Skip to content

Exception in streaming call not propagated to caller's scope #1166

@david-katz

Description

@david-katz

Describe the bug

An exception within a streaming sdk call is not being propagated to the caller's scope.

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected behavior

A streaming sdk call made within a given scope should propagate its exceptions back to that scope. An exception handler provided at launch should be called if the exception is not otherwise handled.

Current behavior

Exceptions that occur within the stream that are not handled within the stream are not propagated to the calling scope's exception handler. Instead the app's top-level exception handler is receiving the following as an unhandled exception:

    FATAL EXCEPTION: DefaultDispatcher-worker-5
    Process: myprocess, PID: 14065
    okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at okio.RealBufferedSink.emit(RealBufferedSink.kt:262)
    at aws.smithy.kotlin.runtime.io.AbstractBufferedSinkAdapter.emit(BufferedSinkAdapter.kt:90)
    at aws.smithy.kotlin.runtime.io.SdkByteReadChannelKt$readAll$2.invokeSuspend(SdkByteReadChannel.kt:114)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
    at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:823)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
    Suppressed: okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.close(RealBufferedSink.kt:280)
    at kotlin.io.CloseableKt.closeFinally(Closeable.kt:59)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody$doWriteTo$1.invokeSuspend(StreamingRequestBody.kt:64)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.UndispatchedCoroutine.afterResume(CoroutineContext.kt:266)
    at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
    ... 7 more
    Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [TelemetryContext(aws.smithy.kotlin.runtime.telemetry.DefaultTelemetryProvider@9fa78e0), LoggingContextElement({rpc=Transcribe Streaming.StartStreamTranscription, sdkInvocationId=563166c0-e5e1-4c22-bbb4-96f5faa14d9e}), aws.smithy.kotlin.runtime.telemetry.context.TelemetryContextElement@db9e799, CoroutineName(call-context:send-request-body), StandaloneCoroutine{Cancelling}@60e6a5e, Dispatchers.IO]

Steps to Reproduce

The streaming call is being started within a coroutine as follows:

        val myDispatcher = Executors
            .newSingleThreadExecutor(VoiceInteractionThreadFactory)
            .asCoroutineDispatcher()

        // supervisor is SupervisorJob()
        val myScope = CoroutineScope(myDispatcher + supervisor)

        awsTranscribeJob =
                myScope.launch(RecognitionServiceExceptionHandler) {
                    withContext(myDispatcher) {
                        try {
                            awsTranscribe?.start()
                        } catch (e: AwsTranscribeException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
                            }
                        } catch (e: IOException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
                            }
                    }
                }
    }
suspend fun start(): String {
        enableHttp2FrameLogging()
        fullMessage = StringBuilder()

        client =
            TranscribeStreamingClient {
                logMode = LogMode.LogRequest + LogMode.LogResponse
                credentialsProvider = myCredentialsProvider
                region = "us-east-1"
                httpClient =
                    OkHttp4Engine {
                        enableHttp2FrameLogging()
                    }

                interceptors = mutableListOf(AwsLoggingInterceptor())
            }

        val req =
            StartStreamTranscriptionRequest {
                audioStream = audioStreamFlow
                mediaSampleRateHertz = 16000
                mediaEncoding = MediaEncoding.Flac
                languageCode = LanguageCode.DeDe
                languageModelName = "my-language-model-de-DE-v1.2"
                enablePartialResultsStabilization = false
                // partialResultsStability = PartialResultsStability.Low
            }

            client?.startStreamTranscription(req) { resp ->
                resp.transcriptResultStream
                    ?.collect { event ->
                        logger.debug("event received on transcriptResultStream")
                        when (event) {
                            is TranscriptResultStream.TranscriptEvent -> {
                                logger.debug("AwsTranscribe TranscriptEvent received")
                                event.value.transcript?.results?.forEach { result ->
                                    val transcript = result.alternatives?.firstOrNull()?.transcript
                                    if (result.isPartial) {
                                        speechRecognitionResultsListener
                                            ?.onPartialResult(
                                                transcript
                                                    ?: "",
                                            )
                                    } else {
                                        transcript?.let {
                                            fullMessage.append(it)
                                            speechRecognitionResultsListener?.onResults(fullMessage.toString())
                                            stop()
                                        }
                                    }
                                }
                            }
                            else -> error("unknown event $event")
                        }
                    }
            }
        return fullMessage.toString()
    }

Possible Solution

In aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody#doWriteTo

Perhaps the GlobalScope.launch is the issue?

    private fun doWriteTo(sink: BufferedSink) {
        val context = callContext + callContext.derivedName("send-request-body")
        if (isDuplex()) {
            // launch coroutine that writes to sink in the background
            GlobalScope.launch(context + Dispatchers.IO) {
                sink.use { transferBody(it) }
            }
        } else {
            // remove the current dispatcher (if it exists) and use the internal
            // runBlocking dispatcher that blocks the *current* thread
            val blockingContext = context.minusKey(CoroutineDispatcher)

            // Non-duplex (aka "normal") requests MUST write all of their request body
            // before this function returns. Requests are given a background thread to
            // do this work in, and it is safe and expected to block.
            // see: https://square.github.io/okhttp/4.x/okhttp/okhttp3/-request-body/is-duplex/
            runBlocking(blockingContext) {
                transferBody(sink)
            }
        }
    }

Context

No response

Smithy-Kotlin version

1.3.17

Platform (JVM/JS/Native)

JVM

Operating system and version

Android 13

Metadata

Metadata

Assignees

Labels

bugThis issue is a bug.p2This is a standard priority issue

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions