笔记-scrapy-请求-下载-结果处理流程

在使用时发现对scrpy的下载过程中的处理逻辑还是不太明晰,-写个文档温习一下。

1.      请求-下载-结果处理流程

从哪开始呢?

engine.py

@defer.inlineCallbacks

def open_spider(self, spider, start_requests=(), close_if_idle=True):

assert self.has_capacity(), "No free spider slot when opening %r" % \

spider.name

logger.info("Spider opened", extra={'spider': spider})

nextcall = CallLaterOnce(self._next_request, spider)

scheduler = self.scheduler_cls.from_crawler(self.crawler)

start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)

slot = Slot(start_requests, close_if_idle, nextcall, scheduler)

self.slot = slot

self.spider = spider

yield scheduler.open(spider)

yield self.scraper.open_spider(spider)

self.crawler.stats.open_spider(spider)

yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)

slot.nextcall.schedule() #

slot.heartbeat.start(5)

注意最后两句

nextcall 是自己写的一个reactor调用中间类,在这里实际是把self._next_request加入了reactor task队列。

slot.nextcall.schedule() 等于调用self._next_request

slot.heartbeat.start(5)声明每5秒调用一次nextcall.schedule

reactor是跑起来了

看到self._next_request

控制循环

def _next_request(self, spider):

slot = self.slot

if not slot:

return

if self.paused:

return

while not self._needs_backout(spider):

if not self._next_request_from_scheduler(spider):

break

if slot.start_requests and not self._needs_backout(spider):

try:

request = next(slot.start_requests)

except StopIteration:

slot.start_requests = None

except Exception:

slot.start_requests = None

logger.error('Error while obtaining start requests',

exc_info=True, extra={'spider': spider})

else:

self.crawl(request, spider)

if self.spider_is_idle(spider) and slot.close_if_idle:

self._spider_idle(spider)

_needs_backout是用于判断爬虫状态的函数

def _next_request_from_scheduler(self, spider):

slot = self.slot

request = slot.scheduler.next_request()

if not request:

return

d = self._download(request, spider)

d.addBoth(self._handle_downloader_output, request, spider)

d.addErrback(lambda f: logger.info('Error while handling downloader output',

exc_info=failure_to_exc_info(f),

extra={'spider': spider}))

d.addBoth(lambda _: slot.remove_request(request))

d.addErrback(lambda f: logger.info('Error while removing request from slot',

exc_info=failure_to_exc_info(f),

extra={'spider': spider}))

d.addBoth(lambda _: slot.nextcall.schedule())

d.addErrback(lambda f: logger.info('Error while scheduling new request',

exc_info=failure_to_exc_info(f),

extra={'spider': spider}))

return d

d 是一个defer对象,然后为它添加了一堆回调函数,包括后续处理,从请求队列中删除,取下一个请求(执行nextcall.schedule(),也就是self._next_request())

Slot是当前请求的状态保存类;

下面要分两条线了,一条是下载_download(单列章节),一条是下载返回结果处理。

下载结果处理:

def _handle_downloader_output(self, response, request, spider):

assert isinstance(response, (Request, Response, Failure)), response

# downloader middleware can return requests (for example, redirects)

if isinstance(response, Request):

self.crawl(response, spider)

return

# response is a Response or Failure

d = self.scraper.enqueue_scrape(response, request, spider)

d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',

exc_info=failure_to_exc_info(f),

extra={'spider': spider}))

return d

继续d = self.scraper.enqueue_scrape(response, request, spider)

def enqueue_scrape(self, response, request, spider):

slot = self.slot

dfd = slot.add_response_request(response, request)

def finish_scraping(_):

slot.finish_response(response, request) # slot状态更新

self._check_if_closing(spider, slot)

self._scrape_next(spider, slot) # 下一步

return _

dfd.addBoth(finish_scraping)

dfd.addErrback(

lambda f: logger.error('Scraper bug processing %(request)s',

{'request': request},

exc_info=failure_to_exc_info(f),

extra={'spider': spider}))

self._scrape_next(spider, slot) #下一步

return dfd

照例:slot用于保存scraper当前任务队列及状态

继续:

def _scrape_next(self, spider, slot):

while slot.queue:

response, request, deferred = slot.next_response_request_deferred()

self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, response, request, spider):

"""Handle the downloaded response or failure through the spider

callback/errback"""

assert isinstance(response, (Response, Failure))

dfd = self._scrape2(response, request, spider) # returns spiders processed output

