-
Notifications
You must be signed in to change notification settings - Fork 31
Closed as not planned
Labels
bugThis issue is a bug.This issue is a bug.p2This is a standard priority issueThis is a standard priority issue
Description
Describe the bug
An exception within a streaming sdk call is not being propagated to the caller's scope.
Regression Issue
- Select this option if this issue appears to be a regression.
Expected behavior
A streaming sdk call made within a given scope should propagate its exceptions back to that scope. An exception handler provided at launch should be called if the exception is not otherwise handled.
Current behavior
Exceptions that occur within the stream that are not handled within the stream are not propagated to the calling scope's exception handler. Instead the app's top-level exception handler is receiving the following as an unhandled exception:
FATAL EXCEPTION: DefaultDispatcher-worker-5
Process: myprocess, PID: 14065
okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
at okio.ForwardingSink.write(ForwardingSink.kt:29)
at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at okio.RealBufferedSink.emit(RealBufferedSink.kt:262)
at aws.smithy.kotlin.runtime.io.AbstractBufferedSinkAdapter.emit(BufferedSinkAdapter.kt:90)
at aws.smithy.kotlin.runtime.io.SdkByteReadChannelKt$readAll$2.invokeSuspend(SdkByteReadChannel.kt:114)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:823)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
Suppressed: okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
at okio.ForwardingSink.write(ForwardingSink.kt:29)
at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
at okio.RealBufferedSink.close(RealBufferedSink.kt:280)
at kotlin.io.CloseableKt.closeFinally(Closeable.kt:59)
at aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody$doWriteTo$1.invokeSuspend(StreamingRequestBody.kt:64)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.UndispatchedCoroutine.afterResume(CoroutineContext.kt:266)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
... 7 more
Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [TelemetryContext(aws.smithy.kotlin.runtime.telemetry.DefaultTelemetryProvider@9fa78e0), LoggingContextElement({rpc=Transcribe Streaming.StartStreamTranscription, sdkInvocationId=563166c0-e5e1-4c22-bbb4-96f5faa14d9e}), aws.smithy.kotlin.runtime.telemetry.context.TelemetryContextElement@db9e799, CoroutineName(call-context:send-request-body), StandaloneCoroutine{Cancelling}@60e6a5e, Dispatchers.IO]
Steps to Reproduce
The streaming call is being started within a coroutine as follows:
val myDispatcher = Executors
.newSingleThreadExecutor(VoiceInteractionThreadFactory)
.asCoroutineDispatcher()
// supervisor is SupervisorJob()
val myScope = CoroutineScope(myDispatcher + supervisor)
awsTranscribeJob =
myScope.launch(RecognitionServiceExceptionHandler) {
withContext(myDispatcher) {
try {
awsTranscribe?.start()
} catch (e: AwsTranscribeException) {
Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
launch(Dispatchers.Main) {
givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
}
} catch (e: IOException) {
Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
launch(Dispatchers.Main) {
givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
}
}
}
}
suspend fun start(): String {
enableHttp2FrameLogging()
fullMessage = StringBuilder()
client =
TranscribeStreamingClient {
logMode = LogMode.LogRequest + LogMode.LogResponse
credentialsProvider = myCredentialsProvider
region = "us-east-1"
httpClient =
OkHttp4Engine {
enableHttp2FrameLogging()
}
interceptors = mutableListOf(AwsLoggingInterceptor())
}
val req =
StartStreamTranscriptionRequest {
audioStream = audioStreamFlow
mediaSampleRateHertz = 16000
mediaEncoding = MediaEncoding.Flac
languageCode = LanguageCode.DeDe
languageModelName = "my-language-model-de-DE-v1.2"
enablePartialResultsStabilization = false
// partialResultsStability = PartialResultsStability.Low
}
client?.startStreamTranscription(req) { resp ->
resp.transcriptResultStream
?.collect { event ->
logger.debug("event received on transcriptResultStream")
when (event) {
is TranscriptResultStream.TranscriptEvent -> {
logger.debug("AwsTranscribe TranscriptEvent received")
event.value.transcript?.results?.forEach { result ->
val transcript = result.alternatives?.firstOrNull()?.transcript
if (result.isPartial) {
speechRecognitionResultsListener
?.onPartialResult(
transcript
?: "",
)
} else {
transcript?.let {
fullMessage.append(it)
speechRecognitionResultsListener?.onResults(fullMessage.toString())
stop()
}
}
}
}
else -> error("unknown event $event")
}
}
}
return fullMessage.toString()
}
Possible Solution
In aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody#doWriteTo
Perhaps the GlobalScope.launch is the issue?
private fun doWriteTo(sink: BufferedSink) {
val context = callContext + callContext.derivedName("send-request-body")
if (isDuplex()) {
// launch coroutine that writes to sink in the background
GlobalScope.launch(context + Dispatchers.IO) {
sink.use { transferBody(it) }
}
} else {
// remove the current dispatcher (if it exists) and use the internal
// runBlocking dispatcher that blocks the *current* thread
val blockingContext = context.minusKey(CoroutineDispatcher)
// Non-duplex (aka "normal") requests MUST write all of their request body
// before this function returns. Requests are given a background thread to
// do this work in, and it is safe and expected to block.
// see: https://square.github.io/okhttp/4.x/okhttp/okhttp3/-request-body/is-duplex/
runBlocking(blockingContext) {
transferBody(sink)
}
}
}
Context
No response
Smithy-Kotlin version
1.3.17
Platform (JVM/JS/Native)
JVM
Operating system and version
Android 13
Metadata
Metadata
Assignees
Labels
bugThis issue is a bug.This issue is a bug.p2This is a standard priority issueThis is a standard priority issue