IO多路复用
import asyncio 这个是异步IO模块 这个还不知道怎么用 select poll epoll 都是IO多路复用 windows 仅支持select linux2.6以后 支持epoll epoll是相当厉害的 详细的描述参考:http://www.cnblogs.com/alex3714/articles/5876749.html select、poll、epoll区别:http://www.cnblogs.com/alex3714/p/4372426.html 先来写一个用select方式的socket模型
import select #引入select模块,这是利用操作系统的select处理方式,windows、linux、unix都支持
import socket
import sys
import queue server = socket.socket()
server.setblocking(0) server_addr = ('0.0.0.0',9999) print('starting up on %s port %s' % server_addr)
server.bind(server_addr) server.listen(5) inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
outputs = [] message_queues = {} while True:
print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 for s in readable: #每个s就是一个socket if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
#就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
#新连接进来了,接受这个连接
conn, client_addr = s.accept()
print("new connection from",client_addr)
conn.setblocking(0)
inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
#就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
#readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
#客户端的数据过来了,在这接收
try:
data = s.recv(1024)
except ConnectionResetError as e:
data=None
print(e)
print("客户端断开了", s)
if s in outputs:
outputs.remove(s) # 清理已断开的连接
inputs.remove(s) # 清理已断开的连接
del message_queues[s] ##清理已断开的连接
if data:
print("收到来自[%s]的数据:" % s.getpeername()[0], data)
message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
if s not in outputs:
outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 else:#如果收不到data代表什么呢? 代表客户端断开了呀
print("客户端断开了",s) if s in outputs:
outputs.remove(s) #清理已断开的连接
if s in inputs:
inputs.remove(s) #清理已断开的连接
if s in message_queues.keys():
del message_queues[s] ##清理已断开的连接 for s in writeable:
try :
next_msg = message_queues[s].get_nowait() except queue.Empty:
print("client [%s]" %s.getpeername()[0], "queue is empty..")
outputs.remove(s) else:
print("sending msg to [%s]"%s.getpeername()[0], next_msg)
s.send(next_msg.upper()) for s in exeptional:
print("handling exception for ",s.getpeername())
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close() del message_queues[s]

再来一个升级版本的,selectors模块,它底层是看操作系统的,默认是epoll,但是如果不支持,比如windows,linux kernel < 2.6,就用select模式

import selectors,socket #selectors 超牛的IO多路复用的模块,支持select poll epoll ,优先epoll

sel=selectors.DefaultSelector()

def accpect(sock,mask):  #创建业务连接的方法
conn,addr=sock.accept()
print("accept:",conn,"from:",addr)
conn.setblocking(False)
sel.register(fileobj=conn,events=selectors.EVENT_READ,data=read) def read(conn,mask): #连接以后 执行业务的方法
data=conn.recv(1024)
if data:
print("recv data:",data,"conn:",conn)
# print(conn.getsockname())
conn.send(data)
else:
print("close:",conn)
sel.unregister(fileobj=conn)
conn.close() server=socket.socket()
server.bind(("0.0.0.0",9999))
server.listen(10000)
server.setblocking(False) sel.register(fileobj=server,events=selectors.EVENT_READ,data=accpect) #只需要注册需要并发使用IO的应用,明确对应的回调data内容就行了 while True:
events=sel.select()
print("已经注册sel数量:")
for key,mask in events:
callback=key.data #获取注册时的sel.register()方法参数中的data对应的内容,之前放进去的有accpect和read函数
callback(key.fileobj,mask)

这里注意,有个非常非常尴尬的问题情景:

需求:

1、基于socket

2、要多进程处理业务(多线程由于GIL锁,多核也无法真正同时刻处理,所以用多进程)

3、进程内必须是selectors处理连接

此时,就非常尴尬了,原因是多进程multiprocessing模块,IO多路复用selectors模块,他俩有矛盾:

矛盾就是:

multiprocessing就是认为socket是阻塞的,

selectors要求socket必须是非阻塞的,不然没办法监听活动。

那么socket怎么办呢,这个问题,在windows还好,因为select方式还是慢,显不出来,linux epoll超快的,直接就严重影响业务了

我是这么解决这个矛盾的

需要多次接收或发送 或者 循环发送循环接收的业务情景,selectors监听到后,步骤:

1、从selectors实例中注销掉这个业务连接,

2、设定这个业务连接为阻塞,

3、开始进行上述情景业务,

4、再设定为非阻塞,

5、重新注册回selectors

最新文章

  1. Mac下环境变量配置
  2. LR通过snmp监控linux下的mysql
  3. 【Tyvj1601】魔兽争霸(主席树,树套树)
  4. Tomcat 集群模式下 Session 更新 Bug (redis memcached 及tomcat自已的集群)
  5. 常用&lt;meta&gt;标签
  6. Spark计算工作流
  7. 【mysql】【分组】后取每组的top2
  8. 关于UIText换行
  9. java 读取文件的路径
  10. vue 之 加载 iframe 的处理
  11. Dubbo简介---搭建一个最简单的Demo框架
  12. 使用GDI+进行图片处理时要注意的问题
  13. 超链接 a的小手
  14. 手把手教你写一个java的orm(四)
  15. HOJ 2139 Spiderman's workout(动态规划)
  16. 7503E-M-irf2配置以及bfd配置
  17. 使用CMake,且在GCC编译时指定相对源代码路径选项BUG的问题
  18. 杂项:WiKi
  19. SpringBoot 精简笔记
  20. laravel 多个where的连接使用

热门文章

  1. Luogu P2326 AKN&#39;s PPAP【按位贪心】
  2. Java | 基础归纳 | set
  3. SpringBoot | 读取配置文件信息
  4. django相关命令
  5. _bzoj1096 [ZJOI2007]仓库建设【斜率优化dp】
  6. 【C#】将数据库读出的数据转换为DataTable类型集合
  7. CF782A Andryusha and Socks
  8. 微信小程序 图片加载失败处理方案
  9. STM32&amp;AT指令NBIOT模组
  10. UVA 11419 SAM I AM (最小点覆盖,匈牙利算法)