dfd.addErrback(self.handle_spider_error, request, response, spider)

dfd.addCallback(self.handle_spider_output, request, response, spider)

return dfd

添加回调函数dfd.addCallback(self.handle_spider_output, request, response, spider)

注意:dfd = self._scrape2(response, request, spider) # returns spiders processed output会完成添加spider解析的回调函数。

def call_spider(self, result, request, spider):

result.request = request

dfd = defer_result(result)

dfd.addCallbacks(request.callback or spider.parse, request.errback)

return dfd.addCallback(iterate_spider_output)

关键句:dfd.addCallbacks(request.callback or spider.parse, request.errback)

继续看self.handle_spider_output,

def handle_spider_output(self, result, request, response, spider):

if not result:

return defer_succeed(None)

it = iter_errback(result, self.handle_spider_error, request, response, spider)

dfd = parallel(it, self.concurrent_items,

self._process_spidermw_output, request, response, spider)

return dfd

def _process_spidermw_output(self, output, request, response, spider):

"""Process each Request/Item (given in the output parameter) returned

from the given spider

"""

if isinstance(output, Request):

self.crawler.engine.crawl(request=output, spider=spider)

elif isinstance(output, (BaseItem, dict)):

self.slot.itemproc_size += 1

dfd = self.itemproc.process_item(output, spider)

dfd.addBoth(self._itemproc_finished, output, response, spider)

return dfd

elif output is None:

pass

else:

typename = type(output).__name__

logger.error('Spider must return Request, BaseItem, dict or None, '

'got %(typename)r in %(request)s',

{'request': request, 'typename': typename},

extra={'spider': spider})

如果返回的是请求,crawl()

如果是item或dict,self.itemproc.process_item(output, spider);也就是pipeline中写的process_item方法了

1.1.    小结

代码中对功能函数的拆分,调用还是有一点复杂的,但这样做的好处是代码块之间的耦合性不高,可以非常方便的进行某一函数或功能块的替换。

2.      下载器

下载流程解析

从engine.py中开始:

def _download(self, request, spider):

slot = self.slot

# slot 状态更新

slot.add_request(request)

def _on_success(response):

assert isinstance(response, (Response, Request))

if isinstance(response, Response):

response.request = request # tie request to response received

logkws = self.logformatter.crawled(request, response, spider)

logger.log(*logformatter_adapter(logkws), extra={'spider': spider})

self.signals.send_catch_log(signal=signals.response_received, \

response=response, request=request, spider=spider)

return response

def _on_complete(_):

slot.nextcall.schedule()

return _

dwld = self.downloader.fetch(request, spider)

dwld.addCallbacks(_on_success)

dwld.addBoth(_on_complete)

return dwld

主要是声明了一个deffer并添加了一些回调函数用于状态更新,信号传递。

dwld = self.downloader.fetch(request, spider)

进入downloader.fetch()

def fetch(self, request, spider):

def _deactivate(response):

self.active.remove(request)

return response

self.active.add(request)

dfd = self.middleware.download(self._enqueue_request, request, spider)

return dfd.addBoth(_deactivate)

active用于保存当前下载的任务

添加deactive用于在下载完成后删除active中对应记录

继续,middleware.download()

def download(self, download_func, request, spider):

@defer.inlineCallbacks

def process_request(request):

for method in self.methods['process_request']:

response = yield method(request=request, spider=spider)

assert response is None or isinstance(response, (Response, Request)), \

'Middleware %s.process_request must return None, Response or Request, got %s' % \

(six.get_method_self(method).__class__.__name__, response.__class__.__name__)

if response:

defer.returnValue(response)

defer.returnValue((yield download_func(request=request,spider=spider)))

@defer.inlineCallbacks

def process_response(response):

assert response is not None, 'Received None in process_response'

if isinstance(response, Request):

defer.returnValue(response)

for method in self.methods['process_response']:

response = yield method(request=request, response=response,

spider=spider)

assert isinstance(response, (Response, Request)), \

'Middleware %s.process_response must return Response or Request, got %s' % \

(six.get_method_self(method).__class__.__name__, type(response))

if isinstance(response, Request):

defer.returnValue(response)

defer.returnValue(response)

@defer.inlineCallbacks

def process_exception(_failure):

exception = _failure.value

for method in self.methods['process_exception']:

response = yield method(request=request, exception=exception,

spider=spider)

assert response is None or isinstance(response, (Response, Request)), \

'Middleware %s.process_exception must return None, Response or Request, got %s' % \

(six.get_method_self(method).__class__.__name__, type(response))

if response:

defer.returnValue(response)

defer.returnValue(_failure)

deferred = mustbe_deferred(process_request, request)

deferred.addErrback(process_exception)

deferred.addCallback(process_response)

return deferred

这里完成了下载中间件的处理

然后调用defer.returnValue((yield download_func(request=request,spider=spider)))

实际上就是download的self._enqueue_request

向下走

def _enqueue_request(self, request, spider):

key, slot = self._get_slot(request, spider)

request.meta['download_slot'] = key

def _deactivate(response):

slot.active.remove(request)

return response

slot.active.add(request)

self.signals.send_catch_log(signal=signals.request_reached_downloader,

request=request,

spider=spider)

deferred = defer.Deferred().addBoth(_deactivate)

slot.queue.append((request, deferred))

self._process_queue(spider, slot)

return deferred

老规矩,slot保存任务信息

调用self._process_queue

下载延迟是在这里进行的。

def _process_queue(self, spider, slot):

if slot.latercall and slot.latercall.active():

return

# Delay queue processing if a download_delay is configured

now = time()

delay = slot.download_delay()

if delay:

penalty = delay - now + slot.lastseen

if penalty > 0:

slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)

return

# Process enqueued requests if there are free slots to transfer for this slot

while slot.queue and slot.free_transfer_slots() > 0:

slot.lastseen = now

request, deferred = slot.queue.popleft()

dfd = self._download(slot, request, spider)

dfd.chainDeferred(deferred)

# prevent burst if inter-request delays were configured

if delay:

self._process_queue(spider, slot)

break

下载语句_download()

def _download(self, slot, request, spider):

# The order is very important for the following deferreds. Do not change!

# 1. Create the download deferred

dfd = mustbe_deferred(self.handlers.download_request, request, spider)

# 2. Notify response_downloaded listeners about the recent download

# before querying queue for next request

def _downloaded(response):

self.signals.send_catch_log(signal=signals.response_downloaded,

response=response,

request=request,

spider=spider)

return response

dfd.addCallback(_downloaded)

# 3. After response arrives,  remove the request from transferring

# state to free up the transferring slot so it can be used by the

# following requests (perhaps those which came from the downloader

# middleware itself)

slot.transferring.add(request)

def finish_transferring(_):

slot.transferring.remove(request)

self._process_queue(spider, slot)

return _

return dfd.addBoth(finish_transferring)

在这里,也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求的是调用了self.handlers.download_request:

后面的水有点深了,涉及的知识点比较多,以后有机会再写。

每个下载handler可以理解为requests包,输入url输出response就可以了。

最新文章

  1. Struts1与Struts2的异同
  2. 无法生成临时类(result=1)。 error CS0229: “DCSoftDotfuscate.aam.a”与“DCSoftDotfuscate.aam.a()”之间存在二义性
  3. sql之表的表达式
  4. 汇编程序w=x*y+z-200
  5. HDU 1073 - Online Judge
  6. Java泛型介绍!!!
  7. BZOJ 1406: [AHOI2007]密码箱( 数论 )
  8. 在Repeater控件中使用if语句
  9. Ubuntu16.04+Theano环境
  10. tomcat替换.class文件并没有生效的原因(失效原因)(转)
  11. jQuery事件 (jQuery实现图片轮播)
  12. C语言的格式符
  13. Compass 更智能的搜索引擎(3)--高亮,排序,过滤以及各种搜索
  14. Learning-Python【30】:基于UDP协议通信的套接字
  15. string替换字符串,路径的斜杠替换为下划线
  16. mysqldump导出数据时,某些表不导出,排除某些表,不导出某些表
  17. HTTP 权威指南 第二章 URL 与资源
  18. Linux command nmon
  19. shiro(1) 介绍
  20. Map 合并

热门文章

  1. 关于css实现单行、多行省略标记
  2. sharepoint国内网站一览表(转发)
  3. Build 2017 | 今儿来说说火得不行的认知服务吧(内附微软开发者大会在线峰会报名地址)
  4. Java项目性能瓶颈定位
  5. PHP : url中出现乱码问题
  6. 【转】C内存操作函数
  7. POJ-2002 Squares---绕点旋转+Hash
  8. cocosBuilder生成cbbi文件,绑定到cocos2d-x
  9. 4. NBU文件备份与恢复,图形界面&字符界面操作
  10. 2017.11.16 JavaWeb-------第八章 EL、JSTL、Ajax技术