Skip to content

Commit 4da983f

Browse files
authored
Set Content-Length header when outputting Json; Tune chunks size when… (#21)
1 parent 9d9c973 commit 4da983f

File tree

3 files changed

+25
-15
lines changed

3 files changed

+25
-15
lines changed

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ object ConnectRpcHttpRoutes {
159159
)
160160
}).map { response =>
161161
val headers = org.http4s.Headers.empty ++
162-
responseHeaderMetadata.get().toHeaders ++
163-
responseTrailerMetadata.get().toTrailingHeaders
162+
responseHeaderMetadata.get.toHeaders ++
163+
responseTrailerMetadata.get.toTrailingHeaders
164164

165165
if (logger.isTraceEnabled) {
166166
logger.trace(s"<<< Headers: ${headers.redactSensitive}")

core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import cats.Applicative
44
import cats.data.EitherT
55
import cats.effect.{Async, Sync}
66
import cats.implicits.*
7-
import fs2.Stream
7+
import com.google.protobuf.CodedOutputStream
88
import fs2.compression.Compression
99
import fs2.io.{readOutputStream, toInputStreamResource}
1010
import fs2.text.decodeWithCharset
11-
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
12-
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, Headers, MediaRange, MediaType}
11+
import fs2.{Chunk, Stream}
12+
import org.http4s.headers.`Content-Type`
13+
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, MediaRange, MediaType}
1314
import org.ivovk.connect_rpc_scala.ConnectRpcHttpRoutes.getClass
1415
import org.slf4j.{Logger, LoggerFactory}
1516
import scalapb.json4s.{JsonFormat, Printer}
@@ -48,7 +49,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
4849
case str: String =>
4950
Sync[F].delay(URLDecoder.decode(str, charset))
5051
case stream: Stream[F, Byte] =>
51-
decompressed(entity.headers, stream)
52+
decompressed(entity.encoding, stream)
5253
.through(decodeWithCharset(charset))
5354
.compile.string
5455
}
@@ -73,7 +74,12 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
7374
logger.trace(s"<<< JSON: $string")
7475
}
7576

76-
EntityEncoder.stringEncoder[F].toEntity(string)
77+
val bytes = string.getBytes()
78+
79+
Entity(
80+
body = Stream.chunk(Chunk.array(bytes)),
81+
length = Some(bytes.length.toLong),
82+
)
7783
}
7884

7985
}
@@ -92,7 +98,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
9298
Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset)))
9399
.flatMap(arr => Async[F].delay(cmp.parseFrom(arr)))
94100
case stream: Stream[F, Byte] =>
95-
toInputStreamResource(decompressed(entity.headers, stream))
101+
toInputStreamResource(decompressed(entity.encoding, stream))
96102
.use(is => Async[F].delay(cmp.parseFrom(is)))
97103
}
98104

@@ -111,17 +117,18 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
111117
logger.trace(s"<<< Proto: ${message.toProtoString}")
112118
}
113119

120+
val dataLength = message.serializedSize
121+
val chunkSize = CodedOutputStream.DEFAULT_BUFFER_SIZE min dataLength
122+
114123
Entity(
115-
body = readOutputStream(2048)(os => Async[F].delay(message.writeTo(os))),
116-
length = Some(message.serializedSize.toLong),
124+
body = readOutputStream(chunkSize)(os => Async[F].delay(message.writeTo(os))),
125+
length = Some(dataLength.toLong),
117126
)
118127
}
119128

120129
}
121130

122-
def decompressed[F[_] : Compression](headers: Headers, body: Stream[F, Byte]): Stream[F, Byte] = {
123-
val encoding = headers.get[`Content-Encoding`].map(_.contentCoding)
124-
131+
def decompressed[F[_] : Compression](encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] = {
125132
body.through(encoding match {
126133
case Some(ContentCoding.gzip) =>
127134
Compression[F].gunzip().andThen(_.flatMap(_.content))

core/src/main/scala/org/ivovk/connect_rpc_scala/http/RequestEntity.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package org.ivovk.connect_rpc_scala.http
22

33
import cats.MonadThrow
44
import fs2.Stream
5-
import org.http4s.headers.`Content-Type`
6-
import org.http4s.{Charset, Headers, Media}
5+
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
6+
import org.http4s.{Charset, ContentCoding, Headers, Media}
77
import org.ivovk.connect_rpc_scala.http.Headers.`Connect-Timeout-Ms`
88
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
99

@@ -27,6 +27,9 @@ case class RequestEntity[F[_]](
2727
def charset: Charset =
2828
contentType.flatMap(_.charset).getOrElse(Charset.`UTF-8`)
2929

30+
def encoding: Option[ContentCoding] =
31+
headers.get[`Content-Encoding`].map(_.contentCoding)
32+
3033
def timeout: Option[Long] =
3134
headers.get[`Connect-Timeout-Ms`].map(_.value)
3235

0 commit comments

Comments
 (0)