diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index ccda00ba293ef..39a9cd4f29628 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -565,6 +565,10 @@ def _should_ingest_lineage(self) -> bool: def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: logger.info("Getting projects") + projects = self.get_projects(conn) + return self.filter_projects(conn, projects) + + def get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: if self.config.project_ids or self.config.project_id: project_ids = self.config.project_ids or [self.config.project_id] # type: ignore return [ @@ -574,6 +578,32 @@ def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: else: return list(self._get_project_list(conn)) + def filter_projects( + self, conn: bigquery.Client, projects: List[BigqueryProject] + ) -> List[BigqueryProject]: + filtered_projects = [] + for project in projects: + project.datasets = self.get_datasets_for_project_id( + conn=conn, project_id=project.id + ) + if len(project.datasets) == 0: + more_info = ( + "Either there are no datasets in this project or missing bigquery.datasets.get permission. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account." + ) + if self.config.exclude_empty_projects: + self.report.report_dropped(project.id) + warning_message = f"Excluded project '{project.id}' since no datasets found. {more_info}" + else: + filtered_projects.append(project) + warning_message = ( + f"No datasets found in project '{project.id}'. {more_info}" + ) + logger.warning(warning_message) + else: + filtered_projects.append(project) + return filtered_projects + def _get_project_list(self, conn: bigquery.Client) -> Iterable[BigqueryProject]: try: projects = BigQueryDataDictionary.get_projects(conn) @@ -606,27 +636,6 @@ def _process_project( yield from self.gen_project_id_containers(project_id) - try: - bigquery_project.datasets = ( - BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id) - ) - except Exception as e: - error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}" - if self.config.profiling.enabled: - error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}" - logger.error(error_message) - self.report.report_failure( - "metadata-extraction", - f"{project_id} - {error_message}", - ) - return None - - if len(bigquery_project.datasets) == 0: - logger.warning( - f"No dataset found in {project_id}. Either there are no datasets in this project or missing bigquery.datasets.get permission. You can assign predefined roles/bigquery.metadataViewer role to your service account." - ) - return - self.report.num_project_datasets_to_scan[project_id] = len( bigquery_project.datasets ) @@ -667,6 +676,25 @@ def _process_project( tables=db_tables, ) + def get_datasets_for_project_id( + self, conn: bigquery.Client, project_id: str + ) -> List[BigqueryDataset]: + try: + return BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id) + except Exception as e: + error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}" + if self.config.profiling.enabled: + error_message = ( + f"Unable to get datasets for project {project_id}, skipping. " + f"Does your service account has bigquery.datasets.get permission? The error was: {e}" + ) + logger.error(error_message) + self.report.report_failure( + "metadata-extraction", + f"{project_id} - {error_message}", + ) + return [] + def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") lineage = self.lineage_extractor.calculate_lineage_for_project( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 7287dc1b67d73..49fce487d1522 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -223,6 +223,11 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="Maximum number of entries for the in-memory caches of FileBacked data structures.", ) + exclude_empty_projects: bool = Field( + default=False, + description="Option to exclude empty projects from being ingested.", + ) + def __init__(self, **data: Any): super().__init__(**data) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index b57e691411f75..3a72309834b8c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -90,6 +90,7 @@ class BigQueryV2Report(ProfilingSqlReport): usage_state_size: Optional[str] = None ingestion_stage: Optional[str] = None ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) + exclude_empty_projects: Optional[bool] = None _timer: Optional[PerfTimer] = field( default=None, init=False, repr=False, compare=False diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index fc8ca166b105a..39897baa54f3f 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -20,6 +20,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigQueryDataDictionary, + BigqueryDataset, BigqueryProject, BigqueryView, ) @@ -797,3 +798,47 @@ def test_get_table_name(full_table_name: str, datahub_full_table_name: str) -> N BigqueryTableIdentifier.from_string_name(full_table_name).get_table_name() == datahub_full_table_name ) + + +def test_default_config_for_excluding_projects_and_datasets(): + config = BigQueryV2Config.parse_obj({}) + assert config.exclude_empty_projects is False + config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True}) + assert config.exclude_empty_projects + + +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_datasets_for_project_id" +) +@patch("datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_projects") +@patch("google.cloud.bigquery.client.Client") +def test_excluding_empty_projects_from_ingestion( + client_mock, get_projects_mock, get_datasets_for_project_id_mock +): + project_id_with_datasets = "project-id-with-datasets" + project_id_without_datasets = "project-id-without-datasets" + get_projects_mock.return_value = [ + BigqueryProject(project_id_with_datasets, project_id_with_datasets), + BigqueryProject(project_id_without_datasets, project_id_without_datasets), + ] + + def get_datasets_for_project_id_side_effect(*args, **kwargs): + return ( + [] + if kwargs["project_id"] == project_id_without_datasets + else [BigqueryDataset("some-dataset")] + ) + + get_datasets_for_project_id_mock.side_effect = ( + get_datasets_for_project_id_side_effect + ) + + config = BigQueryV2Config.parse_obj({}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-1")) + filtered_projects = source._get_projects(client_mock) + assert len(list(filtered_projects)) == 2 + + config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2")) + filtered_projects = source._get_projects(client_mock) + assert len(list(filtered_projects)) == 1