Skip to content

Commit fb1a8c1

Browse files
committed
Introduce separate type for Any's in error details. Split codec classes into files
1 parent fa13aa8 commit fb1a8c1

File tree

18 files changed

+323
-221
lines changed

18 files changed

+323
-221
lines changed

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
6969
val status = Status.fromCodeValue(code.value)
7070
.withDescription(message.orNull)
7171
.augmentDescription(
72-
TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoErrorAny)
72+
TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoErrorDetailsAny)
7373
)
7474

7575
throw new StatusRuntimeException(status)

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.comcast.ip4s.{Port, host, port}
55
import connectrpc.conformance.v1.{ConformanceServiceFs2GrpcTrailers, ServerCompatRequest, ServerCompatResponse}
66
import org.http4s.ember.server.EmberServerBuilder
77
import org.ivovk.connect_rpc_scala.ConnectRouteBuilder
8-
import scalapb.json4s.TypeRegistry
8+
import org.ivovk.connect_rpc_scala.http.codec.JsonMessageCodecBuilder
99

1010
import java.io.InputStream
1111
import java.nio.ByteBuffer
@@ -36,19 +36,21 @@ object Main extends IOApp.Simple {
3636
ConformanceServiceImpl[IO]()
3737
)
3838

39-
app <- ConnectRouteBuilder.forService[IO](service)
39+
jsonCodec = JsonMessageCodecBuilder[IO]()
4040
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
4141
// JSON-serialization conformance tests
42-
.withJsonPrinterConfigurator { p =>
43-
p.withTypeRegistry(
44-
TypeRegistry.default
45-
.addMessage[connectrpc.conformance.v1.UnaryRequest]
46-
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
47-
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
48-
)
42+
.withTypeRegistryConfigurator { tp =>
43+
tp
44+
.addMessage[connectrpc.conformance.v1.UnaryRequest]
45+
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
46+
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
4947
}
5048
.build
5149

50+
app <- ConnectRouteBuilder.forService[IO](service)
51+
.withJsonCodec(jsonCodec)
52+
.build
53+
5254
server <- EmberServerBuilder.default[IO]
5355
.withHost(host"127.0.0.1")
5456
.withPort(port"0") // random port

core/src/main/protobuf/connectrpc/error.proto

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@ syntax = "proto3";
1616

1717
package connectrpc;
1818

19-
import "google/protobuf/any.proto";
19+
// This message is similar to the google.protobuf.Any message.
20+
//
21+
// Separate type was needed to introduce a separate JSON serializer for this message, since Any in error details
22+
// has different JSON serialization rules compared to a generic google.protobuf.Any.
23+
// See https://github.com/connectrpc/conformance/issues/948#issuecomment-2511130448 for some details.
24+
message ErrorDetailsAny {
25+
string type = 1;
26+
bytes value = 2;
27+
}
2028

2129
// An error definition used for specifying a desired error response
2230
message Error {
@@ -30,5 +38,5 @@ message Error {
3038
optional string message = 2;
3139
// Errors in Connect and gRPC protocols can have arbitrary messages
3240
// attached to them, which are known as error details.
33-
repeated google.protobuf.Any details = 3;
41+
repeated ErrorDetailsAny details = 3;
3442
}

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import org.http4s.{Header, MediaType, MessageFailure, Method, Response}
1111
import org.ivovk.connect_rpc_scala.Mappings.*
1212
import org.ivovk.connect_rpc_scala.grpc.{MethodName, MethodRegistry}
1313
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
14-
import org.ivovk.connect_rpc_scala.http.MessageCodec.given
15-
import org.ivovk.connect_rpc_scala.http.{MediaTypes, MessageCodec, MessageCodecRegistry, RequestEntity}
14+
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given
15+
import org.ivovk.connect_rpc_scala.http.codec.{MessageCodec, MessageCodecRegistry}
16+
import org.ivovk.connect_rpc_scala.http.{MediaTypes, RequestEntity}
1617
import org.slf4j.{Logger, LoggerFactory}
1718
import scalapb.grpc.ClientCalls
1819
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TextFormat}
@@ -149,16 +150,16 @@ class ConnectHandler[F[_] : Async](
149150

150151
val messageWithDetails = rawMessage
151152
.map(
152-
_.split("\n").partition(m => !m.startsWith("type_url: "))
153+
_.split("\n").partition(m => !m.startsWith("type: "))
153154
)
154155
.map((messageParts, additionalDetails) =>
155156
val details = additionalDetails
156-
.map(TextFormat.fromAscii(com.google.protobuf.any.Any, _) match {
157+
.map(TextFormat.fromAscii(connectrpc.ErrorDetailsAny, _) match {
157158
case Right(details) => details
158159
case Left(e) =>
159160
logger.warn(s"Failed to parse additional details", e)
160161

161-
com.google.protobuf.wrappers.StringValue(e.msg).toProtoAny
162+
com.google.protobuf.wrappers.StringValue(e.msg).toProtoErrorDetailsAny
162163
})
163164
.toSeq
164165

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ import org.http4s.{HttpApp, HttpRoutes, Method, Uri}
99
import org.ivovk.connect_rpc_scala.grpc.*
1010
import org.ivovk.connect_rpc_scala.http.*
1111
import org.ivovk.connect_rpc_scala.http.QueryParams.*
12-
import org.ivovk.connect_rpc_scala.http.json.ConnectJsonRegistry
13-
import scalapb.json4s.{JsonFormat, Printer}
12+
import org.ivovk.connect_rpc_scala.http.codec.{JsonMessageCodec, JsonMessageCodecBuilder, MessageCodecRegistry, ProtoMessageCodec}
1413

1514
import java.util.concurrent.Executor
1615
import scala.concurrent.ExecutionContext
1716
import scala.concurrent.duration.*
18-
import scala.util.chaining.*
1917

2018
object ConnectRouteBuilder {
2119

@@ -32,9 +30,9 @@ object ConnectRouteBuilder {
3230

3331
case class ConnectRouteBuilder[F[_] : Async] private(
3432
services: Seq[ServerServiceDefinition],
35-
jsonPrinterConfigurator: Endo[Printer] = identity,
3633
serverConfigurator: Endo[ServerBuilder[_]] = identity,
3734
channelConfigurator: Endo[ManagedChannelBuilder[_]] = identity,
35+
customJsonCodec: Option[JsonMessageCodec[F]] = None,
3836
pathPrefix: Uri.Path = Uri.Path.Root,
3937
executor: Executor = ExecutionContext.global,
4038
waitForShutdown: Duration = 5.seconds,
@@ -43,8 +41,8 @@ case class ConnectRouteBuilder[F[_] : Async] private(
4341

4442
import Mappings.*
4543

46-
def withJsonPrinterConfigurator(method: Endo[Printer]): ConnectRouteBuilder[F] =
47-
copy(jsonPrinterConfigurator = method)
44+
def withJsonCodec(codec: JsonMessageCodec[F]): ConnectRouteBuilder[F] =
45+
copy(customJsonCodec = Some(codec))
4846

4947
def withServerConfigurator(method: Endo[ServerBuilder[_]]): ConnectRouteBuilder[F] =
5048
copy(serverConfigurator = method)
@@ -80,14 +78,9 @@ case class ConnectRouteBuilder[F[_] : Async] private(
8078
val httpDsl = Http4sDsl[F]
8179
import httpDsl.*
8280

83-
val compressor = Compressor[F]
84-
val jsonPrinter = JsonFormat.printer
85-
.withFormatRegistry(ConnectJsonRegistry.default)
86-
.pipe(jsonPrinterConfigurator)
87-
8881
val codecRegistry = MessageCodecRegistry[F](
89-
JsonMessageCodec[F](compressor, jsonPrinter),
90-
ProtoMessageCodec[F](compressor)
82+
customJsonCodec.getOrElse(JsonMessageCodecBuilder[F]().build),
83+
ProtoMessageCodec[F](),
9184
)
9285

9386
val methodRegistry = MethodRegistry(services)

core/src/main/scala/org/ivovk/connect_rpc_scala/Mappings.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,18 @@ trait StatusCodeMappings {
106106
trait ProtoMappings {
107107

108108
extension [T <: GeneratedMessage](t: T) {
109-
private def any(typeUrlPrefix: String = "type.googleapis.com/"): com.google.protobuf.any.Any =
109+
def toProtoAny: com.google.protobuf.any.Any = {
110110
com.google.protobuf.any.Any(
111-
typeUrl = typeUrlPrefix + t.companion.scalaDescriptor.fullName,
111+
typeUrl = "type.googleapis.com/" + t.companion.scalaDescriptor.fullName,
112112
value = t.toByteString
113113
)
114+
}
114115

115-
def toProtoAny: com.google.protobuf.any.Any = any()
116-
117-
def toProtoErrorAny: com.google.protobuf.any.Any = any("")
116+
def toProtoErrorDetailsAny: connectrpc.ErrorDetailsAny =
117+
connectrpc.ErrorDetailsAny(
118+
`type` = t.companion.scalaDescriptor.fullName,
119+
value = t.toByteString
120+
)
118121

119122
}
120123

core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala

Lines changed: 0 additions & 145 deletions
This file was deleted.

core/src/main/scala/org/ivovk/connect_rpc_scala/http/RequestEntity.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import fs2.Stream
55
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
66
import org.http4s.{Charset, ContentCoding, Headers, Media}
77
import org.ivovk.connect_rpc_scala.http.Headers.`Connect-Timeout-Ms`
8+
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec
89
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
910

1011
object RequestEntity {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.ivovk.connect_rpc_scala.http.codec
2+
3+
import cats.effect.Sync
4+
import fs2.Stream
5+
import fs2.compression.Compression
6+
import org.http4s.ContentCoding
7+
8+
class Compressor[F[_] : Sync] {
9+
10+
given Compression[F] = Compression.forSync[F]
11+
12+
def decompressed(encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] =
13+
body.through(encoding match {
14+
case Some(ContentCoding.gzip) =>
15+
Compression[F].gunzip().andThen(_.flatMap(_.content))
16+
case _ =>
17+
identity
18+
})
19+
20+
}

0 commit comments

Comments
 (0)