From ae9ec4612fbb6e816ab790d9c6a54db30e91a521 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 9 Dec 2024 10:07:09 +0100 Subject: [PATCH 1/4] adding alias function to modify output column names --- python/hopsworks_common/constants.py | 8 ++ python/hsfs/core/feature_monitoring_config.py | 8 +- python/hsfs/hopsworks_udf.py | 65 +++++++++++-- python/hsfs/transformation_function.py | 96 ++++++++++++------- python/tests/test_transformation_function.py | 7 +- 5 files changed, 135 insertions(+), 49 deletions(-) diff --git a/python/hopsworks_common/constants.py b/python/hopsworks_common/constants.py index b98ed8497..20ec72e77 100644 --- a/python/hopsworks_common/constants.py +++ b/python/hopsworks_common/constants.py @@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG: CA_CERTS = "ca_certs" +class FEATURES: + """ + Class that stores constants about a feature. + """ + + MAX_LENGTH_NAME = 63 + + class KAFKA_SSL_CONFIG: """ Kafka SSL constant strings for configuration diff --git a/python/hsfs/core/feature_monitoring_config.py b/python/hsfs/core/feature_monitoring_config.py index 5b7f9fb71..5668dcaf3 100644 --- a/python/hsfs/core/feature_monitoring_config.py +++ b/python/hsfs/core/feature_monitoring_config.py @@ -22,6 +22,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import ( feature_monitoring_config_engine, @@ -34,7 +35,6 @@ from hsfs.core.job_schedule import JobSchedule -MAX_LENGTH_NAME = 63 MAX_LENGTH_DESCRIPTION = 2000 @@ -686,8 +686,10 @@ def name(self, name: str): raise AttributeError("The name of a registered config is read-only.") elif not isinstance(name, str): raise TypeError("name must be of type str") - if len(name) > MAX_LENGTH_NAME: - raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.") + if len(name) > FEATURES.MAX_LENGTH_NAME: + raise ValueError( + "name must be less than {FEATURES.MAX_LENGTH_NAME} characters." + ) self._name = name @property diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 057c16b88..ce416c374 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -27,6 +27,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import engine, util from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics from hsfs.decorators import typechecked @@ -173,6 +174,7 @@ def __init__( dropped_argument_names: Optional[List[str]] = None, dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, + output_column_names: Optional[str] = None, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( return_types @@ -191,6 +193,12 @@ def __init__( if isinstance(func, Callable) else func ) + + # The parameter `output_column_names` is initialized lazily. + # It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function. + # Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF. + self._output_column_names: List[str] = [] + if not transformation_features: # New transformation function being declared so extract source code from function self._transformation_features: List[TransformationFeature] = ( @@ -211,6 +219,7 @@ def __init__( ) ) self._dropped_features = self._dropped_argument_names + else: self._transformation_features = transformation_features self._transformation_function_argument_names = ( @@ -222,6 +231,9 @@ def __init__( if dropped_feature_names else dropped_argument_names ) + self._output_column_names = ( + output_column_names if output_column_names else [] + ) self._formatted_function_source, self._module_imports = ( HopsworksUdf._format_source_code(self._function_source) @@ -229,8 +241,6 @@ def __init__( self._statistics: Optional[TransformationStatistics] = None - self._output_column_names: List[str] = [] - @staticmethod def _validate_and_convert_drop_features( dropped_features: Union[str, List[str]], @@ -691,6 +701,41 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": udf.dropped_features = updated_dropped_features return udf + def alias(self, *args: str): + """ + Set the names of the transformed features output by the UDF. + """ + if len(args) == 1 and isinstance(args[0], list): + # If a single list is passed, use it directly + output_col_names = args[0] + else: + # Otherwise, use the individual arguments as a list + output_col_names = list(args) + if any( + not isinstance(output_col_name, str) for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings." + ) + + self.output_column_names = output_col_names + + return self + + def _validate_output_col_name(self, output_col_names): + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters." + ) + + if output_col_names and len(output_col_names) != len(self.return_types): + raise FeatureStoreException( + f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." + ) + def update_return_type_one_hot(self): self._return_types = [ self._return_types[0] @@ -765,6 +810,7 @@ def to_dict(self) -> Dict[str, Any]: "name": self.function_name, "featureNamePrefix": self._feature_name_prefix, "executionMode": self.execution_mode.value.upper(), + "outputColumnNames": self.output_column_names, } def json(self) -> str: @@ -826,6 +872,12 @@ def from_response_json( else None ) + output_column_names = ( + [feature.strip() for feature in json_decamelized["output_column_names"]] + if json_decamelized.get("output_column_names", None) + else None + ) + # Reconstructing statistics arguments. arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code) @@ -870,6 +922,7 @@ def from_response_json( execution_mode=UDFExecutionMode.from_string( json_decamelized["execution_mode"] ), + output_column_names=output_column_names, ) # Set transformation features if already set. @@ -998,12 +1051,8 @@ def transformation_statistics( def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: if not isinstance(output_col_names, List): output_col_names = [output_col_names] - if not output_col_names and len(output_col_names) != len(self.return_types): - raise FeatureStoreException( - f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." - ) - else: - self._output_column_names = output_col_names + self._validate_output_col_name(output_col_names) + self._output_column_names = output_col_names def __repr__(self): return f'{self.function_name}({", ".join(self.transformation_features)})' diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index d39097b9a..ae91aee66 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -21,6 +21,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import transformation_function_engine from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics @@ -77,8 +78,13 @@ def __init__( raise FeatureStoreException( "Please use the hopsworks_udf decorator when defining transformation functions." ) + if not id and hopsworks_udf.output_column_names: + # Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already. + # This happens for example if the same udf is used in a on-demand and a model-dependent transformation function. + hopsworks_udf._output_column_names = [] + hopsworks_udf = copy.copy(hopsworks_udf) - self._hopsworks_udf: HopsworksUdf = hopsworks_udf + self.__hopsworks_udf: HopsworksUdf = hopsworks_udf TransformationFunction._validate_transformation_type( transformation_type=transformation_type, hopsworks_udf=hopsworks_udf ) @@ -152,11 +158,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction: """ # Deep copy so that the same transformation function can be used to create multiple new transformation function with different features. transformation = copy.deepcopy(self) - transformation._hopsworks_udf = transformation._hopsworks_udf(*features) - # Regenerate output column names when setting new transformation features. - transformation._hopsworks_udf.output_column_names = ( - transformation._get_output_column_names() - ) + transformation.__hopsworks_udf = transformation.__hopsworks_udf(*features) + return transformation @classmethod @@ -227,9 +230,17 @@ def to_dict(self) -> Dict[str, Any]: "id": self._id, "version": self._version, "featurestoreId": self._featurestore_id, - "hopsworksUdf": self._hopsworks_udf.to_dict(), + "hopsworksUdf": self.hopsworks_udf.to_dict(), } + def alias(self, *args: str): + """ + Set the names of the transformed features output by the transformation function. + """ + self.__hopsworks_udf.alias(*args) + + return self + def _get_output_column_names(self) -> str: """ Function that generates feature names for the transformed features @@ -240,33 +251,43 @@ def _get_output_column_names(self) -> str: # If function name matches the name of an input feature and the transformation function only returns one output feature then # then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten. if ( - len(self._hopsworks_udf.return_types) == 1 + len(self.__hopsworks_udf.return_types) == 1 and any( [ - self.hopsworks_udf.function_name + self.__hopsworks_udf.function_name == transformation_feature.feature_name - for transformation_feature in self.hopsworks_udf._transformation_features + for transformation_feature in self.__hopsworks_udf._transformation_features ] ) and ( - not self.hopsworks_udf.dropped_features - or self.hopsworks_udf.function_name - not in self.hopsworks_udf.dropped_features + not self.__hopsworks_udf.dropped_features + or self.__hopsworks_udf.function_name + not in self.__hopsworks_udf.dropped_features ) ): - return [self.hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] if self.transformation_type == TransformationType.MODEL_DEPENDENT: - _BASE_COLUMN_NAME = f'{self._hopsworks_udf.function_name}_{"_".join(self._hopsworks_udf.transformation_features)}_' - if len(self._hopsworks_udf.return_types) > 1: - return [ + _BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_' + if len(self.__hopsworks_udf.return_types) > 1: + output_col_names = [ f"{_BASE_COLUMN_NAME}{i}" - for i in range(len(self._hopsworks_udf.return_types)) + for i in range(len(self.__hopsworks_udf.return_types)) ] else: - return [f"{_BASE_COLUMN_NAME}"] + output_col_names = [f"{_BASE_COLUMN_NAME}"] elif self.transformation_type == TransformationType.ON_DEMAND: - return [self._hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] + + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"The default names for output features generated by the transformation function `{repr(self.__hopsworks_udf)}` exceeds the maximum length of {FEATURES.MAX_LENGTH_NAME} characters. Please use the `alias` function to assign shorter names to the output features." + ) + + return output_col_names @staticmethod def _validate_transformation_type( @@ -311,7 +332,10 @@ def version(self, version: int) -> None: @property def hopsworks_udf(self) -> HopsworksUdf: """Meta data class for the user defined transformation function.""" - return self._hopsworks_udf + # Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class. + if self.transformation_type and not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf @property def transformation_type(self) -> TransformationType: @@ -321,41 +345,39 @@ def transformation_type(self) -> TransformationType: @transformation_type.setter def transformation_type(self, transformation_type) -> None: self._transformation_type = transformation_type - # Generate output column names when setting transformation type - self._hopsworks_udf.output_column_names = self._get_output_column_names() @property def transformation_statistics( self, ) -> Optional[TransformationStatistics]: """Feature statistics required for the defined UDF""" - return self.hopsworks_udf.transformation_statistics + return self.__hopsworks_udf.transformation_statistics @transformation_statistics.setter def transformation_statistics( self, statistics: List[FeatureDescriptiveStatistics] ) -> None: - self.hopsworks_udf.transformation_statistics = statistics + self.__hopsworks_udf.transformation_statistics = statistics # Generate output column names for one-hot encoder after transformation statistics is set. # This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics. - if self.hopsworks_udf.function_name == "one_hot_encoder": - self._hopsworks_udf.output_column_names = self._get_output_column_names() + if self.__hopsworks_udf.function_name == "one_hot_encoder": + self.__hopsworks_udf.output_column_names = self._get_output_column_names() @property def output_column_names(self) -> List[str]: """Names of the output columns generated by the transformation functions""" - if self._hopsworks_udf.function_name == "one_hot_encoder" and len( - self._hopsworks_udf.output_column_names - ) != len(self._hopsworks_udf.return_types): - self._hopsworks_udf.output_column_names = self._get_output_column_names() - return self._hopsworks_udf.output_column_names + if ( + self.__hopsworks_udf.function_name == "one_hot_encoder" + and len(self.__hopsworks_udf.output_column_names) + != len(self.__hopsworks_udf.return_types) + ) or not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf.output_column_names def __repr__(self): if self.transformation_type == TransformationType.MODEL_DEPENDENT: - return ( - f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}" - ) + return f"Model-Dependent Transformation Function : {repr(self.__hopsworks_udf)}" elif self.transformation_type == TransformationType.ON_DEMAND: - return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}" + return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}" else: - return f"Transformation Function : {repr(self.hopsworks_udf)}" + return f"Transformation Function : {repr(self.__hopsworks_udf)}" diff --git a/python/tests/test_transformation_function.py b/python/tests/test_transformation_function.py index 305fe04da..fb0886d61 100644 --- a/python/tests/test_transformation_function.py +++ b/python/tests/test_transformation_function.py @@ -237,7 +237,12 @@ def test2(col1): transformation_type=TransformationType.MODEL_DEPENDENT, ) - assert tf.hopsworks_udf == test2 + # Creating dict representation of udf. + udf_json = test2.to_dict() + # Adding output column names to dict for testing since it would be generated when UDF is accessed out the transformation function. + udf_json["outputColumnNames"] = ["test2_col1_"] + + assert tf.hopsworks_udf.to_dict() == udf_json def test_generate_output_column_names_one_argument_one_output_type(self): @udf(int) From 69c727991415a20da49284a5c26d213db6ea0255 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 9 Dec 2024 12:11:38 +0100 Subject: [PATCH 2/4] adding unit tests --- python/hsfs/hopsworks_udf.py | 7 +- python/tests/test_hopswork_udf.py | 100 +++++++ python/tests/test_transformation_function.py | 297 +++++++++++++++++++ 3 files changed, 403 insertions(+), 1 deletion(-) diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index ce416c374..a8b97bff3 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -731,9 +731,14 @@ def _validate_output_col_name(self, output_col_names): f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters." ) + if len(output_col_names) != len(set(output_col_names)): + raise FeatureStoreException( + f"Duplicate output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments names are unique." + ) + if output_col_names and len(output_col_names) != len(self.return_types): raise FeatureStoreException( - f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." + f"The number of output feature names provided does not match the number of features returned by the transformation function '{repr(self)}'. Pease provide exactly {len(self.return_types)} feature name(s) to match the output." ) def update_return_type_one_hot(self): diff --git a/python/tests/test_hopswork_udf.py b/python/tests/test_hopswork_udf.py index d85e5425c..44de7cced 100644 --- a/python/tests/test_hopswork_udf.py +++ b/python/tests/test_hopswork_udf.py @@ -1236,3 +1236,103 @@ def test_validate_and_convert_drop_features_dropped_prefix_invalid(self): str(exp.value) == "Cannot drop features 'test_feature1', 'test_feature2' as they are not features given as arguments in the defined UDF." ) + + def test_alias_one_output(self): + @udf(int) + def add_one(feature): + return feature + 1 + + add_one = add_one.alias("feature_plus_one") + + assert add_one.output_column_names == ["feature_plus_one"] + + def test_alias_one_output_list(self): + @udf(int) + def add_one(feature): + return feature + 1 + + add_one = add_one.alias(["feature_plus_one"]) + + assert add_one.output_column_names == ["feature_plus_one"] + + def test_alias_multiple_output(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub.alias("feature_plus_one", "feature_minus_one") + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + + def test_alias_multiple_output_list(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub.alias(["feature_plus_one", "feature_minus_one"]) + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + + def test_alias_invalid_number_column_names(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_and_sub(feature)'. Pease provide exactly 2 feature name(s) to match the output." + ) + + def test_alias_invalid_type(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias("feature_plus_one", {"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments are strings." + ) + + def test_alias_duplicates(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias("feature_plus_one", "feature_plus_one") + + assert ( + str(exp.value) + == "Duplicate output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments names are unique." + ) + + def test_call_and_alias(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub("feature2").alias( + ["feature_plus_one", "feature_minus_one"] + ) + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + assert add_one.transformation_features == ["feature2"] + + def test_alias_invalid_length(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias(["invalid" * 10, "feature_minus_one"]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_and_sub(feature)'. Please provide names shorter than 63 characters." + ) diff --git a/python/tests/test_transformation_function.py b/python/tests/test_transformation_function.py index fb0886d61..415d093d6 100644 --- a/python/tests/test_transformation_function.py +++ b/python/tests/test_transformation_function.py @@ -459,3 +459,300 @@ def test_func(col1, statistics=stats): str(exe.value) == "On-Demand Transformation functions cannot use statistics, please remove statistics parameters from the functions" ) + + def test_alias_one_output(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias("feature_plus_one_mdt") + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt.alias("feature_plus_one_odt") + + assert odt.output_column_names == ["feature_plus_one_odt"] + + def test_alias_one_output_list(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias(["feature_plus_one_mdt"]) + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt.alias(["feature_plus_one_odt"]) + + assert odt.output_column_names == ["feature_plus_one_odt"] + + def test_alias_multiple_output(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias("feature_plus_one_mdt", "feature_minus_one_mdt") + + assert mdt.output_column_names == [ + "feature_plus_one_mdt", + "feature_minus_one_mdt", + ] + + def test_alias_multiple_output_list(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias(["feature_plus_one_mdt", "feature_minus_one_mdt"]) + + assert mdt.output_column_names == [ + "feature_plus_one_mdt", + "feature_minus_one_mdt", + ] + + def test_alias_invalid_number_column_names(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_and_sub(feature)'. Pease provide exactly 2 feature name(s) to match the output." + ) + + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_one(feature)'. Pease provide exactly 1 feature name(s) to match the output." + ) + + def test_alias_invalid_type(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias({"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_one(feature)'. Please ensure all arguments are strings." + ) + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias({"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_one(feature)'. Please ensure all arguments are strings." + ) + + def test_alias_duplicates(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias("feature_plus_one", "feature_plus_one") + + assert ( + str(exp.value) + == "Duplicate output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments names are unique." + ) + + def test_call_and_alias(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt("feature2_mdt").alias(["feature_plus_one_mdt"]) + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + assert mdt.hopsworks_udf.transformation_features == ["feature2_mdt"] + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt("feature2_odt").alias(["feature_plus_one_odt"]) + + assert odt.output_column_names == ["feature_plus_one_odt"] + assert odt.hopsworks_udf.transformation_features == ["feature2_odt"] + + def test_alias_invalid_length(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias(["invalid" * 10]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_one(feature)'. Please provide names shorter than 63 characters." + ) + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias(["invalid" * 10]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_one(feature)'. Please provide names shorter than 63 characters." + ) + + def test_generated_output_col_name_invalid_mdt(self): + @udf(int) + def test( + long_feature_name1, + long_feature_name2, + long_feature_name3, + long_feature_name4, + long_feature_name5, + long_feature_name6, + long_feature_name7, + ): + return long_feature_name1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt._get_output_column_names() + + assert ( + str(exp.value) + == "The default names for output features generated by the transformation function `test(long_feature_name1, long_feature_name2, long_feature_name3, long_feature_name4, long_feature_name5, long_feature_name6, long_feature_name7)` exceeds the maximum length of 63 characters. Please use the `alias` function to assign shorter names to the output features." + ) + + def test_generate_output_col_name_invalid_odt(self): + @udf(int) + def really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features( + features, + ): + return features + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt._get_output_column_names() + + assert ( + str(exp.value) + == "The default names for output features generated by the transformation function `really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features(features)` exceeds the maximum length of 63 characters. Please use the `alias` function to assign shorter names to the output features." + ) + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt._get_output_column_names() + + assert ( + str(exp.value) + == "The default names for output features generated by the transformation function `really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features(features)` exceeds the maximum length of 63 characters. Please use the `alias` function to assign shorter names to the output features." + ) From 93966e8b382f950175a110e1c42707582b3a3e7e Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 9 Dec 2024 17:18:30 +0100 Subject: [PATCH 3/4] fixing generation of output column names when udf is shared between transformation functions --- python/hsfs/hopsworks_udf.py | 13 +++++++++++-- python/hsfs/transformation_function.py | 10 +++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index a8b97bff3..ff3ab1507 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -147,7 +147,9 @@ class HopsworksUdf: transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function. dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied. dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped - feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view. + feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view. + output_column_names: `Optional[List[str]]`. The names of the output columns returned from the transformation function. + generate_output_col_names: `Optional[bool]`. Generate default output column names for the transformation function. Default's to True. """ # Mapping for converting python types to spark types - required for creating pandas UDF's. @@ -175,6 +177,7 @@ def __init__( dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, output_column_names: Optional[str] = None, + generate_output_col_names: bool = True, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( return_types @@ -241,6 +244,10 @@ def __init__( self._statistics: Optional[TransformationStatistics] = None + # Denote if the output feature names have to be generated. + # Set to `False` if the output column names are saved in the backend and the udf is constructed from it using `from_response_json` function or if user has specified the output feature names using the `alias`` function. + self._generate_output_col_name: bool = generate_output_col_names + @staticmethod def _validate_and_convert_drop_features( dropped_features: Union[str, List[str]], @@ -718,6 +725,7 @@ def alias(self, *args: str): f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings." ) + self._generate_output_col_name = False self.output_column_names = output_col_names return self @@ -915,7 +923,7 @@ def from_response_json( for arg_index in range(len(arg_list)) ] - hopsworks_udf = cls( + hopsworks_udf: HopsworksUdf = cls( func=function_source_code, return_types=output_types, name=function_name, @@ -928,6 +936,7 @@ def from_response_json( json_decamelized["execution_mode"] ), output_column_names=output_column_names, + generate_output_col_names=not output_column_names, # Do not generate output column names if they are retrieved from the back ) # Set transformation features if already set. diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index ae91aee66..b79c10fb2 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -78,11 +78,6 @@ def __init__( raise FeatureStoreException( "Please use the hopsworks_udf decorator when defining transformation functions." ) - if not id and hopsworks_udf.output_column_names: - # Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already. - # This happens for example if the same udf is used in a on-demand and a model-dependent transformation function. - hopsworks_udf._output_column_names = [] - hopsworks_udf = copy.copy(hopsworks_udf) self.__hopsworks_udf: HopsworksUdf = hopsworks_udf TransformationFunction._validate_transformation_type( @@ -90,6 +85,11 @@ def __init__( ) self.transformation_type = transformation_type + if self.__hopsworks_udf._generate_output_col_name: + # Reset output column names so that they would be regenerated. + # Handles the use case in which the same UDF is used to define both on-demand and model dependent transformations. + self.__hopsworks_udf._output_column_names = [] + def save(self) -> None: """Save a transformation function into the backend. From ac8181733fef8cdb44ea5b76b5b6e9dfc86103e1 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Wed, 11 Dec 2024 10:34:11 +0100 Subject: [PATCH 4/4] updating documentation --- docs/templates/api/hopsworks_udf.md | 6 +++++- python/auto_doc.py | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/templates/api/hopsworks_udf.md b/docs/templates/api/hopsworks_udf.md index cc155c42e..a408a946b 100644 --- a/docs/templates/api/hopsworks_udf.md +++ b/docs/templates/api/hopsworks_udf.md @@ -6,6 +6,10 @@ {{hopsworks_udf_properties}} +## Methods + +{{hopsworks_udf_methods}} + ## TransformationFeature -{{transformation_feature}} \ No newline at end of file +{{transformation_feature}} diff --git a/python/auto_doc.py b/python/auto_doc.py index 49129f52a..fe40e02f2 100644 --- a/python/auto_doc.py +++ b/python/auto_doc.py @@ -451,6 +451,15 @@ "hopsworks_udf_properties": keras_autodoc.get_properties( "hsfs.hopsworks_udf.HopsworksUdf" ), + "hopsworks_udf_methods": keras_autodoc.get_methods( + "hsfs.hopsworks_udf.HopsworksUdf", + exclude=[ + "update_return_type_one_hot", + "python_udf_wrapper", + "pandas_udf_wrapper", + "get_udf", + ], + ), "transformation_feature": ["hsfs.hopsworks_udf.TransformationFeature"], }, "api/transformation_statistics.md": {