Skip to content

Commit

Permalink
Merge pull request #1011 from shankari/go_back_to_confirmed_trips
Browse files Browse the repository at this point in the history
⏪ Go back to generating trip statistics from the confirmed trips
  • Loading branch information
shankari authored Jan 23, 2025
2 parents 33eda70 + 88bb35a commit 47ab8be
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
41 changes: 31 additions & 10 deletions emission/analysis/result/user_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
logging.debug(f"New profile: {user.getProfile()}")


def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
def get_and_store_pipeline_dependent_user_stats(user_id: str, trip_key: str) -> None:
"""
Aggregates and stores user statistics into the user profile.
Aggregates and stores pipeline dependent into the user profile.
These are statistics based on analysed data such as trips or labels.
:param user_id: The UUID of the user.
:type user_id: str
Expand All @@ -52,7 +53,7 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
:return: None
"""
try:
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")
logging.info(f"Starting get_and_store_pipeline_dependent_user_stats for user_id: {user_id}, trip_key: {trip_key}")

ts = esta.TimeSeries.get_time_series(user_id)
start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
Expand All @@ -68,24 +69,44 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(f"user_id type: {type(user_id)}")

last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call_ts": last_call_ts
}

logging.info(f"user_id type: {type(user_id)}")
update_user_profile(user_id, update_data)

logging.debug("User profile updated successfully.")

except Exception as e:
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")
logging.error(f"Error in get_and_store_dependent_user_stats for user_id {user_id}: {e}")

def get_and_store_pipeline_independent_user_stats(user_id: str) -> None:
"""
Aggregates and stores pipeline indepedent statistics into the user profile.
These are statistics based on raw data, such as the last call, last push
or last location received.
:param user_id: The UUID of the user.
:type user_id: str
:return: None
"""

try:
logging.info(f"Starting get_and_store_pipeline_independent_user_stats for user_id: {user_id}")
ts = esta.TimeSeries.get_time_series(user_id)
last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"last_call_ts": last_call_ts
}
update_user_profile(user_id, update_data)

except Exception as e:
logging.error(f"Error in get_and_store_independent_user_stats for user_id {user_id}: {e}")
15 changes: 11 additions & 4 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
try:
run_intake_pipeline_for_user(uuid, skip_if_no_new_data)
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
logging.info("*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10)
eaurs.get_and_store_pipeline_independent_user_stats(uuid)
esds.store_pipeline_time(uuid, 'STORE_PIPELINE_INDEPENDENT_USER_STATS',
time.time(), gsr.elapsed)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
Expand Down Expand Up @@ -206,3 +206,10 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_PIPELINE_DEPENDENT_USER_STATS',
time.time(), gsr.elapsed)
1 change: 0 additions & 1 deletion emission/tests/analysisTests/intakeTests/TestUserStat.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def setUp(self):
# Initialize the profile if it does not exist
edb.get_profile_db().insert_one({"user_id": self.testUUID})

#etc.runIntakePipeline(self.testUUID)
etc.runIntakePipeline(self.testUUID)
logging.debug("UUID = %s" % (self.testUUID))

Expand Down
3 changes: 2 additions & 1 deletion emission/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def runIntakePipeline(uuid):
eaue.populate_expectations(uuid)
eaum.create_confirmed_objects(uuid)
eapcc.create_composite_objects(uuid)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip")
eaurs.get_and_store_pipeline_independent_user_stats(uuid)


def configLogging():
Expand Down
2 changes: 0 additions & 2 deletions emission/tests/netTests/TestPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import emission.core.wrapper.localdate as ecwl
import emission.tests.common as etc
import emission.pipeline.intake_stage as epi
import emission.analysis.result.user_stat as eaurs

from emission.net.api import pipeline

Expand Down Expand Up @@ -39,7 +38,6 @@ def testNoAnalysisResults(self):
def testAnalysisResults(self):
self.assertEqual(pipeline.get_range(self.testUUID), (None, None))
epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False)
eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip")
pr = pipeline.get_range(self.testUUID)
self.assertAlmostEqual(pr[0], 1440688739.672)
self.assertAlmostEqual(pr[1], 1440729142.709)
Expand Down

0 comments on commit 47ab8be

Please sign in to comment.