From a410275e183b455e77e31903d7afd5210a29d3f1 Mon Sep 17 00:00:00 2001 From: Boris Date: Tue, 30 Mar 2021 10:38:02 +0800 Subject: [PATCH] =?UTF-8?q?AirSpider=20=E6=94=AF=E6=8C=81itemz=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=85=A5=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- feapder/VERSION | 2 +- feapder/buffer/item_buffer.py | 11 ++++++-- feapder/core/parser_control.py | 7 ++++- feapder/core/spiders/air_spider.py | 16 ++++++++++- tests/air-spider/test_air_spider_item.py | 36 ++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 tests/air-spider/test_air_spider_item.py diff --git a/feapder/VERSION b/feapder/VERSION index 88c5fb89..f15838a7 100644 --- a/feapder/VERSION +++ b/feapder/VERSION @@ -1 +1 @@ -1.4.0 +1.4.1-beta1 diff --git a/feapder/buffer/item_buffer.py b/feapder/buffer/item_buffer.py index 78cc7ffd..a1e584fa 100644 --- a/feapder/buffer/item_buffer.py +++ b/feapder/buffer/item_buffer.py @@ -37,6 +37,7 @@ def __new__(cls, *args, **kwargs): class ItemBuffer(threading.Thread, Singleton): dedup = None + __redis_db = None def __init__(self, redis_key, task_table=None): if not hasattr(self, "_table_item"): @@ -48,7 +49,6 @@ def __init__(self, redis_key, task_table=None): self._task_table = task_table self._items_queue = Queue(maxsize=MAX_ITEM_COUNT) - self._db = RedisDB() self._table_item = setting.TAB_ITEM self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key) @@ -69,6 +69,13 @@ def __init__(self, redis_key, task_table=None): if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup: self.__class__.dedup = Dedup(to_md5=False) + @property + def redis_db(self): + if self.__class__.__redis_db is None: + self.__class__.__redis_db = RedisDB() + + return self.__class__.__redis_db + def load_pipelines(self): pipelines = [] for pipeline_path in setting.ITEM_PIPELINES: @@ -335,7 +342,7 @@ def __add_item_to_db( # 删除做过的request if requests: - self._db.zrem(self._table_request, requests) + self.redis_db.zrem(self._table_request, requests) # 去重入库 if export_success and setting.ITEM_FILTER_ENABLE: diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 658a54fe..c0ebb99d 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -13,6 +13,7 @@ import feapder.setting as setting import feapder.utils.tools as tools +from feapder.buffer.item_buffer import ItemBuffer from feapder.db.memory_db import MemoryDB from feapder.network.item import Item from feapder.network.request import Request @@ -399,12 +400,13 @@ class AirSpiderParserControl(PaserControl): _success_task_count = 0 _failed_task_count = 0 - def __init__(self, memory_db: MemoryDB): + def __init__(self, memory_db: MemoryDB, item_buffer: ItemBuffer): super(PaserControl, self).__init__() self._parsers = [] self._memory_db = memory_db self._thread_stop = False self._wait_task_time = 0 + self._item_buffer = item_buffer def run(self): while not self._thread_stop: @@ -500,6 +502,9 @@ def deal_requests(self, requests): # 将next_request 入库 self._memory_db.add(result) + elif isinstance(result, Item): + self._item_buffer.put_item(result) + except Exception as e: exception_type = ( str(type(e)).replace("", "") diff --git a/feapder/core/spiders/air_spider.py b/feapder/core/spiders/air_spider.py index b50bbee9..139291f7 100644 --- a/feapder/core/spiders/air_spider.py +++ b/feapder/core/spiders/air_spider.py @@ -12,6 +12,7 @@ import feapder.setting as setting import feapder.utils.tools as tools +from feapder.buffer.item_buffer import ItemBuffer from feapder.core.base_parser import BaseParser from feapder.core.parser_control import AirSpiderParserControl from feapder.db.memory_db import MemoryDB @@ -39,6 +40,7 @@ def __init__(self, thread_count=None): self._memory_db = MemoryDB() self._parser_controls = [] + self._item_buffer = ItemBuffer(redis_key="air_spider") def distribute_task(self): for request in self.start_requests(): @@ -59,17 +61,26 @@ def all_thread_is_done(self): if not self._memory_db.empty(): return False + # 检测 item_buffer 状态 + if ( + self._item_buffer.get_items_count() > 0 + or self._item_buffer.is_adding_to_db() + ): + return False + tools.delay_time(1) return True def run(self): for i in range(self._thread_count): - parser_control = AirSpiderParserControl(self._memory_db) + parser_control = AirSpiderParserControl(self._memory_db, self._item_buffer) parser_control.add_parser(self) parser_control.start() self._parser_controls.append(parser_control) + self._item_buffer.start() + self.distribute_task() while True: @@ -78,6 +89,9 @@ def run(self): for parser_control in self._parser_controls: parser_control.stop() + # 关闭item_buffer + self._item_buffer.stop() + # 关闭webdirver if Request.webdriver_pool: Request.webdriver_pool.close() diff --git a/tests/air-spider/test_air_spider_item.py b/tests/air-spider/test_air_spider_item.py new file mode 100644 index 00000000..fbdaabcb --- /dev/null +++ b/tests/air-spider/test_air_spider_item.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +""" +Created on 2021-03-30 10:27:21 +--------- +@summary: +--------- +@author: Boris +""" + +import feapder +from feapder import Item + + +class TestAirSpiderItem(feapder.AirSpider): + __custom_setting__ = dict( + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="feapder", + MYSQL_USER_NAME="feapder", + MYSQL_USER_PASS="feapder123", + ) + + def start_requests(self): + yield feapder.Request("https://www.baidu.com") + + def parse(self, request, response): + title = response.xpath("string(//title)").extract_first() + item = Item() + item.table_name = "spider_data" + item.url = request.url + item.title = title + yield item + + +if __name__ == "__main__": + TestAirSpiderItem().start()