Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up experimental orchestration implementation and alert features from it #6948

Merged
merged 67 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
a987571
Update nightly build to use recent nightly packages
nikelite Oct 9, 2024
23fb939
Merge branch 'tensorflow:master' into master
nikelite Oct 9, 2024
b8e5e13
Adjust version constraints
nikelite Oct 9, 2024
9031f22
Merge branch 'master' of https://github.com/nikelite/tfx
nikelite Oct 9, 2024
d099fe7
Adjust version constraints
nikelite Oct 9, 2024
4942170
Add pytest config to avoid absl.flags._exceptions.UnparsedFlagAccessE…
nikelite Oct 16, 2024
12327f6
Add pytest config to avoid absl.flags._exceptions.UnparsedFlagAccessE…
nikelite Oct 16, 2024
7e8d715
Revert "Add pytest config to avoid absl.flags._exceptions.UnparsedFla…
nikelite Oct 16, 2024
8c15807
Cleanup unused pytest import
nikelite Oct 16, 2024
d9813bc
Cleanup unused pytest import
nikelite Oct 16, 2024
8eb2958
Add xFail for kfp handler test
nikelite Oct 16, 2024
e8c142c
Include components testdata to the package build to pass unit tests
nikelite Oct 17, 2024
725b8aa
revert dependencies change
nikelite Oct 17, 2024
295b911
Merge branch 'master' of https://github.com/nikelite/tfx into pytest
nikelite Oct 17, 2024
999351f
Include components testdata to the package build to pass unit tests
nikelite Oct 17, 2024
1d0f92a
Add test constraints for unit-tests with nightly TFX libraries
nikelite Oct 18, 2024
e88e491
Update test constraints to use recent TFX libraries
nikelite Oct 18, 2024
f3f99f7
Merge branch 'master' of https://github.com/nikelite/tfx into pytest
nikelite Oct 19, 2024
57c1a0e
Merge branch 'tensorflow:master' into pytest
nikelite Oct 20, 2024
f104d60
update MANIFEST.in to include schema files
nikelite Oct 20, 2024
b3e6ae4
Revert xfail cases which conftest cannot resolve
nikelite Oct 20, 2024
aa195cb
Merge branch 'tensorflow:master' into master
nikelite Oct 20, 2024
5fa5403
Merge branch 'pytest' of https://github.com/nikelite/tfx into pytest
nikelite Oct 20, 2024
18e9de4
Re-enable xfail tests
nikelite Oct 21, 2024
c595e39
fix conflict
nikelite Oct 21, 2024
0da5561
Update vertex handler test to use mock_init inside the test function,…
nikelite Oct 22, 2024
09b3bd1
Update pytest to use pytest-subprocess plugin
nikelite Oct 22, 2024
60e630a
Fix xfail cases from __subclasses__() which is not cleaned up with py…
nikelite Oct 23, 2024
ccb4cf7
Fix lint errors
nikelite Oct 23, 2024
12de42f
Revert subprocess install
nikelite Oct 23, 2024
48eafca
Re-enable more xfail test cases
nikelite Oct 23, 2024
b7016bb
revert local pipeline test
nikelite Oct 23, 2024
e449598
Merge branch 'master' of https://github.com/nikelite/tfx
nikelite Oct 23, 2024
e1729bd
Fix xFail errors from pytest
nikelite Oct 26, 2024
1394f9d
Fix lint error
nikelite Oct 26, 2024
3354917
Fix lint error
nikelite Oct 26, 2024
b950644
Check pytest rootdir
nikelite Oct 26, 2024
970f1da
Update pytest command
nikelite Oct 26, 2024
5c4f89d
Add testdata into tfx package
nikelite Oct 27, 2024
e2c4e8d
Fix duplicate output artifact type name in test case with pytest
nikelite Oct 27, 2024
2a26c9e
keep xFail for the some e2e tests
nikelite Oct 27, 2024
33724c1
Re-enable some xfail tests
nikelite Oct 27, 2024
b09ac2a
Reflect review comments
nikelite Oct 28, 2024
b371e7c
Fix import
nikelite Oct 28, 2024
6d563b3
Clean up disable_eager_execution()
nikelite Oct 28, 2024
8c508a2
remove unused pytest
nikelite Oct 28, 2024
ff9d236
fix typo
nikelite Oct 28, 2024
975993f
Re-enable more xfail cases
nikelite Oct 29, 2024
45d4b97
Merge branch 'master' of https://github.com/nikelite/tfx
nikelite Oct 29, 2024
5ffdf03
Clean up main function
nikelite Oct 29, 2024
7cf3232
remove unused pytest
nikelite Oct 29, 2024
b17695f
Update xfail description of sklearn_predict_extractor_test
nikelite Oct 29, 2024
423cc67
Update proto_placeholder_test.py to clean up cached proto classes
nikelite Oct 30, 2024
4e4e868
Update pipeline_pb2 clean-up codes and related tests
nikelite Oct 31, 2024
75cec0a
Cleanup struct_pb
nikelite Oct 31, 2024
698a9ef
Clean up struct_pb
nikelite Nov 1, 2024
022e94b
Exclude tests under tfx/orchestration/experimental/core
nikelite Nov 1, 2024
c0323b7
Reviert changes under tfx/orchestration/experimental/core
nikelite Nov 1, 2024
6b08438
Rename cleanup functions
nikelite Nov 1, 2024
2c2f4ed
Cleanup experimental orchestration implementation and alert feature f…
nikelite Nov 1, 2024
4604ece
Merge branch 'tensorflow:master' into master
nikelite Nov 1, 2024
18d8f3e
Remove unused variable
nikelite Nov 1, 2024
6dd98be
Merge branch 'master' of https://github.com/nikelite/tfx
nikelite Nov 1, 2024
41fc68d
Fix lint error
nikelite Nov 1, 2024
2aadfce
Fix lint error
nikelite Nov 1, 2024
e87741f
Clean up lineage logging feature from experimental orchestration
nikelite Nov 1, 2024
5841682
Clean up unused orchestration codes
nikelite Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ sh_binary(
"//tfx/examples/custom_components/presto_example_gen/proto:presto_config_pb2.py",
"//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py",
"//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py",
"//tfx/orchestration/experimental/core:component_generated_alert_pb2.py",
"//tfx/proto:bulk_inferrer_pb2.py",
"//tfx/proto:distribution_validator_pb2.py",
"//tfx/proto:evaluator_pb2.py",
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ def run(self):
# These are the subpackages of `tfx.orchestration` necessary.
'tfx.orchestration',
'tfx.orchestration.config',
'tfx.orchestration.experimental.core',
'tfx.orchestration.launcher',
'tfx.orchestration.local',
'tfx.orchestration.local.legacy',
Expand Down
70 changes: 1 addition & 69 deletions tfx/components/distribution_validator/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from tfx.components.distribution_validator import utils
from tfx.components.statistics_gen import stats_artifact_utils
from tfx.dsl.components.base import base_executor
from tfx.orchestration.experimental.core import component_generated_alert_pb2
from tfx.orchestration.experimental.core import constants
from tfx.proto import distribution_validator_pb2
from tfx.proto.orchestration import execution_result_pb2
from tfx.types import artifact_utils
Expand All @@ -34,7 +32,6 @@
from tfx.utils import monitoring_utils
from tfx.utils import writer_utils

from google.protobuf import any_pb2
from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2
Expand Down Expand Up @@ -176,55 +173,6 @@ def _add_anomalies_for_missing_comparisons(
return anomalies


def _create_anomalies_alerts(
anomalies: anomalies_pb2.Anomalies,
split_pair: str,
span: str,
) -> list[component_generated_alert_pb2.ComponentGeneratedAlertInfo]:
"""Creates an alert for each anomaly in the anomalies artifact.

Args:
anomalies: The Anomalies proto.
split_pair: The tuple name of the data split, like (train, eval).
span: The span of the Anomalies.

Returns:
A list of component generated alerts, if any.
"""
results = []
# Information about dataset-level anomalies, such as "High num examples in
# current dataset versus the previous span."
if anomalies.HasField('dataset_anomaly_info'):
for reason in anomalies.dataset_anomaly_info.reason:
results.append(
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
f'[{split_pair}][span {span}] {reason.short_description}'
),
alert_body=(
f'[{split_pair}][span {span}] {reason.description}'
),
)
)
# Information about feature-level anomalies. Generates a single alert for all
# anomalous features.
features_with_anomalies = ', '.join(anomalies.anomaly_info.keys())
if features_with_anomalies:
results.append(
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
f'[{split_pair}][span {span}] Feature-level anomalies present'
),
alert_body=(
f'[{split_pair}][span {span}] Feature(s) '
f'{features_with_anomalies} contain(s) anomalies. '
f'See Anomalies artifact for more details.'
),
)
)
return results


