bee_server.py

from sanic import Sanic
from sanic import response
from urlpool import UrlPool #初始化urlpool,很久需要进行修改
urlpool = UrlPool(__file__) #初始化url
urlpool.add('https://news.sina.com.cn/')
app = Sanic(__name__) @app.listener("after_server_stop")
async def cache_urlpool(app,loop):
global urlpool
print("caching urlpool after_server_stop")
del urlpool
print("bye!") @app.route("/task")
async def task_get(request):
count = request.args.get("count",10)
try:
count = int(count)
except:
count = 10
urls = urlpool.pop(count)
return response.json(urls) @app.route("/task",methods=["POST",])
async def task_post(requrst):
result = requrst.json()
urlpool.set_status(result['url'],result['status'])
if result['url_real'] != result['url']:
urlpool.set_status(result["url_real"],result["status"])
if result["newurls"]:
print("receive URLs:",len(result["newurls"]))
for url in result["newurls"]:
urlpool.add(url)
return response.text("ok") if __name__ == '__main__':
app.run(
host='127.0.0.1',
port = 8080,
debug=False,
access_log=False,
workers=1
)

bee_client.py

import aiohttp
import json
import asyncio
import traceback
import time class CrawlerClient:
def __init__(self):
self._workers = 0
self.workers_max = 10
self.server_host = "localhost"
self.server_port = 8080
self.session = aiohttp.ClientSession(loop=self.loop)
self.queue = asyncio.Queue async def get_url(self):
count = self.workers_max-self.queue.qsize()
if count <= 0:
print("no need to get urls this time") url = "http://%S:%S/task?count=%s" % (self.server_host,self.server_port,count)
try:
async with self.session.get(url,timeout = 3) as response:
if response.status not in [200,201]:
return
jsn = await response.text()
urls = json.loads(jsn)
msg = ('get_urls() to get [%s] but got[%s],@%s') % (count,len(urls),time.strftime('%Y-%m-%d %H:%M:%S'))
print(msg)
for lv in urls.items():
await self.queue.put(lv)
print()
except:
traceback.print_exc()
return async def send_result(self,result):
'''
result = {
"url“:url,
'url_real':response.url,
'status':status,
"newurls":newurls,
}
'''
url = "http://%S:%S/task" % (self.server_host,self.server_port)
try:
async with self.session.post(url,json = result,timeout = 3) as response:
response.status
except:
traceback.print_exc()
pass

最新文章

  1. 有时打开myeclipse,部署报错解决方案
  2. [转]EL表达式和JSTL表达式实例
  3. angularjs指令(一)
  4. Android composite adb interface
  5. 【JS】Intermediate8:jQuery:AJAX
  6. hdu2030java
  7. codevs 3223 素数密度
  8. wpf中数据绑定(Datacontext)的应用
  9. Leetcode 1——twosum
  10. Nginx(一)-windows下的安装配置
  11. 理解JS原型和原型链
  12. JavaScript匿名函数入门。
  13. NEW —— Code
  14. 初识Identity并添加身份验证管理页面
  15. 【C#进阶】拥抱Lambda(一)
  16. Linux常用命令&amp;定位生产报错日志
  17. Flume的介绍和简单操作
  18. Nginx日志切割工具——logrotate 使用记录
  19. 【OpenGL】无法启动此程序,因为计算机中丢失 glut32.dll。尝试重新安装该程序以解决此问题。
  20. SocketServer模块中的几种类

热门文章

  1. logstash中output{}的另类写法
  2. WSL 2 上启用微软官方支持的 systemd
  3. 简书是如何把用户wo逼疯的
  4. 关于vmware虚拟机的ova/ovf转换成aws上的AMI镜像
  5. P1073 [NOIP2009 提高组] 最优贸易 (最短路spfa)
  6. 洛谷P6033 [NOIP2004 提高组] 合并果子 加强版 (单调队列)
  7. ByPass
  8. 集合元素的遍历操作,使用迭代器Iterator接口
  9. Resilience4J通过yml设置circuitBreaker
  10. PhpStorm 2020.1.2破解 | JetBrains PhpStorm 2020.1.2破解版 附破解文件