diff --git a/playground/streaming/benchmark.py b/playground/streaming/benchmark.py index 555dd7619..2717a2af3 100644 --- a/playground/streaming/benchmark.py +++ b/playground/streaming/benchmark.py @@ -94,7 +94,7 @@ # These synthesizers stream output so they need to be traced within this file. -STREAMING_SYNTHESIZERS = ["azure"] +STREAMING_SYNTHESIZERS = ["azure", "elevenlabs"] TRANSCRIBER_CHOICES = ["deepgram", "assemblyai"] diff --git a/poetry.lock b/poetry.lock index c45ec58c8..414c5b54e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1694,6 +1694,70 @@ files = [ [package.dependencies] traitlets = "*" +[[package]] +name = "miniaudio" +version = "1.59" +description = "python bindings for the miniaudio library and its decoders (mp3, flac, ogg vorbis, wav)" +optional = false +python-versions = "*" +files = [ + {file = "miniaudio-1.59-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:028d7e8e6d99441340c0bfb60660db7bd5789cae7c95fa599d830344901d6d72"}, + {file = "miniaudio-1.59-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ba6f802376f49977e9698740411db46092ea005894ff86d805aeddde7e505c1e"}, + {file = "miniaudio-1.59-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9f24b44bd28ca631b830bc91bd910cb0209fba005401effa64cee9a8ba580992"}, + {file = "miniaudio-1.59-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8980dda51e92ea750ca8bcfb1d2c198eca7e4c844ab857faac12e20245322aa6"}, + {file = "miniaudio-1.59-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:e21f56d51c627cef612d1a7bbc73fc7c5e03908f5ebc22c98494951ab8ccd3c8"}, + {file = "miniaudio-1.59-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1f6f1665ce8ed46caac48beec64c70e248a0b4e001590c698695cd22c1f634a0"}, + {file = "miniaudio-1.59-cp310-cp310-win32.whl", hash = "sha256:799b393adce56c8df1df16e7dc692a83125888df9ecf8ec0242c4905df6275ef"}, + {file = "miniaudio-1.59-cp310-cp310-win_amd64.whl", hash = "sha256:44c6b48f01d934784da282f7a17c40be9110ee6bc723f5f90916d2b2e729c9cc"}, + {file = "miniaudio-1.59-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2a9387e85e6a63d66a873f4208fbaba93179d11423da08dc83c78dd1b68ba504"}, + {file = "miniaudio-1.59-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:12fbbe3934856ab54fa8889ab6ec6b62a97faa2f85a8830e286fe5a4e9584ca4"}, + {file = "miniaudio-1.59-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f4a9a1dc4352af4198f6ca0c20bac8b5b6a89d0d67e3535149ef08420a1ab3c9"}, + {file = "miniaudio-1.59-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4eb9eac3f23fd4c94925830fa9c172e98aebfb12cec1dbfa6a7693b9e4c1e555"}, + {file = "miniaudio-1.59-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:52882c36126c3a97fcdd8d753e4b1b07934bba9d4d3c431aa7f4914849090cac"}, + {file = "miniaudio-1.59-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d6bb6deee6d5969292d22730d31ee85ed0edc2997ca79978db0cca269ab73761"}, + {file = "miniaudio-1.59-cp311-cp311-win32.whl", hash = "sha256:6afe7449d7d593ba3f8bd91085a392d0c6ca3be4c03b62af37cb46f6c0c0d9f4"}, + {file = "miniaudio-1.59-cp311-cp311-win_amd64.whl", hash = "sha256:a22832e449a31620317ae657f7fd20a42466e5768c48373f8f28c53f2b36f5cd"}, + {file = "miniaudio-1.59-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:fbfb3c853641c8bd835e0654f49fe0c09b4018b1ecc7e4c2436e02fbaf748c4b"}, + {file = "miniaudio-1.59-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:45ce3596c693053a578db725848a90ba0dfcc03d1d94124b9fddaa9e50a7533e"}, + {file = "miniaudio-1.59-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d07228047352b655a82711704a635eee41eb3977ceaaf672ee17d64a3ba261b7"}, + {file = "miniaudio-1.59-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:586f95f1d10f5d4f58b272c15c601c3ba13128bd34a839bce5bd28a839d5cf3c"}, + {file = "miniaudio-1.59-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7644f19b1dc00bca3ec9e6066eb8879c0e638091955092a1d7085a38d7de6e0f"}, + {file = "miniaudio-1.59-cp37-cp37m-win32.whl", hash = "sha256:055fb3a2e00ddcce2a2809cd2a3d5e68234588a00c70533fa4b68f0178829dce"}, + {file = "miniaudio-1.59-cp37-cp37m-win_amd64.whl", hash = "sha256:e1acea13830a53c026e58d44856ba4951a26eb0d0a2fda2ce6dc7280b6f57f81"}, + {file = "miniaudio-1.59-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7a8b6f1d4a2551061721cc7b22fb0eb3839aad9137553e01b4dde1a31c91ca45"}, + {file = "miniaudio-1.59-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:011b2b8d5c57a485a6b86476e24c1c6be4a61ec8c33c456e3052929269857d0d"}, + {file = "miniaudio-1.59-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9b270e9e1df5ec05d03659febffaa704709d7c9cb0b3597cb0a993875d73be84"}, + {file = "miniaudio-1.59-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ab9d0468790109bafc88ce9d1c93454b2d384af0c14a6657620315115390c8d"}, + {file = "miniaudio-1.59-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:b0f0ffa87bef0b3bf932eff60aec97ac90e2adf7373e9170969c6f98ba1c5635"}, + {file = "miniaudio-1.59-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:1a03e514b12ba5d93fe48470ddc6a7f4025cd8de5f604233fc899507e5a054c1"}, + {file = "miniaudio-1.59-cp38-cp38-win32.whl", hash = "sha256:19c6406342989a6774305cd214314028553cbb9fcbc5ed43ab17a7f6b43aa46b"}, + {file = "miniaudio-1.59-cp38-cp38-win_amd64.whl", hash = "sha256:239bd4ff17aa7eede52b30a2153a28fe0bf2891aa43617a10348dd90f20d727e"}, + {file = "miniaudio-1.59-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:88930514e6a9a6258d3536638da5ceb3aaa1d6643122e0704db7fd08ff924368"}, + {file = "miniaudio-1.59-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6d195800a6162464731ba484580cdab3be5bcc47a366c8440be6015f16d059b1"}, + {file = "miniaudio-1.59-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6fcd4484eea26629eb9602ebcab0181f515881e736deccb235c2c0d99d7b4215"}, + {file = "miniaudio-1.59-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa0ff716a9eb79799ed623f619ed6b0bc1669046cd1f2070de20d3dc6737a822"}, + {file = "miniaudio-1.59-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:878623ffa77c366c76c4202d3095835943bd8f8f604bbdc1872030d646817e4a"}, + {file = "miniaudio-1.59-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4aca5d9e9774ddbad5c4a5c65dfb1e7fba9e4570092ba6891f439f7c849d21ed"}, + {file = "miniaudio-1.59-cp39-cp39-win32.whl", hash = "sha256:0e05f09d58ed32e7cdc1bad9342155b81e378ff2101242cf8a8f06cd56dae043"}, + {file = "miniaudio-1.59-cp39-cp39-win_amd64.whl", hash = "sha256:0e51899a148b393e84ee3a201a72c8fa4d19413f5b1b705ff5ae9f8eee956a7a"}, + {file = "miniaudio-1.59-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:07de990f047e693681f15736be2e169e3f53064d1e6a7d0c37837297fa18929c"}, + {file = "miniaudio-1.59-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e03beea7dabeb2be1a0a4718458e0b85e3c23692563b0f8841164d71ef2c2eef"}, + {file = "miniaudio-1.59-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c2cb1fdc92ace828bceabb602b4d2f52040a361632d71b810144656b8334738"}, + {file = "miniaudio-1.59-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:5597384fa7dda1691adca46568ffaabcf39c0a224bf5b9548665a255b86a4f35"}, + {file = "miniaudio-1.59-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:d253c994a27062788f60a8fb1f8571320b72ddc44fd4128fdd85634cf38717a4"}, + {file = "miniaudio-1.59-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bb71fb5d5491335fc565c157a01a8c6907180e113c3c0d9a6c22e28458be5919"}, + {file = "miniaudio-1.59-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21569111381b877d5445c917b00dfe038b434cb9dacaa4c548ca8c6b5e6d5b55"}, + {file = "miniaudio-1.59-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:8686712f90189eaa6f9fc3eb503eb3487b09ca52417690fd14ac53306d55a125"}, + {file = "miniaudio-1.59-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4674c49ee2489595f65a8cdc1c48a3e8bc0f577b1eb957d918a942b88480e5a8"}, + {file = "miniaudio-1.59-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0fa69c55197215dabf85bc8f9f9d7128af08db402f12aea962e16bad1ac4dca8"}, + {file = "miniaudio-1.59-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce2668b03594612dacea4521483e38da56b51ce8a92d06106643f520ce092a82"}, + {file = "miniaudio-1.59-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:6117c09da1944a59b7af3009d37c8eb6e0219174f5e00501b797687ddbe9c5a3"}, + {file = "miniaudio-1.59.tar.gz", hash = "sha256:2b68f496a8655497cde827c77aba3c3557f0f2bb7c38a6ecab34090ab4556277"}, +] + +[package.dependencies] +cffi = ">=1.12.0" + [[package]] name = "multidict" version = "6.0.4" @@ -3580,4 +3644,4 @@ transcribers = ["google-cloud-speech"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "6e92cb9298f033894b86ab6c1f70fcebcb132948d8b1d111acb2d9e24405f939" +content-hash = "0551e88980b5e22957c5cbe641e6a7897a295a6e3d31b2cc4e0e074a1fa2e231" diff --git a/pyproject.toml b/pyproject.toml index 9f4aea8ee..1cae9f8f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ speechrecognition = "^3.10.0" aiohttp = "^3.8.4" langchain = "^0.0.198" google-cloud-aiplatform = {version = "^1.26.0", optional = true} +miniaudio = "^1.59" [tool.poetry.group.lint.dependencies] diff --git a/tests/synthesizer/conftest.py b/tests/synthesizer/conftest.py index fbd19ba19..cc304a00a 100644 --- a/tests/synthesizer/conftest.py +++ b/tests/synthesizer/conftest.py @@ -9,6 +9,8 @@ ) import re from tests.streaming.data.loader import get_audio_path +import asyncio +import pytest DEFAULT_PARAMS = {"sampling_rate": 16000, "audio_encoding": AudioEncoding.LINEAR16} @@ -38,23 +40,23 @@ def mock_eleven_labs_api(): @pytest.fixture(scope="module") -def eleven_labs_synthesizer_with_api_key(): +async def fixture_eleven_labs_synthesizer_with_api_key(): params = DEFAULT_PARAMS.copy() params["api_key"] = MOCK_API_KEY - yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) + return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) @pytest.fixture(scope="module") -def eleven_labs_synthesizer_wrong_api_key(): +async def fixture_eleven_labs_synthesizer_wrong_api_key(): params = DEFAULT_PARAMS.copy() params["api_key"] = "wrong_api_key" - yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) + return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) @pytest.fixture(scope="module") -def eleven_labs_synthesizer_env_api_key(): +async def fixture_eleven_labs_synthesizer_env_api_key(): params = DEFAULT_PARAMS.copy() import os os.environ["ELEVEN_LABS_API_KEY"] = MOCK_API_KEY - yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) + return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params)) diff --git a/tests/synthesizer/test_eleven_labs.py b/tests/synthesizer/test_eleven_labs.py index 5ba55ad6b..24304d19b 100644 --- a/tests/synthesizer/test_eleven_labs.py +++ b/tests/synthesizer/test_eleven_labs.py @@ -1,3 +1,4 @@ +import asyncio from pydantic import ValidationError import pytest from vocode.streaming.synthesizer.base_synthesizer import SynthesisResult @@ -9,7 +10,7 @@ from pydub import AudioSegment -async def asset_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer): +async def assert_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer): response = await synthesizer.create_speech(BaseMessage(text="Hello, world!"), 1024) assert isinstance(response, SynthesisResult) assert response.chunk_generator is not None @@ -25,26 +26,26 @@ async def asset_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer): @pytest.mark.asyncio async def test_with_api_key( - eleven_labs_synthesizer_with_api_key: ElevenLabsSynthesizer, + fixture_eleven_labs_synthesizer_with_api_key: ElevenLabsSynthesizer, mock_eleven_labs_api: aioresponses, ): - await asset_synthesis_result_valid(eleven_labs_synthesizer_with_api_key) + await assert_synthesis_result_valid(await fixture_eleven_labs_synthesizer_with_api_key) @pytest.mark.asyncio async def test_with_wrong_api_key( - eleven_labs_synthesizer_wrong_api_key: ElevenLabsSynthesizer, + fixture_eleven_labs_synthesizer_wrong_api_key: ElevenLabsSynthesizer, mock_eleven_labs_api: aioresponses, ): with pytest.raises(Exception, match="ElevenLabs API returned 401 status code"): - await eleven_labs_synthesizer_wrong_api_key.create_speech( + await (await fixture_eleven_labs_synthesizer_wrong_api_key).create_speech( BaseMessage(text="Hello, world!"), 1024 ) @pytest.mark.asyncio async def test_with_env_api_key( - eleven_labs_synthesizer_env_api_key: ElevenLabsSynthesizer, + fixture_eleven_labs_synthesizer_env_api_key: ElevenLabsSynthesizer, mock_eleven_labs_api: aioresponses, ): - await asset_synthesis_result_valid(eleven_labs_synthesizer_env_api_key) + await assert_synthesis_result_valid(await fixture_eleven_labs_synthesizer_env_api_key) diff --git a/vocode/streaming/models/synthesizer.py b/vocode/streaming/models/synthesizer.py index 652905592..13aab3974 100644 --- a/vocode/streaming/models/synthesizer.py +++ b/vocode/streaming/models/synthesizer.py @@ -104,9 +104,10 @@ class ElevenLabsSynthesizerConfig( ): api_key: Optional[str] = None voice_id: Optional[str] = ELEVEN_LABS_ADAM_VOICE_ID + optimize_streaming_latency: Optional[int] + experimental_streaming: Optional[bool] = False stability: Optional[float] similarity_boost: Optional[float] - optimize_streaming_latency: Optional[int] model_id: Optional[str] @validator("voice_id") @@ -135,13 +136,13 @@ def optimize_streaming_latency_check(cls, optimize_streaming_latency): RIME_DEFAULT_SAMPLE_RATE = 22050 RIME_DEFAULT_BASE_URL = "https://rjmopratfrdjgmfmaios.functions.supabase.co/rime-tts" + class RimeSynthesizerConfig(SynthesizerConfig, type=SynthesizerType.RIME.value): speaker: str = RIME_DEFAULT_SPEAKER sampling_rate: int = RIME_DEFAULT_SAMPLE_RATE base_url: str = RIME_DEFAULT_BASE_URL - COQUI_DEFAULT_SPEAKER_ID = "ebe2db86-62a6-49a1-907a-9a1360d4416e" diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 6de8261ee..be7bdc51e 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -1,8 +1,10 @@ -import io +import asyncio import logging -from typing import Any, Optional +import time +from typing import Any, AsyncGenerator, Optional +import wave import aiohttp -from pydub import AudioSegment +from opentelemetry.trace import Span from vocode import getenv from vocode.streaming.synthesizer.base_synthesizer import ( @@ -16,8 +18,8 @@ ) from vocode.streaming.agent.bot_sentiment_analyser import BotSentiment from vocode.streaming.models.message import BaseMessage - -from opentelemetry.context.context import Context +from vocode.streaming.utils.mp3_helper import decode_mp3 +from vocode.streaming.synthesizer.miniaudio_worker import MiniaudioWorker ADAM_VOICE_ID = "pNInz6obpgDQGcFmaJgB" @@ -44,6 +46,52 @@ def __init__( self.model_id = synthesizer_config.model_id self.optimize_streaming_latency = synthesizer_config.optimize_streaming_latency self.words_per_minute = 150 + self.experimental_streaming = synthesizer_config.experimental_streaming + + # Create a PydubWorker instance as an attribute + if self.experimental_streaming: + self.miniaudio_worker = MiniaudioWorker( + synthesizer_config, asyncio.Queue(), asyncio.Queue() + ) + self.miniaudio_worker.start() + + async def tear_down(self): + await super().tear_down() + if self.experimental_streaming: + self.miniaudio_worker.terminate() + + async def output_generator( + self, + response: aiohttp.ClientResponse, + chunk_size: int, + create_speech_span: Optional[Span], + ) -> AsyncGenerator[SynthesisResult.ChunkResult, None]: + stream_reader = response.content + buffer = bytearray() + + # Create a task to send the mp3 chunks to the PydubWorker's input queue in a separate loop + async def send_chunks(): + async for chunk in stream_reader.iter_any(): + self.miniaudio_worker.consume_nonblocking((chunk, False)) + self.miniaudio_worker.consume_nonblocking((None, True)) + + asyncio.create_task(send_chunks()) + + # Await the output queue of the PydubWorker and yield the wav chunks in another loop + while True: + # Get the wav chunk and the flag from the output queue of the PydubWorker + wav_chunk, is_last = await self.miniaudio_worker.output_queue.get() + + if wav_chunk is not None: + buffer.extend(wav_chunk) + + if len(buffer) >= chunk_size or is_last: + yield SynthesisResult.ChunkResult(buffer, is_last) + buffer.clear() + # If this is the last chunk, break the loop + if is_last and create_speech_span is not None: + create_speech_span.end() + break async def create_speech( self, @@ -57,6 +105,10 @@ async def create_speech( stability=self.stability, similarity_boost=self.similarity_boost ) url = ELEVEN_LABS_BASE_URL + f"text-to-speech/{self.voice_id}" + + if self.experimental_streaming: + url += "/stream" + if self.optimize_streaming_latency: url += f"?optimize_streaming_latency={self.optimize_streaming_latency}" headers = {"xi-api-key": self.api_key} @@ -70,30 +122,34 @@ async def create_speech( create_speech_span = tracer.start_span( f"synthesizer.{SynthesizerType.ELEVEN_LABS.value.split('_', 1)[-1]}.create_total", ) - async with self.aiohttp_session.request( + + session = self.aiohttp_session + + response = await session.request( "POST", url, json=body, headers=headers, timeout=aiohttp.ClientTimeout(total=15), - ) as response: - if not response.ok: - raise Exception( - f"ElevenLabs API returned {response.status} status code" - ) + ) + if not response.ok: + raise Exception(f"ElevenLabs API returned {response.status} status code") + if self.experimental_streaming: + return SynthesisResult( + self.output_generator( + response, chunk_size, create_speech_span + ), # should be wav + lambda seconds: self.get_message_cutoff_from_voice_speed( + message, seconds, self.words_per_minute + ), + ) + else: audio_data = await response.read() create_speech_span.end() convert_span = tracer.start_span( f"synthesizer.{SynthesizerType.ELEVEN_LABS.value.split('_', 1)[-1]}.convert", ) - # TODO: probably needs to be in a thread - audio_segment: AudioSegment = AudioSegment.from_mp3( - io.BytesIO(audio_data) # type: ignore - ) - - output_bytes_io = io.BytesIO() - - audio_segment.export(output_bytes_io, format="wav") # type: ignore + output_bytes_io = decode_mp3(audio_data) result = self.create_synthesis_result_from_wav( file=output_bytes_io, diff --git a/vocode/streaming/synthesizer/miniaudio_worker.py b/vocode/streaming/synthesizer/miniaudio_worker.py new file mode 100644 index 000000000..85e254a09 --- /dev/null +++ b/vocode/streaming/synthesizer/miniaudio_worker.py @@ -0,0 +1,56 @@ +from __future__ import annotations +import queue + +from typing import Optional, Tuple +import asyncio +import miniaudio + +from vocode.streaming.models.synthesizer import SynthesizerConfig +from vocode.streaming.utils import convert_wav +from vocode.streaming.utils.mp3_helper import decode_mp3 +from vocode.streaming.utils.worker import ThreadAsyncWorker, logger + +QueueType = Tuple[Optional[bytes], bool] + + +class MiniaudioWorker(ThreadAsyncWorker[QueueType]): + def __init__( + self, + synthesizer_config: SynthesizerConfig, + input_queue: asyncio.Queue[QueueType], + output_queue: asyncio.Queue[QueueType], + ) -> None: + super().__init__(input_queue, output_queue) + self.synthesizer_config = synthesizer_config + self._ended = False + + def _run_loop(self): + while not self._ended: + # Get a tuple of (mp3_chunk, is_last) from the input queue + try: + mp3_chunk, is_last = self.input_janus_queue.sync_q.get(timeout=1) + except queue.Empty: + continue + if mp3_chunk is None: + self.output_janus_queue.sync_q.put((None, True)) + continue + try: + output_bytes_io = decode_mp3( + mp3_chunk, + ) + except miniaudio.DecodeError as e: + # How should I log this + logger.exception("MiniaudioWorker error: " + str(e), exc_info=True) + self.output_janus_queue.sync_q.put((None, True)) + continue + output_bytes_io = convert_wav( + output_bytes_io, + output_sample_rate=self.synthesizer_config.sampling_rate, + output_encoding=self.synthesizer_config.audio_encoding, + ) + # Put a tuple of (wav_chunk, is_last) in the output queue + self.output_janus_queue.sync_q.put((output_bytes_io, is_last)) + + def terminate(self): + self._ended = True + super().terminate() diff --git a/vocode/streaming/utils/mp3_helper.py b/vocode/streaming/utils/mp3_helper.py new file mode 100644 index 000000000..b0a2af915 --- /dev/null +++ b/vocode/streaming/utils/mp3_helper.py @@ -0,0 +1,20 @@ +import io +import wave +import miniaudio + + +# sampling_rate is the rate of the input, not expected output +def decode_mp3(mp3_bytes: bytes) -> io.BytesIO: + # Convert it to a wav chunk using miniaudio + wav_chunk = miniaudio.decode(mp3_bytes, nchannels=1) + + # Write wav_chunks.samples to io.BytesIO with builtin WAVE + output_bytes_io = io.BytesIO() + + with wave.open(output_bytes_io, "wb") as wave_obj: + wave_obj.setnchannels(1) + wave_obj.setsampwidth(2) + wave_obj.setframerate(44100) + wave_obj.writeframes(wav_chunk.samples) + output_bytes_io.seek(0) + return output_bytes_io diff --git a/vocode/streaming/utils/worker.py b/vocode/streaming/utils/worker.py index d9ee5b749..d01599407 100644 --- a/vocode/streaming/utils/worker.py +++ b/vocode/streaming/utils/worker.py @@ -7,6 +7,7 @@ from typing import TypeVar, Generic import logging + logger = logging.getLogger(__name__) WorkerInputType = TypeVar("WorkerInputType") @@ -42,15 +43,15 @@ def terminate(self): return False -class ThreadAsyncWorker(AsyncWorker): +class ThreadAsyncWorker(AsyncWorker[WorkerInputType]): def __init__( self, - input_queue: asyncio.Queue, + input_queue: asyncio.Queue[WorkerInputType], output_queue: asyncio.Queue = asyncio.Queue(), ) -> None: super().__init__(input_queue, output_queue) self.worker_thread: Optional[threading.Thread] = None - self.input_janus_queue: janus.Queue = janus.Queue() + self.input_janus_queue: janus.Queue[WorkerInputType] = janus.Queue() self.output_janus_queue: janus.Queue = janus.Queue() def start(self) -> asyncio.Task: