Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrapy source using scrapy #332

Merged
merged 107 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
4d28bd9
Scrapy source using scrapy
Jan 25, 2024
ac63f7d
Add batching of results
Jan 26, 2024
59fd5f9
Add pytest-mock and scrapy
Jan 26, 2024
d80446c
Adjust tests
Jan 26, 2024
04d8841
Add pytest-twisted
Jan 26, 2024
2741da8
Add twisted to scrapy dependencies
Jan 26, 2024
a685dae
Add twisted to dev dependencies
Jan 26, 2024
78bcc6f
Add review comments
Jan 29, 2024
3208ea8
Add more checks and do not exit when queue is empty
Jan 30, 2024
167fffa
Create QueueClosedError and handle in listener to exit loop
Jan 30, 2024
93ed13a
Simplify code
Jan 30, 2024
2756610
Stop crawling if queue is closed
Jan 30, 2024
1d73c9b
Fix linting issues
Feb 1, 2024
61fd907
Fix linting issues
Feb 1, 2024
20e10a4
Adjust tests and disable telnet server for scrapy
Feb 1, 2024
b3bf863
Remove pytest-twisted
Feb 1, 2024
4812fcc
Refactor scrapy item pipeline
Feb 5, 2024
f79720f
Eliminate custom spider
Feb 5, 2024
4c727cd
Rename a function
Feb 5, 2024
8b8b417
Simplify code
Feb 5, 2024
a9193e8
Cleanup code
Feb 5, 2024
fc66d97
Update comment
Feb 5, 2024
a822218
Update comment
Feb 5, 2024
6d45468
Fix linting issues
Feb 5, 2024
7799bf1
Define abstract method
Feb 5, 2024
059837f
Update readme
Feb 5, 2024
ba52057
Add more tests
Feb 5, 2024
82963d0
Adjust tests
Feb 5, 2024
ba04471
Use pytest.mark.forked to run tests for ALL_DESTINATIONS
Feb 12, 2024
2f4a378
Add pytest-forked
Feb 12, 2024
07a140d
Update lockfile
Feb 12, 2024
c41fad2
Use scrapy signals
Feb 21, 2024
ade0069
Hide batching and retrieving logic inside queue
sultaniman Feb 22, 2024
7296324
Add more types
sultaniman Feb 22, 2024
7f44d14
Extend default scrapy settings
sultaniman Feb 22, 2024
7785e56
Extract pipeline and scrapy runners
sultaniman Feb 22, 2024
83ec743
Simplify helpers code
sultaniman Feb 22, 2024
aadd6e4
Cleanup code
sultaniman Feb 22, 2024
14a18a1
Add start_urls_file configuration option
sultaniman Feb 22, 2024
78f8777
Sync scrapy log level with dlt log level
sultaniman Feb 22, 2024
0e0d5ff
Expose simple scraping pipeline runner
sultaniman Feb 22, 2024
a2733bd
Adjust config file
sultaniman Feb 22, 2024
d8371de
Connect signals in ScrapyRunner.init
sultaniman Feb 22, 2024
47d24f5
Register source and do cleanups
sultaniman Feb 22, 2024
505acff
Better scrapy setting passing and minor cleanups
sultaniman Feb 23, 2024
de154bb
Remove reduntant code comments
sultaniman Feb 23, 2024
20358cd
Call engine_stopped callback in finally block
sultaniman Feb 23, 2024
f8fc527
Add more docstrings related to runners
sultaniman Feb 23, 2024
9024f6b
Adjust batch size
sultaniman Feb 23, 2024
9cf8ec1
Fix queue batching bugs
sultaniman Feb 23, 2024
7378e75
Pass crawler instance to item_scraped callback
sultaniman Feb 23, 2024
4f5bcb8
Add advanced example to pipeline code
sultaniman Feb 23, 2024
657714a
Access settings override for scrapy
sultaniman Feb 23, 2024
e030b51
Rewrite tests
sultaniman Feb 23, 2024
d38ef2a
Small readme update for bing wembaster
sultaniman Feb 26, 2024
e7ca332
Adjust queue read timeout
sultaniman Feb 27, 2024
a4d9290
Extract test utils for scraping source
sultaniman Feb 27, 2024
ee8f3cd
Add stream generator to queue to handle generator exit exception
sultaniman Feb 27, 2024
2e15f07
Extract singal registering and tearing down as context manager
sultaniman Feb 27, 2024
fc4a244
Adjust and cleanup example pipeline source file
sultaniman Feb 27, 2024
7334a6a
Cleanup scraping helpers
sultaniman Feb 27, 2024
398d732
Adjust tests for scraping pipeline
sultaniman Feb 27, 2024
f7347b1
Add callback access to scraping resource
sultaniman Feb 27, 2024
9139c00
Update readme
sultaniman Feb 27, 2024
f9affb2
Cleanup code
sultaniman Feb 27, 2024
e9a38e1
Import ParamSpec from typing extensions
sultaniman Feb 27, 2024
76ddfbc
Fix linting issues
sultaniman Feb 27, 2024
6ccb247
Fix linting issues
sultaniman Feb 27, 2024
1629a15
Set encoding when opening the file with urls
sultaniman Feb 27, 2024
f512b1d
Adjust typing for scraping testing utils
sultaniman Feb 27, 2024
36e55c5
Adjust typing for scraping testing utils
sultaniman Feb 27, 2024
dbd0d53
Use proper Union syntax
sultaniman Feb 27, 2024
c9da574
Adjust mock patch module path for scraping tests
sultaniman Feb 27, 2024
3cf9b23
Use latest dlt version
sultaniman Feb 27, 2024
e5d34c4
Adjust mock patch module path for scraping tests
sultaniman Feb 27, 2024
84767fb
Adjust tests and mark ones to skip
sultaniman Feb 28, 2024
6431b62
Cleanup tests and utils for scraping source
sultaniman Feb 28, 2024
9352f9d
Re-use spy on queue.close calls
sultaniman Feb 28, 2024
f43d602
Use append write_disposition by default for scraping source
sultaniman Feb 28, 2024
7eba014
Update test skip reason
sultaniman Feb 28, 2024
b5f0f06
Stop crawler manually
sultaniman Mar 1, 2024
2ef1350
Return self from __call__
sultaniman Mar 1, 2024
2c4fd60
Check if crawler.stop is actually called
sultaniman Mar 1, 2024
3ae0f1e
Check if crawling has already been stopping
sultaniman Mar 1, 2024
fb9ddc6
Test to verify resource name generation and override
sultaniman Mar 1, 2024
5a881f1
Adjust resource name selection
sultaniman Mar 1, 2024
9376720
Add more docstrings and update readme
sultaniman Mar 1, 2024
b03d564
Update readme
sultaniman Mar 1, 2024
c03e6ab
Add scrapy configuration in example pipeline
sultaniman Mar 1, 2024
2b4d64a
Adjust tests
sultaniman Mar 1, 2024
cad5924
Shutdown twisted reactor after module tests
sultaniman Mar 4, 2024
b95c9fe
Merge branch 'master' into source/scaping
sultaniman Mar 4, 2024
cb9a8a8
Update lockfile
sultaniman Mar 4, 2024
49847a8
Fix linting issues
sultaniman Mar 4, 2024
65a5019
Use simple run_pipeline
sultaniman Mar 4, 2024
7bfdc80
Close the queue after timeout
sultaniman Mar 4, 2024
d490620
Rewrite a comment and use break instead of return in while loop
sultaniman Mar 4, 2024
8667536
Update comments
sultaniman Mar 4, 2024
7cadb93
Mock queue with alternative implementation
sultaniman Mar 5, 2024
02e467b
Add docstrings and fix a bug in test queue
sultaniman Mar 5, 2024
ded3949
Format code
sultaniman Mar 5, 2024
56e8a54
Debug test queue
sultaniman Mar 5, 2024
2e587b9
Adjust mock patch path
sultaniman Mar 5, 2024
dd9d3f5
Add logging when scrapy stops and re-arrange code actions
sultaniman Mar 5, 2024
ed33d15
Stop crawler in on_engine_stopped
sultaniman Mar 5, 2024
9ef03bf
Call on_engine_stopped from on_item_scraped if the queue is closed
sultaniman Mar 5, 2024
eb8278c
Skip test
sultaniman Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7,594 changes: 3,992 additions & 3,602 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ black = "^23.3.0"
pypdf2 = "^3.0.1"
greenlet = "<3.0.0"
confluent-kafka = "^2.3.0"
pytest-mock = "^3.12.0"
twisted = "22.10.0"
pytest-forked = "^1.6.0"

