Skip to content

Commit 816566c

Browse files
committed
33 / 38 tests pass
1 parent 1ca49e0 commit 816566c

File tree

5 files changed

+132
-108
lines changed

5 files changed

+132
-108
lines changed

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Http4sClientLauncher.scala

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,21 @@ package org.ivovk.connect_rpc_scala.conformance
33
import cats.effect.std.Dispatcher
44
import cats.effect.{IO, IOApp}
55
import cats.syntax.all.*
6+
import com.google.protobuf.any.Any
7+
import connectrpc.ErrorDetailsAny
68
import connectrpc.conformance.v1 as conformance
79
import connectrpc.conformance.v1.*
8-
import io.grpc.{Metadata, StatusRuntimeException}
10+
import io.grpc.Metadata
911
import org.http4s.Uri
1012
import org.http4s.ember.client.EmberClientBuilder
1113
import org.ivovk.connect_rpc_scala.conformance.util.{ConformanceHeadersConv, ProtoSerDeser}
12-
import org.ivovk.connect_rpc_scala.connect.StatusCodeMappings.toConnectCode
14+
import org.ivovk.connect_rpc_scala.connect.ErrorHandling
1315
import org.ivovk.connect_rpc_scala.http4s.ConnectHttp4sClientBuilder
1416
import org.slf4j.LoggerFactory
1517

