diff --git a/vocode/streaming/models/transcript.py b/vocode/streaming/models/transcript.py index 18b044e6b..25bc28173 100644 --- a/vocode/streaming/models/transcript.py +++ b/vocode/streaming/models/transcript.py @@ -10,7 +10,7 @@ class EventLog(BaseModel): sender: Sender - timestamp: float + timestamp: float = Field(default_factory=time.time) def to_string(self, include_timestamp: bool = False) -> str: raise NotImplementedError @@ -68,33 +68,55 @@ def to_string(self, include_timestamps: bool = False) -> str: for event in self.event_logs ) - def add_message( - self, - text: str, - sender: Sender, - conversation_id: str, + def maybe_publish_transcript_event_from_message( + self, message: Message, conversation_id: str ): - timestamp = time.time() - self.event_logs.append(Message(text=text, sender=sender, timestamp=timestamp)) if self.events_manager is not None: self.events_manager.publish_event( TranscriptEvent( - text=text, - sender=sender, - timestamp=time.time(), + text=message.text, + sender=message.sender, + timestamp=message.timestamp, conversation_id=conversation_id, ) ) + def add_message_from_props( + self, + text: str, + sender: Sender, + conversation_id: str, + publish_to_events_manager: bool = True, + ): + timestamp = time.time() + message = Message(text=text, sender=sender, timestamp=timestamp) + self.event_logs.append(message) + if publish_to_events_manager: + self.maybe_publish_transcript_event_from_message( + message=message, conversation_id=conversation_id + ) + + def add_message( + self, + message: Message, + conversation_id: str, + publish_to_events_manager: bool = True, + ): + self.event_logs.append(message) + if publish_to_events_manager: + self.maybe_publish_transcript_event_from_message( + message=message, conversation_id=conversation_id + ) + def add_human_message(self, text: str, conversation_id: str): - self.add_message( + self.add_message_from_props( text=text, sender=Sender.HUMAN, conversation_id=conversation_id, ) def add_bot_message(self, text: str, conversation_id: str): - self.add_message( + self.add_message_from_props( text=text, sender=Sender.BOT, conversation_id=conversation_id, diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 9d9ca842a..4f37976d0 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -16,7 +16,12 @@ ) from vocode.streaming.agent.chat_gpt_agent import ChatGPTAgent from vocode.streaming.models.actions import ActionInput -from vocode.streaming.models.transcript import Transcript, TranscriptCompleteEvent +from vocode.streaming.models.events import Sender +from vocode.streaming.models.transcript import ( + Message, + Transcript, + TranscriptCompleteEvent, +) from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.transcriber import EndpointingConfig, TranscriberConfig from vocode.streaming.output_device.base_output_device import BaseOutputDevice @@ -270,11 +275,6 @@ async def process(self, item: InterruptibleAgentResponseEvent[AgentResponse]): item.agent_response_tracker.set() await self.conversation.terminate() return - if isinstance(agent_response, AgentResponseMessage): - self.conversation.transcript.add_bot_message( - text=agent_response.message.text, - conversation_id=self.conversation.id, - ) agent_response_message = typing.cast( AgentResponseMessage, agent_response @@ -320,11 +320,27 @@ async def process( ): try: message, synthesis_result = item.payload + # create an empty transcript message and attach it to the transcript + transcript_message = Message( + text="", + sender=Sender.BOT, + ) + self.conversation.transcript.add_message( + message=transcript_message, + conversation_id=self.conversation.id, + publish_to_events_manager=False, + ) message_sent, cut_off = await self.conversation.send_speech_to_output( message.text, synthesis_result, item.interruption_event, TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS, + transcript_message=transcript_message, + ) + # publish the transcript message now that it includes what was said during send_speech_to_output + self.conversation.transcript.maybe_publish_transcript_event_from_message( + message=transcript_message, + conversation_id=self.conversation.id, ) item.agent_response_tracker.set() self.conversation.logger.debug("Message sent: {}".format(message_sent)) @@ -332,9 +348,6 @@ async def process( self.conversation.agent.update_last_bot_message_on_cut_off( message_sent ) - self.conversation.transcript.update_last_bot_message_on_cut_off( - message_sent - ) if self.conversation.agent.agent_config.end_conversation_on_goodbye: goodbye_detected_task = ( self.conversation.agent.create_goodbye_detection_task( @@ -583,10 +596,12 @@ async def send_speech_to_output( synthesis_result: SynthesisResult, stop_event: threading.Event, seconds_per_chunk: int, + transcript_message: Optional[Message] = None, started_event: Optional[threading.Event] = None, ): """ - Sends the speech chunk by chunk to the output device + - update the transcript message as chunks come in (transcript_message is always provided for non filler audio utterances) - If the stop_event is set, the output is stopped - Sets started_event when the first chunk is sent @@ -606,19 +621,20 @@ async def send_speech_to_output( self.synthesizer.get_synthesizer_config().sampling_rate, ) chunk_idx = 0 + seconds_spoken = 0 async for chunk_result in synthesis_result.chunk_generator: start_time = time.time() speech_length_seconds = seconds_per_chunk * ( len(chunk_result.chunk) / chunk_size ) + seconds_spoken = chunk_idx * seconds_per_chunk if stop_event.is_set(): - seconds = chunk_idx * seconds_per_chunk self.logger.debug( "Interrupted, stopping text to speech after {} chunks".format( chunk_idx ) ) - message_sent = f"{synthesis_result.get_message_up_to(seconds)}-" + message_sent = f"{synthesis_result.get_message_up_to(seconds_spoken)}-" cut_off = True break if chunk_idx == 0: @@ -639,9 +655,16 @@ async def send_speech_to_output( ) self.mark_last_action_timestamp() chunk_idx += 1 + seconds_spoken += seconds_per_chunk + if transcript_message: + transcript_message.text = synthesis_result.get_message_up_to( + seconds_spoken + ) if self.transcriber.get_transcriber_config().mute_during_speech: self.logger.debug("Unmuting transcriber") self.transcriber.unmute() + if transcript_message: + transcript_message.text = message_sent return message_sent, cut_off def mark_terminated(self): diff --git a/vocode/streaming/synthesizer/azure_synthesizer.py b/vocode/streaming/synthesizer/azure_synthesizer.py index 7b91f2f46..dab63b522 100644 --- a/vocode/streaming/synthesizer/azure_synthesizer.py +++ b/vocode/streaming/synthesizer/azure_synthesizer.py @@ -220,6 +220,7 @@ def get_message_up_to( for event in events: if event["audio_offset"] > seconds: ssml_fragment = ssml[: event["text_offset"]] + # TODO: this is a little hacky, but it works for now return ssml_fragment.split(">")[-1] return message