Skip to content

Commit

Permalink
feat(bigquery): excluding projects without any datasets from ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
upendra-vedullapalli committed Aug 1, 2023
1 parent 0593e23 commit 5d2794a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ def _should_ingest_lineage(self) -> bool:

def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
logger.info("Getting projects")
projects = self.get_projects(conn)
return self.filter_projects(conn, projects)

def get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
Expand All @@ -574,6 +578,32 @@ def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
else:
return list(self._get_project_list(conn))

def filter_projects(
self, conn: bigquery.Client, projects: List[BigqueryProject]
) -> List[BigqueryProject]:
filtered_projects = []
for project in projects:
project.datasets = self.get_datasets_for_project_id(
conn=conn, project_id=project.id
)
if len(project.datasets) == 0:
more_info = (
"Either there are no datasets in this project or missing bigquery.datasets.get permission. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
if self.config.exclude_empty_projects:
self.report.report_dropped(project.id)
warning_message = f"Excluded project '{project.id}' since no datasets found. {more_info}"
else:
filtered_projects.append(project)
warning_message = (
f"No datasets found in project '{project.id}'. {more_info}"
)
logger.warning(warning_message)
else:
filtered_projects.append(project)
return filtered_projects

def _get_project_list(self, conn: bigquery.Client) -> Iterable[BigqueryProject]:
try:
projects = BigQueryDataDictionary.get_projects(conn)
Expand Down Expand Up @@ -606,27 +636,6 @@ def _process_project(

yield from self.gen_project_id_containers(project_id)

try:
bigquery_project.datasets = (
BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id)
)
except Exception as e:
error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
if self.config.profiling.enabled:
error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}"
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id} - {error_message}",
)
return None

if len(bigquery_project.datasets) == 0:
logger.warning(
f"No dataset found in {project_id}. Either there are no datasets in this project or missing bigquery.datasets.get permission. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
return

self.report.num_project_datasets_to_scan[project_id] = len(
bigquery_project.datasets
)
Expand Down Expand Up @@ -667,6 +676,25 @@ def _process_project(
tables=db_tables,
)

def get_datasets_for_project_id(
self, conn: bigquery.Client, project_id: str
) -> List[BigqueryDataset]:
try:
return BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id)
except Exception as e:
error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
if self.config.profiling.enabled:
error_message = (
f"Unable to get datasets for project {project_id}, skipping. "
f"Does your service account has bigquery.datasets.get permission? The error was: {e}"
)
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id} - {error_message}",
)
return []

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
lineage = self.lineage_extractor.calculate_lineage_for_project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="Maximum number of entries for the in-memory caches of FileBacked data structures.",
)

exclude_empty_projects: bool = Field(
default=False,
description="Option to exclude empty projects from being ingested.",
)

def __init__(self, **data: Any):
super().__init__(**data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class BigQueryV2Report(ProfilingSqlReport):
usage_state_size: Optional[str] = None
ingestion_stage: Optional[str] = None
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)
exclude_empty_projects: Optional[bool] = None

_timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
Expand Down
45 changes: 45 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigQueryDataDictionary,
BigqueryDataset,
BigqueryProject,
BigqueryView,
)
Expand Down Expand Up @@ -797,3 +798,47 @@ def test_get_table_name(full_table_name: str, datahub_full_table_name: str) -> N
BigqueryTableIdentifier.from_string_name(full_table_name).get_table_name()
== datahub_full_table_name
)


def test_default_config_for_excluding_projects_and_datasets():
config = BigQueryV2Config.parse_obj({})
assert config.exclude_empty_projects is False
config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True})
assert config.exclude_empty_projects


@patch(
"datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_datasets_for_project_id"
)
@patch("datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_projects")
@patch("google.cloud.bigquery.client.Client")
def test_excluding_empty_projects_from_ingestion(
client_mock, get_projects_mock, get_datasets_for_project_id_mock
):
project_id_with_datasets = "project-id-with-datasets"
project_id_without_datasets = "project-id-without-datasets"
get_projects_mock.return_value = [
BigqueryProject(project_id_with_datasets, project_id_with_datasets),
BigqueryProject(project_id_without_datasets, project_id_without_datasets),
]

def get_datasets_for_project_id_side_effect(*args, **kwargs):
return (
[]
if kwargs["project_id"] == project_id_without_datasets
else [BigqueryDataset("some-dataset")]
)

get_datasets_for_project_id_mock.side_effect = (
get_datasets_for_project_id_side_effect
)

config = BigQueryV2Config.parse_obj({})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-1"))
filtered_projects = source._get_projects(client_mock)
assert len(list(filtered_projects)) == 2

config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2"))
filtered_projects = source._get_projects(client_mock)
assert len(list(filtered_projects)) == 1

0 comments on commit 5d2794a

Please sign in to comment.