scrapy_redis原理分析并实现断点续爬以及分布式爬虫
时间:2023-3-1 19:06 作者:wen 分类: Python
一、下载github的demo
1、clone github scrapy-redis源文件
git clone https://github.com/rolando/scrapy-redis.git
2、下载扩展
pip install scrapy_redis
二、观察dmoz文件
在dmoz爬虫文件中,实现方法就是之前的crawlspider类型爬虫
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
class DmozSpider(CrawlSpider):
"""Follow categories and extract links."""
name = 'dmoz'
allowed_domains = ['dmoz-odp.org']
start_urls = ['http://www.dmoz-odp.org/']
rules = [
Rule(LinkExtractor(
restrict_css=('.top-cat', '.sub-cat', '.cat-item')
), callback='parse_directory', follow=True),
]
def parse_directory(self, response):
for div in response.css('.title-and-desc'):
yield {
'name': div.css('.site-title::text').extract_first(),
'description': div.css('.site-descr::text').extract_first().strip(),
'link': div.css('a::attr(href)').extract_first(),
}
但是在setting.py中多了以下内容,这几行表示scrapy_redis中重新实现了去重的类,以及调度器,并且使用RedisPipeline管道类
# 设置重复过滤器的模块
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 设置调取器、scrapy_redis中的调度器具备与数据库交互的功能
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 设置当爬虫结束的时候是否保持redis数据库中的去重集合与任务队列
SCHEDULER_PERSIST = True
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderPriorityQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderStack"
ITEM_PIPELINES = {
'example.pipelines.ExamplePipeline': 300,
# 当开启该管道,该管道将会把数据存储Redis数据库中
'scrapy_redis.pipelines.RedisPipeline': 400,
}
三、运行dmoz爬虫,观察现象
1、首先我们需要添加redis的地址,程序才能够使用redis
# 设置redis数据库
REDIS_URL = "redis://127.0.0.1:6379"
# 或者使用下面的方式
# REDIS_HOST = '127.0.0.1'
# REDIS_PORT = 6379
我们执行domz的爬虫,会发现redis中一下多了三个键
- dmoz:requests:Scheduler队列,存放的待请求的request对象,获取的过程是pop操作,即获取一个会去除一个
- dmoz:dupefilter:指纹集合,存放的是已经进入Scheduler队列的request对象的指纹,指纹默认有请求方法,url和请求体组成
- dmoz:items:存放的获取到的item信息,在pipeline中开启RedisPipeline才会 存入
终止进程后再次运行dmoz爬虫,继续执行程序,会发现程序在前一次的基础上继续往后执行,所以domz爬虫是一个基于URL地址的增量式爬虫
四、scrapy_redis的原理分析
1、Scrapy_redis之RedisPipeline
RedisPipeline中观察process_item,进行数据的保存,存入了redis中
scrapy_redis.pipelines.RedisPipeline代码片段:
# 使用了process_item方法,实现数据的保存
def process_item(self, item, spider):
# 调用一个异步线程去处理这个item
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
# 向dmoz:item中添加item
self.server.rpush(key, data)
return item
2、Scrapy_redis之RFPDupeFilter
RFPDupeFilter实现了对request对象的加密
scrapy_redis.dupefilter.RFPDupeFilter代码片段:
def request_seen(self, request):
# 判断requests对象是否存在
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
# 返回0表示添加是吧,即已经存在,否则表示不存在
return added == 0
def request_fingerprint(self, request):
return request_fingerprint(request)
def request_fingerprint(
request: Request,
include_headers: Optional[Iterable[Union[bytes, str]]] = None,
keep_fragments: bool = False,
):
headers: Optional[Tuple[bytes, ...]] = None
if include_headers:
headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
cache_key = (headers, keep_fragments)
if cache_key not in cache:
# sha1加密
fp = hashlib.sha1()
# 请求方法
fp.update(to_bytes(request.method))
# 请求地址
fp.update(to_bytes(canonicalize_url(request.url, keep_fragments=keep_fragments)))
# 请求体,post请求才会有
fp.update(request.body or b'')
# 添加请求头,默认不添加请求头(因为header的cookies中包含有session_id,这在不同的网站是随机的,会给sha1的计算结果带来误差)
if headers:
for hdr in headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
# 返回加密之后的16进制
cache[cache_key] = fp.hexdigest()
return cache[cache_key]
3、scrapy_redis之Scheduler
scrapy_redis调度器实现了决定什么时候把request对象加入带抓取的队列,同时把请求过的request对象过滤掉
scrapy_redis.scheduler.Scheduler代码片段:
def close(self, reason):
# 如果在setting中设置为不支持,那么在退出的时候回来清空
if not self.persist:
self.flush()
def flush(self):
# 指的是存放dupefilter的redis
self.df.clear()
# 指的是存放requests的redis
self.queue.clear()
def enqueue_request(self, request):
# 不能加入待爬队列的条件
# 当前url需要经过allow_domain过滤并且request不存在dp的时候
# 由此:对于像百度贴吧这种页面内容会更新的网址,可以设置dont_filter为True让其能够被反复抓取
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
4、由此可以终结出request对象入队的条件
- request的指纹不在集合中
- request的dont_filter为True,即不过滤
- start_urls中的url地址会入队,因为他们默认是不过滤
5、实现单机断点续爬
五、实现分布式爬虫
打开example-project项目的myspider_redis.py文件
通过观察代码:
-
继承自父类为RedisSpider
-
增加了一个redis_key的键,没有start_urls,因为分布式中,如果每台电脑都请求一次start_url就会重复
-
多了init方法,该方法不是必须的,可以手动指定allow_domains
-
启动方法:
- 在每个节点正确的目录下执行scrapy crawl 爬虫名,使该节点的scrapy_redis爬虫程序就位
- 在共用的redis中 lpush redis_key 'start_url',使全部节点真正的开始运行
-
setting.py中关键配置
# 设置重复过滤器的模块
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 设置调取器、scrapy_redis中的调度器具备与数据库交互的功能
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 设置当爬虫结束的时候是否保持redis数据库中的去重集合与任务队列
SCHEDULER_PERSIST = True
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderPriorityQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderQueue"
#SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderStack"
ITEM_PIPELINES = {
'example.pipelines.ExamplePipeline': 300,
# 当开启该管道,该管道将会把数据存储Redis数据库中
'scrapy_redis.pipelines.RedisPipeline': 400,
}
# 设置redis数据库
REDIS_URL = "redis://127.0.0.1:6379"
-
分布式爬虫编写流程
- 编写普通爬虫
- 创建项目
- 明确目标
- 创建爬虫
- 保存内容
-
改造成分布式爬虫
- 改成爬虫
- 导入类
from scrapy_redis.spiders import RedisSpider
- 继承类
class MySpider(RedisSpider):
- 注销 start_urls & allowed_domains
- 设置redis_key获取start_url
- 设置init获取允许的域
- 导入类
- 改造配置文件
- copy配置参数
- 改成爬虫
标签: 爬虫