@@ -4,12 +4,13 @@ import cats.Applicative
4
4
import cats .data .EitherT
5
5
import cats .effect .{Async , Sync }
6
6
import cats .implicits .*
7
- import fs2 . Stream
7
+ import com . google . protobuf . CodedOutputStream
8
8
import fs2 .compression .Compression
9
9
import fs2 .io .{readOutputStream , toInputStreamResource }
10
10
import fs2 .text .decodeWithCharset
11
- import org .http4s .headers .{`Content-Encoding` , `Content-Type` }
12
- import org .http4s .{ContentCoding , DecodeResult , Entity , EntityDecoder , EntityEncoder , Headers , MediaRange , MediaType }
11
+ import fs2 .{Chunk , Stream }
12
+ import org .http4s .headers .`Content-Type`
13
+ import org .http4s .{ContentCoding , DecodeResult , Entity , EntityDecoder , EntityEncoder , MediaRange , MediaType }
13
14
import org .ivovk .connect_rpc_scala .ConnectRpcHttpRoutes .getClass
14
15
import org .slf4j .{Logger , LoggerFactory }
15
16
import scalapb .json4s .{JsonFormat , Printer }
@@ -48,7 +49,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
48
49
case str : String =>
49
50
Sync [F ].delay(URLDecoder .decode(str, charset))
50
51
case stream : Stream [F , Byte ] =>
51
- decompressed(entity.headers , stream)
52
+ decompressed(entity.encoding , stream)
52
53
.through(decodeWithCharset(charset))
53
54
.compile.string
54
55
}
@@ -73,7 +74,12 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
73
74
logger.trace(s " <<< JSON: $string" )
74
75
}
75
76
76
- EntityEncoder .stringEncoder[F ].toEntity(string)
77
+ val bytes = string.getBytes()
78
+
79
+ Entity (
80
+ body = Stream .chunk(Chunk .array(bytes)),
81
+ length = Some (bytes.length.toLong),
82
+ )
77
83
}
78
84
79
85
}
@@ -92,7 +98,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
92
98
Async [F ].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset)))
93
99
.flatMap(arr => Async [F ].delay(cmp.parseFrom(arr)))
94
100
case stream : Stream [F , Byte ] =>
95
- toInputStreamResource(decompressed(entity.headers , stream))
101
+ toInputStreamResource(decompressed(entity.encoding , stream))
96
102
.use(is => Async [F ].delay(cmp.parseFrom(is)))
97
103
}
98
104
@@ -111,17 +117,18 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
111
117
logger.trace(s " <<< Proto: ${message.toProtoString}" )
112
118
}
113
119
120
+ val dataLength = message.serializedSize
121
+ val chunkSize = CodedOutputStream .DEFAULT_BUFFER_SIZE min dataLength
122
+
114
123
Entity (
115
- body = readOutputStream(2048 )(os => Async [F ].delay(message.writeTo(os))),
116
- length = Some (message.serializedSize .toLong),
124
+ body = readOutputStream(chunkSize )(os => Async [F ].delay(message.writeTo(os))),
125
+ length = Some (dataLength .toLong),
117
126
)
118
127
}
119
128
120
129
}
121
130
122
- def decompressed [F [_] : Compression ](headers : Headers , body : Stream [F , Byte ]): Stream [F , Byte ] = {
123
- val encoding = headers.get[`Content-Encoding`].map(_.contentCoding)
124
-
131
+ def decompressed [F [_] : Compression ](encoding : Option [ContentCoding ], body : Stream [F , Byte ]): Stream [F , Byte ] = {
125
132
body.through(encoding match {
126
133
case Some (ContentCoding .gzip) =>
127
134
Compression [F ].gunzip().andThen(_.flatMap(_.content))
0 commit comments