一 利用multiprocessing模块,开启多进程,实现socket通信并发

1. 开启子进程的两种方式

import time
import random
from multiprocessing import Process
def piao(name):
print('%s piaoing' %name)
time.sleep(random.randrange(1,5))
print('%s piao end' %name) p1=Process(target=piao,args=('egon',)) #必须加,号
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('wupeqi',))
p4=Process(target=piao,args=('yuanhao',)) p1.start()
p2.start()
p3.start()
p4.start()
print('主线程')

定义函数的方式

#开进程的方法二:
import time
import random
from multiprocessing import Process class Piao(Process): #注意一定要继承Process
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print('%s piaoing' %self.name) time.sleep(random.randrange(1,5))
print('%s piao end' %self.name) p1=Piao('egon')
p2=Piao('alex')
p3=Piao('wupeiqi')
p4=Piao('yuanhao') p1.start() #start会自动调用run
p2.start()
p3.start()
p4.start()
print('主进程')

定义类的方式

2.多进程实现socket并发通信

服务端

from socket import *
from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5) def talk(conn,client_addr):
while True:
try: #若不用此句,客户端关闭时,服务端会因报错,停止
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break if __name__ == '__main__': #windows下start进程一定要写到这下面
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))
p.start()

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

存在的问题:每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

解决办法:进程池3 进程池实现并发通信

3 进程池实现并发通信

使用进程池维护固定数目的进程

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5) def talk(conn,client_addr):
print('进程pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break if __name__ == '__main__':
p=Pool()
while True:
conn,client_addr=server.accept()
p.apply_async(talk,args=(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

服务端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

客户端

发现:并发开启多个客户端,服务端同一时间只有4个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理

二 利用threading模块,开启多线程,实现socket通信并发

1. 开启多线程的两种方式

#方式一
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name) if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('主线程')

定义函数的方式

#方式二
from threading import Thread
import time
class Sayhi(Thread): #注意继承Thread类
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hello' % self.name) if __name__ == '__main__':
t = Sayhi('egon')
t.start()
print('主线程')

定义类的方式

2.多进程实现socket并发通信

服务端

from threading import Thread
from socket import * server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5) def talk(conn,addr):
while True:
try:
msg=conn.recv(1024)
if not msg:
break
conn.send(msg.upper())
except Exception:
break if __name__ == '__main__':
while True:
conn,addr=server.accept()
t=Thread(target=talk,args=(conn,addr))
t.start()

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

三 利用concurrent模块,开启进程池、线程池实现socket通信并发

1. 进程池

服务端

from concurrent.futures import ProcessPoolExecutor
from socket import * server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5) def talk(conn,addr):
while True:
try:
msg=conn.recv(1024)
if not msg:
break
conn.send(msg.upper())
except Exception:
break if __name__ == '__main__':
p=ProcessPoolExecutor() #不填则默认为cpu的个数
while True:
conn,addr=server.accept()
p.submit(talk,conn,addr)

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

客户端

2. 线程池

服务端

from concurrent.futures import ThreadPoolExecutor
from socket import * server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5) def talk(conn,addr):
while True:
try:
msg=conn.recv(1024)
if not msg:
break
conn.send(msg.upper())
except Exception:
break if __name__ == '__main__':
p=ThreadPoolExecutor() #不填则默认为cpu的个数*5
while True:
conn,addr=server.accept()
p.submit(talk,conn,addr)

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

客户端

四 利用gevent模块,协程实现单线程下的socket通信并发

 通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket() def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr) def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close() if __name__ == '__main__':
server('127.0.0.1',8080)

服务端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080)) while True:
msg=input('>>: ').strip()
if not msg:continue client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

客户端

from threading import Thread
from socket import *
import threading def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port)) count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()

多线程并发多个客户端

五 利用selectors模块,实现socket并发通信

selectors模块,帮我们默认选择当前平台下最合适的IO多路复用模型(select、poll和epoll)

#服务端
from socket import *
import selectors sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close() server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

服务端

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088)) while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))

客户端



最新文章

  1. yii asset 初步
  2. JavaMail和James的秘密花园
  3. Android调用默认浏览器打开指定Url
  4. (翻译)Emacs Hooks
  5. 谈谈javascript语法里一些难点问题(二)
  6. html5跨域通讯之postMessage的用法
  7. JQuery发送Ajax请求出现 500 Internal Server Error
  8. CSS打造经典鼠标触发显示选项
  9. 从官方下载 Bootstrap 版本 并写 第一个页面
  10. AngularJS 基础教程二:
  11. flash播放器遮挡页面中元素问题解决
  12. [C#]设置或取消开机启动(注册表形式)
  13. MAC使用小技巧之用好mac电脑?的10个必知的小技巧!
  14. 从明面上学习ASP.NET Core
  15. Java学习笔记——鸵鸟学习记(三)
  16. [swarthmore cs75] Lab 1 — OCaml Tree Programming
  17. 转://如何增加linux根目录的磁盘空间(基于LVM)?
  18. Python:SQLMap源码精读—基于错误的盲注(error-based blind)
  19. 理解HTML5中Range对象
  20. AJAX请求.net controller数据交互过程

热门文章

  1. Statusbar
  2. quartz实现定时任务调度
  3. Oracle 11g 分区拆分与合并
  4. cydia源
  5. 【转发】JS中如何判断null/ undefined/IsNull
  6. python之函数用法iter()
  7. 升级 asp.net core 1.1 到 2.0 preview
  8. leetcode || 58、Length of Last Word
  9. 【LeetCode】124. Binary Tree Maximum Path Sum
  10. access database in a helper function ?