Skip to content

Commit

Permalink
[EPD-333] ElevenLabs Threaded MP3 Streaming (#316)
Browse files Browse the repository at this point in the history
* initial work, still blocking

* Add threaded mp3 worker

* fix tests add todo

* use miniaudio worker and fix sync issues

* attempt to handle error from short chunks?

* make streaming optional & refac miniaudio decoding

* teardown the experimental worker

* fix tests and mypy

* resolve comments

* use sentinel to fix /stream endpoint

* potentially fix typing

* Revert "potentially fix typing"

This reverts commit fdd37d7.

* forgot about __future__

* fix termination code

---------

Co-authored-by: Ajay Raj <[email protected]>
  • Loading branch information
zaptrem and ajar98 authored Aug 2, 2023
1 parent fa24e27 commit d078d19
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 39 deletions.
2 changes: 1 addition & 1 deletion playground/streaming/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@


# These synthesizers stream output so they need to be traced within this file.
STREAMING_SYNTHESIZERS = ["azure"]
STREAMING_SYNTHESIZERS = ["azure", "elevenlabs"]


TRANSCRIBER_CHOICES = ["deepgram", "assemblyai"]
Expand Down
66 changes: 65 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ speechrecognition = "^3.10.0"
aiohttp = "^3.8.4"
langchain = "^0.0.198"
google-cloud-aiplatform = {version = "^1.26.0", optional = true}
miniaudio = "^1.59"


[tool.poetry.group.lint.dependencies]
Expand Down
14 changes: 8 additions & 6 deletions tests/synthesizer/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
)
import re
from tests.streaming.data.loader import get_audio_path
import asyncio
import pytest

DEFAULT_PARAMS = {"sampling_rate": 16000,
"audio_encoding": AudioEncoding.LINEAR16}
Expand Down Expand Up @@ -38,23 +40,23 @@ def mock_eleven_labs_api():


@pytest.fixture(scope="module")
def eleven_labs_synthesizer_with_api_key():
async def fixture_eleven_labs_synthesizer_with_api_key():
params = DEFAULT_PARAMS.copy()
params["api_key"] = MOCK_API_KEY
yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))
return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))


@pytest.fixture(scope="module")
def eleven_labs_synthesizer_wrong_api_key():
async def fixture_eleven_labs_synthesizer_wrong_api_key():
params = DEFAULT_PARAMS.copy()
params["api_key"] = "wrong_api_key"
yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))
return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))


@pytest.fixture(scope="module")
def eleven_labs_synthesizer_env_api_key():
async def fixture_eleven_labs_synthesizer_env_api_key():
params = DEFAULT_PARAMS.copy()
import os

os.environ["ELEVEN_LABS_API_KEY"] = MOCK_API_KEY
yield ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))
return ElevenLabsSynthesizer(ElevenLabsSynthesizerConfig(**params))
15 changes: 8 additions & 7 deletions tests/synthesizer/test_eleven_labs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from pydantic import ValidationError
import pytest
from vocode.streaming.synthesizer.base_synthesizer import SynthesisResult
Expand All @@ -9,7 +10,7 @@
from pydub import AudioSegment


async def asset_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer):
async def assert_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer):
response = await synthesizer.create_speech(BaseMessage(text="Hello, world!"), 1024)
assert isinstance(response, SynthesisResult)
assert response.chunk_generator is not None
Expand All @@ -25,26 +26,26 @@ async def asset_synthesis_result_valid(synthesizer: ElevenLabsSynthesizer):

@pytest.mark.asyncio
async def test_with_api_key(
eleven_labs_synthesizer_with_api_key: ElevenLabsSynthesizer,
fixture_eleven_labs_synthesizer_with_api_key: ElevenLabsSynthesizer,
mock_eleven_labs_api: aioresponses,
):
await asset_synthesis_result_valid(eleven_labs_synthesizer_with_api_key)
await assert_synthesis_result_valid(await fixture_eleven_labs_synthesizer_with_api_key)