[tool.poetry.group.sql_database.dependencies]
sqlalchemy = ">=1.4"
Expand Down Expand Up @@ -80,6 +83,11 @@ pyairtable = "^2.1.0.post1"
[tool.poetry.group.filesystem.dependencies]
adlfs = ">=2023.9.0"


[tool.poetry.group.scrapy.dependencies]
scrapy = "^2.11.0"
twisted = "22.10.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Expand Down
9 changes: 9 additions & 0 deletions sources/.dlt/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ access_token_expires_at=1688821881

[sources.workable]
subdomain="dlthub-test"

[sources.scraping]
batch_size = 100
queue_size = 3000
queue_result_timeout = 3.0
start_urls_file="/path/to/urls.txt"
start_urls = [
"https://quotes.toscrape.com/page/1/"
]
2 changes: 1 addition & 1 deletion sources/bing_webmaster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ api_key = "Please set me up!" # please set me up!
3. Now the pipeline can be run by using the command:

```bash
python3 bing_webmaster_pipeline.py
python bing_webmaster_pipeline.py
```

3. To make sure that everything is loaded as expected, use the command:
Expand Down
2 changes: 1 addition & 1 deletion sources/google_sheets/helpers/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def serial_date_to_datetime(
)
# int values are dates, float values are datetimes
if data_type == "date":
return conv_datetime.date() # type: ignore
return conv_datetime.date()

