Python的平凡之路(10)
2024-10-20 03:23:05
异步IO 数据库 队列 缓存
1、Gevent协程
定义:用户态的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
优点:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
greelet协程.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
#Author is wspikh
# -*- coding: encoding -*-
from greenlet import greenlet
def test1():
print('12')
gr2.switch()
print('--第二次切换完毕--')
print('34')
print('12')
gr2.switch()
print('--第二次切换完毕--')
print('34')
gr2.switch()
def test2():
print('--第一次切换完毕--')
print('56')
gr1.switch()
print('--第三次切换完毕--')
print('78')
gr1 = greenlet(test1) #启动一个协程
gr2 = greenlet(test2)
gr1.switch()#
gevent协程.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
#Author is wspikh
# -*- coding: encoding -*-
import gevent
def foo():
print('Running in foo')
gevent.sleep(3)
print('Running in foo')
gevent.sleep(3)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(2)
print('Explicit context to bar')
gevent.sleep(2)
print('Implicit context switch back to bar')
def func():
print("running func on")
gevent.sleep(1)
print("running func on")
gevent.sleep(1)
print("running func again")
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(func)
])
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(func)
])
socket-server.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import sys
import socket
import time
import gevent
from gevent import socket, monkey
monkey.patch_all()
#Author is wspikh
# -*- coding: encoding -*-
import sys
import socket
import time
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
socket-client.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
#_*_coding:utf-8_*_
import socket
HOST = 'localhost' # The remote host
PORT = 9999 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data)
print('Received', repr(data))
s.close()
#Author is wspikh
# -*- coding: encoding -*-
#_*_coding:utf-8_*_
import socket
HOST = 'localhost' # The remote host
PORT = 9999 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data)
print('Received', repr(data))
s.close()
2、Select\Poll\Epoll异步IO与事件驱动(详看各种模块)
(1)写程序代码的时候,每收到一个请求,放入一个事件列表,让主进程通过阻塞I/O方式处理请求是大多数网络服务器采用的方式。
(2)事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
(3)阻塞IO,其特点是在IO执行的两个阶段都被Block了。
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
(4)非阻塞IO,其特点是用户进程需要不断的主动询问kernel数据好了没有
当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
(5)IO多路复用
IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
当用户进程调用了select,那么整个进程会被block
,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block
(6) 异步IO
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
(7)调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
(8)synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
(9)各个IO Model
通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据
(10)select例子
select-socket-client1.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import socket
import sys
messages = ['This is the message',
'It will be sent',
'in parts.',
]
server_address = ('localhost',10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET,socket.SOCK_STREAM),
socket.socket(socket.AF_INET,socket.SOCK_STREAM),
]
# Connect the socket to the port where ther server is listning
print(sys.stderr,'connecting to %s port %s' % server_address)
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print(sys.stderr, '%s: sending "%s"' %(s.getsockname(),message))
s.send(message.encode('utf-8'))
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print(sys.stderr,'%s: received "%s"' %(s.getsockname(),data))
if not data:
print(sys.stderr,'closing socket',s.getsockname())
s.close()
#Author is wspikh
# -*- coding: encoding -*-
import socket
import sys
messages = ['This is the message',
'It will be sent',
'in parts.',
]
server_address = ('localhost',10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET,socket.SOCK_STREAM),
socket.socket(socket.AF_INET,socket.SOCK_STREAM),
]
# Connect the socket to the port where ther server is listning
print(sys.stderr,'connecting to %s port %s' % server_address)
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print(sys.stderr, '%s: sending "%s"' %(s.getsockname(),message))
s.send(message.encode('utf-8'))
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print(sys.stderr,'%s: received "%s"' %(s.getsockname(),data))
if not data:
print(sys.stderr,'closing socket',s.getsockname())
s.close()
select-socket-server1.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket()
server.setblocking(False) #非阻塞模式
# Bind the socket to the port
server_address = ('localhost',10000)
print(sys.stderr,'starting up on %s port %s' % server_address)
server.bind(server_address)
# Listen for incoming connection
server.listen(200)
# Sockets from which we expect to read
inputs = [server,]
# Sockets to which we expect to write
outputs = [ ]
message_queues = {}
while True:
# Wait for at least on of the sockets to be ready for processing
print('\nwaiting for the next event')
readable, writeable, exceptional = select.select(inputs,outputs,inputs)
print(readable, writeable, exceptional)
# Handle inputs
for s in readable:
if s is server:
# A"readable" server socket is ready to accept a connection
conn,addr = s.accept()
print('new connection from',conn,addr)
#conn.setblocking(False)
inputs.append(conn)
# Give the connection a queue for data we want to send
message_queues[conn] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr,'received "%s" from %s' %(data,s.getpeername()))
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing',addr,'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
# Handle outputs
#if __name__ == '__main__':
for s in writeable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writebility
print('output queue for', s.getpeername(),'is empty')
outputs.remove(s)
else:
print('sending "%s" to %s' % (next_msg,s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print("Handing exceptional condition for",s.getpeername())
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
#Author is wspikh
# -*- coding: encoding -*-
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket()
server.setblocking(False) #非阻塞模式
# Bind the socket to the port
server_address = ('localhost',10000)
print(sys.stderr,'starting up on %s port %s' % server_address)
server.bind(server_address)
# Listen for incoming connection
server.listen(200)
# Sockets from which we expect to read
inputs = [server,]
# Sockets to which we expect to write
outputs = [ ]
message_queues = {}
while True:
# Wait for at least on of the sockets to be ready for processing
print('\nwaiting for the next event')
readable, writeable, exceptional = select.select(inputs,outputs,inputs)
print(readable, writeable, exceptional)
# Handle inputs
for s in readable:
if s is server:
# A"readable" server socket is ready to accept a connection
conn,addr = s.accept()
print('new connection from',conn,addr)
#conn.setblocking(False)
inputs.append(conn)
# Give the connection a queue for data we want to send
message_queues[conn] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr,'received "%s" from %s' %(data,s.getpeername()))
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing',addr,'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
# Handle outputs
#if __name__ == '__main__':
for s in writeable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writebility
print('output queue for', s.getpeername(),'is empty')
outputs.remove(s)
else:
print('sending "%s" to %s' % (next_msg,s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print("Handing exceptional condition for",s.getpeername())
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
最新文章
- 【转】C/S,B/S区别
- 【转】logback logback.xml常用配置详解(三) <;filter>;
- DevExpress中的ASPxTreeView 递归显示checknodes并获得选中值
- 使用easyui时 进入一个新页面 前要经过一个页面混乱的时候 才到正常的页面去
- SQL Server死锁
- 炫酷JQUERY自定义对话框插件JDIALOG_JDIALOG弹出对话框和确认对话框插件
- HDU 4258 Covered Walkway 斜率优化DP
- 【剑指offer】顺时针打印矩阵
- 项目文件包含 ToolsVersion=";12.0"; 设置,而此版本的 MSBuild 不支持该工具版本
- 用Windows Live Writer发来
- 团体程序设计天梯赛-练习集L2-005. 集合相似度
- POJ 3414--Pots(BFS+回溯路径)
- 201521123096《Java程序设计》第十二周学习总结
- Spock - Document -01- introduction &; Getting Started
- java.util.concuttent Callable Future详解
- Spark记录-Scala语句(运算符-if-for-while-try-模式匹配)
- 高并发第十二弹:并发容器J.U.C -- Executor组件FutureTask、ForkJoin
- Logrotate还有谁记得它??
- IOTutility 一个轻量级的 IOT 基础操作库
- QtGui.QCheckBox