18+
import java.util.concurrent.TimeUnit
19+
import scala.concurrent.duration.Duration
20+
1621
/**
1722
* Flow:
1823
*
@@ -39,11 +44,11 @@ object Http4sClientLauncher extends IOApp.Simple {
3944
.unfoldEval(0) { _ =>
4045
ProtoSerDeser[IO].read[ClientCompatRequest](System.in).option.map(_.map(_ -> 0))
4146
}
42-
.evalMap { req =>
47+
.evalMap { (spec: ClientCompatRequest) =>
4348
val f = for
44-
_ <- IO(logger.info(s"Received request: $req")).toResource
49+
_ <- IO(logger.info(s"Received request: $spec")).toResource
4550

46-
baseUri <- IO.fromEither(Uri.fromString(s"http://${req.host}:${req.port}")).toResource
51+
baseUri <- IO.fromEither(Uri.fromString(s"http://${spec.host}:${spec.port}")).toResource
4752

4853
channel <- ConnectHttp4sClientBuilder(httpClient)
4954
.withJsonCodecConfigurator(
@@ -53,10 +58,12 @@ object Http4sClientLauncher extends IOApp.Simple {
5358
.registerType[conformance.UnaryRequest]
5459
.registerType[conformance.IdempotentUnaryRequest]
5560
)
61+
.withRequestTimeout(spec.timeoutMs.map(Duration(_, TimeUnit.MILLISECONDS)))
5662
.build(baseUri)
63+
5764
stub = ConformanceServiceFs2GrpcTrailers.stub[IO](dispatcher, channel)
5865

59-
resp <- runTestCase(stub, req).toResource
66+
resp <- testClientCompat(stub, spec).toResource
6067

6168
_ <- ProtoSerDeser[IO].write(System.out, resp).toResource
6269
yield ()
@@ -72,28 +79,28 @@ object Http4sClientLauncher extends IOApp.Simple {
7279
}
7380
}
7481

75-
private def runTestCase(
82+
private def testClientCompat(
7683
stub: ConformanceServiceFs2GrpcTrailers[IO, Metadata],
77-
specReq: ClientCompatRequest,
84+
spec: ClientCompatRequest,
7885
): IO[ClientCompatResponse] = {
79-
logger.info(">>> Running conformance test: {}", specReq.testName)
86+
logger.info(">>> Running conformance test: {}", spec.testName)
8087

8188
require(
82-
specReq.service.contains("connectrpc.conformance.v1.ConformanceService"),
83-
s"Invalid service name: ${specReq.service}. Expected 'connectrpc.conformance.v1.ConformanceService'.",
89+
spec.service.contains("connectrpc.conformance.v1.ConformanceService"),
90+
s"Invalid service name: ${spec.service}. Expected 'connectrpc.conformance.v1.ConformanceService'.",
8491
)
8592

86-
specReq.method match {
93+
spec.method match {
8794
case Some("Unary") =>
88-
val req = specReq.requestMessages.head.unpack[UnaryRequest]
89-
val requestMetadata = ConformanceHeadersConv.toMetadata(specReq.requestHeaders)
95+
val req = spec.requestMessages.head.unpack[UnaryRequest]
96+
val requestMetadata = ConformanceHeadersConv.toMetadata(spec.requestHeaders)
9097

9198
logger.info("Decoded Request: {}", req)
9299
logger.info("Decoded Request metadata: {}", requestMetadata)
93100

94101
stub.unary(req, requestMetadata)
95102
.map { (resp, trailers) =>
96-
logger.info("<<< Conformance test completed: {}", specReq.testName)
103+
logger.info("<<< Conformance test completed: {}", spec.testName)
97104

98105
ClientCompatResponse.Result.Response(
99106
ClientResponseResult(
@@ -103,41 +110,37 @@ object Http4sClientLauncher extends IOApp.Simple {
103110
)
104111
)
105112
}
106-
.handleError {
107-
case e: StatusRuntimeException =>
108-
logger.error("Error during conformance test: {}", specReq.testName, e)
109-
110-
ClientCompatResponse.Result.Response(
111-
ClientResponseResult(
112-
responseHeaders = ConformanceHeadersConv.toHeaderSeq(e.getTrailers),
113-
error = Some(
114-
conformance.Error(
115-
code = conformance.Code.fromValue(e.getStatus.toConnectCode.value),
116-
message = Some(e.getMessage),
117-
details = Seq.empty,
118-
)
119-
),
120-
responseTrailers = ConformanceHeadersConv.toTrailingHeaderSeq(e.getTrailers),
121-
)
122-
)
123-
case e: Throwable =>
124-
ClientCompatResponse.Result.Error(
125-
ClientErrorResult(
126-
message = e.getMessage
127-
)
113+
.handleError { (t: Throwable) =>
114+
val errorDetails = ErrorHandling.extractDetails(t)
115+
logger.info(s"Error during the conformance test: ${spec.testName}. Error: $errorDetails")
116+
117+
ClientCompatResponse.Result.Response(
118+
ClientResponseResult(
119+
error = Some(
120+
conformance.Error(
121+
code = conformance.Code.fromValue(errorDetails.error.code.value),
122+
message = errorDetails.error.message,
123+
details = errorDetails.error.details
124+
// TODO: simplify
125+
.map(d => Any("type.googleapis.com/" + d.`type`, d.value).unpack[ErrorDetailsAny])
126+
.map(d => Any(d.`type`, d.value)),
127+
)
128+
),
129+
responseTrailers = ConformanceHeadersConv.toTrailingHeaderSeq(errorDetails.metadata),
128130
)
131+
)
129132
}
130-
.map(result => ClientCompatResponse(specReq.testName, result))
133+
.map(result => ClientCompatResponse(spec.testName, result))
131134
case Some(other) =>
132-
ClientCompatResponse(specReq.testName)
135+
ClientCompatResponse(spec.testName)
133136
.withError(
134137
ClientErrorResult(
135138
message = s"Unsupported method: $other. Only 'Unary' is supported in this client."
136139
)
137140
)
138141
.pure[IO]
139142
case None =>
140-
ClientCompatResponse(specReq.testName)
143+
ClientCompatResponse(spec.testName)
141144
.withError(
142145
ClientErrorResult(
143146
message = s"Method is not specified in the request."

core/src/main/scala/org/ivovk/connect_rpc_scala/connect/StatusCodeMappings.scala

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import io.grpc.Status
55
object StatusCodeMappings {
66

77
private val httpStatusCodesByGrpcStatusCode: Array[Int] = {
8-
val maxCode = Status.Code.values().map(_.value()).max
8+
val maxCode = Status.Code.values.map(_.value).max
99
val codes = new Array[Int](maxCode + 1)
1010

11-
Status.Code.values().foreach { code =>
12-
codes(code.value()) = code match {
11+
Status.Code.values.foreach { code =>
12+
codes(code.value) = code match {
1313
case Status.Code.CANCELLED => 499 // 499 Client Closed Request
1414
case Status.Code.UNKNOWN => 500 // 500 Internal Server Error
1515
case Status.Code.INVALID_ARGUMENT => 400 // 400 Bad Request
@@ -33,33 +33,21 @@ object StatusCodeMappings {
3333
codes
3434
}
3535

36-
private val connectErrorCodeByGrpcStatusCode: Array[connectrpc.Code] = {
37-
val maxCode = Status.Code.values().map(_.value()).max
38-
val codes = new Array[connectrpc.Code](maxCode + 1)
36+
// Used in the client to determine the gRPC status code from the HTTP status code
37+
// Surprisingly, not matching the previous mapping
38+
val GrpcStatusCodesByHttpStatusCode: Map[Int, Status.Code] = {
39+
val reverseMapping = Status.Code.values
40+
.map(code => httpStatusCodesByGrpcStatusCode(code.value) -> code)
41+
.toMap
3942

40-
Status.Code.values().foreach { code =>
41-
codes(code.value()) = code match {
42-
case Status.Code.CANCELLED => connectrpc.Code.Canceled
43-
case Status.Code.UNKNOWN => connectrpc.Code.Unknown
44-
case Status.Code.INVALID_ARGUMENT => connectrpc.Code.InvalidArgument
45-
case Status.Code.DEADLINE_EXCEEDED => connectrpc.Code.DeadlineExceeded
46-
case Status.Code.NOT_FOUND => connectrpc.Code.NotFound
47-
case Status.Code.ALREADY_EXISTS => connectrpc.Code.AlreadyExists
48-
case Status.Code.PERMISSION_DENIED => connectrpc.Code.PermissionDenied
49-
case Status.Code.RESOURCE_EXHAUSTED => connectrpc.Code.ResourceExhausted
50-
case Status.Code.FAILED_PRECONDITION => connectrpc.Code.FailedPrecondition
51-
case Status.Code.ABORTED => connectrpc.Code.Aborted
52-
case Status.Code.OUT_OF_RANGE => connectrpc.Code.OutOfRange
53-
case Status.Code.UNIMPLEMENTED => connectrpc.Code.Unimplemented
54-
case Status.Code.INTERNAL => connectrpc.Code.Internal
55-
case Status.Code.UNAVAILABLE => connectrpc.Code.Unavailable
56-
case Status.Code.DATA_LOSS => connectrpc.Code.DataLoss
57-
case Status.Code.UNAUTHENTICATED => connectrpc.Code.Unauthenticated
58-
case _ => connectrpc.Code.Internal
59-
}
60-
}
61-
62-
codes
43+
reverseMapping ++ Map(
44+
400 -> Status.Code.INTERNAL, // 400 Bad Request
45+
404 -> Status.Code.UNIMPLEMENTED, // 404 Not Found
46+
409 -> Status.Code.UNKNOWN, // 409 Conflict
47+
429 -> Status.Code.UNAVAILABLE, // 429 Too Many Requests
48+
502 -> Status.Code.UNAVAILABLE, // 502 Bad Gateway
49+
504 -> Status.Code.UNAVAILABLE, // 504 Gateway Timeout
50+
)
6351
}
6452

6553
extension (status: Status) {
@@ -73,10 +61,10 @@ object StatusCodeMappings {
7361
// Url: https://connectrpc.com/docs/protocol/#error-codes
7462
extension (code: Status.Code) {
7563
def toHttpStatusCode: Int =
76-
httpStatusCodesByGrpcStatusCode(code.value())
64+
httpStatusCodesByGrpcStatusCode(code.value)
7765

7866
def toConnectCode: connectrpc.Code =
79-
connectErrorCodeByGrpcStatusCode(code.value())
67+
connectrpc.Code.fromValue(code.value)
8068
}
8169

8270
}
Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.ivovk.connect_rpc_scala.syntax
22

3-
import io.grpc.{StatusException, StatusRuntimeException}
3+
import io.grpc.{Metadata, StatusException, StatusRuntimeException}
44
import org.ivovk.connect_rpc_scala.grpc.GrpcHeaders
55
import scalapb.GeneratedMessage
66

@@ -10,28 +10,25 @@ trait ExceptionSyntax {
1010

1111
extension (e: StatusRuntimeException) {
1212
def withDetails[T <: GeneratedMessage](t: T): StatusRuntimeException = {
13-
e.getTrailers.put(
14-
GrpcHeaders.ErrorDetailsKey,
15-
connectrpc.ErrorDetailsAny(
16-
`type` = t.companion.scalaDescriptor.fullName,
17-
value = t.toByteString,
18-
),
19-
)
13+
packDetails(e.getTrailers, t)
2014
e
2115
}
2216
}
2317

2418
extension (e: StatusException) {
2519
def withDetails[T <: GeneratedMessage](t: T): StatusException = {
26-
e.getTrailers.put(
27-
GrpcHeaders.ErrorDetailsKey,
28-
connectrpc.ErrorDetailsAny(
29-
`type` = t.companion.scalaDescriptor.fullName,
30-
value = t.toByteString,
31-
),
32-
)
20+
packDetails(e.getTrailers, t);
3321
e
3422
}
3523
}
3624

25+
def packDetails[T <: GeneratedMessage](metadata: Metadata, details: T): Unit =
26+
metadata.put(
27+
GrpcHeaders.ErrorDetailsKey,
28+
connectrpc.ErrorDetailsAny(
29+
`type` = details.companion.scalaDescriptor.fullName,
30+
value = details.toByteString,
31+
),
32+
)
33+
3734
}

http4s/src/main/scala/org/ivovk/connect_rpc_scala/http4s/ConnectHttp4sClientBuilder.scala

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,12 @@ import cats.effect.{Async, Resource}
66
import io.grpc.Channel
77
import org.http4s.Uri
88
import org.http4s.client.Client
9-
import org.ivovk.connect_rpc_scala.http.{HeaderMapping, HeadersFilter, MediaTypes}
10-
import org.ivovk.connect_rpc_scala.http.codec.{
11-
JsonSerDeser,
12-
JsonSerDeserBuilder,
13-
MessageCodecRegistry,
14-
ProtoMessageCodec,
15-
}
9+
import org.ivovk.connect_rpc_scala.http.codec.{JsonSerDeser, JsonSerDeserBuilder, ProtoMessageCodec}
10+
import org.ivovk.connect_rpc_scala.http.{HeaderMapping, HeadersFilter}
1611
import org.ivovk.connect_rpc_scala.http4s.client.Http4sChannel
1712

13+
import scala.concurrent.duration.Duration
14+
1815
object ConnectHttp4sClientBuilder {
1916

2017
def apply[F[_]: Async](client: Client[F]): ConnectHttp4sClientBuilder[F] =
@@ -23,6 +20,8 @@ object ConnectHttp4sClientBuilder {
2320
customJsonSerDeser = None,
2421
incomingHeadersFilter = HeaderMapping.DefaultIncomingHeadersFilter,
2522
outgoingHeadersFilter = HeaderMapping.DefaultOutgoingHeadersFilter,
23+
useBinaryFormat = false,
24+
requestTimeoutMs = None,
2625
)
2726
}
2827

@@ -31,18 +30,24 @@ class ConnectHttp4sClientBuilder[F[_]: Async] private (
3130
customJsonSerDeser: Option[JsonSerDeser[F]],
3231
incomingHeadersFilter: HeadersFilter,
3332
outgoingHeadersFilter: HeadersFilter,
33+
useBinaryFormat: Boolean,
34+
requestTimeoutMs: Option[Long],
3435
) {
3536

3637
private def copy(
3738
customJsonSerDeser: Option[JsonSerDeser[F]] = customJsonSerDeser,
3839
incomingHeadersFilter: HeadersFilter = incomingHeadersFilter,
3940
outgoingHeadersFilter: HeadersFilter = outgoingHeadersFilter,
41+
useBinaryFormat: Boolean = useBinaryFormat,
42+
requestTimeoutMs: Option[Long] = requestTimeoutMs,
4043
): ConnectHttp4sClientBuilder[F] =
4144
new ConnectHttp4sClientBuilder(
4245
client,
4346
customJsonSerDeser,
4447
incomingHeadersFilter,
4548
outgoingHeadersFilter,
49+
useBinaryFormat,
50+
requestTimeoutMs,
4651
)
4752

4853
def withJsonCodecConfigurator(method: Endo[JsonSerDeserBuilder[F]]): ConnectHttp4sClientBuilder[F] =
@@ -64,14 +69,23 @@ class ConnectHttp4sClientBuilder[F[_]: Async] private (
6469
def withOutgoingHeadersFilter(filter: String => Boolean): ConnectHttp4sClientBuilder[F] =
6570
copy(outgoingHeadersFilter = filter)
6671

72+
/**
73+
* Use protobuf binary format for messages.
74+
*
75+
* By default, JSON format is used.
76+
*/
77+
def withUseBinaryFormat(useBinaryFormat: Boolean): ConnectHttp4sClientBuilder[F] =
78+
copy(useBinaryFormat = useBinaryFormat)
79+
80+
def withRequestTimeout(timeout: Option[Duration]): ConnectHttp4sClientBuilder[F] =
81+
copy(requestTimeoutMs = timeout.map(_.toMillis).filter(_ > 0))
82+
6783
def build(baseUri: Uri): Resource[F, Channel] =
6884
for dispatcher <- Dispatcher.parallel[F](await = false)
6985
yield {
70-
val jsonSerDeser = customJsonSerDeser.getOrElse(JsonSerDeserBuilder[F]().build)
71-
val codecRegistry = MessageCodecRegistry[F](
72-
jsonSerDeser.codec,
73-
ProtoMessageCodec[F](),
74-
)
86+
val codec =
87+
if useBinaryFormat then new ProtoMessageCodec[F]()
88+
else customJsonSerDeser.getOrElse(JsonSerDeserBuilder[F]().build).codec
7589

7690
val headerMapping = Http4sHeaderMapping(
7791
incomingHeadersFilter,
@@ -80,11 +94,12 @@ class ConnectHttp4sClientBuilder[F[_]: Async] private (
8094
)
8195

8296
new Http4sChannel(
83-
client,
84-
dispatcher,
85-
codecRegistry.byMediaType(MediaTypes.`application/json`).get,
86-
headerMapping,
87-
baseUri,
97+
client = client,
98+
dispatcher = dispatcher,
99+
messageCodec = codec,
100+
headerMapping = headerMapping,
101+
baseUri = baseUri,
102+
timeoutMs = requestTimeoutMs,
88103
)
89104
}
90105

0 commit comments

Comments
 (0)