Skip to content

Commit 88a3b2b

Browse files
committed
replace AwsChunkedReader with StreamChunker which has a cleaner API
1 parent 9f1a31c commit 88a3b2b

File tree

5 files changed

+144
-312
lines changed

5 files changed

+144
-312
lines changed

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package aws.smithy.kotlin.runtime.auth.awssigning
77

88
import aws.smithy.kotlin.runtime.InternalApi
9-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
9+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
1010
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1111
import aws.smithy.kotlin.runtime.io.SdkBuffer
1212
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
@@ -30,35 +30,17 @@ public class AwsChunkedByteReadChannel(
3030
private var previousSignature: ByteArray,
3131
private val trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3232
) : SdkByteReadChannel by delegate {
33-
34-
private val chunkReader = AwsChunkedReader(
35-
delegate.asStream(),
36-
signer,
37-
signingConfig,
38-
previousSignature,
39-
trailingHeaders,
40-
)
41-
42-
override val isClosedForRead: Boolean
43-
get() = chunkReader.chunk.size == 0L && chunkReader.hasLastChunkBeenSent && delegate.isClosedForRead
44-
45-
override val availableForRead: Int
46-
get() = chunkReader.chunk.size.toInt() + delegate.availableForRead
33+
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
4734

4835
override suspend fun read(sink: SdkBuffer, limit: Long): Long {
4936
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
50-
if (!chunkReader.ensureValidChunk()) return -1L
51-
chunkReader.chunk.read(sink, limit)
52-
return chunkReader.readCountAndReset()
37+
return chunker.readAndMaybeWrite(sink, limit)
5338
}
5439
}
5540

56-
private fun SdkByteReadChannel.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
57-
private val delegate = this@asStream
58-
59-
override fun isClosedForRead(): Boolean =
60-
delegate.isClosedForRead
41+
private fun SdkByteReadChannel.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
42+
val delegate = this@adapt
6143

62-
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
63-
delegate.read(sink, limit)
44+
override val eof: Boolean get() = delegate.isClosedForRead
45+
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
6446
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package aws.smithy.kotlin.runtime.auth.awssigning
66

77
import aws.smithy.kotlin.runtime.InternalApi
8-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
8+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
99
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1010
import aws.smithy.kotlin.runtime.io.SdkBuffer
1111
import aws.smithy.kotlin.runtime.io.SdkSource
@@ -32,36 +32,22 @@ public class AwsChunkedSource(
3232
previousSignature: ByteArray,
3333
trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3434
) : SdkSource {
35-
private val chunkReader = AwsChunkedReader(
36-
delegate.asStream(),
37-
signer,
38-
signingConfig,
39-
previousSignature,
40-
trailingHeaders,
41-
)
35+
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
4236

4337
override fun read(sink: SdkBuffer, limit: Long): Long {
4438
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
4539
// COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface
46-
val isChunkValid = runBlocking {
47-
chunkReader.ensureValidChunk()
48-
}
49-
if (!isChunkValid) return -1L
50-
chunkReader.chunk.read(sink, limit)
51-
return chunkReader.readCountAndReset()
40+
return runBlocking { chunker.readAndMaybeWrite(sink, limit) }
5241
}
5342

5443
override fun close() {
5544
delegate.close()
5645
}
5746
}
5847

59-
private fun SdkSource.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
60-
private val delegate = this@asStream.buffer()
48+
private fun SdkSource.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
49+
private val delegate = this@adapt.buffer()
6150

62-
override fun isClosedForRead(): Boolean =
63-
delegate.exhausted()
64-
65-
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
66-
delegate.read(sink, limit)
51+
override val eof: Boolean get() = delegate.exhausted()
52+
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
6753
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt

Lines changed: 0 additions & 244 deletions
This file was deleted.

0 commit comments

Comments
 (0)