Skip to content

Commit

Permalink
feat: Added time for data dump + fix 500 error!
Browse files Browse the repository at this point in the history
+ createdAt and updatedAt for hivemind data dump
+ fixed the 500 error while the question is being answered
+ removed the old unused codes.
  • Loading branch information
amindadgar committed Nov 1, 2024
1 parent abadf92 commit 6cb7d41
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 122 deletions.
30 changes: 17 additions & 13 deletions routers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ async def ask(payload: RequestPayload):
@router.get("/status")
async def status(task_id: str):
task = AsyncResult(task_id)

# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

return {"id": task.id, "status": task.status, "result": task.result}
if task.status == "SUCCESS":
# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

results = {"id": task.id, "status": task.status, "result": task.result}
else:
results = {"id": task.id, "status": task.status}

return results
14 changes: 12 additions & 2 deletions utils/persist_payload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime

from schema import AMQPPayload, HTTPPayload
from utils.mongo import MongoSingleton
Expand All @@ -24,7 +25,11 @@ def persist_amqp(self, payload: AMQPPayload) -> None:
community_id = payload.communityId
try:
self.client[self.db][self.internal_msgs_collection].insert_one(
payload.model_dump()
{
**payload.model_dump(),
"createdAt": datetime.now(),
"updatedAt": datetime.now(),
}
)
logging.info(
f"Payload for community id: {community_id} persisted successfully!"
Expand Down Expand Up @@ -60,7 +65,12 @@ def persist_http(self, payload: HTTPPayload, update: bool = False) -> None:
else:
self.client[self.db][self.external_msgs_collection].update_one(
{"taskId": payload.taskId},
{"$set": {"response": payload.response.model_dump()}},
{
"$set": {
"response": payload.response.model_dump(),
"updatedAt": datetime.now(),
}
},
upsert=True,
)
logging.info(
Expand Down
115 changes: 8 additions & 107 deletions worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,26 @@
import gc
import json
import logging
from typing import Any

from celery.signals import task_postrun, task_prerun
from subquery import query_multiple_source
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import (
InteractionCallbackData,
)
from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import (
ChatInputCommandInteraction,
)
from tc_messageBroker.rabbit_mq.payload.payload import Payload
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.data_source_selector import DataSourceSelector
from utils.traceloop import init_tracing
from worker.celery import app
from worker.utils.fire_event import job_send


@app.task
def ask_question_auto_search_discord_interaction(
question: str,
community_id: str,
bot_given_info: dict[str, Any],
) -> None:
"""
this task is for the case that the user asks a question
and use the discord interaction schema
it would first retrieve the search metadata from summaries
then perform a query on the filetred raw data to find answer
Parameters
------------
question : str
the user question
community_id : str
the community that the question was asked in
bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction
the information data that needed to be sent back to the bot again.
This would be a dictionary representing the keys
- `event`
- `date`
- `content`: which is the `ChatInputCommandInteraction` as a dictionary
"""

prefix = f"COMMUNITY_ID: {community_id} | "
logging.info(f"{prefix}Processing question!")
interaction = json.loads(bot_given_info["content"]["interaction"])
chat_input_interaction = ChatInputCommandInteraction.from_dict(interaction)

try:
# create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create(
# interaction=chat_input_interaction,
# data=InteractionResponse(
# type=4,
# data=InteractionCallbackData(
# content="Processing your question ...", flags=64
# ),
# ),
# ).to_dict()

# logging.info(f"{prefix}Sending process question to discord-bot!")
# job_send(
# event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE,
# queue_name=Queue.DISCORD_BOT,
# content=create_interaction_content,
# )
logging.info(f"{prefix}Querying the data sources!")
# for now we have just the discord platform
response = query_data_sources(community_id=community_id, query=question)

# source_nodes_dict: list[dict[str, Any]] = []
# for node in source_nodes:
# node_dict = dict(node)
# node_dict.pop("relationships", None)
# source_nodes_dict.append(node_dict)

# results = {
# "response": response,
# The source of answers is commented for now
# "source_nodes": source_nodes_dict,
# }
results = f"**Question:** {question}\n**Answer:** {response}"

response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
# content=json.dumps(results)
content=results
),
).to_dict()

logging.info(f"{prefix}Sending Edit response to discord-bot!")
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)
except Exception as exp:
logging.error(f"Exception {exp} | during processing the question {question}")
response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
content="Sorry, We cannot process your question at the moment."
),
).to_dict()
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)


@app.task
def ask_question_auto_search(
community_id: str,
query: str,
) -> dict[str, str]:
response = query_data_sources(community_id=community_id, query=query)
try:
response = query_data_sources(community_id=community_id, query=query)
except Exception:
response = "Sorry, We cannot process your question at the moment."
logging.error(
f"Errors raised while processing the question for community: {community_id}!"
)

return {
"community_id": community_id,
"question": query,
Expand Down

0 comments on commit 6cb7d41

Please sign in to comment.