diff --git a/python/hopsworks_common/constants.py b/python/hopsworks_common/constants.py index b98ed8497..20ec72e77 100644 --- a/python/hopsworks_common/constants.py +++ b/python/hopsworks_common/constants.py @@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG: CA_CERTS = "ca_certs" +class FEATURES: + """ + Class that stores constants about a feature. + """ + + MAX_LENGTH_NAME = 63 + + class KAFKA_SSL_CONFIG: """ Kafka SSL constant strings for configuration diff --git a/python/hsfs/core/feature_monitoring_config.py b/python/hsfs/core/feature_monitoring_config.py index 5b7f9fb71..5668dcaf3 100644 --- a/python/hsfs/core/feature_monitoring_config.py +++ b/python/hsfs/core/feature_monitoring_config.py @@ -22,6 +22,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import ( feature_monitoring_config_engine, @@ -34,7 +35,6 @@ from hsfs.core.job_schedule import JobSchedule -MAX_LENGTH_NAME = 63 MAX_LENGTH_DESCRIPTION = 2000 @@ -686,8 +686,10 @@ def name(self, name: str): raise AttributeError("The name of a registered config is read-only.") elif not isinstance(name, str): raise TypeError("name must be of type str") - if len(name) > MAX_LENGTH_NAME: - raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.") + if len(name) > FEATURES.MAX_LENGTH_NAME: + raise ValueError( + "name must be less than {FEATURES.MAX_LENGTH_NAME} characters." + ) self._name = name @property diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 057c16b88..ce416c374 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -27,6 +27,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import engine, util from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics from hsfs.decorators import typechecked @@ -173,6 +174,7 @@ def __init__( dropped_argument_names: Optional[List[str]] = None, dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, + output_column_names: Optional[str] = None, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( return_types @@ -191,6 +193,12 @@ def __init__( if isinstance(func, Callable) else func ) + + # The parameter `output_column_names` is initialized lazily. + # It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function. + # Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF. + self._output_column_names: List[str] = [] + if not transformation_features: # New transformation function being declared so extract source code from function self._transformation_features: List[TransformationFeature] = ( @@ -211,6 +219,7 @@ def __init__( ) ) self._dropped_features = self._dropped_argument_names + else: self._transformation_features = transformation_features self._transformation_function_argument_names = ( @@ -222,6 +231,9 @@ def __init__( if dropped_feature_names else dropped_argument_names ) + self._output_column_names = ( + output_column_names if output_column_names else [] + ) self._formatted_function_source, self._module_imports = ( HopsworksUdf._format_source_code(self._function_source) @@ -229,8 +241,6 @@ def __init__( self._statistics: Optional[TransformationStatistics] = None - self._output_column_names: List[str] = [] - @staticmethod def _validate_and_convert_drop_features( dropped_features: Union[str, List[str]], @@ -691,6 +701,41 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": udf.dropped_features = updated_dropped_features return udf + def alias(self, *args: str): + """ + Set the names of the transformed features output by the UDF. + """ + if len(args) == 1 and isinstance(args[0], list): + # If a single list is passed, use it directly + output_col_names = args[0] + else: + # Otherwise, use the individual arguments as a list + output_col_names = list(args) + if any( + not isinstance(output_col_name, str) for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings." + ) + + self.output_column_names = output_col_names + + return self + + def _validate_output_col_name(self, output_col_names): + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters." + ) + + if output_col_names and len(output_col_names) != len(self.return_types): + raise FeatureStoreException( + f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." + ) + def update_return_type_one_hot(self): self._return_types = [ self._return_types[0] @@ -765,6 +810,7 @@ def to_dict(self) -> Dict[str, Any]: "name": self.function_name, "featureNamePrefix": self._feature_name_prefix, "executionMode": self.execution_mode.value.upper(), + "outputColumnNames": self.output_column_names, } def json(self) -> str: @@ -826,6 +872,12 @@ def from_response_json( else None ) + output_column_names = ( + [feature.strip() for feature in json_decamelized["output_column_names"]] + if json_decamelized.get("output_column_names", None) + else None + ) + # Reconstructing statistics arguments. arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code) @@ -870,6 +922,7 @@ def from_response_json( execution_mode=UDFExecutionMode.from_string( json_decamelized["execution_mode"] ), + output_column_names=output_column_names, ) # Set transformation features if already set. @@ -998,12 +1051,8 @@ def transformation_statistics( def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: if not isinstance(output_col_names, List): output_col_names = [output_col_names] - if not output_col_names and len(output_col_names) != len(self.return_types): - raise FeatureStoreException( - f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." - ) - else: - self._output_column_names = output_col_names + self._validate_output_col_name(output_col_names) + self._output_column_names = output_col_names def __repr__(self): return f'{self.function_name}({", ".join(self.transformation_features)})' diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index d39097b9a..ae91aee66 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -21,6 +21,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import transformation_function_engine from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics @@ -77,8 +78,13 @@ def __init__( raise FeatureStoreException( "Please use the hopsworks_udf decorator when defining transformation functions." ) + if not id and hopsworks_udf.output_column_names: + # Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already. + # This happens for example if the same udf is used in a on-demand and a model-dependent transformation function. + hopsworks_udf._output_column_names = [] + hopsworks_udf = copy.copy(hopsworks_udf) - self._hopsworks_udf: HopsworksUdf = hopsworks_udf + self.__hopsworks_udf: HopsworksUdf = hopsworks_udf TransformationFunction._validate_transformation_type( transformation_type=transformation_type, hopsworks_udf=hopsworks_udf ) @@ -152,11 +158,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction: """ # Deep copy so that the same transformation function can be used to create multiple new transformation function with different features. transformation = copy.deepcopy(self) - transformation._hopsworks_udf = transformation._hopsworks_udf(*features) - # Regenerate output column names when setting new transformation features. - transformation._hopsworks_udf.output_column_names = ( - transformation._get_output_column_names() - ) + transformation.__hopsworks_udf = transformation.__hopsworks_udf(*features) + return transformation @classmethod @@ -227,9 +230,17 @@ def to_dict(self) -> Dict[str, Any]: "id": self._id, "version": self._version, "featurestoreId": self._featurestore_id, - "hopsworksUdf": self._hopsworks_udf.to_dict(), + "hopsworksUdf": self.hopsworks_udf.to_dict(), } + def alias(self, *args: str): + """ + Set the names of the transformed features output by the transformation function. + """ + self.__hopsworks_udf.alias(*args) + + return self + def _get_output_column_names(self) -> str: """ Function that generates feature names for the transformed features @@ -240,33 +251,43 @@ def _get_output_column_names(self) -> str: # If function name matches the name of an input feature and the transformation function only returns one output feature then # then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten. if ( - len(self._hopsworks_udf.return_types) == 1 + len(self.__hopsworks_udf.return_types) == 1 and any( [ - self.hopsworks_udf.function_name + self.__hopsworks_udf.function_name == transformation_feature.feature_name - for transformation_feature in self.hopsworks_udf._transformation_features + for transformation_feature in self.__hopsworks_udf._transformation_features ] ) and ( - not self.hopsworks_udf.dropped_features - or self.hopsworks_udf.function_name - not in self.hopsworks_udf.dropped_features + not self.__hopsworks_udf.dropped_features + or self.__hopsworks_udf.function_name + not in self.__hopsworks_udf.dropped_features ) ): - return [self.hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] if self.transformation_type == TransformationType.MODEL_DEPENDENT: - _BASE_COLUMN_NAME = f'{self._hopsworks_udf.function_name}_{"_".join(self._hopsworks_udf.transformation_features)}_' - if len(self._hopsworks_udf.return_types) > 1: - return [ + _BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_' + if len(self.__hopsworks_udf.return_types) > 1: + output_col_names = [ f"{_BASE_COLUMN_NAME}{i}" - for i in range(len(self._hopsworks_udf.return_types)) + for i in range(len(self.__hopsworks_udf.return_types)) ] else: - return [f"{_BASE_COLUMN_NAME}"] + output_col_names = [f"{_BASE_COLUMN_NAME}"] elif self.transformation_type == TransformationType.ON_DEMAND: - return [self._hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] + + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"The default names for output features generated by the transformation function `{repr(self.__hopsworks_udf)}` exceeds the maximum length of {FEATURES.MAX_LENGTH_NAME} characters. Please use the `alias` function to assign shorter names to the output features." + ) + + return output_col_names @staticmethod def _validate_transformation_type( @@ -311,7 +332,10 @@ def version(self, version: int) -> None: @property def hopsworks_udf(self) -> HopsworksUdf: """Meta data class for the user defined transformation function.""" - return self._hopsworks_udf + # Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class. + if self.transformation_type and not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf @property def transformation_type(self) -> TransformationType: @@ -321,41 +345,39 @@ def transformation_type(self) -> TransformationType: @transformation_type.setter def transformation_type(self, transformation_type) -> None: self._transformation_type = transformation_type - # Generate output column names when setting transformation type - self._hopsworks_udf.output_column_names = self._get_output_column_names() @property def transformation_statistics( self, ) -> Optional[TransformationStatistics]: """Feature statistics required for the defined UDF""" - return self.hopsworks_udf.transformation_statistics + return self.__hopsworks_udf.transformation_statistics @transformation_statistics.setter def transformation_statistics( self, statistics: List[FeatureDescriptiveStatistics] ) -> None: - self.hopsworks_udf.transformation_statistics = statistics + self.__hopsworks_udf.transformation_statistics = statistics # Generate output column names for one-hot encoder after transformation statistics is set. # This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics. - if self.hopsworks_udf.function_name == "one_hot_encoder": - self._hopsworks_udf.output_column_names = self._get_output_column_names() + if self.__hopsworks_udf.function_name == "one_hot_encoder": + self.__hopsworks_udf.output_column_names = self._get_output_column_names() @property def output_column_names(self) -> List[str]: """Names of the output columns generated by the transformation functions""" - if self._hopsworks_udf.function_name == "one_hot_encoder" and len( - self._hopsworks_udf.output_column_names - ) != len(self._hopsworks_udf.return_types): - self._hopsworks_udf.output_column_names = self._get_output_column_names() - return self._hopsworks_udf.output_column_names + if ( + self.__hopsworks_udf.function_name == "one_hot_encoder" + and len(self.__hopsworks_udf.output_column_names) + != len(self.__hopsworks_udf.return_types) + ) or not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf.output_column_names def __repr__(self): if self.transformation_type == TransformationType.MODEL_DEPENDENT: - return ( - f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}" - ) + return f"Model-Dependent Transformation Function : {repr(self.__hopsworks_udf)}" elif self.transformation_type == TransformationType.ON_DEMAND: - return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}" + return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}" else: - return f"Transformation Function : {repr(self.hopsworks_udf)}" + return f"Transformation Function : {repr(self.__hopsworks_udf)}" diff --git a/python/tests/test_transformation_function.py b/python/tests/test_transformation_function.py index 305fe04da..fb0886d61 100644 --- a/python/tests/test_transformation_function.py +++ b/python/tests/test_transformation_function.py @@ -237,7 +237,12 @@ def test2(col1): transformation_type=TransformationType.MODEL_DEPENDENT, ) - assert tf.hopsworks_udf == test2 + # Creating dict representation of udf. + udf_json = test2.to_dict() + # Adding output column names to dict for testing since it would be generated when UDF is accessed out the transformation function. + udf_json["outputColumnNames"] = ["test2_col1_"] + + assert tf.hopsworks_udf.to_dict() == udf_json def test_generate_output_column_names_one_argument_one_output_type(self): @udf(int)