return conv_datetime

Expand Down
6 changes: 3 additions & 3 deletions sources/kinesis/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dlt
from dlt.common import pendulum
from dlt.common.typing import DictStrStr, StrStr
from dlt.common.typing import DictStrAny, StrAny, StrStr


def get_shard_iterator(
Expand All @@ -11,7 +11,7 @@ def get_shard_iterator(
shard_id: str,
last_msg: dlt.sources.incremental[StrStr],
initial_at_timestamp: pendulum.DateTime,
) -> Tuple[str, StrStr]:
) -> Tuple[str, StrAny]:
"""Gets shard `shard_id` of `stream_name` iterator. If `last_msg` incremental is present it may
contain last message sequence for shard_id. in that case AFTER_SEQUENCE_NUMBER is created.
If no message sequence is present, `initial_at_timestamp` is used for AT_TIMESTAMP or LATEST.
Expand All @@ -20,7 +20,7 @@ def get_shard_iterator(
sequence_state = (
{} if last_msg is None else last_msg.last_value or last_msg.initial_value or {}
)
iterator_params: DictStrStr
iterator_params: DictStrAny
msg_sequence = sequence_state.get(shard_id, None)
if msg_sequence:
iterator_params = dict(
Expand Down
112 changes: 112 additions & 0 deletions sources/scraping/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
---
title: Scraping with DLT
description: dlt source to scrape web content
keywords: [scrapy, scraping, spiders, crawler, crawling]
---

# Scraping

Scraping source allows you to scrape content from web and uses [Scrapy](https://doc.scrapy.org/en/latest/)
to enable this capability.

It is possible to access and manipulate a scraping resource via (please see `scraping_pipeline.py`)

1. `on_before_start` callback which will receive a `DltResource` as the only argument,
2. The advanced scraping pipeline builder `scraping.helpers.create_pipeline_runner`

## Initialize the pipeline

```bash
dlt init scraping duckdb
```

## 🎲 Configuration

It is possible to provide configuration via `.dlt/config.toml` below you can see an example

```toml
[sources.scraping]
# Batch size - how many scraped results to collect
# before dispatching to DLT pipeline
batch_size = 100
# Defaul queue size
queue_size = 3000
# How log to wait before exiting
queue_result_timeout = 3.0
start_urls = [
"https://quotes.toscrape.com/page/1/"
]
start_urls_file="/path/to/urls.txt"
```

When both `start_urls` and `start_urls_file` they will be merged and deduplicated so Scrapy
gets a unique set of `start_urls`.

## 🏎️ Running the pipeline

Install requirements and run the pipeline

```sh
pip install -r requirements.txt
python scraping_pipeline.py
```

## Implementing a spider

It is your responsibility to implement the spider and data extraction logic from the responses
because our runner expects spider class, please see as a reference an example of spider in `scraping_pipeline.py`.
For more information about spider implementation please also see [Scrapy docs](https://docs.scrapy.org/en/latest/topics/spiders.html).

## Configuring Scrapy

You can pass scrapy settings via

1. `run_pipeline(..., scrapy_settings={...})`,
2. `create_pipeline_runner(..., scrapy_settings={...})`,
3. Overriding defaults in `settings.py`.

Example:
```py
run_pipeline(
pipeline,
MySpider,
scrapy_settings={
# How many sub pages to scrape
# https://docs.scrapy.org/en/latest/topics/settings.html#depth-limit
"DEPTH_LIMIT": 0,
"SPIDER_MIDDLEWARES": {
"scrapy.spidermiddlewares.depth.DepthMiddleware": 200,
"scrapy.spidermiddlewares.httperror.HttpErrorMiddleware": 300,
},
"HTTPERROR_ALLOW_ALL": True,
},
)
```

Note: this is just a shallow merge.
Also log level is automatically set in sync with the one
dlt provides so providing it via `scrapy_settings` as `"LOG_LEVEL": "DEBUG"` will not work,
please see [logging documentation](https://dlthub.com/docs/running-in-production/running#set-the-log-level-and-format) for dlt.

## 🧐 Introspection using streamlit

NOTE: you might need to set up `streamlit`, `pip install streamlit`

```sh
dlt pipeline <pipeline_name> show
```

## 🧠 How it works?

Under the hood we run DLT [pipeline](https://dlthub.com/docs/api_reference/pipeline) in a separate thread while scrapy is running in the main thread.

Communication between the two is done via the queue, where

- Spider is responsible to put the results in the queue,
- DLT resource collects and batches results from the queue.

![simple diagram](./diagram.png)

<p align="center"><strong>Enjoy it!<strong></p>
<hr>
<p align="center">✨ 🚀 ✨</p>
74 changes: 74 additions & 0 deletions sources/scraping/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Scraping source

Integrates Dlt and Scrapy to facilitate scraping pipelines.
"""
import inspect
import typing as t