def _get_distribution_validator_config(
input_dict: Dict[str, list[types.Artifact]], exec_properties: Dict[str, Any]
) -> Optional[distribution_validator_pb2.DistributionValidatorConfig]:
Expand Down Expand Up @@ -282,8 +230,7 @@ def Do(
exec_properties: A dict of execution properties.

Returns:
ExecutionResult proto with anomalies and the component generated alerts
execution property set with anomalies alerts, if any.
ExecutionResult proto with anomalies
"""
self._log_startup(input_dict, output_dict, exec_properties)

Expand Down Expand Up @@ -379,7 +326,6 @@ def Do(
)
)
current_stats_span = test_statistics.span
alerts = component_generated_alert_pb2.ComponentGeneratedAlertList()
for test_split, baseline_split in split_pairs:
split_pair = '%s_%s' % (test_split, baseline_split)
logging.info('Processing split pair %s', split_pair)
Expand Down Expand Up @@ -420,11 +366,6 @@ def Do(
current_stats_span,
validation_metrics_artifact,
)
alerts.component_generated_alert_list.extend(
_create_anomalies_alerts(
anomalies, split_pair, anomalies_artifact.span
)
)

# Set blessed custom property for Anomalies Artifact
anomalies_artifact.set_json_value_custom_property(
Expand All @@ -435,13 +376,4 @@ def Do(
standard_component_specs.ANOMALIES_KEY
].artifacts.append(anomalies_artifact.mlmd_artifact)

# Set component generated alerts execution property in ExecutorOutput if
# any anomalies alerts exist.
if alerts.component_generated_alert_list:
any_proto = any_pb2.Any()
any_proto.Pack(alerts)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.CopyFrom(any_proto)

return executor_output
149 changes: 4 additions & 145 deletions tfx/components/distribution_validator/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from tensorflow_data_validation.anomalies.proto import custom_validation_config_pb2
from tfx.components.distribution_validator import executor
from tfx.dsl.io import fileio
from tfx.orchestration.experimental.core import component_generated_alert_pb2
from tfx.orchestration.experimental.core import constants
from tfx.proto import distribution_validator_pb2
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
Expand Down Expand Up @@ -215,23 +213,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] Feature-level anomalies '
'present'
),
alert_body=(
'[train_eval][span 2] Feature(s) company, '
'dropoff_census_tract contain(s) anomalies. See '
'Anomalies artifact for more details.'
),
),
]
)
)
},
{
'testcase_name': 'dataset_constraint',
Expand All @@ -255,24 +236,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
}""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] High num examples in '
'current dataset versus the previous span.'
),
alert_body=(
'[train_eval][span 2] The ratio of num examples '
'in the current dataset versus the previous span '
'is 2.02094 (up to six significant digits), '
'which is above the threshold 1.'
),
),
]
)
)
},
{
'testcase_name': 'no_anomalies',
Expand Down Expand Up @@ -305,9 +268,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 1,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList()
),
},
{
'testcase_name': 'custom_anomalies',
Expand Down Expand Up @@ -367,23 +327,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] Feature-level anomalies '
'present'
),
alert_body=(
'[train_eval][span 2] Feature(s) company '
'contain(s) anomalies. See Anomalies artifact '
'for more details.'
),
)
]
)
)
},
)
def testAnomaliesGenerated(
Expand All @@ -392,7 +335,6 @@ def testAnomaliesGenerated(
custom_validation_config,
expected_anomalies,
anomalies_blessed_value,
expected_alerts,
):
source_data_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'testdata')
Expand Down Expand Up @@ -438,7 +380,7 @@ def testAnomaliesGenerated(
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -465,27 +407,6 @@ def testAnomaliesGenerated(
),
{'train_eval': anomalies_blessed_value},
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
for alert in expected_alerts.component_generated_alert_list:
self.assertEqual(
alert.alert_name,
actual_alerts.component_generated_alert_list[0].alert_name
)
if 'Feature-level anomalies present' in alert.alert_name:
self.assertIn(
'See Anomalies artifact for more details.',
actual_alerts.component_generated_alert_list[0].alert_body,
)
else:
self.assertEqual(
alert.alert_body,
actual_alerts.component_generated_alert_list[0].alert_body
)

def testMissBaselineStats(self):

Expand Down Expand Up @@ -682,19 +603,6 @@ def testStructData(self):
}
}""", anomalies_pb2.Anomalies())

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 3] Feature-level anomalies present'),
alert_body=(
'[train_eval][span 3] Feature(s) '
'parent_feature.value_feature contain(s) anomalies. See '
'Anomalies artifact for more details.'),
)
],
)

