From 50e02797342ece1c514afa1dd5c38d31c4d641c2 Mon Sep 17 00:00:00 2001 From: Javier R Date: Wed, 30 Oct 2024 20:34:16 +0000 Subject: [PATCH 1/4] feat: add `npm` downloads dlt collector --- warehouse/oso_dagster/assets/npm.py | 134 ++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 warehouse/oso_dagster/assets/npm.py diff --git a/warehouse/oso_dagster/assets/npm.py b/warehouse/oso_dagster/assets/npm.py new file mode 100644 index 00000000..b36d8d82 --- /dev/null +++ b/warehouse/oso_dagster/assets/npm.py @@ -0,0 +1,134 @@ +from datetime import datetime, timedelta +from typing import Generator, List, Optional + +import dlt +import requests +from dagster import AssetExecutionContext +from pydantic import BaseModel + +from ..factories.dlt import pydantic_to_dlt_nullable_columns + +# Host for the NPM registry +NPM_HOST = "https://api.npmjs.org" + + +class NPMPackageInfo(BaseModel): + date: datetime + name: str + downloads: int + + +def get_npm_package_downloads( + package_name: str, date_from: datetime, date_to: datetime +) -> Generator[Optional[NPMPackageInfo], None, None]: + """ + Fetches the download count for an NPM package between two dates. + + Args: + package_name (str): The NPM package name + date_from (datetime): The start date + date_to (datetime): The end date + + Yields: + Optional[NPMPackageInfo]: The download count for the package + """ + + str_from = date_from.strftime("%Y-%m-%d") + str_to = date_to.strftime("%Y-%m-%d") + + endpoint = f"{NPM_HOST}/downloads/range/{str_from}:{str_to}/{package_name}" + response = requests.get(endpoint, timeout=10) + + if not response.ok: + if response.status_code == 400 and "end date > start date" in response.text: + yield None + + raise ValueError(f"Failed to fetch data for {package_name}: {response.text}") + + data = response.json() + + if data["package"] != package_name: + raise ValueError( + f"Unexpected package name: {data['package']} != {package_name}" + ) + + days_between = [ + date_from + timedelta(days=i) for i in range((date_to - date_from).days + 1) + ] + + for download in data["downloads"]: + date_day = datetime.strptime(download["day"], "%Y-%m-%d") + + if date_day not in days_between: + raise ValueError( + f"Unexpected date for {package_name}: {date_day} not in {days_between}" + ) + + days_between.remove(date_day) + + yield NPMPackageInfo( + date=date_day, + name=package_name, + downloads=download["downloads"], + ) + + if len(days_between) > 0: + raise ValueError( + f"Missing data for {package_name} between {date_from} and {date_to}: { + ", ".join(str(day) for day in days_between) + }" + ) + + +@dlt.resource( + columns=pydantic_to_dlt_nullable_columns(NPMPackageInfo), +) +def get_all_downloads( + context: AssetExecutionContext, + package_names: List[str], +): + """ + Fetches the download count for a list of NPM packages for the week + starting on the partition key date. + + Args: + context (AssetExecutionContext): The asset execution + package_names (List[str]): List of NPM package names to fetch + + Yields: + List[NPMPackageInfo]: The download count for each package + + Example: + ```python + # NPM was launched on January 12, 2010 + NPM_EPOCH = "2010-01-12T00:00:00Z" + + @dlt_factory( + key_prefix="npm", + partitions_def=WeeklyPartitionsDefinition( + start_date=NPM_EPOCH.split("T", maxsplit=1)[0], + end_offset=1, + ), + ) + def downloads( + context: AssetExecutionContext, + ): + yield get_all_downloads( + context, + package_names=[ + "@angular/core", + "react", + "vue", + # ... + ], + ) + ``` + """ + + start = datetime.strptime(context.partition_key, "%Y-%m-%d") + end = start + timedelta(weeks=1) + + yield from ( + get_npm_package_downloads(package_name, start, end) + for package_name in package_names + ) From d26424a88a45762d6bf25584c8193f0ea9a3433f Mon Sep 17 00:00:00 2001 From: Javier R Date: Tue, 12 Nov 2024 09:31:25 +0000 Subject: [PATCH 2/4] fix: use `__annotations__` from source function --- warehouse/oso_dagster/factories/dlt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/warehouse/oso_dagster/factories/dlt.py b/warehouse/oso_dagster/factories/dlt.py index a6a3bcaa..2f71e5ba 100644 --- a/warehouse/oso_dagster/factories/dlt.py +++ b/warehouse/oso_dagster/factories/dlt.py @@ -165,11 +165,11 @@ def _dlt_asset( source_args: Dict[str, Any] = extra_source_args source_args.update(resolved_secrets) - if "context" in source.__annotations__: + if "context" in f.__annotations__: source_args["context"] = context - if "dlt" in source.__annotations__: + if "dlt" in f.__annotations__: source_args["dlt"] = dlt - if "config" in source.__annotations__: + if "config" in f.__annotations__: source_args["config"] = config for resource in extra_resources: From 5fc8d13f4afeae23a1494c2d140725db51d72a5f Mon Sep 17 00:00:00 2001 From: Javier R Date: Tue, 12 Nov 2024 09:31:44 +0000 Subject: [PATCH 3/4] add: `npm` weekly downloads collector --- warehouse/oso_dagster/assets/npm.py | 80 +++++++++++++++++------------ 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/warehouse/oso_dagster/assets/npm.py b/warehouse/oso_dagster/assets/npm.py index b36d8d82..686c9f40 100644 --- a/warehouse/oso_dagster/assets/npm.py +++ b/warehouse/oso_dagster/assets/npm.py @@ -3,18 +3,22 @@ import dlt import requests -from dagster import AssetExecutionContext +from dagster import AssetExecutionContext, AssetKey, WeeklyPartitionsDefinition +from oso_dagster.cbt.cbt import CBTResource from pydantic import BaseModel -from ..factories.dlt import pydantic_to_dlt_nullable_columns +from ..factories.dlt import dlt_factory, pydantic_to_dlt_nullable_columns # Host for the NPM registry NPM_HOST = "https://api.npmjs.org" +# NPM was launched on January 12, 2010 +NPM_EPOCH = "2010-01-12T00:00:00Z" + class NPMPackageInfo(BaseModel): date: datetime - name: str + artifact_name: str downloads: int @@ -43,7 +47,10 @@ def get_npm_package_downloads( if response.status_code == 400 and "end date > start date" in response.text: yield None - raise ValueError(f"Failed to fetch data for {package_name}: {response.text}") + raise ValueError( + f"Failed to fetch data for { + package_name}: {response.text}" + ) data = response.json() @@ -61,14 +68,15 @@ def get_npm_package_downloads( if date_day not in days_between: raise ValueError( - f"Unexpected date for {package_name}: {date_day} not in {days_between}" + f"Unexpected date for {package_name}: {date_day.strftime('%Y-%m-%d')} not in " + f"{days_between[0].strftime('%Y-%m-%d')} => {days_between[-1].strftime('%Y-%m-%d')}" ) days_between.remove(date_day) yield NPMPackageInfo( date=date_day, - name=package_name, + artifact_name=package_name, downloads=download["downloads"], ) @@ -81,6 +89,8 @@ def get_npm_package_downloads( @dlt.resource( + primary_key="artifact_name", + name="downloads", columns=pydantic_to_dlt_nullable_columns(NPMPackageInfo), ) def get_all_downloads( @@ -97,32 +107,6 @@ def get_all_downloads( Yields: List[NPMPackageInfo]: The download count for each package - - Example: - ```python - # NPM was launched on January 12, 2010 - NPM_EPOCH = "2010-01-12T00:00:00Z" - - @dlt_factory( - key_prefix="npm", - partitions_def=WeeklyPartitionsDefinition( - start_date=NPM_EPOCH.split("T", maxsplit=1)[0], - end_offset=1, - ), - ) - def downloads( - context: AssetExecutionContext, - ): - yield get_all_downloads( - context, - package_names=[ - "@angular/core", - "react", - "vue", - # ... - ], - ) - ``` """ start = datetime.strptime(context.partition_key, "%Y-%m-%d") @@ -132,3 +116,35 @@ def downloads( get_npm_package_downloads(package_name, start, end) for package_name in package_names ) + + +@dlt_factory( + key_prefix="npm", + partitions_def=WeeklyPartitionsDefinition( + start_date=NPM_EPOCH.split("T", maxsplit=1)[0], + end_offset=1, + ), + deps=[AssetKey(["dbt", "production", "artifacts_v1"])], +) +def downloads( + context: AssetExecutionContext, + cbt: CBTResource, +): + unique_artifacts_query = """ + SELECT + DISTINCT(artifact_name) + FROM + `oso.artifacts_v1` + WHERE + artifact_source = "NPM" + """ + + client = cbt.get(context.log) + + yield get_all_downloads( + context, + package_names=[ + row["artifact_name"] + for row in client.query_with_string(unique_artifacts_query) + ], + ) From 3436ba59600d142ff8b003def2fec7840535bccd Mon Sep 17 00:00:00 2001 From: Javier R Date: Tue, 12 Nov 2024 09:32:14 +0000 Subject: [PATCH 4/4] add: `query_with_string` method to `cbt` --- warehouse/oso_dagster/cbt/cbt.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/warehouse/oso_dagster/cbt/cbt.py b/warehouse/oso_dagster/cbt/cbt.py index 3549a33c..274925e7 100644 --- a/warehouse/oso_dagster/cbt/cbt.py +++ b/warehouse/oso_dagster/cbt/cbt.py @@ -76,12 +76,15 @@ def add_search_paths(self, search_paths: List[str]): self.search_paths.append(p) self.load_env() - def query(self, model_file: str, timeout: float = 300, **vars): + def query_with_string(self, query_str: str, timeout: float = 300): with self.bigquery.get_client() as client: - rendered = self.render_model(model_file, **vars) - job = client.query(rendered) + job = client.query(query_str, timeout=timeout) return job.result() + def query(self, model_file: str, timeout: float = 300, **vars): + rendered = self.render_model(model_file, **vars) + return self.query_with_string(rendered, timeout) + # we should transition to this instead of using jinja def query_with_sqlglot( self,