import dlt

from dlt.sources import DltResource
from dlt.common.source import _SOURCES, SourceInfo

from scrapy import Spider # type: ignore

from .helpers import ScrapingConfig, create_pipeline_runner
from .types import P, AnyDict


def run_pipeline( # type: ignore[valid-type]
pipeline: dlt.Pipeline,
spider: t.Type[Spider],
*args: P.args,
on_before_start: t.Callable[[DltResource], None] = None,
scrapy_settings: t.Optional[AnyDict] = None,
batch_size: t.Optional[int] = None,
queue_size: t.Optional[int] = None,
queue_result_timeout: t.Optional[float] = None,
**kwargs: P.kwargs,
) -> None:
"""Simple runner for the scraping pipeline

You can pass all parameters via kwargs to `dlt.pipeline.run(....)`

```
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDisposition = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None
```
"""
options: AnyDict = {}
if scrapy_settings:
options["scrapy_settings"] = scrapy_settings

if batch_size:
options["batch_size"] = batch_size

if queue_size:
options["queue_size"] = queue_size

if queue_result_timeout:
options["queue_result_timeout"] = queue_result_timeout

scraping_host = create_pipeline_runner(pipeline, spider, **options)

if on_before_start:
on_before_start(scraping_host.pipeline_runner.scraping_resource)

