From 9f1a31c774f4cf4832ea61607a01bc291f51d142 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Thu, 14 Aug 2025 23:07:42 +0000 Subject: [PATCH 1/5] fix: correctly return number of bytes read from chunked streams --- .../7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json | 9 ++++ .../awssigning/AwsChunkedByteReadChannel.kt | 3 +- .../auth/awssigning/AwsChunkedSource.kt | 3 +- .../awssigning/internal/AwsChunkedReader.kt | 10 +++++ .../AwsChunkedByteReadChannelTestBase.kt | 5 ++- .../awssigning/tests/AwsChunkedTestBase.kt | 45 +++++++++---------- .../tests/AwsChunkedSourceTestBase.kt | 2 + runtime/runtime-core/api/runtime-core.api | 1 + .../aws/smithy/kotlin/runtime/io/SdkSource.kt | 16 +++++++ 9 files changed, 65 insertions(+), 29 deletions(-) create mode 100644 .changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json diff --git a/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json b/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json new file mode 100644 index 0000000000..382bb33585 --- /dev/null +++ b/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json @@ -0,0 +1,9 @@ +{ + "id": "7251f5e7-0e9a-4ea4-b4c7-30dad31f4622", + "type": "bugfix", + "description": "⚠️ **IMPORTANT**: Correctly return number of bytes read from chunked streams", + "issues": [ + "awslabs/smithy-kotlin#1285" + ], + "requiresMinorVersionBump": true +} \ No newline at end of file diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt index b972ce9352..aff6c39719 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt @@ -48,7 +48,8 @@ public class AwsChunkedByteReadChannel( override suspend fun read(sink: SdkBuffer, limit: Long): Long { require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" } if (!chunkReader.ensureValidChunk()) return -1L - return chunkReader.chunk.read(sink, limit) + chunkReader.chunk.read(sink, limit) + return chunkReader.readCountAndReset() } } diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt index edfe598404..073419c19f 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt @@ -47,7 +47,8 @@ public class AwsChunkedSource( chunkReader.ensureValidChunk() } if (!isChunkValid) return -1L - return chunkReader.chunk.read(sink, limit) + chunkReader.chunk.read(sink, limit) + return chunkReader.readCountAndReset() } override fun close() { diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt index 05061cfd98..cde1e97d75 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt @@ -57,6 +57,15 @@ internal class AwsChunkedReader( */ internal var hasLastChunkBeenSent: Boolean = false + private var read = 0L + + /** + * Gets the most recent read count and then resets the counter. This is meant to be checked after every caller + * invocation of AwsChunkedReader.chunk.read(...). Note that because of buffering, this value may return _more_ than + * the total number of bytes written to the sink. + */ + fun readCountAndReset(): Long = read.also { read = 0L } + /** * Ensures that the internal [chunk] is valid for reading. If it's not valid, try to load the next chunk. Note that * this function will suspend until the whole chunk has been loaded. @@ -117,6 +126,7 @@ internal class AwsChunkedReader( while (remaining > 0L) { val rc = read(sink, remaining) if (rc == -1L) break + read += rc remaining -= rc } diff --git a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt index f86ef5ba3a..a239124322 100644 --- a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt +++ b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt @@ -47,11 +47,12 @@ abstract class AwsChunkedByteReadChannelTestBase : AwsChunkedTestBase(AwsChunked val sink = SdkBuffer() val bytesRead = awsChunked.readAll(sink) - writeJob.join() + // writeJob.join() + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) val bytesAsString = sink.readUtf8() - assertEquals(totalBytesExpected.toLong(), bytesRead) assertTrue(awsChunked.isClosedForRead) val chunkSignatures = getChunkSignatures(bytesAsString) diff --git a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt index 10247109ae..11490fd3cc 100644 --- a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt +++ b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt @@ -22,6 +22,7 @@ interface AwsChunkedTestReader { // This may modify the chunked reader state and cause loss of data! fun isClosedForRead(): Boolean suspend fun read(sink: SdkBuffer, limit: Long): Long + suspend fun readAll(sink: SdkBuffer): Long } fun interface AwsChunkedReaderFactory { @@ -32,6 +33,7 @@ fun interface AwsChunkedReaderFactory { object : AwsChunkedTestReader { override fun isClosedForRead(): Boolean = chunked.isClosedForRead override suspend fun read(sink: SdkBuffer, limit: Long): Long = chunked.read(sink, limit) + override suspend fun readAll(sink: SdkBuffer): Long = chunked.readAll(sink) } } } @@ -198,6 +200,7 @@ abstract class AwsChunkedTestBase( // need to make 2 successive calls because there are two chunks -- read will only fetch the first one due to limit var bytesRead = awsChunked.read(sink, readLimit.toLong()) bytesRead += awsChunked.read(sink, readLimit - bytesRead) + assertEquals(readLimit.toLong(), sink.size) val bytesAsString = sink.readUtf8() @@ -213,7 +216,7 @@ abstract class AwsChunkedTestBase( assertEquals(CHUNK_SIZE_BYTES, chunkSizes[0]) assertEquals(0, chunkSizes[1]) - assertEquals(readLimit, bytesRead.toInt()) + assertEquals(dataLengthBytes, bytesRead.toInt()) assertTrue(awsChunked.isClosedForRead()) } @@ -257,7 +260,8 @@ abstract class AwsChunkedTestBase( val sink = SdkBuffer() val bytesRead = awsChunked.read(sink, readLimit.toLong()) - assertEquals(readLimit.toLong(), bytesRead) + assertEquals(CHUNK_SIZE_BYTES.toLong(), bytesRead) + assertEquals(readLimit.toLong(), sink.size) val bytesAsString = sink.readUtf8() val chunkSignatures = getChunkSignatures(bytesAsString) @@ -289,7 +293,8 @@ abstract class AwsChunkedTestBase( bytesRead += awsChunked.read(sink, readLimit.toLong()) } bytesRead += awsChunked.read(sink, readLimit.toLong()) - assertEquals(totalBytesExpected.toLong(), bytesRead) + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) val bytesAsString = sink.readUtf8() @@ -345,9 +350,11 @@ abstract class AwsChunkedTestBase( } } + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) + val bytesAsString = sink.readUtf8() - assertEquals(totalBytesExpected.toLong(), bytesRead) assertTrue(awsChunked.isClosedForRead()) val chunkSignatures = getChunkSignatures(bytesAsString) @@ -402,13 +409,9 @@ abstract class AwsChunkedTestBase( val totalBytesExpected = encodedChunkLength(CHUNK_SIZE_BYTES) + encodedChunkLength(0) + trailingHeadersLength + "\r\n".length val sink = SdkBuffer() - var bytesRead = 0L - - while (bytesRead < totalBytesExpected.toLong()) { - bytesRead += awsChunked.read(sink, Long.MAX_VALUE) - } - - assertEquals(totalBytesExpected.toLong(), bytesRead) + val bytesRead = awsChunked.readAll(sink) + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) assertTrue(awsChunked.isClosedForRead()) val bytesAsString = sink.readUtf8() @@ -445,13 +448,9 @@ abstract class AwsChunkedTestBase( val totalBytesExpected = encodedUnsignedChunkLength(CHUNK_SIZE_BYTES) + encodedUnsignedChunkLength(0) + "\r\n".length val sink = SdkBuffer() - var bytesRead = 0L - - while (bytesRead < totalBytesExpected.toLong()) { - bytesRead += awsChunked.read(sink, Long.MAX_VALUE) - } - - assertEquals(totalBytesExpected.toLong(), bytesRead) + val bytesRead = awsChunked.readAll(sink) + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) assertTrue(awsChunked.isClosedForRead()) val bytesAsString = sink.readUtf8() @@ -482,13 +481,9 @@ abstract class AwsChunkedTestBase( val totalBytesExpected = encodedUnsignedChunkLength(CHUNK_SIZE_BYTES) + encodedUnsignedChunkLength(0) + trailingHeadersLength + "\r\n".length val sink = SdkBuffer() - var bytesRead = 0L - - while (bytesRead < totalBytesExpected.toLong()) { - bytesRead += awsChunked.read(sink, Long.MAX_VALUE) - } - - assertEquals(totalBytesExpected.toLong(), bytesRead) + val bytesRead = awsChunked.readAll(sink) + assertEquals(dataLengthBytes.toLong(), bytesRead) + assertEquals(totalBytesExpected.toLong(), sink.size) assertTrue(awsChunked.isClosedForRead()) val bytesAsString = sink.readUtf8() diff --git a/runtime/auth/aws-signing-tests/jvm/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedSourceTestBase.kt b/runtime/auth/aws-signing-tests/jvm/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedSourceTestBase.kt index 64a73d096e..6d80f21864 100644 --- a/runtime/auth/aws-signing-tests/jvm/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedSourceTestBase.kt +++ b/runtime/auth/aws-signing-tests/jvm/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedSourceTestBase.kt @@ -6,6 +6,7 @@ package aws.smithy.kotlin.runtime.auth.awssigning.tests import aws.smithy.kotlin.runtime.auth.awssigning.AwsChunkedSource import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.readAll import aws.smithy.kotlin.runtime.io.source val AwsChunkedReaderFactory.Companion.Source: AwsChunkedReaderFactory @@ -19,6 +20,7 @@ val AwsChunkedReaderFactory.Companion.Source: AwsChunkedReaderFactory return rc == -1L } override suspend fun read(sink: SdkBuffer, limit: Long): Long = chunked.read(sink, limit) + override suspend fun readAll(sink: SdkBuffer): Long = chunked.readAll(sink) } } diff --git a/runtime/runtime-core/api/runtime-core.api b/runtime/runtime-core/api/runtime-core.api index fcc7f6727a..c2911b4f01 100644 --- a/runtime/runtime-core/api/runtime-core.api +++ b/runtime/runtime-core/api/runtime-core.api @@ -1075,6 +1075,7 @@ public final class aws/smithy/kotlin/runtime/io/SdkSourceJVMKt { } public final class aws/smithy/kotlin/runtime/io/SdkSourceKt { + public static final fun readAll (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkSink;)J public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V } diff --git a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt index dfb9534627..87afb24741 100644 --- a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt +++ b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt @@ -76,3 +76,19 @@ public fun SdkSource.readFully(sink: SdkBuffer, byteCount: Long) { totalBytesRead += rc } } + +/** + * Read all bytes from this source into [sink]. Returns the total number of bytes written. + */ +public fun SdkSource.readAll(sink: SdkSink): Long { + val bufferedSink = sink.buffer() + var totalWritten = 0L + while (true) { + val rc = read(bufferedSink.buffer, DEFAULT_BYTE_CHANNEL_MAX_BUFFER_SIZE.toLong()) + if (rc == -1L) break + totalWritten += rc + bufferedSink.emit() + } + bufferedSink.emit() + return totalWritten +} From 88a3b2b16f352c2a15f6c73c0dbd6bdea72bb21d Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Mon, 18 Aug 2025 19:00:25 +0000 Subject: [PATCH 2/5] replace `AwsChunkedReader` with `StreamChunker` which has a cleaner API --- .../awssigning/AwsChunkedByteReadChannel.kt | 32 +-- .../auth/awssigning/AwsChunkedSource.kt | 28 +- .../awssigning/internal/AwsChunkedReader.kt | 244 ------------------ .../auth/awssigning/internal/StreamChunker.kt | 110 ++++++++ .../awssigning/tests/AwsChunkedTestBase.kt | 42 ++- 5 files changed, 144 insertions(+), 312 deletions(-) delete mode 100644 runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt create mode 100644 runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt index aff6c39719..751b548961 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt @@ -6,7 +6,7 @@ package aws.smithy.kotlin.runtime.auth.awssigning import aws.smithy.kotlin.runtime.InternalApi -import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader +import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker import aws.smithy.kotlin.runtime.http.DeferredHeaders import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.SdkByteReadChannel @@ -30,35 +30,17 @@ public class AwsChunkedByteReadChannel( private var previousSignature: ByteArray, private val trailingHeaders: DeferredHeaders = DeferredHeaders.Empty, ) : SdkByteReadChannel by delegate { - - private val chunkReader = AwsChunkedReader( - delegate.asStream(), - signer, - signingConfig, - previousSignature, - trailingHeaders, - ) - - override val isClosedForRead: Boolean - get() = chunkReader.chunk.size == 0L && chunkReader.hasLastChunkBeenSent && delegate.isClosedForRead - - override val availableForRead: Int - get() = chunkReader.chunk.size.toInt() + delegate.availableForRead + private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders) override suspend fun read(sink: SdkBuffer, limit: Long): Long { require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" } - if (!chunkReader.ensureValidChunk()) return -1L - chunkReader.chunk.read(sink, limit) - return chunkReader.readCountAndReset() + return chunker.readAndMaybeWrite(sink, limit) } } -private fun SdkByteReadChannel.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream { - private val delegate = this@asStream - - override fun isClosedForRead(): Boolean = - delegate.isClosedForRead +private fun SdkByteReadChannel.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter { + val delegate = this@adapt - override suspend fun read(sink: SdkBuffer, limit: Long): Long = - delegate.read(sink, limit) + override val eof: Boolean get() = delegate.isClosedForRead + override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit) } diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt index 073419c19f..01b3ea9b19 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt @@ -5,7 +5,7 @@ package aws.smithy.kotlin.runtime.auth.awssigning import aws.smithy.kotlin.runtime.InternalApi -import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader +import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker import aws.smithy.kotlin.runtime.http.DeferredHeaders import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.SdkSource @@ -32,23 +32,12 @@ public class AwsChunkedSource( previousSignature: ByteArray, trailingHeaders: DeferredHeaders = DeferredHeaders.Empty, ) : SdkSource { - private val chunkReader = AwsChunkedReader( - delegate.asStream(), - signer, - signingConfig, - previousSignature, - trailingHeaders, - ) + private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders) override fun read(sink: SdkBuffer, limit: Long): Long { require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" } // COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface - val isChunkValid = runBlocking { - chunkReader.ensureValidChunk() - } - if (!isChunkValid) return -1L - chunkReader.chunk.read(sink, limit) - return chunkReader.readCountAndReset() + return runBlocking { chunker.readAndMaybeWrite(sink, limit) } } override fun close() { @@ -56,12 +45,9 @@ public class AwsChunkedSource( } } -private fun SdkSource.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream { - private val delegate = this@asStream.buffer() +private fun SdkSource.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter { + private val delegate = this@adapt.buffer() - override fun isClosedForRead(): Boolean = - delegate.exhausted() - - override suspend fun read(sink: SdkBuffer, limit: Long): Long = - delegate.read(sink, limit) + override val eof: Boolean get() = delegate.exhausted() + override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit) } diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt deleted file mode 100644 index cde1e97d75..0000000000 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package aws.smithy.kotlin.runtime.auth.awssigning.internal - -import aws.smithy.kotlin.runtime.auth.awssigning.AwsSignatureType -import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigner -import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigningConfig -import aws.smithy.kotlin.runtime.auth.awssigning.HashSpecification -import aws.smithy.kotlin.runtime.http.DeferredHeaders -import aws.smithy.kotlin.runtime.http.Headers -import aws.smithy.kotlin.runtime.http.toHeaders -import aws.smithy.kotlin.runtime.io.SdkBuffer - -/** - * Common implementation of aws-chunked content encoding. Operations on this class can not be invoked concurrently. - * This class wraps a [Stream] which actually provides the raw bytes. - * @see SigV4 Streaming - * @param stream the underlying IO abstraction which will have its data encoded in aws-chunked format - * @param signer the signer to use to sign chunks and (optionally) chunk trailer - * @param signingConfig the config to use for signing - * @param previousSignature the previous signature to use for signing. in most cases, this should be the seed signature - * @param trailingHeaders the optional trailing headers to include in the final chunk - */ -internal class AwsChunkedReader( - private val stream: Stream, - private val signer: AwsSigner, - private val signingConfig: AwsSigningConfig, - private var previousSignature: ByteArray, - private val trailingHeaders: DeferredHeaders, -) { - - /** - * Common interface abstracting over [SdkSource] and [SdkByteReadChannel] - */ - internal interface Stream { - fun isClosedForRead(): Boolean - - /** - * Read data from the underlying IO source. - * NOTE: Implementations may or may not suspend/block. The suspend coloring of this function - * is to gloss over differences between underlying IO abstractions and share aws-chunked encoding - * internals. - */ - suspend fun read(sink: SdkBuffer, limit: Long): Long - } - - /** - * The current chunk to read from - */ - internal val chunk: SdkBuffer = SdkBuffer() - - /** - * Flag indicating if the last chunk (empty + trailers) has been sent - */ - internal var hasLastChunkBeenSent: Boolean = false - - private var read = 0L - - /** - * Gets the most recent read count and then resets the counter. This is meant to be checked after every caller - * invocation of AwsChunkedReader.chunk.read(...). Note that because of buffering, this value may return _more_ than - * the total number of bytes written to the sink. - */ - fun readCountAndReset(): Long = read.also { read = 0L } - - /** - * Ensures that the internal [chunk] is valid for reading. If it's not valid, try to load the next chunk. Note that - * this function will suspend until the whole chunk has been loaded. - * - * @return true if the [chunk] is valid for reading, false if it's invalid (chunk data is exhausted) - */ - internal suspend fun ensureValidChunk(): Boolean { - // check if the current chunk is still valid - if (chunk.size > 0L) return true - - // if not, try to fetch a new chunk - val nextChunk = when { - stream.isClosedForRead() && hasLastChunkBeenSent -> null - else -> { - var next = if (signingConfig.isUnsigned) getUnsignedChunk() else getSignedChunk() - if (next == null) { - check(stream.isClosedForRead()) { "Expected underlying reader to be closed" } - next = getFinalChunk() - hasLastChunkBeenSent = true - } - next - } - } - - nextChunk?.writeUtf8("\r\n") // terminating CRLF to signal end of chunk - - // transfer all segments to the working chunk - nextChunk?.let { chunk.writeAll(it) } - - return chunk.size > 0L - } - - /** - * Get the last chunk that will be sent which consists of an empty signed chunk + any - * trailers - */ - private suspend fun getFinalChunk(): SdkBuffer { - // empty chunk - val lastChunk = checkNotNull(if (signingConfig.isUnsigned) getUnsignedChunk(SdkBuffer()) else getSignedChunk(SdkBuffer())) - - // + any trailers - if (!trailingHeaders.isEmpty()) { - val trailingHeaderChunk = getTrailingHeadersChunk(trailingHeaders.toHeaders()) - lastChunk.writeAll(trailingHeaderChunk) - } - return lastChunk - } - - /** - * Read a chunk from the underlying [stream], suspending until a whole chunk has been read OR the channel is exhausted. - * @return an SdkBuffer containing a chunk of data, or null if the channel is exhausted. - */ - private suspend fun Stream.readChunk(): SdkBuffer? { - val sink = SdkBuffer() - - // fill up to chunk size bytes - var remaining = CHUNK_SIZE_BYTES.toLong() - while (remaining > 0L) { - val rc = read(sink, remaining) - if (rc == -1L) break - read += rc - remaining -= rc - } - - return when (sink.size) { - 0L -> null // delegate closed without reading any data - else -> sink - } - } - - /** - * Get a signed aws-chunked encoding of [data]. - * If [data] is not set, read the next chunk from [delegate] and add hex-formatted chunk size and chunk signature to the front. - * Note that this function will suspend until the whole chunk has been read OR the channel is exhausted. - * The chunk structure is: `string(IntHexBase(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n` - * - * @param data the data which will be encoded to aws-chunked. if not provided, will default to - * reading up to [CHUNK_SIZE_BYTES] from [delegate]. - * @return a buffer containing the chunked data or null if no data is available (channel is closed) - */ - private suspend fun getSignedChunk(data: SdkBuffer? = null): SdkBuffer? { - val bodyBuffer = data ?: stream.readChunk() - - // signer takes a ByteArray unfortunately... - val chunkBody = bodyBuffer?.readByteArray() ?: return null - - val chunkSignature = signer.signChunk(chunkBody, previousSignature, signingConfig.toChunkSigningConfig()).signature - previousSignature = chunkSignature - - val signedChunk = SdkBuffer() - - // headers - signedChunk.apply { - writeUtf8(chunkBody.size.toString(16)) - writeUtf8(";") - writeUtf8("chunk-signature=") - write(chunkSignature) - writeUtf8("\r\n") - } - - // append the body - signedChunk.write(chunkBody) - - return signedChunk - } - - /** - * Get an unsigned aws-chunked encoding of [data]. - * If [data] is not set, read the next chunk from [delegate] and add hex-formatted chunk size to the front. - * Note that this function will suspend until the whole chunk has been read OR the channel is exhausted. - * The unsigned chunk structure is: `string(IntHexBase(chunk-size)) + \r\n + chunk-data + \r\n` - * - * @param data the data which will be encoded to aws-chunked. if not provided, will default to - * reading up to [CHUNK_SIZE_BYTES] from [delegate]. - * @return a buffer containing the chunked data or null if no data is available (channel is closed) - */ - private suspend fun getUnsignedChunk(data: SdkBuffer? = null): SdkBuffer? { - val bodyBuffer = data ?: stream.readChunk() ?: return null - - val unsignedChunk = SdkBuffer() - - // headers - unsignedChunk.apply { - writeUtf8(bodyBuffer.size.toString(16)) - writeUtf8("\r\n") - writeAll(bodyBuffer) // append the body - } - - return unsignedChunk - } - - /** - * Get the trailing headers chunk. The grammar for trailing headers is: - * trailing-header-A:value CRLF - * trailing-header-B:value CRLF - * ... - * x-amz-trailer-signature:signature_value CRLF - * - * @param trailingHeaders a list of [Headers] which will be sent - * @return a [SdkBuffer] containing the trailing headers in aws-chunked encoding, ready to send on the wire - */ - private suspend fun getTrailingHeadersChunk(trailingHeaders: Headers): SdkBuffer { - val trailerSignature = signer.signChunkTrailer(trailingHeaders, previousSignature, signingConfig.toTrailingHeadersSigningConfig()).signature - previousSignature = trailerSignature - - val trailerBody = SdkBuffer() - trailerBody.writeTrailers(trailingHeaders) - if (!signingConfig.isUnsigned) { - trailerBody.writeTrailerSignature(trailerSignature.decodeToString()) - } - - return trailerBody - } - - /** - * Make a copy of the signing config, changing the signatureType and hashSpecification configuration values - * to specify chunk signing. - * @return an [AwsSigningConfig] which can be used by a signer to sign chunks - */ - private fun AwsSigningConfig.toChunkSigningConfig(): AwsSigningConfig = this.toBuilder().apply { - signatureType = AwsSignatureType.HTTP_REQUEST_CHUNK // signature is for a chunk - hashSpecification = HashSpecification.CalculateFromPayload // calculate the hash from the chunk payload - }.build() - - /** - * Make a copy of the signing config, changing the signatureType and hashSpecification configuration values - * to specify trailing headers signing. - * @return an [AwsSigningConfig] which can be used by a signer to sign trailing headers - */ - private fun AwsSigningConfig.toTrailingHeadersSigningConfig(): AwsSigningConfig = this.toBuilder().apply { - signatureType = AwsSignatureType.HTTP_REQUEST_TRAILING_HEADERS // signature is for trailing headers - hashSpecification = HashSpecification.CalculateFromPayload // calculate the hash from the trailing headers payload - }.build() - - private val AwsSigningConfig.isUnsigned: Boolean get() = hashSpecification == HashSpecification.StreamingUnsignedPayloadWithTrailers -} diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt new file mode 100644 index 0000000000..25e5c215eb --- /dev/null +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt @@ -0,0 +1,110 @@ +package aws.smithy.kotlin.runtime.auth.awssigning.internal + +import aws.smithy.kotlin.runtime.auth.awssigning.AwsSignatureType +import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigner +import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigningConfig +import aws.smithy.kotlin.runtime.auth.awssigning.HashSpecification +import aws.smithy.kotlin.runtime.http.DeferredHeaders +import aws.smithy.kotlin.runtime.http.toHeaders +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.SdkSink +import kotlin.math.min + +internal class StreamChunker( + private val input: Adapter, + private val signer: AwsSigner, + private val signingConfig: AwsSigningConfig, + private var previousSignature: ByteArray, + private val trailingHeaders: DeferredHeaders, + private val chunkSize: Long = CHUNK_SIZE_BYTES.toLong(), +) { + interface Adapter { + val eof: Boolean + suspend fun read(buffer: SdkBuffer, limit: Long): Long + } + + private val inputBuffer = SdkBuffer() + private var hasLastChunkBeenSent = false + + internal suspend inline fun readAndMaybeWrite(destination: SdkSink, limit: Long): Long { + if (input.eof) { + if (hasLastChunkBeenSent) { + return -1 // this stream is exhausted + } else { + // Write any remaining data as final chunk(s) + if (inputBuffer.size > 0) { + consumeAndWriteChunk(destination) + } + + // Write an empty chunk, including any previous signature calculated if applicable + wrapAndWriteChunk(byteArrayOf(), destination) + + // Write the tailer chunk if applicable + if (!trailingHeaders.isEmpty()) writeTrailerChunk(destination) + + // Always end with blank line? + SdkBuffer().apply { writeUtf8("\r\n") }.readAll(destination) + + hasLastChunkBeenSent = true + return 0 // no bytes read from source + } + } else { + val rc = input.read(inputBuffer, limit) + while (inputBuffer.size >= chunkSize) { + consumeAndWriteChunk(destination) + } + if (rc == -1L) { + check(input.eof) { "Got back -1 bytes but input adapter is not EOF" } + return 0 + } + return rc + } + } + + private suspend fun consumeAndWriteChunk(destination: SdkSink) { + val chunkBody = inputBuffer.readByteArray(min(chunkSize, inputBuffer.size)) + wrapAndWriteChunk(chunkBody, destination) + } + + private suspend fun wrapAndWriteChunk(body: ByteArray, destination: SdkSink) { + SdkBuffer() + .apply { + writeUtf8(body.size.toString(16)) + + if (signingConfig.isSigned) { + writeUtf8(";chunk-signature=") + + val signature = signer.signChunk(body, previousSignature, signingConfig).signature + write(signature) // signature is UTF-8-encoded hex string + previousSignature = signature + } + + writeUtf8("\r\n") + write(body) + if (body.isNotEmpty()) writeUtf8("\r\n") + } + .readAll(destination) + } + + private suspend fun writeTrailerChunk(destination: SdkSink) { + val trailingHeaders = this.trailingHeaders.toHeaders() + val buffer = SdkBuffer() + buffer.writeTrailers(trailingHeaders) + + if (signingConfig.isSigned) { + val trailerSigningConfig = signingConfig.toTrailingHeadersSigningConfig() + val trailerSignature = signer.signChunkTrailer(trailingHeaders, previousSignature, trailerSigningConfig).signature + buffer.writeTrailerSignature(trailerSignature.decodeToString()) + } + + buffer.readAll(destination) + } +} + +private val AwsSigningConfig.isSigned: Boolean + get() = hashSpecification != HashSpecification.StreamingUnsignedPayloadWithTrailers + +private fun AwsSigningConfig.toTrailingHeadersSigningConfig(): AwsSigningConfig = this.toBuilder().apply { + signatureType = AwsSignatureType.HTTP_REQUEST_TRAILING_HEADERS // signature is for trailing headers + hashSpecification = HashSpecification.CalculateFromPayload // calculate the hash from the trailing headers payload +}.build() diff --git a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt index 11490fd3cc..3f940f015b 100644 --- a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt +++ b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedTestBase.kt @@ -186,6 +186,16 @@ abstract class AwsChunkedTestBase( } } + private suspend fun AwsChunkedTestReader.readExact(sink: SdkBuffer, bytes: Long): Long { + var total = 0L + while (total < bytes) { + val rc = read(sink, bytes - total) + assertTrue(rc >= 0, "Unexpected end of source while trying to read $bytes bytes") + total += rc + } + return total + } + @Test fun testReadExactBytes(): TestResult = runTest { val dataLengthBytes = CHUNK_SIZE_BYTES @@ -198,8 +208,8 @@ abstract class AwsChunkedTestBase( val sink = SdkBuffer() // need to make 2 successive calls because there are two chunks -- read will only fetch the first one due to limit - var bytesRead = awsChunked.read(sink, readLimit.toLong()) - bytesRead += awsChunked.read(sink, readLimit - bytesRead) + var bytesRead = awsChunked.readExact(sink, dataLengthBytes.toLong()) + bytesRead += awsChunked.read(sink, 1L) assertEquals(readLimit.toLong(), sink.size) val bytesAsString = sink.readUtf8() @@ -229,8 +239,8 @@ abstract class AwsChunkedTestBase( val readLimit = encodedChunkLength(dataLengthBytes * 2) + encodedChunkLength(0) + "\r\n".length val sink = SdkBuffer() - var bytesRead = awsChunked.read(sink, readLimit.toLong()) - bytesRead += awsChunked.read(sink, readLimit.toLong()) + var bytesRead = awsChunked.readExact(sink, dataLengthBytes.toLong()) + bytesRead += awsChunked.read(sink, 1L) val bytesAsString = sink.readUtf8() val chunkSignatures = getChunkSignatures(bytesAsString) @@ -260,20 +270,8 @@ abstract class AwsChunkedTestBase( val sink = SdkBuffer() val bytesRead = awsChunked.read(sink, readLimit.toLong()) - assertEquals(CHUNK_SIZE_BYTES.toLong(), bytesRead) - assertEquals(readLimit.toLong(), sink.size) - - val bytesAsString = sink.readUtf8() - val chunkSignatures = getChunkSignatures(bytesAsString) - assertEquals(1, chunkSignatures.size) - val expectedChunkSignature = signer.signChunk(data, previousSignature, testChunkSigningConfig).signature - assertEquals(expectedChunkSignature.decodeToString(), chunkSignatures[0]) - - val chunkSizes = getChunkSizes(bytesAsString) - assertEquals(1, chunkSizes.size) - assertEquals(CHUNK_SIZE_BYTES, chunkSizes[0]) - - assertFalse(awsChunked.isClosedForRead()) + assertTrue(CHUNK_SIZE_BYTES.toLong() >= bytesRead) + assertTrue(readLimit.toLong() >= sink.size) } @Test @@ -288,11 +286,11 @@ abstract class AwsChunkedTestBase( val sink = SdkBuffer() var bytesRead = 0L - val readLimit = CHUNK_SIZE_BYTES + (dataLengthBytes.toString(16).length + 1 + "chunk-signature=".length + 64 + 4) - for (chunk in 0 until numChunks) { // read the chunks in a loop - bytesRead += awsChunked.read(sink, readLimit.toLong()) + // read the chunks in a loop + repeat(numChunks) { + bytesRead += awsChunked.readExact(sink, CHUNK_SIZE_BYTES.toLong()) } - bytesRead += awsChunked.read(sink, readLimit.toLong()) + bytesRead += awsChunked.read(sink, CHUNK_SIZE_BYTES.toLong()) assertEquals(dataLengthBytes.toLong(), bytesRead) assertEquals(totalBytesExpected.toLong(), sink.size) From a7c8da0c4a5175711f9ad315d86e8df286ef13ea Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Mon, 18 Aug 2025 19:03:11 +0000 Subject: [PATCH 3/5] remove `requiresMinorVersionBump` flag --- .changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json b/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json index 382bb33585..4b66751788 100644 --- a/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json +++ b/.changes/7251f5e7-0e9a-4ea4-b4c7-30dad31f4622.json @@ -4,6 +4,5 @@ "description": "⚠️ **IMPORTANT**: Correctly return number of bytes read from chunked streams", "issues": [ "awslabs/smithy-kotlin#1285" - ], - "requiresMinorVersionBump": true + ] } \ No newline at end of file From 8320bf25da84ecb633fe0566c83a7098bc96ac05 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Mon, 18 Aug 2025 19:03:33 +0000 Subject: [PATCH 4/5] un-comment a job join in unit tests --- .../auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt index a239124322..b4d6e5cfad 100644 --- a/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt +++ b/runtime/auth/aws-signing-tests/common/src/aws/smithy/kotlin/runtime/auth/awssigning/tests/AwsChunkedByteReadChannelTestBase.kt @@ -47,7 +47,7 @@ abstract class AwsChunkedByteReadChannelTestBase : AwsChunkedTestBase(AwsChunked val sink = SdkBuffer() val bytesRead = awsChunked.readAll(sink) - // writeJob.join() + writeJob.join() assertEquals(dataLengthBytes.toLong(), bytesRead) assertEquals(totalBytesExpected.toLong(), sink.size) From 8295a1cf3f86357c0f6d16cb235215b0881d3074 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Tue, 19 Aug 2025 22:42:05 +0000 Subject: [PATCH 5/5] correct erroneous comment --- .../kotlin/runtime/auth/awssigning/internal/StreamChunker.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt index 25e5c215eb..29ff05ce5a 100644 --- a/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt +++ b/runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/StreamChunker.kt @@ -31,7 +31,7 @@ internal class StreamChunker( if (hasLastChunkBeenSent) { return -1 // this stream is exhausted } else { - // Write any remaining data as final chunk(s) + // Write any remaining data as final chunk if (inputBuffer.size > 0) { consumeAndWriteChunk(destination) }