Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in streaming call not propagated to caller's scope #1166

Open
1 task
david-katz opened this issue Oct 22, 2024 · 2 comments
Open
1 task

Exception in streaming call not propagated to caller's scope #1166

david-katz opened this issue Oct 22, 2024 · 2 comments
Assignees
Labels
bug This issue is a bug.

Comments

@david-katz
Copy link

david-katz commented Oct 22, 2024

Describe the bug

An exception within a streaming sdk call is not being propagated to the caller's scope.

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected behavior

A streaming sdk call made within a given scope should propagate its exceptions back to that scope. An exception handler provided at launch should be called if the exception is not otherwise handled.

Current behavior

Exceptions that occur within the stream that are not handled within the stream are not propagated to the calling scope's exception handler. Instead the app's top-level exception handler is receiving the following as an unhandled exception:

    FATAL EXCEPTION: DefaultDispatcher-worker-5
    Process: myprocess, PID: 14065
    okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at okio.RealBufferedSink.emit(RealBufferedSink.kt:262)
    at aws.smithy.kotlin.runtime.io.AbstractBufferedSinkAdapter.emit(BufferedSinkAdapter.kt:90)
    at aws.smithy.kotlin.runtime.io.SdkByteReadChannelKt$readAll$2.invokeSuspend(SdkByteReadChannel.kt:114)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
    at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:823)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
    Suppressed: okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.close(RealBufferedSink.kt:280)
    at kotlin.io.CloseableKt.closeFinally(Closeable.kt:59)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody$doWriteTo$1.invokeSuspend(StreamingRequestBody.kt:64)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.UndispatchedCoroutine.afterResume(CoroutineContext.kt:266)
    at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
    ... 7 more
    Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [TelemetryContext(aws.smithy.kotlin.runtime.telemetry.DefaultTelemetryProvider@9fa78e0), LoggingContextElement({rpc=Transcribe Streaming.StartStreamTranscription, sdkInvocationId=563166c0-e5e1-4c22-bbb4-96f5faa14d9e}), aws.smithy.kotlin.runtime.telemetry.context.TelemetryContextElement@db9e799, CoroutineName(call-context:send-request-body), StandaloneCoroutine{Cancelling}@60e6a5e, Dispatchers.IO]

Steps to Reproduce

The streaming call is being started within a coroutine as follows:

        val myDispatcher = Executors
            .newSingleThreadExecutor(VoiceInteractionThreadFactory)
            .asCoroutineDispatcher()

        // supervisor is SupervisorJob()
        val myScope = CoroutineScope(myDispatcher + supervisor)

        awsTranscribeJob =
                myScope.launch(RecognitionServiceExceptionHandler) {
                    withContext(myDispatcher) {
                        try {
                            awsTranscribe?.start()
                        } catch (e: AwsTranscribeException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
                            }
                        } catch (e: IOException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
                            }
                    }
                }
    }
suspend fun start(): String {
        enableHttp2FrameLogging()
        fullMessage = StringBuilder()

        client =
            TranscribeStreamingClient {
                logMode = LogMode.LogRequest + LogMode.LogResponse
                credentialsProvider = myCredentialsProvider
                region = "us-east-1"
                httpClient =
                    OkHttp4Engine {
                        enableHttp2FrameLogging()
                    }

                interceptors = mutableListOf(AwsLoggingInterceptor())
            }

        val req =
            StartStreamTranscriptionRequest {
                audioStream = audioStreamFlow
                mediaSampleRateHertz = 16000
                mediaEncoding = MediaEncoding.Flac
                languageCode = LanguageCode.DeDe
                languageModelName = "my-language-model-de-DE-v1.2"
                enablePartialResultsStabilization = false
                // partialResultsStability = PartialResultsStability.Low
            }

            client?.startStreamTranscription(req) { resp ->
                resp.transcriptResultStream
                    ?.collect { event ->
                        logger.debug("event received on transcriptResultStream")
                        when (event) {
                            is TranscriptResultStream.TranscriptEvent -> {
                                logger.debug("AwsTranscribe TranscriptEvent received")
                                event.value.transcript?.results?.forEach { result ->
                                    val transcript = result.alternatives?.firstOrNull()?.transcript
                                    if (result.isPartial) {
                                        speechRecognitionResultsListener
                                            ?.onPartialResult(
                                                transcript
                                                    ?: "",
                                            )
                                    } else {
                                        transcript?.let {
                                            fullMessage.append(it)
                                            speechRecognitionResultsListener?.onResults(fullMessage.toString())
                                            stop()
                                        }
                                    }
                                }
                            }
                            else -> error("unknown event $event")
                        }
                    }
            }
        return fullMessage.toString()
    }

Possible Solution

In aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody#doWriteTo

Perhaps the GlobalScope.launch is the issue?

    private fun doWriteTo(sink: BufferedSink) {
        val context = callContext + callContext.derivedName("send-request-body")
        if (isDuplex()) {
            // launch coroutine that writes to sink in the background
            GlobalScope.launch(context + Dispatchers.IO) {
                sink.use { transferBody(it) }
            }
        } else {
            // remove the current dispatcher (if it exists) and use the internal
            // runBlocking dispatcher that blocks the *current* thread
            val blockingContext = context.minusKey(CoroutineDispatcher)

            // Non-duplex (aka "normal") requests MUST write all of their request body
            // before this function returns. Requests are given a background thread to
            // do this work in, and it is safe and expected to block.
            // see: https://square.github.io/okhttp/4.x/okhttp/okhttp3/-request-body/is-duplex/
            runBlocking(blockingContext) {
                transferBody(sink)
            }
        }
    }

Context

No response

Smithy-Kotlin version

1.3.17

Platform (JVM/JS/Native)

JVM

Operating system and version

Android 13

@david-katz david-katz added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Oct 22, 2024
@david-katz
Copy link
Author

I tried adding a .catch block to the stream processing:

                resp.transcriptResultStream
                    ?.catch { e ->
                        if (e !is Exception) throw e
                        logger.error("error in transcriptResultStream: ${e.message}")
                        speechRecognitionResultsListener?.onError(e)
                        stop()
                    }
                    ?.collect { event ->

but the result was that while this catch block was called, for instance when no audio was received by transcribe for 15 seconds, the StreamResetException was still unhandled.

@ianbotsf ianbotsf self-assigned this Oct 22, 2024
@ianbotsf ianbotsf removed the needs-triage This issue or PR still needs to be triaged. label Oct 22, 2024
@ianbotsf
Copy link
Contributor

I've been able to reproduce this and I'm working on a fix and additional testing within the SDK. I'll post an update here when I have a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug.
Projects
None yet
Development

No branches or pull requests

2 participants