scraping_host.run(*args, **kwargs)


# This way we allow dlt init to detect scraping source it is indeed hacky
# and the core team is working to provide a better alternative.
_SOURCES[run_pipeline.__qualname__] = SourceInfo(
ScrapingConfig,
run_pipeline,
inspect.getmodule(run_pipeline),
)
Binary file added sources/scraping/diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
99 changes: 99 additions & 0 deletions sources/scraping/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
import typing as t

import dlt
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs.base_configuration import (
configspec,
BaseConfiguration,
)

from scrapy import Spider # type: ignore

from .queue import ScrapingQueue
from .settings import SOURCE_SCRAPY_QUEUE_SIZE, SOURCE_SCRAPY_SETTINGS
from .runner import ScrapingHost, PipelineRunner, ScrapyRunner, Signals
from .types import AnyDict


@configspec
class ScrapingConfig(BaseConfiguration):
# Batch size for scraped items
batch_size: int = 100

# maxsize for queue
queue_size: t.Optional[int] = SOURCE_SCRAPY_QUEUE_SIZE

# result wait timeout for our queue
queue_result_timeout: t.Optional[float] = 1.0

# List of start urls
start_urls: t.List[str] = None
start_urls_file: str = None


@with_config(sections=("sources", "scraping"), spec=ScrapingConfig)
def resolve_start_urls(
start_urls: t.Optional[t.List[str]] = dlt.config.value,
start_urls_file: t.Optional[str] = dlt.config.value,
) -> t.List[str]:
"""Merges start urls
If both `start_urls` and `start_urls_file` given, we will merge them
and return deduplicated list of `start_urls` for scrapy spider.
"""
urls = set()
if os.path.exists(start_urls_file):
with open(start_urls_file, encoding="utf-8") as fp:
urls = {line for line in fp.readlines() if str(line).strip()}

if start_urls:
for url in start_urls:
urls.add(url)

return list(set(urls))


@with_config(sections=("sources", "scraping"), spec=ScrapingConfig)
def create_pipeline_runner(
sultaniman marked this conversation as resolved.
Show resolved Hide resolved
pipeline: dlt.Pipeline,
spider: t.Type[Spider],
batch_size: int = dlt.config.value,
queue_size: int = dlt.config.value,
queue_result_timeout: float = dlt.config.value,
scrapy_settings: t.Optional[AnyDict] = None,
) -> ScrapingHost:
queue = ScrapingQueue( # type: ignore
maxsize=queue_size,
batch_size=batch_size,
read_timeout=queue_result_timeout,
)

signals = Signals(
pipeline_name=pipeline.pipeline_name,
queue=queue,
)

# Just to simple merge
settings = {**SOURCE_SCRAPY_SETTINGS}
if scrapy_settings:
settings = {**scrapy_settings}

scrapy_runner = ScrapyRunner(
spider=spider,
start_urls=resolve_start_urls(),
signals=signals,
settings=settings,
)

pipeline_runner = PipelineRunner(
pipeline=pipeline,
queue=queue,
)

scraping_host = ScrapingHost(
queue,
scrapy_runner,
pipeline_runner,
)

return scraping_host
Loading
Loading