diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 365da21208ecc..b9deb9854c370 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -122,6 +122,10 @@ "more_itertools", } +cachetools_lib = { + "cachetools", +} + sql_common = ( { # Required for all SQL sources. @@ -138,6 +142,7 @@ # https://github.com/ipython/traitlets/issues/741 "traitlets<5.2.2", "greenlet", + *cachetools_lib, } | usage_common | sqlglot_lib @@ -213,7 +218,7 @@ "pandas", "cryptography", "msal", - "cachetools", + *cachetools_lib, } | classification_lib trino = { @@ -457,7 +462,7 @@ | sqlglot_lib | classification_lib | {"db-dtypes"} # Pandas extension data types - | {"cachetools"}, + | cachetools_lib, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "abs": {*abs_base, *data_lake_profiling}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 3ead59eed2d39..7d82d99412ffe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -2,6 +2,8 @@ from abc import abstractmethod from typing import Any, Dict, Optional +import cachetools +import cachetools.keys import pydantic from pydantic import Field from sqlalchemy.engine import URL @@ -27,6 +29,7 @@ StatefulIngestionConfigBase, ) from datahub.ingestion.source_config.operation_config import is_profiling_enabled +from datahub.utilities.cachetools_keys import self_methodkey logger: logging.Logger = logging.getLogger(__name__) @@ -115,6 +118,13 @@ class SQLCommonConfig( # Custom Stateful Ingestion settings stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None + # TRICKY: The operation_config is time-dependent. Because we don't want to change + # whether or not we're running profiling mid-ingestion, we cache the result of this method. + # TODO: This decorator should be moved to the is_profiling_enabled(operation_config) method. + @cachetools.cached( + cache=cachetools.LRUCache(maxsize=1), + key=self_methodkey, + ) def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config diff --git a/metadata-ingestion/src/datahub/utilities/cachetools_keys.py b/metadata-ingestion/src/datahub/utilities/cachetools_keys.py new file mode 100644 index 0000000000000..e3c7d67c81cd3 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/cachetools_keys.py @@ -0,0 +1,8 @@ +from typing import Any + +import cachetools.keys + + +def self_methodkey(self: Any, *args: Any, **kwargs: Any) -> Any: + # Keeps the id of self around + return cachetools.keys.hashkey(id(self), *args, **kwargs)