# Create stats artifacts with a struct feature.
for split_dir in ['Split-eval', 'Split-train']:
full_split_dir = os.path.join(stats_artifact.uri, split_dir)
Expand Down Expand Up @@ -733,7 +641,7 @@ def testStructData(self):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -752,14 +660,6 @@ def testStructData(self):
distribution_anomalies.ParseFromString(distribution_anomalies_bytes)
self.assertEqualExceptBaseline(expected_anomalies, distribution_anomalies)

actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

@parameterized.named_parameters(
{
'testcase_name':
Expand Down Expand Up @@ -1076,7 +976,7 @@ def testEmptyData(self, stats_train, stats_eval, expected_anomalies):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -1099,26 +999,6 @@ def testEmptyData(self, stats_train, stats_eval, expected_anomalies):
distribution_anomalies.ParseFromString(distribution_anomalies_bytes)
self.assertEqualExceptBaseline(expected_anomalies, distribution_anomalies)

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 4] Feature-level anomalies present'
),
alert_body=(
'[train_eval][span 4] Feature(s) first_feature contain(s) '
'anomalies. See Anomalies artifact for more details.'
),
),
]
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

def testAddOutput(self):
source_data_dir = os.path.join(
Expand Down Expand Up @@ -1184,7 +1064,7 @@ def testAddOutput(self):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -1193,27 +1073,6 @@ def testAddOutput(self):
)
self.assertTrue(fileio.exists(distribution_anomalies_path))

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 5] Feature-level anomalies present'
),
alert_body=(
'[train_eval][span 5] Feature(s) '
'parent_feature.value_feature contain(s) anomalies. See '
'Anomalies artifact for more details.'
),
),
]
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

def testUseArtifactDVConfig(self):
source_data_dir = os.path.join(
Expand Down
Loading