Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1630] Model Dependent Transformation Functions creates feature names that are longer than 64 character causing logging feature group ingestion to fail #428

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/templates/api/hopsworks_udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

{{hopsworks_udf_properties}}

## Methods

{{hopsworks_udf_methods}}

## TransformationFeature

{{transformation_feature}}
{{transformation_feature}}
9 changes: 9 additions & 0 deletions python/auto_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,15 @@
"hopsworks_udf_properties": keras_autodoc.get_properties(
"hsfs.hopsworks_udf.HopsworksUdf"
),
"hopsworks_udf_methods": keras_autodoc.get_methods(
"hsfs.hopsworks_udf.HopsworksUdf",
exclude=[
"update_return_type_one_hot",
"python_udf_wrapper",
"pandas_udf_wrapper",
"get_udf",
],
),
"transformation_feature": ["hsfs.hopsworks_udf.TransformationFeature"],
},
"api/transformation_statistics.md": {
Expand Down
8 changes: 8 additions & 0 deletions python/hopsworks_common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG:
CA_CERTS = "ca_certs"


class FEATURES:
"""
Class that stores constants about a feature.
"""

MAX_LENGTH_NAME = 63


class KAFKA_SSL_CONFIG:
"""
Kafka SSL constant strings for configuration
Expand Down
8 changes: 5 additions & 3 deletions python/hsfs/core/feature_monitoring_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import humps
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.constants import FEATURES
from hsfs import util
from hsfs.core import (
feature_monitoring_config_engine,
Expand All @@ -34,7 +35,6 @@
from hsfs.core.job_schedule import JobSchedule


MAX_LENGTH_NAME = 63
MAX_LENGTH_DESCRIPTION = 2000


Expand Down Expand Up @@ -686,8 +686,10 @@ def name(self, name: str):
raise AttributeError("The name of a registered config is read-only.")
elif not isinstance(name, str):
raise TypeError("name must be of type str")
if len(name) > MAX_LENGTH_NAME:
raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.")
if len(name) > FEATURES.MAX_LENGTH_NAME:
raise ValueError(
"name must be less than {FEATURES.MAX_LENGTH_NAME} characters."
)
self._name = name

@property
Expand Down
81 changes: 72 additions & 9 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import humps
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.constants import FEATURES
from hsfs import engine, util
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
from hsfs.decorators import typechecked
Expand Down Expand Up @@ -146,7 +147,9 @@ class HopsworksUdf:
transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function.
dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied.
dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped
feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view.
feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view.
output_column_names: `Optional[List[str]]`. The names of the output columns returned from the transformation function.
generate_output_col_names: `Optional[bool]`. Generate default output column names for the transformation function. Default's to True.
"""

# Mapping for converting python types to spark types - required for creating pandas UDF's.
Expand All @@ -173,6 +176,8 @@ def __init__(
dropped_argument_names: Optional[List[str]] = None,
dropped_feature_names: Optional[List[str]] = None,
feature_name_prefix: Optional[str] = None,
output_column_names: Optional[str] = None,
generate_output_col_names: bool = True,
):
self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types(
return_types
Expand All @@ -191,6 +196,12 @@ def __init__(
if isinstance(func, Callable)
else func
)

# The parameter `output_column_names` is initialized lazily.
# It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function.
# Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF.
self._output_column_names: List[str] = []

if not transformation_features:
# New transformation function being declared so extract source code from function
self._transformation_features: List[TransformationFeature] = (
Expand All @@ -211,6 +222,7 @@ def __init__(
)
)
self._dropped_features = self._dropped_argument_names

else:
self._transformation_features = transformation_features
self._transformation_function_argument_names = (
Expand All @@ -222,14 +234,19 @@ def __init__(
if dropped_feature_names
else dropped_argument_names
)
self._output_column_names = (
output_column_names if output_column_names else []
)

self._formatted_function_source, self._module_imports = (
HopsworksUdf._format_source_code(self._function_source)
)

self._statistics: Optional[TransformationStatistics] = None

self._output_column_names: List[str] = []
# Denote if the output feature names have to be generated.
# Set to `False` if the output column names are saved in the backend and the udf is constructed from it using `from_response_json` function or if user has specified the output feature names using the `alias`` function.
self._generate_output_col_name: bool = generate_output_col_names

@staticmethod
def _validate_and_convert_drop_features(
Expand Down Expand Up @@ -691,6 +708,47 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
udf.dropped_features = updated_dropped_features
return udf

def alias(self, *args: str):
"""
Set the names of the transformed features output by the UDF.
"""
if len(args) == 1 and isinstance(args[0], list):
# If a single list is passed, use it directly
output_col_names = args[0]
else:
# Otherwise, use the individual arguments as a list
output_col_names = list(args)
if any(
not isinstance(output_col_name, str) for output_col_name in output_col_names
):
raise FeatureStoreException(
f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings."
)

self._generate_output_col_name = False
self.output_column_names = output_col_names

return self

def _validate_output_col_name(self, output_col_names):
if any(
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
for output_col_name in output_col_names
):
raise FeatureStoreException(
f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters."
)

if len(output_col_names) != len(set(output_col_names)):
raise FeatureStoreException(
f"Duplicate output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments names are unique."
)

if output_col_names and len(output_col_names) != len(self.return_types):
raise FeatureStoreException(
f"The number of output feature names provided does not match the number of features returned by the transformation function '{repr(self)}'. Pease provide exactly {len(self.return_types)} feature name(s) to match the output."
)

def update_return_type_one_hot(self):
self._return_types = [
self._return_types[0]
Expand Down Expand Up @@ -765,6 +823,7 @@ def to_dict(self) -> Dict[str, Any]:
"name": self.function_name,
"featureNamePrefix": self._feature_name_prefix,
"executionMode": self.execution_mode.value.upper(),
"outputColumnNames": self.output_column_names,
}

def json(self) -> str:
Expand Down Expand Up @@ -826,6 +885,12 @@ def from_response_json(
else None
)

output_column_names = (
[feature.strip() for feature in json_decamelized["output_column_names"]]
if json_decamelized.get("output_column_names", None)
else None
)

# Reconstructing statistics arguments.
arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code)

Expand Down Expand Up @@ -858,7 +923,7 @@ def from_response_json(
for arg_index in range(len(arg_list))
]

hopsworks_udf = cls(
hopsworks_udf: HopsworksUdf = cls(
func=function_source_code,
return_types=output_types,
name=function_name,
Expand All @@ -870,6 +935,8 @@ def from_response_json(
execution_mode=UDFExecutionMode.from_string(
json_decamelized["execution_mode"]
),
output_column_names=output_column_names,
generate_output_col_names=not output_column_names, # Do not generate output column names if they are retrieved from the back
)

# Set transformation features if already set.
Expand Down Expand Up @@ -998,12 +1065,8 @@ def transformation_statistics(
def output_column_names(self, output_col_names: Union[str, List[str]]) -> None:
if not isinstance(output_col_names, List):
output_col_names = [output_col_names]
if not output_col_names and len(output_col_names) != len(self.return_types):
raise FeatureStoreException(
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
)
else:
self._output_column_names = output_col_names
self._validate_output_col_name(output_col_names)
self._output_column_names = output_col_names

def __repr__(self):
return f'{self.function_name}({", ".join(self.transformation_features)})'
Loading
Loading