Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): allow lower freq profiling based on date of month/day of week #8489

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down Expand Up @@ -161,6 +162,11 @@ class GlueSourceConfig(
default=None, description=""
)

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
self.profiling.operation_config
)

@property
def glue_client(self):
return self.get_glue_client()
Expand Down Expand Up @@ -813,7 +819,9 @@ def _create_profile_mcp(
def get_profile_if_enabled(
self, mce: MetadataChangeEventClass, database_name: str, table_name: str
) -> Iterable[MetadataWorkUnit]:
if self.source_config.profiling:
# We don't need both checks only the second one
# but then lint believes that GlueProfilingConfig can be None
if self.source_config.profiling and self.source_config.is_profiling_enabled():
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def metadata_read_capability_test(
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=config.profiling.enabled,
with_data_read_permission=config.is_profiling_enabled(),
)
if len(list(tables)) == 0:
return CapabilityReport(
Expand Down Expand Up @@ -612,7 +612,7 @@ def _process_project(
)
except Exception as e:
error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}"
logger.error(error_message)
self.report.report_failure(
Expand Down Expand Up @@ -647,7 +647,7 @@ def _process_project(

except Exception as e:
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}"
if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission, bigquery.tables.getData permission? The error was: {e}"

trace = traceback.format_exc()
Expand All @@ -659,7 +659,7 @@ def _process_project(
)
continue

if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
logger.info(f"Starting profiling project {project_id}")
self.report.set_ingestion_stage(project_id, "Profiling")
yield from self.profiler.get_workunits(
Expand Down Expand Up @@ -793,7 +793,7 @@ def _process_schema(
if self.config.include_views:
db_views[dataset_name] = list(
BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled
conn, project_id, dataset_name, self.config.is_profiling_enabled()
)
)

Expand Down Expand Up @@ -841,7 +841,7 @@ def _process_table(

# We only collect profile ignore list if profiling is enabled and profile_table_level_only is false
if (
self.config.profiling.enabled
self.config.is_profiling_enabled()
and not self.config.profiling.profile_table_level_only
):
table.columns_ignore_from_profiling = self.generate_profile_ignore_list(
Expand Down Expand Up @@ -1218,7 +1218,7 @@ def get_tables_for_dataset(
# https://cloud.google.com/bigquery/docs/information-schema-partitions
max_batch_size: int = (
self.config.number_of_datasets_process_in_batch
if not self.config.profiling.enabled
if not self.config.is_profiling_enabled()
else self.config.number_of_datasets_process_in_batch_if_profiling_enabled
)

Expand All @@ -1235,7 +1235,7 @@ def get_tables_for_dataset(
project_id,
dataset_name,
items_to_get,
with_data_read_permission=self.config.profiling.enabled,
with_data_read_permission=self.config.is_profiling_enabled(),
)
items_to_get.clear()

Expand All @@ -1245,7 +1245,7 @@ def get_tables_for_dataset(
project_id,
dataset_name,
items_to_get,
with_data_read_permission=self.config.profiling.enabled,
with_data_read_permission=self.config.is_profiling_enabled(),
)

self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source_config.operation_config import (
OperationConfig,
is_profiling_enabled,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
Expand Down Expand Up @@ -199,6 +203,10 @@ class ElasticProfiling(ConfigModel):
default=False,
description="Whether to enable profiling for the elastic search source.",
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)


class CollapseUrns(ConfigModel):
Expand Down Expand Up @@ -296,6 +304,11 @@ class ElasticsearchSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
default_factory=CollapseUrns,
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

@validator("host")
def host_colon_port_comma(cls, host_val: str) -> str:
for entry in host_val.split(","):
Expand Down Expand Up @@ -511,7 +524,7 @@ def _extract_mcps(
),
)

if self.source_config.profiling.enabled:
if self.source_config.is_profiling_enabled():
if self.cat_response is None:
self.cat_response = self.client.cat.indices(
params={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.ingestion.source_config.operation_config import OperationConfig

_PROFILING_FLAGS_TO_REPORT = {
"turn_off_expensive_profiling_metrics",
Expand All @@ -22,6 +23,10 @@ class GEProfilingConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
limit: Optional[int] = Field(
default=None,
description="Max number of documents to profile. By default, profiles all documents.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.ingestion.source_config.operation_config import OperationConfig


class GlueProfilingConfig(ConfigModel):
Expand Down Expand Up @@ -54,3 +55,8 @@ class GlueProfilingConfig(ConfigModel):
default=AllowDenyPattern.allow_all(),
description="""Regex patterns for filtering partitions for profile. The pattern should be a string like: "{'key':'value'}".""",
)

operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _create_iceberg_workunit(
if dpi_aspect:
yield dpi_aspect

if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
profiler = IcebergProfiler(self.report, self.config.profiling)
yield from profiler.profile_table(dataset_name, dataset_urn, table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.operation_config import (
OperationConfig,
is_profiling_enabled,
)


class IcebergProfilingConfig(ConfigModel):
Expand All @@ -41,6 +45,10 @@ class IcebergProfilingConfig(ConfigModel):
default=True,
description="Whether to profile for the max value of numeric columns.",
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
# Stats we cannot compute without looking at data
# include_field_mean_value: bool = True
# include_field_median_value: bool = True
Expand Down Expand Up @@ -82,6 +90,11 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
)
profiling: IcebergProfilingConfig = IcebergProfilingConfig()

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

@root_validator()
def _ensure_one_filesystem_is_configured(
cls: "IcebergSourceConfig", values: Dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_workunits(
) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
self.config.options.setdefault(
"max_overflow", self.config.profiling.max_workers
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
connection=connection, all_tables=all_tables, database=database
)

if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
profiler = RedshiftProfiler(
config=self.config,
report=self.report,
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled

# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
Expand Down Expand Up @@ -84,6 +85,11 @@ class DataLakeSourceConfig(
"path_spec", "path_specs", lambda path_spec: [path_spec]
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

@pydantic.validator("path_specs", always=True)
def check_path_specs_and_infer_platform(
cls, path_specs: List[PathSpec], values: Dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source_config.operation_config import OperationConfig


class DataLakeProfilerConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)

# These settings will override the ones below.
profile_table_level_only: bool = Field(
Expand Down
11 changes: 7 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,17 @@ def __init__(self, config: DataLakeSourceConfig, ctx: PipelineContext):
config_option: config.dict().get(config_option)
for config_option in config_options_to_report
}
config_report = {**config_report, "profiling_enabled": config.profiling.enabled}
config_report = {
**config_report,
"profiling_enabled": config.is_profiling_enabled(),
}

telemetry.telemetry_instance.ping(
"data_lake_config",
config_report,
)

if config.profiling.enabled:
if config.is_profiling_enabled():
telemetry.telemetry_instance.ping(
"data_lake_profiling_config",
{
Expand Down Expand Up @@ -659,7 +662,7 @@ def ingest_table(
table_data.table_path, dataset_urn
)

if self.source_config.profiling.enabled:
if self.source_config.is_profiling_enabled():
yield from self.get_table_profile(table_data, dataset_urn)

def get_prefix(self, relative_path: str) -> str:
Expand Down Expand Up @@ -884,7 +887,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for guid, table_data in table_dict.items():
yield from self.ingest_table(table_data, path_spec)

if not self.source_config.profiling.enabled:
if not self.source_config.is_profiling_enabled():
return

total_time_taken = timer.elapsed_seconds()
Expand Down
15 changes: 14 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source_config.operation_config import (
OperationConfig,
is_profiling_enabled,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
BooleanTypeClass,
Expand Down Expand Up @@ -70,6 +74,10 @@ class SalesforceProfilingConfig(ConfigModel):
default=False,
description="Whether profiling should be done. Supports only table-level profiling at this stage",
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)

# TODO - support field level profiling

Expand Down Expand Up @@ -124,6 +132,11 @@ class SalesforceConfig(DatasetSourceConfigMixin):
description="Regex patterns for profiles to filter in ingestion, allowed by the `object_pattern`.",
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

@validator("instance_url")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Expand Down Expand Up @@ -329,7 +342,7 @@ def get_salesforce_object_workunits(
if self.config.domain is not None:
yield from self.get_domain_workunit(sObjectName, datasetUrn)

if self.config.profiling.enabled and self.config.profile_pattern.allowed(
if self.config.is_profiling_enabled() and self.config.profile_pattern.allowed(
sObjectName
):
yield from self.get_profile_workunit(sObjectName, datasetUrn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_workunits(
) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if self.config.profiling.enabled:
if self.config.is_profiling_enabled():
self.config.options.setdefault(
"max_overflow", self.config.profiling.max_workers
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
run_id=self.ctx.run_id,
)

if config.profiling.enabled:
if config.is_profiling_enabled():
# For profiling
self.profiler = SnowflakeProfiler(
config, self.report, self.profiling_state_handler
Expand Down Expand Up @@ -701,7 +701,7 @@ def _process_database(
for snowflake_schema in snowflake_db.schemas:
yield from self._process_schema(snowflake_schema, db_name)

if self.config.profiling.enabled and self.db_tables:
if self.config.is_profiling_enabled() and self.db_tables:
yield from self.profiler.get_workunits(snowflake_db, self.db_tables)

def fetch_schemas_for_database(self, snowflake_db, db_name):
Expand Down
Loading
Loading