Skip to content

Commit

Permalink
Clean up experimental orchestration implementation and alert features…
Browse files Browse the repository at this point in the history
… from it (#6948)

Clean up experimental orchestration implementation and alert features from it
  • Loading branch information
nikelite authored Nov 4, 2024
1 parent 9e66ed1 commit 381ccf6
Show file tree
Hide file tree
Showing 77 changed files with 18 additions and 24,646 deletions.
1 change: 0 additions & 1 deletion build/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ sh_binary(
"//tfx/examples/custom_components/presto_example_gen/proto:presto_config_pb2.py",
"//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py",
"//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py",
"//tfx/orchestration/experimental/core:component_generated_alert_pb2.py",
"//tfx/proto:bulk_inferrer_pb2.py",
"//tfx/proto:distribution_validator_pb2.py",
"//tfx/proto:evaluator_pb2.py",
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ def run(self):
# These are the subpackages of `tfx.orchestration` necessary.
'tfx.orchestration',
'tfx.orchestration.config',
'tfx.orchestration.experimental.core',
'tfx.orchestration.launcher',
'tfx.orchestration.local',
'tfx.orchestration.local.legacy',
Expand Down
70 changes: 1 addition & 69 deletions tfx/components/distribution_validator/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from tfx.components.distribution_validator import utils
from tfx.components.statistics_gen import stats_artifact_utils
from tfx.dsl.components.base import base_executor
from tfx.orchestration.experimental.core import component_generated_alert_pb2
from tfx.orchestration.experimental.core import constants
from tfx.proto import distribution_validator_pb2
from tfx.proto.orchestration import execution_result_pb2
from tfx.types import artifact_utils
Expand All @@ -34,7 +32,6 @@
from tfx.utils import monitoring_utils
from tfx.utils import writer_utils

from google.protobuf import any_pb2
from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2
Expand Down Expand Up @@ -176,55 +173,6 @@ def _add_anomalies_for_missing_comparisons(
return anomalies


def _create_anomalies_alerts(
anomalies: anomalies_pb2.Anomalies,
split_pair: str,
span: str,
) -> list[component_generated_alert_pb2.ComponentGeneratedAlertInfo]:
"""Creates an alert for each anomaly in the anomalies artifact.
Args:
anomalies: The Anomalies proto.
split_pair: The tuple name of the data split, like (train, eval).
span: The span of the Anomalies.
Returns:
A list of component generated alerts, if any.
"""
results = []
# Information about dataset-level anomalies, such as "High num examples in
# current dataset versus the previous span."
if anomalies.HasField('dataset_anomaly_info'):
for reason in anomalies.dataset_anomaly_info.reason:
results.append(
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
f'[{split_pair}][span {span}] {reason.short_description}'
),
alert_body=(
f'[{split_pair}][span {span}] {reason.description}'
),
)
)
# Information about feature-level anomalies. Generates a single alert for all
# anomalous features.
features_with_anomalies = ', '.join(anomalies.anomaly_info.keys())
if features_with_anomalies:
results.append(
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
f'[{split_pair}][span {span}] Feature-level anomalies present'
),
alert_body=(
f'[{split_pair}][span {span}] Feature(s) '
f'{features_with_anomalies} contain(s) anomalies. '
f'See Anomalies artifact for more details.'
),
)
)
return results


def _get_distribution_validator_config(
input_dict: Dict[str, list[types.Artifact]], exec_properties: Dict[str, Any]
) -> Optional[distribution_validator_pb2.DistributionValidatorConfig]:
Expand Down Expand Up @@ -282,8 +230,7 @@ def Do(
exec_properties: A dict of execution properties.
Returns:
ExecutionResult proto with anomalies and the component generated alerts
execution property set with anomalies alerts, if any.
ExecutionResult proto with anomalies
"""
self._log_startup(input_dict, output_dict, exec_properties)

Expand Down Expand Up @@ -379,7 +326,6 @@ def Do(
)
)
current_stats_span = test_statistics.span
alerts = component_generated_alert_pb2.ComponentGeneratedAlertList()
for test_split, baseline_split in split_pairs:
split_pair = '%s_%s' % (test_split, baseline_split)
logging.info('Processing split pair %s', split_pair)
Expand Down Expand Up @@ -420,11 +366,6 @@ def Do(
current_stats_span,
validation_metrics_artifact,
)
alerts.component_generated_alert_list.extend(
_create_anomalies_alerts(
anomalies, split_pair, anomalies_artifact.span
)
)

# Set blessed custom property for Anomalies Artifact
anomalies_artifact.set_json_value_custom_property(
Expand All @@ -435,13 +376,4 @@ def Do(
standard_component_specs.ANOMALIES_KEY
].artifacts.append(anomalies_artifact.mlmd_artifact)

# Set component generated alerts execution property in ExecutorOutput if
# any anomalies alerts exist.
if alerts.component_generated_alert_list:
any_proto = any_pb2.Any()
any_proto.Pack(alerts)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.CopyFrom(any_proto)

return executor_output
149 changes: 4 additions & 145 deletions tfx/components/distribution_validator/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from tensorflow_data_validation.anomalies.proto import custom_validation_config_pb2
from tfx.components.distribution_validator import executor
from tfx.dsl.io import fileio
from tfx.orchestration.experimental.core import component_generated_alert_pb2
from tfx.orchestration.experimental.core import constants
from tfx.proto import distribution_validator_pb2
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
Expand Down Expand Up @@ -215,23 +213,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] Feature-level anomalies '
'present'
),
alert_body=(
'[train_eval][span 2] Feature(s) company, '
'dropoff_census_tract contain(s) anomalies. See '
'Anomalies artifact for more details.'
),
),
]
)
)
},
{
'testcase_name': 'dataset_constraint',
Expand All @@ -255,24 +236,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
}""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] High num examples in '
'current dataset versus the previous span.'
),
alert_body=(
'[train_eval][span 2] The ratio of num examples '
'in the current dataset versus the previous span '
'is 2.02094 (up to six significant digits), '
'which is above the threshold 1.'
),
),
]
)
)
},
{
'testcase_name': 'no_anomalies',
Expand Down Expand Up @@ -305,9 +268,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 1,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList()
),
},
{
'testcase_name': 'custom_anomalies',
Expand Down Expand Up @@ -367,23 +327,6 @@ def testSplitPairs(self, split_pairs, expected_split_pair_names,
}
""",
'anomalies_blessed_value': 0,
'expected_alerts': (
component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 2] Feature-level anomalies '
'present'
),
alert_body=(
'[train_eval][span 2] Feature(s) company '
'contain(s) anomalies. See Anomalies artifact '
'for more details.'
),
)
]
)
)
},
)
def testAnomaliesGenerated(
Expand All @@ -392,7 +335,6 @@ def testAnomaliesGenerated(
custom_validation_config,
expected_anomalies,
anomalies_blessed_value,
expected_alerts,
):
source_data_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'testdata')
Expand Down Expand Up @@ -438,7 +380,7 @@ def testAnomaliesGenerated(
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -465,27 +407,6 @@ def testAnomaliesGenerated(
),
{'train_eval': anomalies_blessed_value},
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
for alert in expected_alerts.component_generated_alert_list:
self.assertEqual(
alert.alert_name,
actual_alerts.component_generated_alert_list[0].alert_name
)
if 'Feature-level anomalies present' in alert.alert_name:
self.assertIn(
'See Anomalies artifact for more details.',
actual_alerts.component_generated_alert_list[0].alert_body,
)
else:
self.assertEqual(
alert.alert_body,
actual_alerts.component_generated_alert_list[0].alert_body
)

def testMissBaselineStats(self):

Expand Down Expand Up @@ -682,19 +603,6 @@ def testStructData(self):
}
}""", anomalies_pb2.Anomalies())

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 3] Feature-level anomalies present'),
alert_body=(
'[train_eval][span 3] Feature(s) '
'parent_feature.value_feature contain(s) anomalies. See '
'Anomalies artifact for more details.'),
)
],
)

# Create stats artifacts with a struct feature.
for split_dir in ['Split-eval', 'Split-train']:
full_split_dir = os.path.join(stats_artifact.uri, split_dir)
Expand Down Expand Up @@ -733,7 +641,7 @@ def testStructData(self):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -752,14 +660,6 @@ def testStructData(self):
distribution_anomalies.ParseFromString(distribution_anomalies_bytes)
self.assertEqualExceptBaseline(expected_anomalies, distribution_anomalies)

actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

@parameterized.named_parameters(
{
'testcase_name':
Expand Down Expand Up @@ -1076,7 +976,7 @@ def testEmptyData(self, stats_train, stats_eval, expected_anomalies):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -1099,26 +999,6 @@ def testEmptyData(self, stats_train, stats_eval, expected_anomalies):
distribution_anomalies.ParseFromString(distribution_anomalies_bytes)
self.assertEqualExceptBaseline(expected_anomalies, distribution_anomalies)

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 4] Feature-level anomalies present'
),
alert_body=(
'[train_eval][span 4] Feature(s) first_feature contain(s) '
'anomalies. See Anomalies artifact for more details.'
),
),
]
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

def testAddOutput(self):
source_data_dir = os.path.join(
Expand Down Expand Up @@ -1184,7 +1064,7 @@ def testAddOutput(self):
}

distribution_validator_executor = executor.Executor()
executor_output = distribution_validator_executor.Do(
distribution_validator_executor.Do(
input_dict, output_dict, exec_properties
)

Expand All @@ -1193,27 +1073,6 @@ def testAddOutput(self):
)
self.assertTrue(fileio.exists(distribution_anomalies_path))

expected_alerts = component_generated_alert_pb2.ComponentGeneratedAlertList(
component_generated_alert_list=[
component_generated_alert_pb2.ComponentGeneratedAlertInfo(
alert_name=(
'[train_eval][span 5] Feature-level anomalies present'
),
alert_body=(
'[train_eval][span 5] Feature(s) '
'parent_feature.value_feature contain(s) anomalies. See '
'Anomalies artifact for more details.'
),
),
]
)
actual_alerts = (
component_generated_alert_pb2.ComponentGeneratedAlertList()
)
executor_output.execution_properties[
constants.COMPONENT_GENERATED_ALERTS_KEY
].proto_value.Unpack(actual_alerts)
self.assertEqual(actual_alerts, expected_alerts)

def testUseArtifactDVConfig(self):
source_data_dir = os.path.join(
Expand Down
Loading

0 comments on commit 381ccf6

Please sign in to comment.