Skip to content

Commit de571fd

Browse files
authored
Support response compression (#52)
1 parent 68513b3 commit de571fd

File tree

6 files changed

+55
-15
lines changed

6 files changed

+55
-15
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ Known issues:
160160

161161
## Future improvements
162162

163-
- [x] Support GET-requests
164-
- [ ] Support `google.api.http` annotations (GRPC transcoding)
163+
- [x] Support GET-requests ([#10](https://github.com/igor-vovk/connect-rpc-scala/issues/10))
164+
- [ ] Support `google.api.http` annotations (GRPC transcoding) ([#51](https://github.com/igor-vovk/connect-rpc-scala/issues/51))
165165
- [ ] Support configurable timeouts
166166
- [ ] Support non-unary (streaming) methods

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import org.ivovk.connect_rpc_scala.Mappings.*
1010
import org.ivovk.connect_rpc_scala.grpc.{ClientCalls, GrpcHeaders, MethodRegistry}
1111
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
1212
import org.ivovk.connect_rpc_scala.http.RequestEntity
13-
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec
1413
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given
14+
import org.ivovk.connect_rpc_scala.http.codec.{Compressor, EncodeOptions, MessageCodec}
1515
import org.slf4j.{Logger, LoggerFactory}
1616
import scalapb.GeneratedMessage
1717

@@ -33,6 +33,10 @@ class ConnectHandler[F[_] : Async](
3333
req: RequestEntity[F],
3434
method: MethodRegistry.Entry,
3535
)(using MessageCodec[F]): F[Response[F]] = {
36+
given EncodeOptions = EncodeOptions(
37+
encoding = req.encoding.filter(Compressor.supportedEncodings.contains)
38+
)
39+
3640
method.descriptor.getType match
3741
case MethodType.UNARY =>
3842
handleUnary(req, method)
@@ -46,7 +50,7 @@ class ConnectHandler[F[_] : Async](
4650
private def handleUnary(
4751
req: RequestEntity[F],
4852
method: MethodRegistry.Entry,
49-
)(using MessageCodec[F]): F[Response[F]] = {
53+
)(using MessageCodec[F], EncodeOptions): F[Response[F]] = {
5054
if (logger.isTraceEnabled) {
5155
// Used in conformance tests
5256
req.headers.get[`X-Test-Case-Name`] match {

core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/Compressor.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ package org.ivovk.connect_rpc_scala.http.codec
33
import cats.effect.Sync
44
import fs2.Stream
55
import fs2.compression.Compression
6-
import org.http4s.ContentCoding
6+
import org.http4s.{ContentCoding, Entity}
7+
8+
object Compressor {
9+
val supportedEncodings: Set[ContentCoding] = Set(ContentCoding.gzip)
10+
}
711

812
class Compressor[F[_] : Sync] {
913

@@ -13,8 +17,22 @@ class Compressor[F[_] : Sync] {
1317
body.through(encoding match {
1418
case Some(ContentCoding.gzip) =>
1519
Compression[F].gunzip().andThen(_.flatMap(_.content))
16-
case _ =>
20+
case Some(other) =>
21+
throw new IllegalArgumentException(s"Unsupported encoding: $other")
22+
case None =>
1723
identity
1824
})
1925

26+
def compressed(encoding: Option[ContentCoding], entity: Entity[F]): Entity[F] =
27+
encoding match {
28+
case Some(ContentCoding.gzip) =>
29+
Entity(
30+
body = entity.body.through(Compression[F].gzip()),
31+
)
32+
case Some(other) =>
33+
throw new IllegalArgumentException(s"Unsupported encoding: $other")
34+
case None =>
35+
entity
36+
}
37+
2038
}

core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/JsonMessageCodec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class JsonMessageCodec[F[_] : Sync](
4646
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
4747
}
4848

49-
override def encode[A <: Message](message: A): Entity[F] = {
49+
override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = {
5050
val string = printer.print(message)
5151

5252
if (logger.isTraceEnabled) {
@@ -55,10 +55,12 @@ class JsonMessageCodec[F[_] : Sync](
5555

5656
val bytes = string.getBytes()
5757

58-
Entity(
58+
val entity = Entity(
5959
body = Stream.chunk(Chunk.array(bytes)),
6060
length = Some(bytes.length.toLong),
6161
)
62+
63+
compressor.compressed(options.encoding, entity)
6264
}
6365

6466
}
Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
11
package org.ivovk.connect_rpc_scala.http.codec
22

3-
import org.http4s.headers.`Content-Type`
4-
import org.http4s.{DecodeResult, Entity, EntityEncoder, MediaType}
3+
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
4+
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityEncoder, Header, Headers, MediaType}
55
import org.ivovk.connect_rpc_scala.http.RequestEntity
66
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
77

8+
import scala.util.chaining.*
9+
10+
case class EncodeOptions(
11+
encoding: Option[ContentCoding]
12+
)
13+
814
object MessageCodec {
9-
given [F[_], A <: Message](using codec: MessageCodec[F]): EntityEncoder[F, A] =
10-
EntityEncoder.encodeBy(`Content-Type`(codec.mediaType))(codec.encode)
15+
given [F[_], A <: Message](using codec: MessageCodec[F], options: EncodeOptions): EntityEncoder[F, A] = {
16+
val headers = Headers(`Content-Type`(codec.mediaType))
17+
.pipe(
18+
options.encoding match
19+
case Some(encoding) => _.put(`Content-Encoding`(encoding))
20+
case None => identity
21+
)
22+
23+
EntityEncoder.encodeBy(headers)(codec.encode(_, options))
24+
}
1125
}
1226

1327
trait MessageCodec[F[_]] {
@@ -16,6 +30,6 @@ trait MessageCodec[F[_]] {
1630

1731
def decode[A <: Message](m: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A]
1832

19-
def encode[A <: Message](message: A): Entity[F]
33+
def encode[A <: Message](message: A, options: EncodeOptions): Entity[F]
2034

2135
}

core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/ProtoMessageCodec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,20 @@ class ProtoMessageCodec[F[_] : Async] extends MessageCodec[F] {
4444
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
4545
}
4646

47-
override def encode[A <: Message](message: A): Entity[F] = {
47+
override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = {
4848
if (logger.isTraceEnabled) {
4949
logger.trace(s"<<< Proto: ${message.toProtoString}")
5050
}
5151

5252
val dataLength = message.serializedSize
5353
val chunkSize = CodedOutputStream.DEFAULT_BUFFER_SIZE min dataLength
5454

55-
Entity(
55+
val entity = Entity(
5656
body = readOutputStream(chunkSize)(os => Async[F].delay(message.writeTo(os))),
5757
length = Some(dataLength.toLong),
5858
)
59+
60+
compressor.compressed(options.encoding, entity)
5961
}
6062

6163
}

0 commit comments

Comments
 (0)