Skip to content

fix: correctly return number of bytes read from chunked streams #1386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "7251f5e7-0e9a-4ea4-b4c7-30dad31f4622",
"type": "bugfix",
"description": "⚠️ **IMPORTANT**: Correctly return number of bytes read from chunked streams",
"issues": [
"awslabs/smithy-kotlin#1285"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package aws.smithy.kotlin.runtime.auth.awssigning

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
import aws.smithy.kotlin.runtime.http.DeferredHeaders
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
Expand All @@ -30,34 +30,17 @@ public class AwsChunkedByteReadChannel(
private var previousSignature: ByteArray,
private val trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
) : SdkByteReadChannel by delegate {

private val chunkReader = AwsChunkedReader(
delegate.asStream(),
signer,
signingConfig,
previousSignature,
trailingHeaders,
)

override val isClosedForRead: Boolean
get() = chunkReader.chunk.size == 0L && chunkReader.hasLastChunkBeenSent && delegate.isClosedForRead

override val availableForRead: Int
get() = chunkReader.chunk.size.toInt() + delegate.availableForRead
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)

override suspend fun read(sink: SdkBuffer, limit: Long): Long {
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
if (!chunkReader.ensureValidChunk()) return -1L
return chunkReader.chunk.read(sink, limit)
return chunker.readAndMaybeWrite(sink, limit)
}
}

private fun SdkByteReadChannel.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
private val delegate = this@asStream

override fun isClosedForRead(): Boolean =
delegate.isClosedForRead
private fun SdkByteReadChannel.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
val delegate = this@adapt

override suspend fun read(sink: SdkBuffer, limit: Long): Long =
delegate.read(sink, limit)
override val eof: Boolean get() = delegate.isClosedForRead
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package aws.smithy.kotlin.runtime.auth.awssigning

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
import aws.smithy.kotlin.runtime.http.DeferredHeaders
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkSource
Expand All @@ -32,35 +32,22 @@ public class AwsChunkedSource(
previousSignature: ByteArray,
trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
) : SdkSource {
private val chunkReader = AwsChunkedReader(
delegate.asStream(),
signer,
signingConfig,
previousSignature,
trailingHeaders,
)
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)

override fun read(sink: SdkBuffer, limit: Long): Long {
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
// COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface
val isChunkValid = runBlocking {
chunkReader.ensureValidChunk()
}
if (!isChunkValid) return -1L
return chunkReader.chunk.read(sink, limit)
return runBlocking { chunker.readAndMaybeWrite(sink, limit) }
}

override fun close() {
delegate.close()
}
}

private fun SdkSource.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
private val delegate = this@asStream.buffer()
private fun SdkSource.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
private val delegate = this@adapt.buffer()

override fun isClosedForRead(): Boolean =
delegate.exhausted()

override suspend fun read(sink: SdkBuffer, limit: Long): Long =
delegate.read(sink, limit)
override val eof: Boolean get() = delegate.exhausted()
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
}

This file was deleted.

Loading
Loading