Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demo #244

Merged
merged 10 commits into from
Feb 7, 2024
2 changes: 1 addition & 1 deletion src/bdc/steps/google_places.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def get_first_place_candidate(self, query, input_type) -> (dict, int):
return None, 0

if not response["status"] == HTTPStatus.OK.name:
log.warning(
log.debug(
f"Failed to fetch data. Status code: {response['status']}",
)
return None, 0
Expand Down
5 changes: 2 additions & 3 deletions src/bdc/steps/helpers/generate_hash_leads.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pandas as pd

from bdc.steps.step import Step
from database import get_database
from logger import get_logger

Expand Down Expand Up @@ -56,12 +55,12 @@ def hash_check(

if lead_hash in lookup_table:
# If the hash exists in the lookup table, return the corresponding data
log.info(f"Hash {lead_hash} already exists in the lookup table.")
log.debug(f"Hash {lead_hash} already exists in the lookup table.")
try:
previous_data = lead_data[fields_tofill]
return previous_data
except KeyError as e:
log.info(
log.debug(
f"Hash is present but data fields {fields_tofill} were not found."
)
lookup_table[lead_hash] = lookup_table[lead_hash][:-1] + [
Expand Down
30 changes: 25 additions & 5 deletions src/database/leads/local_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ class LocalRepository(Repository):
DF_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/leads_enriched.csv")
)
DF_HISTORICAL_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/100k_historic_enriched.csv")
)
DF_PREPROCESSED_INPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/preprocessed_data_files/")
)
DF_PREDICTION_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/leads_predicted_size.csv")
)
REVIEWS = os.path.abspath(os.path.join(BASE_PATH, "../../data/reviews/"))
SNAPSHOTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/snapshots/"))
GPT_RESULTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/gpt-results/"))
Expand All @@ -51,6 +57,13 @@ def save_dataframe(self):
self.df.to_csv(self.DF_OUTPUT, index=False)
log.info(f"Saved enriched data locally to {self.DF_OUTPUT}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
df.to_csv(self.DF_PREDICTION_OUTPUT, index=False)
log.info(f"Saved prediction result locally to {self.DF_PREDICTION_OUTPUT}")

def insert_data(self, data):
"""
TODO: Insert new data into specified dataframe
Expand All @@ -68,7 +81,7 @@ def save_review(self, review, place_id, force_refresh=False):
json_file_path = os.path.join(self.REVIEWS, file_name)

if os.path.exists(json_file_path):
log.info(f"Reviews for {place_id} already exist")
log.debug(f"Reviews for {place_id} already exist")
return

with open(json_file_path, "w", encoding="utf-8") as json_file:
Expand Down Expand Up @@ -253,10 +266,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report at {report_file_path}! Error: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = os.path.join(self.DF_PREPROCESSED_INPUT, file_name)
return file_path

def load_preprocessed_data(self, historical: bool = True):
try:
return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name))
return pd.read_csv(self.get_preprocessed_data_path(historical))
except FileNotFoundError:
log.error("Error: Could not find input file for preprocessed data.")
29 changes: 27 additions & 2 deletions src/database/leads/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@ def DF_INPUT(self):
pass

@property
@abstractmethod
def DF_OUTPUT(self):
"""
Define database path to store dataframe
"""
pass

@property
@abstractmethod
def DF_HISTORICAL_OUTPUT(self):
"""
Define database path to store historical enriched dataframe (used for preprocessing input)
"""
pass

@property
@abstractmethod
def REVIEWS(self):
Expand Down Expand Up @@ -65,7 +74,9 @@ def set_dataframe(self, df):
def get_input_path(self):
return self.DF_INPUT

def get_output_path(self):
def get_enriched_data_path(self, historical=False):
if historical:
return self.DF_HISTORICAL_OUTPUT
return self.DF_OUTPUT

@abstractmethod
Expand All @@ -82,6 +93,13 @@ def save_dataframe(self):
"""
pass

@abstractmethod
def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
pass

@abstractmethod
def insert_data(self, data):
"""
Expand Down Expand Up @@ -221,7 +239,14 @@ def save_classification_report(self, report, model_name: str):
pass

@abstractmethod
def load_preprocessed_data(self, file_name: str):
def get_preprocessed_data_path(self, historical: bool = True):
"""
Returns the path for a preprocessed data file (either historical or current)
"""
pass

@abstractmethod
def load_preprocessed_data(self, historical: bool = True):
"""
Load the preprocessed data from the given file
"""
Expand Down
32 changes: 24 additions & 8 deletions src/database/leads/s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class S3Repository(Repository):
MODELS_BUCKET = "amos--models"
DF_INPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_HISTORICAL_OUTPUT = (
f"s3://{EVENTS_BUCKET}/historical_data/100k_historic_enriched.csv"
)
DF_PREDICTION_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/leads_predicted_size.csv"
DF_PREPROCESSED_INPUT = f"s3://{FEATURES_BUCKET}/preprocessed_data_files/"
REVIEWS = f"s3://{EVENTS_BUCKET}/reviews/"
SNAPSHOTS = f"s3://{EVENTS_BUCKET}/snapshots/"
Expand Down Expand Up @@ -131,6 +135,16 @@ def save_dataframe(self):
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved enriched leads to s3://{bucket}/{obj_key}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
bucket, obj_key = decode_s3_url(self.DF_PREDICTION_OUTPUT)
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved prediction result to s3://{bucket}/{obj_key}")

def _save_to_s3(self, data, bucket, key):
s3.put_object(
Bucket=bucket,
Expand Down Expand Up @@ -374,15 +388,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report for '{model_name}' to S3: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = self.DF_PREPROCESSED_INPUT + file_name
if not file_path.startswith("s3://"):
log.error(
"S3 location has to be defined like this: s3://<BUCKET>/<OBJECT_KEY>"
)
return
return file_path

def load_preprocessed_data(self, historical: bool = True):
file_path = self.get_preprocessed_data_path(historical)

source = None
remote_dataset = None
Expand Down
Loading
Loading