From d302cbb114d035757ffc7cc5ac04900756e592d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Jun 2024 21:32:21 -0700 Subject: [PATCH 1/4] services(deepgram): allow transcriptions during interruptions If the user interrupts we can't just discard transcriptions because the user is actually interrupting and talking. --- src/pipecat/services/deepgram.py | 33 +++++++++----------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index e32e4c870..74b40cb90 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -19,7 +19,6 @@ InterimTranscriptionFrame, StartFrame, StartInterruptionFrame, - StopInterruptionFrame, SystemFrame, TranscriptionFrame) from pipecat.processors.frame_processor import FrameDirection @@ -118,16 +117,12 @@ def __init__(self, self._connection = self._client.listen.asynclive.v("1") self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) - # This event will be used to ignore out-of-band transcriptions while we - # are itnerrupted. - self._is_interrupted_event = asyncio.Event() - self._create_push_task() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): + if isinstance(frame, StartInterruptionFrame): await self._handle_interruptions(frame) elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -153,21 +148,14 @@ async def cancel(self, frame: CancelFrame): await self._push_frame_task async def _handle_interruptions(self, frame: Frame): - if isinstance(frame, StartInterruptionFrame): - # Indicate we are interrupted, we should ignore any out-of-band - # transcriptions. - self._is_interrupted_event.set() - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - elif isinstance(frame, StopInterruptionFrame): - # We should now be able to receive transcriptions again. - self._is_interrupted_event.clear() + # Cancel the task. This will stop pushing frames downstream. + self._push_frame_task.cancel() + await self._push_frame_task + # Push an out-of-band frame (i.e. not using the ordered push + # frame task). + await self.push_frame(frame) + # Create a new queue and task. + self._create_push_task() def _create_push_task(self): self._push_queue = asyncio.Queue() @@ -184,9 +172,6 @@ async def _push_frame_task_handler(self): break async def _on_message(self, *args, **kwargs): - if self._is_interrupted_event.is_set(): - return - result = kwargs["result"] is_final = result.is_final transcript = result.channel.alternatives[0].transcript From dac033fe61628ff05e9c057b23d24e5e4476e128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Jun 2024 21:33:06 -0700 Subject: [PATCH 2/4] services(azure): allow transcriptions during interruptions If the user interrupts we can't just discard transcriptions because the user is actually interrupting and talking. --- src/pipecat/services/azure.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index e8e2acd34..f7fdbf16b 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -20,7 +20,6 @@ Frame, StartFrame, StartInterruptionFrame, - StopInterruptionFrame, SystemFrame, TranscriptionFrame, URLImageRawFrame) @@ -143,7 +142,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): + if isinstance(frame, StartInterruptionFrame): await self._handle_interruptions(frame) elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -166,21 +165,14 @@ async def cancel(self, frame: CancelFrame): await self._push_frame_task async def _handle_interruptions(self, frame: Frame): - if isinstance(frame, StartInterruptionFrame): - # Indicate we are interrupted, we should ignore any out-of-band - # transcriptions. - self._is_interrupted_event.set() - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - elif isinstance(frame, StopInterruptionFrame): - # We should now be able to receive transcriptions again. - self._is_interrupted_event.clear() + # Cancel the task. This will stop pushing frames downstream. + self._push_frame_task.cancel() + await self._push_frame_task + # Push an out-of-band frame (i.e. not using the ordered push + # frame task). + await self.push_frame(frame) + # Create a new queue and task. + self._create_push_task() def _create_push_task(self): self._push_queue = asyncio.Queue() @@ -197,9 +189,6 @@ async def _push_frame_task_handler(self): break def _on_handle_recognized(self, event): - if self._is_interrupted_event.is_set(): - return - if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: direction = FrameDirection.DOWNSTREAM frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000)) From 4be3e8c87d04ae011476d480124f7f2106be66c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Jun 2024 21:33:17 -0700 Subject: [PATCH 3/4] aggregators: revert using intermediate results --- src/pipecat/processors/aggregators/llm_response.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 757076f2c..4c60949bd 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -13,6 +13,7 @@ Frame, InterimTranscriptionFrame, LLMFullResponseEndFrame, + LLMFullResponseStartFrame, LLMResponseEndFrame, LLMResponseStartFrame, LLMMessagesFrame, @@ -151,8 +152,8 @@ def __init__(self, messages: List[dict] = []): super().__init__( messages=messages, role="assistant", - start_frame=LLMResponseStartFrame, - end_frame=LLMResponseEndFrame, + start_frame=LLMFullResponseStartFrame, + end_frame=LLMFullResponseEndFrame, accumulator_frame=TextFrame, handle_interruptions=True ) From 66e331248d3720252b8fec8869b301bf4c297806 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Jun 2024 21:36:01 -0700 Subject: [PATCH 4/4] update CHANGELOG for 0.0.34 --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 014d1c2da..032e7d59b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.0.34] - 2024-06-25 + +### Fixed + +- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could + interruptions to ignore transcriptions. + +- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate + shorter output. + ## [0.0.33] - 2024-06-25 ### Changed