diff --git a/src/bdc/steps/google_places.py b/src/bdc/steps/google_places.py index 96dea27..94dde24 100644 --- a/src/bdc/steps/google_places.py +++ b/src/bdc/steps/google_places.py @@ -228,7 +228,7 @@ def get_first_place_candidate(self, query, input_type) -> (dict, int): return None, 0 if not response["status"] == HTTPStatus.OK.name: - log.warning( + log.debug( f"Failed to fetch data. Status code: {response['status']}", ) return None, 0 diff --git a/src/bdc/steps/helpers/generate_hash_leads.py b/src/bdc/steps/helpers/generate_hash_leads.py index ea66c0f..4ab3ef0 100644 --- a/src/bdc/steps/helpers/generate_hash_leads.py +++ b/src/bdc/steps/helpers/generate_hash_leads.py @@ -7,7 +7,6 @@ import pandas as pd -from bdc.steps.step import Step from database import get_database from logger import get_logger @@ -56,12 +55,12 @@ def hash_check( if lead_hash in lookup_table: # If the hash exists in the lookup table, return the corresponding data - log.info(f"Hash {lead_hash} already exists in the lookup table.") + log.debug(f"Hash {lead_hash} already exists in the lookup table.") try: previous_data = lead_data[fields_tofill] return previous_data except KeyError as e: - log.info( + log.debug( f"Hash is present but data fields {fields_tofill} were not found." ) lookup_table[lead_hash] = lookup_table[lead_hash][:-1] + [ diff --git a/src/database/leads/local_repository.py b/src/database/leads/local_repository.py index af36e8d..3bade62 100644 --- a/src/database/leads/local_repository.py +++ b/src/database/leads/local_repository.py @@ -24,9 +24,15 @@ class LocalRepository(Repository): DF_OUTPUT = os.path.abspath( os.path.join(BASE_PATH, "../../data/leads_enriched.csv") ) + DF_HISTORICAL_OUTPUT = os.path.abspath( + os.path.join(BASE_PATH, "../../data/100k_historic_enriched.csv") + ) DF_PREPROCESSED_INPUT = os.path.abspath( os.path.join(BASE_PATH, "../../data/preprocessed_data_files/") ) + DF_PREDICTION_OUTPUT = os.path.abspath( + os.path.join(BASE_PATH, "../../data/leads_predicted_size.csv") + ) REVIEWS = os.path.abspath(os.path.join(BASE_PATH, "../../data/reviews/")) SNAPSHOTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/snapshots/")) GPT_RESULTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/gpt-results/")) @@ -51,6 +57,13 @@ def save_dataframe(self): self.df.to_csv(self.DF_OUTPUT, index=False) log.info(f"Saved enriched data locally to {self.DF_OUTPUT}") + def save_prediction(self, df): + """ + Save dataframe in df parameter in chosen output location + """ + df.to_csv(self.DF_PREDICTION_OUTPUT, index=False) + log.info(f"Saved prediction result locally to {self.DF_PREDICTION_OUTPUT}") + def insert_data(self, data): """ TODO: Insert new data into specified dataframe @@ -68,7 +81,7 @@ def save_review(self, review, place_id, force_refresh=False): json_file_path = os.path.join(self.REVIEWS, file_name) if os.path.exists(json_file_path): - log.info(f"Reviews for {place_id} already exist") + log.debug(f"Reviews for {place_id} already exist") return with open(json_file_path, "w", encoding="utf-8") as json_file: @@ -253,10 +266,17 @@ def save_classification_report(self, report, model_name: str): except Exception as e: log.error(f"Could not save report at {report_file_path}! Error: {str(e)}") - def load_preprocessed_data( - self, file_name: str = "historical_preprocessed_data.csv" - ): + def get_preprocessed_data_path(self, historical: bool = True): + file_name = ( + "historical_preprocessed_data.csv" + if historical + else "preprocessed_data.csv" + ) + file_path = os.path.join(self.DF_PREPROCESSED_INPUT, file_name) + return file_path + + def load_preprocessed_data(self, historical: bool = True): try: - return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name)) + return pd.read_csv(self.get_preprocessed_data_path(historical)) except FileNotFoundError: log.error("Error: Could not find input file for preprocessed data.") diff --git a/src/database/leads/repository.py b/src/database/leads/repository.py index a44cc5b..90c9a78 100644 --- a/src/database/leads/repository.py +++ b/src/database/leads/repository.py @@ -18,12 +18,21 @@ def DF_INPUT(self): pass @property + @abstractmethod def DF_OUTPUT(self): """ Define database path to store dataframe """ pass + @property + @abstractmethod + def DF_HISTORICAL_OUTPUT(self): + """ + Define database path to store historical enriched dataframe (used for preprocessing input) + """ + pass + @property @abstractmethod def REVIEWS(self): @@ -65,7 +74,9 @@ def set_dataframe(self, df): def get_input_path(self): return self.DF_INPUT - def get_output_path(self): + def get_enriched_data_path(self, historical=False): + if historical: + return self.DF_HISTORICAL_OUTPUT return self.DF_OUTPUT @abstractmethod @@ -82,6 +93,13 @@ def save_dataframe(self): """ pass + @abstractmethod + def save_prediction(self, df): + """ + Save dataframe in df parameter in chosen output location + """ + pass + @abstractmethod def insert_data(self, data): """ @@ -221,7 +239,14 @@ def save_classification_report(self, report, model_name: str): pass @abstractmethod - def load_preprocessed_data(self, file_name: str): + def get_preprocessed_data_path(self, historical: bool = True): + """ + Returns the path for a preprocessed data file (either historical or current) + """ + pass + + @abstractmethod + def load_preprocessed_data(self, historical: bool = True): """ Load the preprocessed data from the given file """ diff --git a/src/database/leads/s3_repository.py b/src/database/leads/s3_repository.py index dbdf620..912f5b3 100644 --- a/src/database/leads/s3_repository.py +++ b/src/database/leads/s3_repository.py @@ -43,6 +43,10 @@ class S3Repository(Repository): MODELS_BUCKET = "amos--models" DF_INPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv" DF_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv" + DF_HISTORICAL_OUTPUT = ( + f"s3://{EVENTS_BUCKET}/historical_data/100k_historic_enriched.csv" + ) + DF_PREDICTION_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/leads_predicted_size.csv" DF_PREPROCESSED_INPUT = f"s3://{FEATURES_BUCKET}/preprocessed_data_files/" REVIEWS = f"s3://{EVENTS_BUCKET}/reviews/" SNAPSHOTS = f"s3://{EVENTS_BUCKET}/snapshots/" @@ -131,6 +135,16 @@ def save_dataframe(self): self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key) log.info(f"Successfully saved enriched leads to s3://{bucket}/{obj_key}") + def save_prediction(self, df): + """ + Save dataframe in df parameter in chosen output location + """ + bucket, obj_key = decode_s3_url(self.DF_PREDICTION_OUTPUT) + csv_buffer = StringIO() + df.to_csv(csv_buffer, index=False) + self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key) + log.info(f"Successfully saved prediction result to s3://{bucket}/{obj_key}") + def _save_to_s3(self, data, bucket, key): s3.put_object( Bucket=bucket, @@ -374,15 +388,17 @@ def save_classification_report(self, report, model_name: str): except Exception as e: log.error(f"Could not save report for '{model_name}' to S3: {str(e)}") - def load_preprocessed_data( - self, file_name: str = "historical_preprocessed_data.csv" - ): + def get_preprocessed_data_path(self, historical: bool = True): + file_name = ( + "historical_preprocessed_data.csv" + if historical + else "preprocessed_data.csv" + ) file_path = self.DF_PREPROCESSED_INPUT + file_name - if not file_path.startswith("s3://"): - log.error( - "S3 location has to be defined like this: s3:///" - ) - return + return file_path + + def load_preprocessed_data(self, historical: bool = True): + file_path = self.get_preprocessed_data_path(historical) source = None remote_dataset = None diff --git a/src/demo/demos.py b/src/demo/demos.py index 2a57764..4d90acc 100644 --- a/src/demo/demos.py +++ b/src/demo/demos.py @@ -8,6 +8,7 @@ import re +import warnings import pandas as pd import xgboost as xgb @@ -33,6 +34,10 @@ from logger import get_logger from preprocessing import Preprocessing +warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning) +warnings.simplefilter(action="ignore", category=FutureWarning) + + log = get_logger() # Constants and configurations @@ -202,7 +207,7 @@ def pipeline_demo(): steps_info = "\n".join([str(step) for step in steps]) log.info( - f"Running Pipeline with steps:\n{steps_info}\ninput_location={get_database().get_input_path()}\noutput_location={get_database().get_output_path()}" + f"Running Pipeline with steps:\n{steps_info}\ninput_location={get_database().get_input_path()}\noutput_location={get_database().get_enriched_data_path()}" ) pipeline = Pipeline( @@ -224,10 +229,9 @@ def preprocessing_demo(): historical_bool = True else: historical_bool = False - S3_bool = DATABASE_TYPE == "S3" preprocessor = Preprocessing( - filter_null_data=filter_bool, historical_bool=historical_bool, S3_bool=S3_bool + filter_null_data=filter_bool, historical_bool=historical_bool ) preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path) @@ -239,10 +243,7 @@ def preprocessing_demo(): def predict_MerchantSize_on_lead_data_demo(): import os import sys - from io import BytesIO - import boto3 - import joblib import pandas as pd log.info( @@ -254,12 +255,13 @@ def predict_MerchantSize_on_lead_data_demo(): current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd() parent_dir = os.path.join(current_dir, "..") sys.path.append(parent_dir) + from database import get_database from preprocessing import Preprocessing + db = get_database() + log.info(f"Preprocessing the leads...") - preprocessor = Preprocessing( - filter_null_data=False, historical_bool=False, S3_bool=S3_bool - ) + preprocessor = Preprocessing(filter_null_data=False, historical_bool=False) preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path) df = preprocessor.implement_preprocessing_pipeline() preprocessor.save_preprocessed_data() @@ -267,67 +269,34 @@ def predict_MerchantSize_on_lead_data_demo(): ############################## adapting the preprocessing files ########################### log.info(f"Adapting the leads' preprocessed data for the ML model...") # load the data from the CSV files - historical_preprocessed_data = pd.read_csv( - "s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv" - ) - if S3_bool: - toBePredicted_preprocessed_data = pd.read_csv( - "s3://amos--data--events/leads/preprocessed_leads_data.csv" - ) - else: - path_components = preprocessor.data_path.split( - "\\" if "\\" in preprocessor.data_path else "/" - ) - path_components.pop() - path_components.append("preprocessed_data_files/leads_preprocessed_data.csv") - leads_preprocessed_data_path = "/".join(path_components) - toBePredicted_preprocessed_data = pd.read_csv(leads_preprocessed_data_path) + historical_preprocessed_data = db.load_preprocessed_data(historical=True) + unlabeled_preprocessed_data = db.load_preprocessed_data(historical=False) historical_columns_order = historical_preprocessed_data.columns missing_columns = set(historical_columns_order) - set( - toBePredicted_preprocessed_data.columns + unlabeled_preprocessed_data.columns ) - for column in missing_columns: - toBePredicted_preprocessed_data[column] = 0 + unlabeled_preprocessed_data[list(missing_columns)] = 0 - for column in toBePredicted_preprocessed_data.columns: + for column in unlabeled_preprocessed_data.columns: if column not in historical_columns_order: - toBePredicted_preprocessed_data = toBePredicted_preprocessed_data.drop( + unlabeled_preprocessed_data = unlabeled_preprocessed_data.drop( column, axis=1 ) # reorder columns - toBePredicted_preprocessed_data = toBePredicted_preprocessed_data[ - historical_columns_order - ] - if S3_bool: - toBePredicted_output_path_s3 = ( - "s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv" - ) - toBePredicted_preprocessed_data.to_csv( - toBePredicted_output_path_s3, - index=False, - ) - log.info( - f"Saving the adapted preprocessed data at {toBePredicted_output_path_s3}" - ) - else: - path_components = preprocessor.data_path.split( - "\\" if "\\" in preprocessor.data_path else "/" - ) - path_components.pop() - path_components.append("toBePredicted_preprocessed_data_updated.csv") - local_preprocessed_data_path = "/".join(path_components) - toBePredicted_preprocessed_data.to_csv( - local_preprocessed_data_path, index=False - ) - log.info( - f"Saving the adapted preprocessed data at {local_preprocessed_data_path}" - ) + unlabeled_preprocessed_data = unlabeled_preprocessed_data[historical_columns_order] + unlabeled_preprocessed_data.to_csv( + preprocessor.preprocessed_data_output_path, + index=False, + ) + log.info( + f"Saving the adapted preprocessed data at {preprocessor.preprocessed_data_output_path}" + ) # check if columns in both dataframe are in same order and same number - assert list(toBePredicted_preprocessed_data.columns) == list( + assert list(unlabeled_preprocessed_data.columns) == list( historical_preprocessed_data.columns ), "Column names are different" @@ -343,57 +312,30 @@ def predict_MerchantSize_on_lead_data_demo(): model_name = get_string_input( "Provide model file name in data/models local directory\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n" ) - # file_key = "models/lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model_updated.pkl" # adjust according to the desired model - model_name = model_name.replace(" ", "") + model_name = model_name.strip() xgb_bool = False - if model_name[:3].lower() == "xgb": + if model_name.lower().startswith("xgb"): xgb_bool = True - file_key = f"models/" + model_name - def check_classification_task(string): - match = re.search(r"\d+", string) + match = re.search(r"numclasses\((\d+)\)", string) if match: - last_number = int(match.group()) + last_number = int(match.group(1)) if last_number == 3: return True else: False - classification_task_3 = check_classification_task(file_key) + classification_task_3 = check_classification_task(model_name) try: - if S3_bool: - # create an S3 client - s3 = boto3.client("s3") - # download the file from S3 - response = s3.get_object(Bucket=bucket_name, Key=file_key) - model_content = response["Body"].read() - # load model - with BytesIO(model_content) as model_file: - model = joblib.load(model_file) - log.info(f"Loaded the model from S3 bucket sucessfully!") - else: - path_components = preprocessor.data_path.split( - "\\" if "\\" in preprocessor.data_path else "/" - ) - path_components.pop() - path_components.append(file_key) - model_local_path = "/".join(path_components) - model = joblib.load(model_local_path) - log.info(f"Loaded the model from the local path sucessfully!") + model = db.load_ml_model(model_name) + log.info(f"Loaded the model {model_name}!") except: log.error("No model found with the given name!") return - if S3_bool: - data_path = ( - "s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv" - ) - else: - data_path = local_preprocessed_data_path - - df = pd.read_csv(data_path) + df = pd.read_csv(preprocessor.preprocessed_data_output_path) input = df.drop("MerchantSizeByDPV", axis=1) if xgb_bool: input = xgb.DMatrix(input) @@ -405,29 +347,10 @@ def check_classification_task(string): size_mapping = {0: "XS", 1: "S", 2: "M", 3: "L", 4: "XL"} remapped_predictions = [size_mapping[prediction] for prediction in predictions] - if S3_bool: - enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv") - else: - enriched_data = pd.read_csv(preprocessor.data_path) + enriched_data = pd.read_csv(preprocessor.data_path) # first 5 columns: Last Name,First Name,Company / Account,Phone,Email, raw_data = enriched_data.iloc[:, :5] raw_data["PredictedMerchantSize"] = remapped_predictions - if S3_bool: - raw_data.to_csv( - "s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv", - index=True, - ) - log.info( - f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv" - ) - else: - path_components = preprocessor.data_path.split( - "\\" if "\\" in preprocessor.data_path else "/" - ) - path_components.pop() - path_components.append("predicted_MerchantSize_of_leads.csv") - output_path = "/".join(path_components) - raw_data.to_csv(output_path, index=True) - log.info(f"Saved the predicted Merchant Size of the leads at {output_path}") + db.save_prediction(raw_data) diff --git a/src/demo/pipeline_configs/demo_pipeline.json b/src/demo/pipeline_configs/demo_pipeline.json new file mode 100644 index 0000000..85df118 --- /dev/null +++ b/src/demo/pipeline_configs/demo_pipeline.json @@ -0,0 +1,39 @@ +{ + "description": "This config is optimized for demoing our software.", + "config": { + "steps": [ + { + "name": "HashGenerator", + "force_refresh": true + }, + { + "name": "AnalyzeEmails", + "force_refresh": true + }, + { + "name": "PreprocessPhonenumbers", + "force_refresh": true + }, + { + "name": "GooglePlaces", + "force_refresh": true + }, + { + "name": "GooglePlacesDetailed", + "force_refresh": true + }, + { + "name": "GPTReviewSentimentAnalyzer", + "force_refresh": true + }, + { + "name": "SmartReviewInsightsEnhancer", + "force_refresh": true + }, + { + "name": "RegionalAtlas", + "force_refresh": true + } + ] + } +} diff --git a/src/demo/pipeline_configs/demo_pipeline.json.license b/src/demo/pipeline_configs/demo_pipeline.json.license new file mode 100644 index 0000000..875941a --- /dev/null +++ b/src/demo/pipeline_configs/demo_pipeline.json.license @@ -0,0 +1,2 @@ +SPDX-License-Identifier: MIT +SPDX-FileCopyrightText: 2024 Simon Zimmermann diff --git a/src/logger/logger.py b/src/logger/logger.py index ccd9eb1..fe08296 100644 --- a/src/logger/logger.py +++ b/src/logger/logger.py @@ -54,7 +54,7 @@ def __init__(self, name, log_dir=None): # Create stream handler for logging to stdout (log all five levels) self.stdout_handler = logging.StreamHandler(sys.stdout) - self.stdout_handler.setLevel(logging.DEBUG) + self.stdout_handler.setLevel(logging.INFO) self.stdout_handler.setFormatter(StdOutFormatter()) self.enable_console_output() diff --git a/src/main.py b/src/main.py index 266f07f..33300ba 100644 --- a/src/main.py +++ b/src/main.py @@ -21,8 +21,8 @@ DEMOS = { "Base Data Collector": pipeline_demo, "Data preprocessing": preprocessing_demo, - "Estimated Value Predictor": evp_demo, - "Merchant Size Prediction": predict_MerchantSize_on_lead_data_demo, + "ML model training": evp_demo, + "Merchant Size Predictor": predict_MerchantSize_on_lead_data_demo, } PROMPT = "Choose demo:\n" @@ -31,10 +31,8 @@ if __name__ == "__main__": options = list(DEMOS.keys()) + [EXIT] while True: - try: - choice = get_multiple_choice(PROMPT, options) - if choice == EXIT: - break + choice = get_multiple_choice(PROMPT, options) + if choice == EXIT: + break + if choice != None: DEMOS[choice]() - except ValueError: - print("Invalid choice") diff --git a/src/preprocessing/preprocessing.py b/src/preprocessing/preprocessing.py index a278b2a..aa91935 100644 --- a/src/preprocessing/preprocessing.py +++ b/src/preprocessing/preprocessing.py @@ -29,57 +29,13 @@ class Preprocessing: - def __init__(self, filter_null_data=True, historical_bool=True, S3_bool=False): + def __init__(self, filter_null_data=True, historical_bool=True): data_repo = get_database() - self.data_path = data_repo.get_output_path() + self.data_path = data_repo.get_enriched_data_path(historical=historical_bool) self.preprocessed_df = None - self.prerocessed_data_output_path = None - if historical_bool and S3_bool: - self.data_path = ( - "s3://amos--data--events/historical_data/100k_historic_enriched.csv" - ) - self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/historical_preprocessed_data.csv" - elif historical_bool and not S3_bool: - # input path - input_path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - input_path_components.pop() - input_path_components.append("100k_historic_enriched.csv") - input_path = "/".join(input_path_components) - self.data_path = input_path - - # output path - path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - path_components.pop() - path_components.append( - "preprocessed_data_files/historical_preprocessed_data.csv" - ) - self.prerocessed_data_output_path = "/".join(path_components) - elif not historical_bool and S3_bool: - self.data_path = "s3://amos--data--events/leads/enriched.csv" - self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/leads_preprocessed_data.csv" - elif not historical_bool and not S3_bool: - # input path - input_path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - input_path_components.pop() - input_path_components.append("leads_enriched.csv") - input_path = "/".join(input_path_components) - self.data_path = input_path - - # output path - path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - path_components.pop() - path_components.append( - "preprocessed_data_files/leads_preprocessed_data.csv" - ) - self.prerocessed_data_output_path = "/".join(path_components) + self.preprocessed_data_output_path = data_repo.get_preprocessed_data_path( + historical_bool + ) self.filter_bool = filter_null_data # columns that would be added later after one-hot encoding each class @@ -291,9 +247,9 @@ def save_preprocessed_data(self): except ValueError as e: log.error(f"Failed to save the selected columns for preprocessing! {e}") try: - selected_df.to_csv(self.prerocessed_data_output_path, index=False) + selected_df.to_csv(self.preprocessed_data_output_path, index=False) log.info( - f"Preprocessed dataframe of shape {self.preprocessed_df.shape} is saved at {self.prerocessed_data_output_path}" + f"Preprocessed dataframe of shape {self.preprocessed_df.shape} is saved at {self.preprocessed_data_output_path}" ) except ValueError as e: log.error(f"Failed to save preprocessed data file! {e}")