From 0ac3e41ff767a20fde7270d68fcd4fdb16f57986 Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 12 Jun 2024 17:05:21 -0500 Subject: [PATCH 1/4] initial commit to PR --- ai_ta_backend/beam/convo_analysis.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 ai_ta_backend/beam/convo_analysis.py diff --git a/ai_ta_backend/beam/convo_analysis.py b/ai_ta_backend/beam/convo_analysis.py new file mode 100644 index 00000000..962a978c --- /dev/null +++ b/ai_ta_backend/beam/convo_analysis.py @@ -0,0 +1,3 @@ +# script to analyze the conversation data + +# project owner, email, total conversations, total documents \ No newline at end of file From 8eea9376fe15c57e3ffd84be7af54c228fafcb93 Mon Sep 17 00:00:00 2001 From: star-nox Date: Mon, 24 Jun 2024 16:36:02 -0500 Subject: [PATCH 2/4] fetching and uploading course-level data to SQL --- ai_ta_backend/beam/convo_analysis.py | 3 - ai_ta_backend/beam/usage_analysis.py | 108 +++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) delete mode 100644 ai_ta_backend/beam/convo_analysis.py create mode 100644 ai_ta_backend/beam/usage_analysis.py diff --git a/ai_ta_backend/beam/convo_analysis.py b/ai_ta_backend/beam/convo_analysis.py deleted file mode 100644 index 962a978c..00000000 --- a/ai_ta_backend/beam/convo_analysis.py +++ /dev/null @@ -1,3 +0,0 @@ -# script to analyze the conversation data - -# project owner, email, total conversations, total documents \ No newline at end of file diff --git a/ai_ta_backend/beam/usage_analysis.py b/ai_ta_backend/beam/usage_analysis.py new file mode 100644 index 00000000..86ffaa11 --- /dev/null +++ b/ai_ta_backend/beam/usage_analysis.py @@ -0,0 +1,108 @@ +""" +To deploy: beam deploy usage_analysis.py +Use CAII gmail to auth. +""" + +import os +import json +import supabase +import requests +from datetime import datetime +from typing import Any, Dict, List +import beam +from beam import App, QueueDepthAutoscaler, Runtime +from posthog import Posthog + +requirements = [ + "posthog==3.1.0", + "supabase==2.0.2", +] + +app = App( + "usage_analysis", + runtime=Runtime( + cpu=1, + memory="2Gi", + image=beam.Image( + python_version="python3.10", + python_packages=requirements, + ), + )) + +def loader(): + """ + The loader function will run once for each worker that starts up. https://docs.beam.cloud/deployment/loaders + """ + posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') + supabase_client = supabase.create_client(os.environ['SUPABASE_URL'], os.environ['SUPABASE_API_KEY']) # type: ignore + + return posthog, supabase_client + +autoscaler = QueueDepthAutoscaler(max_tasks_per_replica=2, max_replicas=3) + + +@app.task_queue( + workers=1, + max_pending_tasks=15_000, + max_retries=3, + timeout=-1, + loader=loader, + autoscaler=autoscaler) +def usage_analysis(**inputs: Dict[str, Any]): + """ + This function queries supabase for the latest usage data and sends it to Posthog. + Details to report: + Args: + course_name (str): The name of the course. + """ + course_name = str = inputs.get('course_name', '') + print("Running usage analysis for course:", course_name) + + posthog_client, supabase_client = inputs['context'] + + if course_name: + # single course + print("Single course") + metrics = get_usage_data(course_name, supabase_client) + print("Metrics:", metrics) + + # upload to Supabase + response = supabase_client.table('usage_metrics').insert(metrics).execute() + print("Response:", response) + + else: + # all courses + print("All courses") + + return "Success" + +def get_usage_data(course_name, supabase_client) -> Dict[str, Any]: + """ + Get usage data from Supabase. + """ + # get total documents + total_docs = supabase_client.table('documents').select('id', count='exact').eq('course_name', course_name).execute() + print("Total docs:", total_docs.count) + + # get total conversations + total_conversations = supabase_client.table('llm-convo-monitor').select('id', count='exact').eq('course_name', course_name).execute() + print("Total conversations:", total_conversations.count) + + # get most recent conversation + most_recent_conversation = supabase_client.table('llm-convo-monitor').select('id', 'created_at').eq('course_name', course_name).order('created_at', desc=True).limit(1).execute() + print("Most recent conversation:", most_recent_conversation) + + # extract datetime + print(type(most_recent_conversation.data[0]['created_at'])) + dt_str = most_recent_conversation.data[0]['created_at'] + dt_object = datetime.fromisoformat(dt_str) + + metrics = { + "course_name": course_name, + "total_docs": total_docs.count, + "total_convos": total_conversations.count, + "most_recent_convo": dt_str, + } + + return metrics + From d032b7f333e21ed7fa7676eba2d6796066459f20 Mon Sep 17 00:00:00 2001 From: Kastan Day Date: Wed, 26 Jun 2024 15:26:08 -0700 Subject: [PATCH 3/4] convert beam to cron job --- ai_ta_backend/beam/usage_analysis.py | 140 ++++++++++++++------------- 1 file changed, 73 insertions(+), 67 deletions(-) diff --git a/ai_ta_backend/beam/usage_analysis.py b/ai_ta_backend/beam/usage_analysis.py index 86ffaa11..c52c24b2 100644 --- a/ai_ta_backend/beam/usage_analysis.py +++ b/ai_ta_backend/beam/usage_analysis.py @@ -1,15 +1,17 @@ """ To deploy: beam deploy usage_analysis.py +For local testing w/ hot reload: beam serve usage_analysis.py Use CAII gmail to auth. """ -import os import json -import supabase -import requests +import os from datetime import datetime from typing import Any, Dict, List + import beam +import requests +import supabase from beam import App, QueueDepthAutoscaler, Runtime from posthog import Posthog @@ -18,91 +20,95 @@ "supabase==2.0.2", ] -app = App( - "usage_analysis", - runtime=Runtime( - cpu=1, - memory="2Gi", - image=beam.Image( - python_version="python3.10", - python_packages=requirements, - ), - )) +app = App("usage_analysis", + runtime=Runtime( + cpu=1, + memory="2Gi", + image=beam.Image( + python_version="python3.10", + python_packages=requirements, + ), + )) + def loader(): - """ + """ The loader function will run once for each worker that starts up. https://docs.beam.cloud/deployment/loaders """ - posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') - supabase_client = supabase.create_client(os.environ['SUPABASE_URL'], os.environ['SUPABASE_API_KEY']) # type: ignore + posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') + supabase_client = supabase.create_client(os.environ['SUPABASE_URL'], os.environ['SUPABASE_API_KEY']) # type: ignore + + return posthog, supabase_client - return posthog, supabase_client autoscaler = QueueDepthAutoscaler(max_tasks_per_replica=2, max_replicas=3) -@app.task_queue( - workers=1, - max_pending_tasks=15_000, - max_retries=3, - timeout=-1, - loader=loader, - autoscaler=autoscaler) +@app.schedule(when="2 2 * * *") # 2:02 AM daily def usage_analysis(**inputs: Dict[str, Any]): - """ + """ This function queries supabase for the latest usage data and sends it to Posthog. Details to report: Args: course_name (str): The name of the course. """ - course_name = str = inputs.get('course_name', '') - print("Running usage analysis for course:", course_name) - - posthog_client, supabase_client = inputs['context'] - - if course_name: - # single course - print("Single course") - metrics = get_usage_data(course_name, supabase_client) - print("Metrics:", metrics) - - # upload to Supabase - response = supabase_client.table('usage_metrics').insert(metrics).execute() - print("Response:", response) + # course_name = str = inputs.get('course_name', '') + print("Running usage analysis for course:") - else: - # all courses - print("All courses") + posthog, supabase_client = inputs['context'] - return "Success" + posthog.capture('distinct_id_of_the_user', event='usage_analysis_ran', properties={ + 'hi_josh': "👋", + }) -def get_usage_data(course_name, supabase_client) -> Dict[str, Any]: - """ - Get usage data from Supabase. - """ - # get total documents - total_docs = supabase_client.table('documents').select('id', count='exact').eq('course_name', course_name).execute() - print("Total docs:", total_docs.count) + # if course_name: + # # single course + # print("Single course") + # metrics = get_usage_data(course_name, supabase_client) + # print("Metrics:", metrics) - # get total conversations - total_conversations = supabase_client.table('llm-convo-monitor').select('id', count='exact').eq('course_name', course_name).execute() - print("Total conversations:", total_conversations.count) + # posthog.capture('usage_metrics', metrics) - # get most recent conversation - most_recent_conversation = supabase_client.table('llm-convo-monitor').select('id', 'created_at').eq('course_name', course_name).order('created_at', desc=True).limit(1).execute() - print("Most recent conversation:", most_recent_conversation) + # # upload to Supabase + # response = supabase_client.table('usage_metrics').insert(metrics).execute() + # print("Response:", response) - # extract datetime - print(type(most_recent_conversation.data[0]['created_at'])) - dt_str = most_recent_conversation.data[0]['created_at'] - dt_object = datetime.fromisoformat(dt_str) + # else: + # # all courses + # print("All courses") - metrics = { - "course_name": course_name, - "total_docs": total_docs.count, - "total_convos": total_conversations.count, - "most_recent_convo": dt_str, - } + return "Success" - return metrics +def get_usage_data(course_name, supabase_client) -> Dict[str, Any]: + """ + Get usage data from Supabase. + """ + # get total documents + total_docs = supabase_client.table('documents').select('id', count='exact').eq('course_name', course_name).execute() + print("Total docs:", total_docs.count) + + # get total conversations + total_conversations = supabase_client.table('llm-convo-monitor').select('id', + count='exact').eq('course_name', + course_name).execute() + print("Total conversations:", total_conversations.count) + + # get most recent conversation + most_recent_conversation = supabase_client.table('llm-convo-monitor').select('id', 'created_at').eq( + 'course_name', course_name).order('created_at', desc=True).limit(1).execute() + print("Most recent conversation:", most_recent_conversation) + + # extract datetime + print(type(most_recent_conversation.data[0]['created_at'])) + dt_str = most_recent_conversation.data[0]['created_at'] + dt_object = datetime.fromisoformat(dt_str) + + metrics = { + "course_name": course_name, + "total_docs": total_docs.count, + "total_convos": total_conversations.count, + "most_recent_convo": dt_str, + } + + return metrics From 2664cd8083b89ca42d096a29e6dfdaac51685058 Mon Sep 17 00:00:00 2001 From: Kastan Day Date: Wed, 26 Jun 2024 15:47:27 -0700 Subject: [PATCH 4/4] Working prototype --- ai_ta_backend/beam/usage_analysis.py | 29 +++++++--------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/ai_ta_backend/beam/usage_analysis.py b/ai_ta_backend/beam/usage_analysis.py index c52c24b2..846a2d24 100644 --- a/ai_ta_backend/beam/usage_analysis.py +++ b/ai_ta_backend/beam/usage_analysis.py @@ -4,15 +4,13 @@ Use CAII gmail to auth. """ -import json import os from datetime import datetime from typing import Any, Dict, List import beam -import requests import supabase -from beam import App, QueueDepthAutoscaler, Runtime +from beam import App, Runtime from posthog import Posthog requirements = [ @@ -31,19 +29,6 @@ )) -def loader(): - """ - The loader function will run once for each worker that starts up. https://docs.beam.cloud/deployment/loaders - """ - posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') - supabase_client = supabase.create_client(os.environ['SUPABASE_URL'], os.environ['SUPABASE_API_KEY']) # type: ignore - - return posthog, supabase_client - - -autoscaler = QueueDepthAutoscaler(max_tasks_per_replica=2, max_replicas=3) - - @app.schedule(when="2 2 * * *") # 2:02 AM daily def usage_analysis(**inputs: Dict[str, Any]): """ @@ -52,10 +37,10 @@ def usage_analysis(**inputs: Dict[str, Any]): Args: course_name (str): The name of the course. """ - # course_name = str = inputs.get('course_name', '') - print("Running usage analysis for course:") - - posthog, supabase_client = inputs['context'] + print("Running usage analysis for course") + posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') + supabase_client = supabase.create_client(os.environ['SUPABASE_URL'], os.environ['SUPABASE_API_KEY']) # type: ignore + print("Instantiated Posthog and Supabase clients") posthog.capture('distinct_id_of_the_user', event='usage_analysis_ran', properties={ 'hi_josh': "👋", @@ -64,13 +49,13 @@ def usage_analysis(**inputs: Dict[str, Any]): # if course_name: # # single course # print("Single course") - # metrics = get_usage_data(course_name, supabase_client) + # metrics = get_usage_data(course_name, supabase) # print("Metrics:", metrics) # posthog.capture('usage_metrics', metrics) # # upload to Supabase - # response = supabase_client.table('usage_metrics').insert(metrics).execute() + # response = supabase.table('usage_metrics').insert(metrics).execute() # print("Response:", response) # else: