笔记-scrapy-请求-下载-结果处理流程
笔记-scrapy-请求-下载-结果处理流程
在使用时发现对scrpy的下载过程中的处理逻辑还是不太明晰,-写个文档温习一下。
1. 请求-下载-结果处理流程
从哪开始呢?
engine.py
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
assert self.has_capacity(), "No free spider slot when opening %r" % \
spider.name
logger.info("Spider opened", extra={'spider': spider})
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule() #
slot.heartbeat.start(5)
注意最后两句
nextcall 是自己写的一个reactor调用中间类,在这里实际是把self._next_request加入了reactor task队列。
slot.nextcall.schedule() 等于调用self._next_request
slot.heartbeat.start(5)声明每5秒调用一次nextcall.schedule
reactor是跑起来了
看到self._next_request
控制循环
def _next_request(self, spider):
slot = self.slot
if not slot:
return
if self.paused:
return
while not self._needs_backout(spider):
if not self._next_request_from_scheduler(spider):
break
if slot.start_requests and not self._needs_backout(spider):
try:
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
self.crawl(request, spider)
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
_needs_backout是用于判断爬虫状态的函数
def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request()
if not request:
return
d = self._download(request, spider)
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
d 是一个defer对象,然后为它添加了一堆回调函数,包括后续处理,从请求队列中删除,取下一个请求(执行nextcall.schedule(),也就是self._next_request())
Slot是当前请求的状态保存类;
下面要分两条线了,一条是下载_download(单列章节),一条是下载返回结果处理。
下载结果处理:
def _handle_downloader_output(self, response, request, spider):
assert isinstance(response, (Request, Response, Failure)), response
# downloader middleware can return requests (for example, redirects)
if isinstance(response, Request):
self.crawl(response, spider)
return
# response is a Response or Failure
d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
继续d = self.scraper.enqueue_scrape(response, request, spider)
def enqueue_scrape(self, response, request, spider):
slot = self.slot
dfd = slot.add_response_request(response, request)
def finish_scraping(_):
slot.finish_response(response, request) # slot状态更新
self._check_if_closing(spider, slot)
self._scrape_next(spider, slot) # 下一步
return _
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error('Scraper bug processing %(request)s',
{'request': request},
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
self._scrape_next(spider, slot) #下一步
return dfd
照例:slot用于保存scraper当前任务队列及状态
继续:
def _scrape_next(self, spider, slot):
while slot.queue:
response, request, deferred = slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)
def _scrape(self, response, request, spider):
"""Handle the downloaded response or failure through the spider
callback/errback"""
assert isinstance(response, (Response, Failure))
dfd = self._scrape2(response, request, spider) # returns spiders processed output
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
添加回调函数dfd.addCallback(self.handle_spider_output, request, response, spider)
注意:dfd = self._scrape2(response, request, spider) # returns spiders processed output会完成添加spider解析的回调函数。
def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)
关键句:dfd.addCallbacks(request.callback or spider.parse, request.errback)
继续看self.handle_spider_output,
def handle_spider_output(self, result, request, response, spider):
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items,
self._process_spidermw_output, request, response, spider)
return dfd
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error('Spider must return Request, BaseItem, dict or None, '
'got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider})
如果返回的是请求,crawl()
如果是item或dict,self.itemproc.process_item(output, spider);也就是pipeline中写的process_item方法了
1.1. 小结
代码中对功能函数的拆分,调用还是有一点复杂的,但这样做的好处是代码块之间的耦合性不高,可以非常方便的进行某一函数或功能块的替换。
2. 下载器
下载流程解析
从engine.py中开始:
def _download(self, request, spider):
slot = self.slot
# slot 状态更新
slot.add_request(request)
def _on_success(response):
assert isinstance(response, (Response, Request))
if isinstance(response, Response):
response.request = request # tie request to response received
logkws = self.logformatter.crawled(request, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
self.signals.send_catch_log(signal=signals.response_received, \
response=response, request=request, spider=spider)
return response
def _on_complete(_):
slot.nextcall.schedule()
return _
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
主要是声明了一个deffer并添加了一些回调函数用于状态更新,信号传递。
dwld = self.downloader.fetch(request, spider)
进入downloader.fetch()
def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
self.active.add(request)
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)
active用于保存当前下载的任务
添加deactive用于在下载完成后删除active中对应记录
继续,middleware.download()
def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_request must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, response.__class__.__name__)
if response:
defer.returnValue(response)
defer.returnValue((yield download_func(request=request,spider=spider)))
@defer.inlineCallbacks
def process_response(response):
assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)
for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert isinstance(response, (Response, Request)), \
'Middleware %s.process_response must return Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)
@defer.inlineCallbacks
def process_exception(_failure):
exception = _failure.value
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_exception must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if response:
defer.returnValue(response)
defer.returnValue(_failure)
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
这里完成了下载中间件的处理
然后调用defer.returnValue((yield download_func(request=request,spider=spider)))
实际上就是download的self._enqueue_request
向下走
def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
def _deactivate(response):
slot.active.remove(request)
return response
slot.active.add(request)
self.signals.send_catch_log(signal=signals.request_reached_downloader,
request=request,
spider=spider)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred
老规矩,slot保存任务信息
调用self._process_queue
下载延迟是在这里进行的。
def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active():
return
# Delay queue processing if a download_delay is configured
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break
下载语句_download()
def _download(self, slot, request, spider):
# The order is very important for the following deferreds. Do not change!
# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)
# 3. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
slot.transferring.add(request)
def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
return _
return dfd.addBoth(finish_transferring)
在这里,也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求的是调用了self.handlers.download_request:
后面的水有点深了,涉及的知识点比较多,以后有机会再写。
每个下载handler可以理解为requests包,输入url输出response就可以了。
最新文章
- Struts1与Struts2的异同
- 无法生成临时类(result=1)。 error CS0229: “DCSoftDotfuscate.aam.a”与“DCSoftDotfuscate.aam.a()”之间存在二义性
- sql之表的表达式
- 汇编程序w=x*y+z-200
- HDU 1073 - Online Judge
- Java泛型介绍!!!
- BZOJ 1406: [AHOI2007]密码箱( 数论 )
- 在Repeater控件中使用if语句
- Ubuntu16.04+Theano环境
- tomcat替换.class文件并没有生效的原因(失效原因)(转)
- jQuery事件 (jQuery实现图片轮播)
- C语言的格式符
- Compass 更智能的搜索引擎(3)--高亮,排序,过滤以及各种搜索
- Learning-Python【30】:基于UDP协议通信的套接字
- string替换字符串,路径的斜杠替换为下划线
- mysqldump导出数据时,某些表不导出,排除某些表,不导出某些表
- HTTP 权威指南 第二章 URL 与资源
- Linux command nmon
- shiro(1) 介绍
- Map 合并
热门文章
- 关于css实现单行、多行省略标记
- sharepoint国内网站一览表(转发)
- Build 2017 | 今儿来说说火得不行的认知服务吧(内附微软开发者大会在线峰会报名地址)
- Java项目性能瓶颈定位
- PHP : url中出现乱码问题
- 【转】C内存操作函数
- POJ-2002 Squares---绕点旋转+Hash
- cocosBuilder生成cbbi文件,绑定到cocos2d-x
- 4. NBU文件备份与恢复,图形界面&;字符界面操作
- 2017.11.16 JavaWeb-------第八章 EL、JSTL、Ajax技术