Skip to content

Commit

Permalink
Adjust resource name selection
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 1, 2024
1 parent fb9ddc6 commit 5a881f1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
12 changes: 6 additions & 6 deletions sources/scraping/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,11 @@ class PipelineRunner(Runnable):
option to host pipeline in a thread and communicate via the queue.
"""

def __init__(
self,
pipeline: dlt.Pipeline,
queue: ScrapingQueue[T],
) -> None:
def __init__(self, pipeline: dlt.Pipeline, queue: ScrapingQueue[T]) -> None:
self.pipeline = pipeline
self.queue = queue

if pipeline.dataset_name:
if pipeline.dataset_name and not self.is_default_dataset_name(pipeline):
resource_name = pipeline.dataset_name
else:
resource_name = f"{pipeline.pipeline_name}_results"
Expand All @@ -118,6 +114,10 @@ def __init__(
name=resource_name,
)

def is_default_dataset_name(self, pipeline: dlt.Pipeline) -> bool:
default_name = pipeline.pipeline_name + pipeline.DEFAULT_DATASET_SUFFIX
return pipeline.dataset_name == default_name

def run(
self,
*args: P.args,
Expand Down
20 changes: 11 additions & 9 deletions tests/scraping/test_scraping_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from sources.scraping import run_pipeline
from sources.scraping.helpers import create_pipeline_runner

from sources.scraping.queue import ScrapingQueue
from sources.scraping.runner import PipelineRunner

from tests.utils import ALL_DESTINATIONS, load_table_counts

from .utils import (
Expand Down Expand Up @@ -73,24 +74,25 @@ def test_pipeline_runners_handle_extended_and_simple_use_cases(mocker):


def test_resource_name_assignment_and_generation():
queue = ScrapingQueue()
# If dataset_name is given to pipeline then we will have
# resource name same as dataset_name
pipeline1 = dlt.pipeline(
pipeline_name="pipeline_one",
destination="duckdb",
dataset_name="cookies",
)
scraping_host = create_pipeline_runner(pipeline1, MySpider)
scraping_host.pipeline_runner.scraping_resource.name == "cookies"
pipeline_runner = PipelineRunner(
pipeline=pipeline1,
queue=queue,
)
assert pipeline_runner.scraping_resource.name == "cookies"

# If datasert_name is not given to pipeline then we will have
# resource name is generate like pipeline.name + "_result" suffix
pipeline2 = dlt.pipeline(
pipeline_name="pipeline_one",
destination="duckdb",
)
scraping_host = create_pipeline_runner(pipeline2, MySpider)
scraping_host.pipeline_runner.scraping_resource.name == "pipeline_one_results"
pipeline2 = dlt.pipeline(pipeline_name="pipeline_two", destination="duckdb")
pipeline_runner2 = PipelineRunner(pipeline2, queue=queue)
assert pipeline_runner2.scraping_resource.name == "pipeline_two_results"


@pytest.mark.skip(
Expand Down

0 comments on commit 5a881f1

Please sign in to comment.