Skip to content

Commit

Permalink
Added support for service restarts, restart GCS after 305 secs
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan770 committed Apr 4, 2021
1 parent 5fe61bf commit 6a71cdf
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl HandlerDatabase {

/// Receive new WebSocket notification.
///
/// This method will yield to executor if there are no transcriptions in queue.
/// This method will yield to executor if there are no notifications in queue.
pub async fn recv_notification(&self) -> WsNotification {
self.ws_notifications.pop().await
}
Expand Down
25 changes: 23 additions & 2 deletions src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
use tonic::{
metadata::{errors::InvalidMetadataValue, MetadataValue},
transport::{Certificate, Channel, ClientTlsConfig, Error as TransportError},
Request, Status,
Code, Request, Status,
};
use tracing::warn;
use yup_oauth2::{
Expand Down Expand Up @@ -56,10 +56,27 @@ pub enum CloudSpeechError {
AuthenticationError(#[from] OauthError),

#[error("Unable to call gRPC API: {0}")]
CallError(#[from] Status),
CallError(Status),

#[error("Unable to send transcription to background service: {0}")]
FlumeError(#[from] SendError<String>),

#[error("Max GCS stream duration is 305 seconds.")]
TooLongStream(Status),
}

impl From<Status> for CloudSpeechError {
fn from(error: Status) -> Self {
match error.code() {
Code::OutOfRange
if error.message()
== "Exceeded maximum allowed stream duration of 305 seconds." =>
{
CloudSpeechError::TooLongStream(error)
}
_ => CloudSpeechError::CallError(error),
}
}
}

pub mod await_time {
Expand Down Expand Up @@ -249,6 +266,10 @@ where
.map_err(CloudSpeechError::from))
}
}

fn restartable(error: &Self::Error) -> bool {
matches!(error, CloudSpeechError::TooLongStream(_))
}
}

impl<'c> FromConfig<'c> for GoogleCloudSpeech
Expand Down
5 changes: 3 additions & 2 deletions src/recognition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct SpeechRecognitionResponse {
/// specific options via [`SpeechRecognitionConfig`]
///
/// [`application config`]: Config
#[derive(Clone)]
pub struct SpeechRecognitionServiceConfig<'c> {
/// Application configuration.
pub application_config: &'c Config,
Expand All @@ -54,9 +55,11 @@ pub struct SpeechRecognitionConfig {
pub language: String,

/// Enable profanity filter (if provider supports it)?
#[serde(default)]
pub profanity_filter: bool,

/// Enable punctuation guessing (if provider supports it)?
#[serde(default)]
pub punctuation: bool,
}

Expand All @@ -73,10 +76,8 @@ impl Default for SpeechRecognitionConfig {
#[pin_project]
pub struct SpeechRecognitionSink<E> {
id: Uuid,

#[pin]
database: Arc<HandlerDatabase>,

_error: PhantomData<E>,
}

Expand Down
39 changes: 31 additions & 8 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ where

/// Start process of audio streaming.
fn stream(self, stream: S) -> Self::Fut;

/// Determine if service should be restarted in case of error.
///
/// By default, in case of fail there will be no restart for a service.
fn restartable(_: &Self::Error) -> bool {
false
}
}

/// Create a [`Service`] from provided config, possibly failing to do so.
Expand Down Expand Up @@ -82,7 +89,8 @@ async fn spawn_service<C, I, O, S>(
mut sink: O,
) -> Result<(), <S as Service<I>>::Error>
where
I: Stream<Item = <S as Service<I>>::Input> + Send + Sync + 'static,
C: Clone,
I: Stream<Item = <S as Service<I>>::Input> + Clone + Send + Sync + 'static,
O: Sink<<<S as Service<I>>::Ok as TryStream>::Ok, Error = <S as Service<I>>::Error>
+ Send
+ Unpin,
Expand All @@ -94,13 +102,28 @@ where
<S as Service<I>>::Error:
From<<S as FromConfig<'static>>::Error> + From<<S::Ok as TryStream>::Error>,
{
S::from_config(config)
.await?
.stream(stream)
.await?
.map_err(<S as Service<I>>::Error::from)
.forward(&mut sink)
.await?;
macro_rules! spawner {
($config:expr, $stream:expr, $sink:expr) => {
S::from_config(config.clone())
.await?
.stream(stream.clone())
.await?
.map_err(<S as Service<I>>::Error::from)
.forward(&mut sink)
.await;
};
}

let mut spawned = spawner!(config.clone(), stream.clone(), &mut sink);

while let Err(ref e) = spawned {
if <S as Service<I>>::restartable(e) {
tracing::warn!("Restarting service due to error match");
spawned = spawner!(config.clone(), stream.clone(), &mut sink);
} else {
return spawned;
}
}

Ok(())
}
Expand Down

0 comments on commit 6a71cdf

Please sign in to comment.