Skip to content

Commit

Permalink
AirSpider 支持itemz自动入库
Browse files Browse the repository at this point in the history
  • Loading branch information
Boris committed Mar 30, 2021
1 parent d04f071 commit a410275
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
2 changes: 1 addition & 1 deletion feapder/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.0
1.4.1-beta1
11 changes: 9 additions & 2 deletions feapder/buffer/item_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __new__(cls, *args, **kwargs):

class ItemBuffer(threading.Thread, Singleton):
dedup = None
__redis_db = None

def __init__(self, redis_key, task_table=None):
if not hasattr(self, "_table_item"):
Expand All @@ -48,7 +49,6 @@ def __init__(self, redis_key, task_table=None):
self._task_table = task_table

self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
self._db = RedisDB()

self._table_item = setting.TAB_ITEM
self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key)
Expand All @@ -69,6 +69,13 @@ def __init__(self, redis_key, task_table=None):
if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
self.__class__.dedup = Dedup(to_md5=False)

@property
def redis_db(self):
if self.__class__.__redis_db is None:
self.__class__.__redis_db = RedisDB()

return self.__class__.__redis_db

def load_pipelines(self):
pipelines = []
for pipeline_path in setting.ITEM_PIPELINES:
Expand Down Expand Up @@ -335,7 +342,7 @@ def __add_item_to_db(

# 删除做过的request
if requests:
self._db.zrem(self._table_request, requests)
self.redis_db.zrem(self._table_request, requests)

# 去重入库
if export_success and setting.ITEM_FILTER_ENABLE:
Expand Down
7 changes: 6 additions & 1 deletion feapder/core/parser_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.buffer.item_buffer import ItemBuffer
from feapder.db.memory_db import MemoryDB
from feapder.network.item import Item
from feapder.network.request import Request
Expand Down Expand Up @@ -399,12 +400,13 @@ class AirSpiderParserControl(PaserControl):
_success_task_count = 0
_failed_task_count = 0

def __init__(self, memory_db: MemoryDB):
def __init__(self, memory_db: MemoryDB, item_buffer: ItemBuffer):
super(PaserControl, self).__init__()
self._parsers = []
self._memory_db = memory_db
self._thread_stop = False
self._wait_task_time = 0
self._item_buffer = item_buffer

def run(self):
while not self._thread_stop:
Expand Down Expand Up @@ -500,6 +502,9 @@ def deal_requests(self, requests):
# 将next_request 入库
self._memory_db.add(result)

elif isinstance(result, Item):
self._item_buffer.put_item(result)

except Exception as e:
exception_type = (
str(type(e)).replace("<class '", "").replace("'>", "")
Expand Down
16 changes: 15 additions & 1 deletion feapder/core/spiders/air_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.buffer.item_buffer import ItemBuffer
from feapder.core.base_parser import BaseParser
from feapder.core.parser_control import AirSpiderParserControl
from feapder.db.memory_db import MemoryDB
Expand Down Expand Up @@ -39,6 +40,7 @@ def __init__(self, thread_count=None):

self._memory_db = MemoryDB()
self._parser_controls = []
self._item_buffer = ItemBuffer(redis_key="air_spider")

def distribute_task(self):
for request in self.start_requests():
Expand All @@ -59,17 +61,26 @@ def all_thread_is_done(self):
if not self._memory_db.empty():
return False

# 检测 item_buffer 状态
if (
self._item_buffer.get_items_count() > 0
or self._item_buffer.is_adding_to_db()
):
return False

tools.delay_time(1)

return True

def run(self):
for i in range(self._thread_count):
parser_control = AirSpiderParserControl(self._memory_db)
parser_control = AirSpiderParserControl(self._memory_db, self._item_buffer)
parser_control.add_parser(self)
parser_control.start()
self._parser_controls.append(parser_control)

self._item_buffer.start()

self.distribute_task()

while True:
Expand All @@ -78,6 +89,9 @@ def run(self):
for parser_control in self._parser_controls:
parser_control.stop()

# 关闭item_buffer
self._item_buffer.stop()

# 关闭webdirver
if Request.webdriver_pool:
Request.webdriver_pool.close()
Expand Down
36 changes: 36 additions & 0 deletions tests/air-spider/test_air_spider_item.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
Created on 2021-03-30 10:27:21
---------
@summary:
---------
@author: Boris
"""

import feapder
from feapder import Item


class TestAirSpiderItem(feapder.AirSpider):
__custom_setting__ = dict(
MYSQL_IP="localhost",
MYSQL_PORT=3306,
MYSQL_DB="feapder",
MYSQL_USER_NAME="feapder",
MYSQL_USER_PASS="feapder123",
)

def start_requests(self):
yield feapder.Request("https://www.baidu.com")

def parse(self, request, response):
title = response.xpath("string(//title)").extract_first()
item = Item()
item.table_name = "spider_data"
item.url = request.url
item.title = title
yield item


if __name__ == "__main__":
TestAirSpiderItem().start()

0 comments on commit a410275

Please sign in to comment.