From 24e58b7733fc461f108341a739a005f224404d55 Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 13:41:15 -0700 Subject: [PATCH 1/7] pass in transcript message to send_speech_to_output --- vocode/streaming/models/transcript.py | 48 ++++++++++++++++------ vocode/streaming/streaming_conversation.py | 41 +++++++++++++----- 2 files changed, 65 insertions(+), 24 deletions(-) diff --git a/vocode/streaming/models/transcript.py b/vocode/streaming/models/transcript.py index 18b044e6b..25bc28173 100644 --- a/vocode/streaming/models/transcript.py +++ b/vocode/streaming/models/transcript.py @@ -10,7 +10,7 @@ class EventLog(BaseModel): sender: Sender - timestamp: float + timestamp: float = Field(default_factory=time.time) def to_string(self, include_timestamp: bool = False) -> str: raise NotImplementedError @@ -68,33 +68,55 @@ def to_string(self, include_timestamps: bool = False) -> str: for event in self.event_logs ) - def add_message( - self, - text: str, - sender: Sender, - conversation_id: str, + def maybe_publish_transcript_event_from_message( + self, message: Message, conversation_id: str ): - timestamp = time.time() - self.event_logs.append(Message(text=text, sender=sender, timestamp=timestamp)) if self.events_manager is not None: self.events_manager.publish_event( TranscriptEvent( - text=text, - sender=sender, - timestamp=time.time(), + text=message.text, + sender=message.sender, + timestamp=message.timestamp, conversation_id=conversation_id, ) ) + def add_message_from_props( + self, + text: str, + sender: Sender, + conversation_id: str, + publish_to_events_manager: bool = True, + ): + timestamp = time.time() + message = Message(text=text, sender=sender, timestamp=timestamp) + self.event_logs.append(message) + if publish_to_events_manager: + self.maybe_publish_transcript_event_from_message( + message=message, conversation_id=conversation_id + ) + + def add_message( + self, + message: Message, + conversation_id: str, + publish_to_events_manager: bool = True, + ): + self.event_logs.append(message) + if publish_to_events_manager: + self.maybe_publish_transcript_event_from_message( + message=message, conversation_id=conversation_id + ) + def add_human_message(self, text: str, conversation_id: str): - self.add_message( + self.add_message_from_props( text=text, sender=Sender.HUMAN, conversation_id=conversation_id, ) def add_bot_message(self, text: str, conversation_id: str): - self.add_message( + self.add_message_from_props( text=text, sender=Sender.BOT, conversation_id=conversation_id, diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 9d9ca842a..7ef254046 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -16,7 +16,12 @@ ) from vocode.streaming.agent.chat_gpt_agent import ChatGPTAgent from vocode.streaming.models.actions import ActionInput -from vocode.streaming.models.transcript import Transcript, TranscriptCompleteEvent +from vocode.streaming.models.events import Sender +from vocode.streaming.models.transcript import ( + Message, + Transcript, + TranscriptCompleteEvent, +) from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.transcriber import EndpointingConfig, TranscriberConfig from vocode.streaming.output_device.base_output_device import BaseOutputDevice @@ -270,11 +275,11 @@ async def process(self, item: InterruptibleAgentResponseEvent[AgentResponse]): item.agent_response_tracker.set() await self.conversation.terminate() return - if isinstance(agent_response, AgentResponseMessage): - self.conversation.transcript.add_bot_message( - text=agent_response.message.text, - conversation_id=self.conversation.id, - ) + # if isinstance(agent_response, AgentResponseMessage): + # self.conversation.transcript.add_bot_message( + # text=agent_response.message.text, + # conversation_id=self.conversation.id, + # ) agent_response_message = typing.cast( AgentResponseMessage, agent_response @@ -320,11 +325,21 @@ async def process( ): try: message, synthesis_result = item.payload + empty_transcript_message = Message( + text="", + sender=Sender.BOT, + ) + self.conversation.transcript.add_message( + message=empty_transcript_message, + conversation_id=self.conversation.id, + publish_to_events_manager=False, + ) message_sent, cut_off = await self.conversation.send_speech_to_output( message.text, synthesis_result, item.interruption_event, TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS, + transcript_message=empty_transcript_message, ) item.agent_response_tracker.set() self.conversation.logger.debug("Message sent: {}".format(message_sent)) @@ -332,9 +347,6 @@ async def process( self.conversation.agent.update_last_bot_message_on_cut_off( message_sent ) - self.conversation.transcript.update_last_bot_message_on_cut_off( - message_sent - ) if self.conversation.agent.agent_config.end_conversation_on_goodbye: goodbye_detected_task = ( self.conversation.agent.create_goodbye_detection_task( @@ -583,6 +595,7 @@ async def send_speech_to_output( synthesis_result: SynthesisResult, stop_event: threading.Event, seconds_per_chunk: int, + transcript_message: Optional[Message] = None, started_event: Optional[threading.Event] = None, ): """ @@ -606,25 +619,30 @@ async def send_speech_to_output( self.synthesizer.get_synthesizer_config().sampling_rate, ) chunk_idx = 0 + seconds_spoken = 0 async for chunk_result in synthesis_result.chunk_generator: start_time = time.time() speech_length_seconds = seconds_per_chunk * ( len(chunk_result.chunk) / chunk_size ) + seconds_spoken = chunk_idx * seconds_per_chunk if stop_event.is_set(): - seconds = chunk_idx * seconds_per_chunk self.logger.debug( "Interrupted, stopping text to speech after {} chunks".format( chunk_idx ) ) - message_sent = f"{synthesis_result.get_message_up_to(seconds)}-" + message_sent = f"{synthesis_result.get_message_up_to(seconds_spoken)}-" cut_off = True break if chunk_idx == 0: if started_event: started_event.set() self.output_device.consume_nonblocking(chunk_result.chunk) + if transcript_message: + transcript_message.text = synthesis_result.get_message_up_to( + seconds_spoken + ) end_time = time.time() await asyncio.sleep( max( @@ -639,6 +657,7 @@ async def send_speech_to_output( ) self.mark_last_action_timestamp() chunk_idx += 1 + seconds_spoken += seconds_per_chunk if self.transcriber.get_transcriber_config().mute_during_speech: self.logger.debug("Unmuting transcriber") self.transcriber.unmute() From c89a9208521142ca31e0c3cbc27995db083205cb Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 13:43:50 -0700 Subject: [PATCH 2/7] publish to events manager --- vocode/streaming/streaming_conversation.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 7ef254046..81e0a3778 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -325,12 +325,12 @@ async def process( ): try: message, synthesis_result = item.payload - empty_transcript_message = Message( + transcript_message = Message( text="", sender=Sender.BOT, ) self.conversation.transcript.add_message( - message=empty_transcript_message, + message=transcript_message, conversation_id=self.conversation.id, publish_to_events_manager=False, ) @@ -339,7 +339,11 @@ async def process( synthesis_result, item.interruption_event, TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS, - transcript_message=empty_transcript_message, + transcript_message=transcript_message, + ) + self.conversation.transcript.maybe_publish_transcript_event_from_message( + message=transcript_message, + conversation_id=self.conversation.id, ) item.agent_response_tracker.set() self.conversation.logger.debug("Message sent: {}".format(message_sent)) From b7e6fdacb52a280041c9f26b9c8334bf86f5eb3c Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 13:44:14 -0700 Subject: [PATCH 3/7] remove commented code --- vocode/streaming/streaming_conversation.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 81e0a3778..ef501f289 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -275,11 +275,6 @@ async def process(self, item: InterruptibleAgentResponseEvent[AgentResponse]): item.agent_response_tracker.set() await self.conversation.terminate() return - # if isinstance(agent_response, AgentResponseMessage): - # self.conversation.transcript.add_bot_message( - # text=agent_response.message.text, - # conversation_id=self.conversation.id, - # ) agent_response_message = typing.cast( AgentResponseMessage, agent_response From 5d96b12fe61c32fb48a557cb46ff713a9af1da9e Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 14:42:56 -0700 Subject: [PATCH 4/7] move transcript add to later in send_speech_to_output --- vocode/streaming/streaming_conversation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index ef501f289..10ee7ae5a 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -638,10 +638,6 @@ async def send_speech_to_output( if started_event: started_event.set() self.output_device.consume_nonblocking(chunk_result.chunk) - if transcript_message: - transcript_message.text = synthesis_result.get_message_up_to( - seconds_spoken - ) end_time = time.time() await asyncio.sleep( max( @@ -657,6 +653,10 @@ async def send_speech_to_output( self.mark_last_action_timestamp() chunk_idx += 1 seconds_spoken += seconds_per_chunk + if transcript_message: + transcript_message.text = synthesis_result.get_message_up_to( + seconds_spoken + ) if self.transcriber.get_transcriber_config().mute_during_speech: self.logger.debug("Unmuting transcriber") self.transcriber.unmute() From ded137f3da6b1d97dd74836fa959fc1ea36f0e5a Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 14:52:49 -0700 Subject: [PATCH 5/7] one more fix --- vocode/streaming/streaming_conversation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 10ee7ae5a..f44b3fdcf 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -660,6 +660,8 @@ async def send_speech_to_output( if self.transcriber.get_transcriber_config().mute_during_speech: self.logger.debug("Unmuting transcriber") self.transcriber.unmute() + if transcript_message: + transcript_message.text = message_sent return message_sent, cut_off def mark_terminated(self): From e6bda3ffc3558066d0c5efaed9567043f1e263e6 Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 15:44:03 -0700 Subject: [PATCH 6/7] adds note on azure synthesizer --- vocode/streaming/synthesizer/azure_synthesizer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vocode/streaming/synthesizer/azure_synthesizer.py b/vocode/streaming/synthesizer/azure_synthesizer.py index 7b91f2f46..dab63b522 100644 --- a/vocode/streaming/synthesizer/azure_synthesizer.py +++ b/vocode/streaming/synthesizer/azure_synthesizer.py @@ -220,6 +220,7 @@ def get_message_up_to( for event in events: if event["audio_offset"] > seconds: ssml_fragment = ssml[: event["text_offset"]] + # TODO: this is a little hacky, but it works for now return ssml_fragment.split(">")[-1] return message From 1527f5e7d6e7aa320a5c20270a42603a0a93d3c6 Mon Sep 17 00:00:00 2001 From: Ajay Raj Date: Fri, 4 Aug 2023 15:58:13 -0700 Subject: [PATCH 7/7] adds comments --- vocode/streaming/streaming_conversation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index f44b3fdcf..4f37976d0 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -320,6 +320,7 @@ async def process( ): try: message, synthesis_result = item.payload + # create an empty transcript message and attach it to the transcript transcript_message = Message( text="", sender=Sender.BOT, @@ -336,6 +337,7 @@ async def process( TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS, transcript_message=transcript_message, ) + # publish the transcript message now that it includes what was said during send_speech_to_output self.conversation.transcript.maybe_publish_transcript_event_from_message( message=transcript_message, conversation_id=self.conversation.id, @@ -599,6 +601,7 @@ async def send_speech_to_output( ): """ - Sends the speech chunk by chunk to the output device + - update the transcript message as chunks come in (transcript_message is always provided for non filler audio utterances) - If the stop_event is set, the output is stopped - Sets started_event when the first chunk is sent