From dad4d884704901c8a9d3cfe97fcf0c9a9a11ecd0 Mon Sep 17 00:00:00 2001 From: Timothy Luong Date: Wed, 4 Sep 2024 17:21:00 -0700 Subject: [PATCH] Upgrading the message cutoff for Cartesia Synthesizer to use timestamps --- .../synthesizer/cartesia_synthesizer.py | 49 +++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/vocode/streaming/synthesizer/cartesia_synthesizer.py b/vocode/streaming/synthesizer/cartesia_synthesizer.py index 959ca5ba5..bb7cfa875 100644 --- a/vocode/streaming/synthesizer/cartesia_synthesizer.py +++ b/vocode/streaming/synthesizer/cartesia_synthesizer.py @@ -90,6 +90,8 @@ def __init__( self.client = self.cartesia_tts(api_key=self.api_key) self.ws = None self.ctx = None + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps = [] self.no_more_inputs_task = None self.no_more_inputs_lock = asyncio.Lock() @@ -99,10 +101,14 @@ async def initialize_ws(self): async def initialize_ctx(self, is_first_text_chunk: bool): if self.ctx is None or self.ctx.is_closed(): + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps = [] if self.ws: self.ctx = self.ws.context() else: if is_first_text_chunk: + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps = [] if self.no_more_inputs_task: self.no_more_inputs_task.cancel() await self.ctx.no_more_inputs() @@ -144,6 +150,7 @@ async def create_speech_uncached( voice_id=self.voice_id, continue_=not is_sole_text_chunk, output_format=self.output_format, + add_timestamps=True, _experimental_voice_controls=self._experimental_voice_controls, ) if not is_sole_text_chunk: @@ -159,12 +166,20 @@ async def chunk_generator(context): try: async for event in context.receive(): audio = event.get("audio") - buffer.extend(audio) - while len(buffer) >= chunk_size: - yield SynthesisResult.ChunkResult( - chunk=buffer[:chunk_size], is_last_chunk=False - ) - buffer = buffer[chunk_size:] + word_timestamps = event.get("word_timestamps") + if word_timestamps: + words = word_timestamps['words'] + start_times = word_timestamps['start'] + end_times = word_timestamps['end'] + for word, start, end in zip(words, start_times, end_times): + self.ctx_timestamps.append((word, start, end)) + if audio: + buffer.extend(audio) + while len(buffer) >= chunk_size: + yield SynthesisResult.ChunkResult( + chunk=buffer[:chunk_size], is_last_chunk=False + ) + buffer = buffer[chunk_size:] except Exception as e: logger.info( f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}" @@ -180,11 +195,27 @@ async def chunk_generator(context): buffer.extend(b"\x00\x00" * padding_size) # 0 is silence in s16le yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True) + self.ctx_message.text += transcript + + def get_message_cutoff_ctx(message, seconds, words_per_minute=150): + if seconds: + closest_index = 0 + if len(self.ctx_timestamps) > 0: + for index, word_timestamp in enumerate(self.ctx_timestamps): + _word, start, end = word_timestamp + closest_index = index + if end >= seconds: + break + if closest_index: + # Check if they're less than 2 seconds apart, fall back to words per minute otherwise + if self.ctx_timestamps[closest_index][2] - seconds < 2: + return " ".join([word for word, *_ in self.ctx_timestamps[:closest_index + 1]]) + return self.get_message_cutoff_from_voice_speed(message, seconds, words_per_minute) + + return SynthesisResult( chunk_generator=chunk_generator(self.ctx), - get_message_up_to=lambda seconds: self.get_message_cutoff_from_voice_speed( - message, seconds - ), + get_message_up_to=lambda seconds: get_message_cutoff_ctx(self.ctx_message, seconds), ) @classmethod