@pytest.mark.asyncio
async def test_with_wrong_api_key(
eleven_labs_synthesizer_wrong_api_key: ElevenLabsSynthesizer,
fixture_eleven_labs_synthesizer_wrong_api_key: ElevenLabsSynthesizer,
mock_eleven_labs_api: aioresponses,
):
with pytest.raises(Exception, match="ElevenLabs API returned 401 status code"):
await eleven_labs_synthesizer_wrong_api_key.create_speech(
await (await fixture_eleven_labs_synthesizer_wrong_api_key).create_speech(
BaseMessage(text="Hello, world!"), 1024
)


@pytest.mark.asyncio
async def test_with_env_api_key(
eleven_labs_synthesizer_env_api_key: ElevenLabsSynthesizer,
fixture_eleven_labs_synthesizer_env_api_key: ElevenLabsSynthesizer,
mock_eleven_labs_api: aioresponses,
):
await asset_synthesis_result_valid(eleven_labs_synthesizer_env_api_key)
await assert_synthesis_result_valid(await fixture_eleven_labs_synthesizer_env_api_key)
5 changes: 3 additions & 2 deletions vocode/streaming/models/synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ class ElevenLabsSynthesizerConfig(
):
api_key: Optional[str] = None
voice_id: Optional[str] = ELEVEN_LABS_ADAM_VOICE_ID
optimize_streaming_latency: Optional[int]
experimental_streaming: Optional[bool] = False
stability: Optional[float]
similarity_boost: Optional[float]
optimize_streaming_latency: Optional[int]
model_id: Optional[str]

@validator("voice_id")
Expand Down Expand Up @@ -135,13 +136,13 @@ def optimize_streaming_latency_check(cls, optimize_streaming_latency):
RIME_DEFAULT_SAMPLE_RATE = 22050
RIME_DEFAULT_BASE_URL = "https://rjmopratfrdjgmfmaios.functions.supabase.co/rime-tts"


class RimeSynthesizerConfig(SynthesizerConfig, type=SynthesizerType.RIME.value):
speaker: str = RIME_DEFAULT_SPEAKER
sampling_rate: int = RIME_DEFAULT_SAMPLE_RATE
base_url: str = RIME_DEFAULT_BASE_URL



COQUI_DEFAULT_SPEAKER_ID = "ebe2db86-62a6-49a1-907a-9a1360d4416e"


Expand Down
94 changes: 75 additions & 19 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import io
import asyncio
import logging
from typing import Any, Optional
import time
from typing import Any, AsyncGenerator, Optional
import wave
import aiohttp
from pydub import AudioSegment
from opentelemetry.trace import Span

from vocode import getenv
from vocode.streaming.synthesizer.base_synthesizer import (
Expand All @@ -16,8 +18,8 @@
)
from vocode.streaming.agent.bot_sentiment_analyser import BotSentiment
from vocode.streaming.models.message import BaseMessage

from opentelemetry.context.context import Context
from vocode.streaming.utils.mp3_helper import decode_mp3
from vocode.streaming.synthesizer.miniaudio_worker import MiniaudioWorker


ADAM_VOICE_ID = "pNInz6obpgDQGcFmaJgB"
Expand All @@ -44,6 +46,52 @@ def __init__(
self.model_id = synthesizer_config.model_id
self.optimize_streaming_latency = synthesizer_config.optimize_streaming_latency
self.words_per_minute = 150
self.experimental_streaming = synthesizer_config.experimental_streaming

# Create a PydubWorker instance as an attribute
if self.experimental_streaming:
self.miniaudio_worker = MiniaudioWorker(
synthesizer_config, asyncio.Queue(), asyncio.Queue()
)
self.miniaudio_worker.start()

async def tear_down(self):
await super().tear_down()
if self.experimental_streaming:
self.miniaudio_worker.terminate()

async def output_generator(
self,
response: aiohttp.ClientResponse,
chunk_size: int,
create_speech_span: Optional[Span],
) -> AsyncGenerator[SynthesisResult.ChunkResult, None]:
stream_reader = response.content
buffer = bytearray()

# Create a task to send the mp3 chunks to the PydubWorker's input queue in a separate loop
async def send_chunks():
async for chunk in stream_reader.iter_any():
self.miniaudio_worker.consume_nonblocking((chunk, False))
self.miniaudio_worker.consume_nonblocking((None, True))

asyncio.create_task(send_chunks())

# Await the output queue of the PydubWorker and yield the wav chunks in another loop
while True:
# Get the wav chunk and the flag from the output queue of the PydubWorker
wav_chunk, is_last = await self.miniaudio_worker.output_queue.get()

if wav_chunk is not None:
buffer.extend(wav_chunk)

if len(buffer) >= chunk_size or is_last:
yield SynthesisResult.ChunkResult(buffer, is_last)
buffer.clear()
# If this is the last chunk, break the loop
if is_last and create_speech_span is not None:
create_speech_span.end()
break

async def create_speech(
self,
Expand All @@ -57,6 +105,10 @@ async def create_speech(
stability=self.stability, similarity_boost=self.similarity_boost
)
url = ELEVEN_LABS_BASE_URL + f"text-to-speech/{self.voice_id}"

if self.experimental_streaming:
url += "/stream"

if self.optimize_streaming_latency:
url += f"?optimize_streaming_latency={self.optimize_streaming_latency}"
headers = {"xi-api-key": self.api_key}
Expand All @@ -70,30 +122,34 @@ async def create_speech(
create_speech_span = tracer.start_span(
f"synthesizer.{SynthesizerType.ELEVEN_LABS.value.split('_', 1)[-1]}.create_total",
)
async with self.aiohttp_session.request(

session = self.aiohttp_session

response = await session.request(
"POST",
url,
json=body,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15),
) as response:
if not response.ok:
raise Exception(
f"ElevenLabs API returned {response.status} status code"
)
)
if not response.ok:
raise Exception(f"ElevenLabs API returned {response.status} status code")
if self.experimental_streaming:
return SynthesisResult(
self.output_generator(
response, chunk_size, create_speech_span
), # should be wav
lambda seconds: self.get_message_cutoff_from_voice_speed(
message, seconds, self.words_per_minute
),
)
else:
audio_data = await response.read()
create_speech_span.end()
convert_span = tracer.start_span(
f"synthesizer.{SynthesizerType.ELEVEN_LABS.value.split('_', 1)[-1]}.convert",
)
# TODO: probably needs to be in a thread
audio_segment: AudioSegment = AudioSegment.from_mp3(
io.BytesIO(audio_data) # type: ignore
)

output_bytes_io = io.BytesIO()

audio_segment.export(output_bytes_io, format="wav") # type: ignore
output_bytes_io = decode_mp3(audio_data)

result = self.create_synthesis_result_from_wav(
file=output_bytes_io,
Expand Down
Loading

0 comments on commit d078d19

Please sign in to comment.