1.settings.py

INSTALLED_APPS = [
'...',
'channels',
'...',
]
ASGI_APPLICATION = 'server.routing.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [(os.getenv('REDIS_SERVER_HOST', '127.0.0.1'), int(os.getenv('REDIS_SERVER_PORT', '')))],
},
},
}

2.routing.py(settin.py同级)

# -*- coding: utf-8 -*-

from channels.routing import ProtocolTypeRouter, URLRouter

from device.consumers import QueryAuthMiddleware  # websocket中间件
import device.routing # 路由 application = ProtocolTypeRouter({
'websocket': QueryAuthMiddleware(
URLRouter(
device.routing.websocket_urlpatterns
)
)
})
class QueryAuthMiddleware:
"""
WebSocket 认证中间件
"""
def __init__(self, inner):
# Store the ASGI application we were passed
self.inner = inner def __call__(self, scope):
query = parse.parse_qs(scope["query_string"].decode())
token = query.get('token', [None])[0]
device_name = None
try:
login_info = json.loads(base64.b64decode(token).decode())
device_name = login_info['name']
logger.info('Device {} connect from {} port {}'.format(device_name, scope['client'][0], scope['client'][1])) # 验证时间
gen_time = datetime.strptime(login_info['time'], '%Y%m%d%H%M%S')
now_time = datetime.now()
if abs(gen_time - now_time).total_seconds() > 5 * 60:
raise ValueError('Device {} time validate fail: token time: {}, server time: {}'.format(
device_name, gen_time.strftime('%Y-%m-%d %H:%M:%S'), now_time.strftime('%Y-%m-%d %H:%M:%S'))) # 验证密码
device = DeviceInfoModel.objects.get(name=device_name) # 获取到设备
correct = device.check_password(login_info['password']) # 检查密码
if not correct:
raise ValueError('Device {} password validate fail'.format(device_name)) except DeviceInfoModel.DoesNotExist:
logger.info('Device {} can not find in DB'.format(device_name))
device = None except Exception as e:
logger.error('Connect error {}'.format(e))
device = None finally:
# Middleware 中必须手动关闭数据库连接
# http://channels.readthedocs.io/en/latest/topics/authentication.html#custom-authentication
close_old_connections() return self.inner(dict(scope, user=device))

此时,启动方式也应该稍作调整(项目根目录下新建asgi.py文件)

"""
ASGI entrypoint. Configures Django and then runs the application
defined in the ASGI_APPLICATION setting.
""" import os
import django
from channels.routing import get_default_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
django.setup()
application = get_default_application()

客户端示例:

import threading
import time
import json
import tarfile
import base64
import hashlib
import os
import pickle
from io import BytesIO
from datetime import datetime import websocket def on_message(ws, message):
print('on_message')
msg = json.loads(message)
print(msg)
# print(msg['result'])
# print(pickle.loads(base64.b64decode(msg['content']['2-5b4747af5f627dbcb5eaefeb']))) def on_error(ws, error):
print(error) def on_close(ws):
print("### closed ###") def on_open(ws):
def run(*args):
# for i in range(1):
# tarobj = BytesIO()
# tar = tarfile.open(fileobj=tarobj, mode='w:gz')
# dir_list = filter(lambda name: os.path.isdir(name) and not name.startswith('.'), os.listdir(os.path.abspath(os.path.dirname('__file__'))))
# for dir in dir_list:
# tar.add(dir)
# tar.close()
# tarobj.seek(0)
with open('D:\\software\\websocket\websocket\\face\\18032811042400\\0.jpg', 'rb') as f:
base64_data = base64.b64encode(f.read())
s = base64_data.decode() file_obj = {
'timestamp': datetime.now().timestamp(),
'image': s
} info = {
'command': 'TEST',
'content': file_obj,
# 'content': ['30-5b691d365f627dee73c4b58d-common', '71-5b695edf5f627d762d5d651b'],
}
ws.send(json.dumps(info))
time.sleep(100)
ws.close()
print("thread terminating...")
threading.Thread(target=run).start() if __name__ == "__main__": websocket.enableTrace(True)
login_info = {
'name': '',
'password': hashlib.md5(''.encode('utf8')).hexdigest(),
'time': datetime.now().strftime('%Y%m%d%H%M%S')
} b64_token = base64.b64encode(json.dumps(login_info).encode()).decode()
url = "ws://xx.xx.xx.xx:8000/push/device/?token={}".format(b64_token)
print(login_info)
print('Connect {}'.format(url))
ws = websocket.WebSocketApp(url,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()

参考:https://channels.readthedocs.io/en/latest/deploying.html

最新文章

  1. 改变html中鼠标形状
  2. sqlserver查询所有表名、字段名、类型、长度和存储过程、视图的创建语句
  3. Volley源码分析(1)----Volley 队列
  4. sass、git、ruby的安装与使用。
  5. javascript的词法分析
  6. github 多帐户使用
  7. C#类索引器的使用
  8. C数组的相关知识
  9. [置顶] ruby变量详解(收集+整理)
  10. MicrosoftSQLServer中的锁模式
  11. linux命令: mount
  12. JavaWeb(六)之MVC与三层架构设计
  13. Iptables详解七层过滤
  14. 流API--流的映射
  15. lxml etree的一个问题
  16. 使用Vitrualbox虚拟Windows Server 2016系统的一些常见问题
  17. secureCRT免密码登陆Linux
  18. 丰富的else语句以及简介的with语句
  19. poj 2505 A multiplication game
  20. H.264 SVC

热门文章

  1. flutter 页面布局 Paddiing Row Column Expanded 组件
  2. 算法习题---5-5复合词(UVa10391)
  3. 果卿居士-《四种清净明诲》之不淫欲 -------------------------------------------------------------------------------- (转自学佛网:http://www.xuefo.net/nr/article19/186541.html)
  4. netty WEBSOKET 客户端 JAVA
  5. MySQL学习笔记——MySQL启动过程(一)
  6. nodejs应用转换png,jpg,gif为webp图片格式
  7. JMC(Java Mission Control)在mac下无法启动和显示界面
  8. Django:序列化的几种方法
  9. idea安装阿里云插件和sonar插件
  10. 《ucore lab5》实验报告