diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 5121aea3..b011b0c1 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -108,12 +108,16 @@ def tensorflow_pipeline( ).set_display_name("Ingest data") # lookup champion model - champion_model = lookup_model( - model_name=model_name, - project_location=project_location, - project_id=project_id, - fail_on_model_not_found=True, - ).set_display_name("Look up champion model") + champion_model = ( + lookup_model( + model_name=model_name, + project_location=project_location, + project_id=project_id, + fail_on_model_not_found=True, + ) + .set_display_name("Look up champion model") + .set_caching_options(False) + ) # batch predict from BigQuery to BigQuery bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" diff --git a/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py b/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py index c89a7880..5d164ae7 100644 --- a/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py +++ b/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py @@ -219,6 +219,9 @@ def _get_temp_dir(dirpath, task_id): parser.add_argument("--hparams", default={}, type=json.loads) args = parser.parse_args() +if args.model.startswith("gs://"): + args.model = Path("/gcs/" + args.model[5:]) + # merge dictionaries by overwriting default_model_params if provided in model_params hparams = {**DEFAULT_HPARAMS, **args.hparams} logging.info(f"Using model hyper-parameters: {hparams}") @@ -261,9 +264,9 @@ def _get_temp_dir(dirpath, task_id): logging.info("not chief node, exiting now") sys.exit() -os.makedirs(args.model, exist_ok=True) logging.info(f"Save model to: {args.model}") -tf_model.save(args.model, save_format="tf") +args.model.mkdir(parents=True) +tf_model.save(str(args.model), save_format="tf") logging.info(f"Save metrics to: {args.metrics}") eval_metrics = dict(zip(tf_model.metrics_names, tf_model.evaluate(test_ds))) @@ -281,11 +284,13 @@ def _get_temp_dir(dirpath, task_id): json.dump(metrics, fp) # Persist URIs of training file(s) for model monitoring in batch predictions -path = Path(args.model) / TRAINING_DATASET_INFO +# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 +# for the expected schema. +path = args.model / TRAINING_DATASET_INFO training_dataset_for_monitoring = { "gcsSource": {"uris": [args.train_data]}, "dataFormat": "csv", - "targetField": hparams["label"], + "targetField": label, } logging.info(f"Save training dataset info for model monitoring: {path}") logging.info(f"Training dataset: {training_dataset_for_monitoring}") diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index cb99e091..2767d210 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -215,6 +215,7 @@ def tensorflow_pipeline( fail_on_model_not_found=False, ) .set_display_name("Lookup past model") + .set_caching_options(False) .outputs["model_resource_name"] ) diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 0bd1ec91..ed474eb8 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -102,12 +102,16 @@ def xgboost_pipeline( ).set_display_name("Ingest data") # lookup champion model - champion_model = lookup_model( - model_name=model_name, - project_location=project_location, - project_id=project_id, - fail_on_model_not_found=True, - ).set_display_name("Look up champion model") + champion_model = ( + lookup_model( + model_name=model_name, + project_location=project_location, + project_id=project_id, + fail_on_model_not_found=True, + ) + .set_display_name("Look up champion model") + .set_caching_options(False) + ) # batch predict from BigQuery to BigQuery bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" diff --git a/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py b/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py index 31d95247..71cc65b5 100644 --- a/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py +++ b/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py @@ -1,4 +1,6 @@ import argparse +from pathlib import Path + import joblib import json import os @@ -14,7 +16,9 @@ logging.basicConfig(level=logging.DEBUG) - +# used for monitoring during prediction time +TRAINING_DATASET_INFO = "training_dataset.json" +# numeric/categorical features in Chicago trips dataset to be preprocessed NUM_COLS = ["dayofweek", "hourofday", "trip_distance", "trip_miles", "trip_seconds"] ORD_COLS = ["company"] OHE_COLS = ["payment_type"] @@ -39,6 +43,9 @@ def indices_in_list(elements: list, base_list: list) -> list: parser.add_argument("--hparams", default={}, type=json.loads) args = parser.parse_args() +if args.model.startswith("gs://"): + args.model = Path("/gcs/" + args.model[5:]) + logging.info("Read csv files into dataframes") df_train = pd.read_csv(args.train_data) df_valid = pd.read_csv(args.valid_data) @@ -111,15 +118,25 @@ def indices_in_list(elements: list, base_list: list) -> list: "rootMeanSquaredLogError": np.sqrt(metrics.mean_squared_log_error(y_test, y_pred)), } -try: - model_path = args.model.replace("gs://", "/gcs/") - logging.info(f"Save model to: {model_path}") - os.makedirs(model_path, exist_ok=True) - joblib.dump(pipeline, model_path + "model.joblib") -except Exception as e: - print(e) - raise e +logging.info(f"Save model to: {args.model}") +args.model.mkdir(parents=True) +joblib.dump(pipeline, str(args.model / "model.joblib")) logging.info(f"Metrics: {metrics}") with open(args.metrics, "w") as fp: json.dump(metrics, fp) + +# Persist URIs of training file(s) for model monitoring in batch predictions +# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 +# for the expected schema. +path = args.model / TRAINING_DATASET_INFO +training_dataset_for_monitoring = { + "gcsSource": {"uris": [args.train_data]}, + "dataFormat": "csv", + "targetField": label, +} +logging.info(f"Training dataset info: {training_dataset_for_monitoring}") + +with open(path, "w") as fp: + logging.info(f"Save training dataset info for model monitoring: {path}") + json.dump(training_dataset_for_monitoring, fp) diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 1db2023c..27cb47ed 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -212,6 +212,7 @@ def xgboost_pipeline( fail_on_model_not_found=False, ) .set_display_name("Lookup past model") + .set_caching_options(False) .outputs["model_resource_name"] )