Skip to content

Commit

Permalink
finalize mp3 streaming (#332)
Browse files Browse the repository at this point in the history
* build up the buffer and lop off the end to respond

* make a new miniaudio worker per synthesis

* resolve comments

* one more comment

* pydub worker references
  • Loading branch information
ajar98 authored Aug 3, 2023
1 parent 129ae6c commit bb8b195
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 54 deletions.
74 changes: 37 additions & 37 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import time
from typing import Any, AsyncGenerator, Optional
from typing import Any, AsyncGenerator, Optional, Tuple, Union
import wave
import aiohttp
from opentelemetry.trace import Span
Expand Down Expand Up @@ -48,50 +48,50 @@ def __init__(
self.words_per_minute = 150
self.experimental_streaming = synthesizer_config.experimental_streaming

# Create a PydubWorker instance as an attribute
if self.experimental_streaming:
self.miniaudio_worker = MiniaudioWorker(
synthesizer_config, asyncio.Queue(), asyncio.Queue()
)
self.miniaudio_worker.start()

async def tear_down(self):
await super().tear_down()
if self.experimental_streaming:
self.miniaudio_worker.terminate()

async def output_generator(
async def experimental_streaming_output_generator(
self,
response: aiohttp.ClientResponse,
chunk_size: int,
create_speech_span: Optional[Span],
) -> AsyncGenerator[SynthesisResult.ChunkResult, None]:
miniaudio_worker_input_queue: asyncio.Queue[
Union[bytes, None]
] = asyncio.Queue()
miniaudio_worker_output_queue: asyncio.Queue[
Tuple[bytes, bool]
] = asyncio.Queue()
miniaudio_worker = MiniaudioWorker(
self.synthesizer_config,
chunk_size,
miniaudio_worker_input_queue,
miniaudio_worker_output_queue,
)
miniaudio_worker.start()
stream_reader = response.content
buffer = bytearray()

# Create a task to send the mp3 chunks to the PydubWorker's input queue in a separate loop
# Create a task to send the mp3 chunks to the MiniaudioWorker's input queue in a separate loop
async def send_chunks():
async for chunk in stream_reader.iter_any():
self.miniaudio_worker.consume_nonblocking((chunk, False))
self.miniaudio_worker.consume_nonblocking((None, True))

asyncio.create_task(send_chunks())

# Await the output queue of the PydubWorker and yield the wav chunks in another loop
while True:
# Get the wav chunk and the flag from the output queue of the PydubWorker
wav_chunk, is_last = await self.miniaudio_worker.output_queue.get()

if wav_chunk is not None:
buffer.extend(wav_chunk)

if len(buffer) >= chunk_size or is_last:
yield SynthesisResult.ChunkResult(buffer, is_last)
buffer.clear()
# If this is the last chunk, break the loop
if is_last and create_speech_span is not None:
create_speech_span.end()
break
miniaudio_worker.consume_nonblocking(chunk)
miniaudio_worker.consume_nonblocking(None) # sentinel

try:
asyncio.create_task(send_chunks())

# Await the output queue of the MiniaudioWorker and yield the wav chunks in another loop
while True:
# Get the wav chunk and the flag from the output queue of the MiniaudioWorker
wav_chunk, is_last = await miniaudio_worker.output_queue.get()

yield SynthesisResult.ChunkResult(wav_chunk, is_last)
# If this is the last chunk, break the loop
if is_last and create_speech_span is not None:
create_speech_span.end()
break
except asyncio.CancelledError:
pass
finally:
miniaudio_worker.terminate()

async def create_speech(
self,
Expand Down Expand Up @@ -136,7 +136,7 @@ async def create_speech(
raise Exception(f"ElevenLabs API returned {response.status} status code")
if self.experimental_streaming:
return SynthesisResult(
self.output_generator(
self.experimental_streaming_output_generator(
response, chunk_size, create_speech_span
), # should be wav
lambda seconds: self.get_message_cutoff_from_voice_speed(
Expand Down
63 changes: 46 additions & 17 deletions vocode/streaming/synthesizer/miniaudio_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
import queue

from typing import Optional, Tuple
from typing import Optional, Tuple, Union
import asyncio
import miniaudio

Expand All @@ -10,46 +10,75 @@
from vocode.streaming.utils.mp3_helper import decode_mp3
from vocode.streaming.utils.worker import ThreadAsyncWorker, logger

QueueType = Tuple[Optional[bytes], bool]


class MiniaudioWorker(ThreadAsyncWorker[QueueType]):
class MiniaudioWorker(ThreadAsyncWorker[Union[bytes, None]]):
def __init__(
self,
synthesizer_config: SynthesizerConfig,
input_queue: asyncio.Queue[QueueType],
output_queue: asyncio.Queue[QueueType],
chunk_size: int,
input_queue: asyncio.Queue[Union[bytes, None]],
output_queue: asyncio.Queue[Tuple[bytes, bool]],
) -> None:
super().__init__(input_queue, output_queue)
self.output_queue = output_queue # for typing
self.synthesizer_config = synthesizer_config
self.chunk_size = chunk_size
self._ended = False

def _run_loop(self):
# tracks the mp3 so far
current_mp3_buffer = bytearray()
# tracks the wav so far
current_wav_buffer = bytearray()
# the leftover chunks of the wav that haven't been sent to the output queue yet
current_wav_output_buffer = bytearray()
while not self._ended:
# Get a tuple of (mp3_chunk, is_last) from the input queue
try:
mp3_chunk, is_last = self.input_janus_queue.sync_q.get(timeout=1)
mp3_chunk = self.input_janus_queue.sync_q.get(timeout=1)
except queue.Empty:
continue
if mp3_chunk is None:
self.output_janus_queue.sync_q.put((None, True))
current_mp3_buffer.clear()
current_wav_buffer.clear()
self.output_janus_queue.sync_q.put(
(bytes(current_wav_output_buffer), True)
)
current_wav_output_buffer.clear()
continue
try:
output_bytes_io = decode_mp3(
mp3_chunk,
)
current_mp3_buffer.extend(mp3_chunk)
output_bytes = decode_mp3(bytes(current_mp3_buffer))
except miniaudio.DecodeError as e:
# How should I log this
# TODO: better logging
logger.exception("MiniaudioWorker error: " + str(e), exc_info=True)
self.output_janus_queue.sync_q.put((None, True))
self.output_janus_queue.sync_q.put(
(bytes(current_wav_output_buffer), True)
) # sentinel
continue
output_bytes_io = convert_wav(
output_bytes_io,
converted_output_bytes = convert_wav(
output_bytes,
output_sample_rate=self.synthesizer_config.sampling_rate,
output_encoding=self.synthesizer_config.audio_encoding,
)
# Put a tuple of (wav_chunk, is_last) in the output queue
self.output_janus_queue.sync_q.put((output_bytes_io, is_last))
# take the difference between the current_wav_buffer and the converted_output_bytes
# and put the difference in the output buffer
new_bytes = converted_output_bytes[len(current_wav_buffer) :]
current_wav_output_buffer.extend(new_bytes)

# chunk up new_bytes in chunks of chunk_size bytes, but keep the last chunk (less than chunk size) in the wav output buffer
output_buffer_idx = 0
while output_buffer_idx < len(current_wav_output_buffer) - self.chunk_size:
chunk = current_wav_output_buffer[
output_buffer_idx : output_buffer_idx + self.chunk_size
]
self.output_janus_queue.sync_q.put(
(chunk, False)
) # don't need to use bytes() since we already sliced it (which is a copy)
output_buffer_idx += self.chunk_size

current_wav_output_buffer = current_wav_output_buffer[output_buffer_idx:]
current_wav_buffer.extend(new_bytes)

def terminate(self):
self._ended = True
Expand Down

0 comments on commit bb8b195

Please sign in to comment.