Skip to content

Commit df2461d

Browse files
committed
Work on client conformance tests
1 parent dac5746 commit df2461d

File tree

6 files changed

+176
-46
lines changed

6 files changed

+176
-46
lines changed

build/conformance/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ RUN mkdir "/logs"
3333

3434
ARG launcher
3535
ARG config="suite.yaml"
36+
ARG mode="server"
3637
ARG stable="true"
3738

3839
# Run stable tests first
3940
RUN echo ">>>>> Running stable tests <<<<<"
4041
RUN LOGS_PATH="/logs" \
4142
./connectconformance \
4243
--conf $config \
43-
--mode server \
44-
--parallel 1 \
44+
--mode $mode \
4545
-v -vv --trace \
4646
-- \
4747
/app/bin/conformance -main org.ivovk.connect_rpc_scala.conformance.$launcher; \
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# https://github.com/connectrpc/conformance/blob/main/docs/configuring_and_running_tests.md#configuration-files
2+
features:
3+
versions: [ HTTP_VERSION_1 ]
4+
protocols: [ PROTOCOL_CONNECT ]
5+
codecs: [ CODEC_JSON ]
6+
stream_types: [ STREAM_TYPE_UNARY ]
7+
supports_tls: false
8+
supports_trailers: false
9+
supports_connect_get: false

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import com.google.protobuf.ByteString
66
import connectrpc.conformance.v1.*
77
import io.grpc.internal.GrpcUtil
88
import io.grpc.{Metadata, Status}
9-
import org.ivovk.connect_rpc_scala.syntax.all.{*, given}
9+
import org.ivovk.connect_rpc_scala.conformance.util.ConformanceHeadersConv
10+
import org.ivovk.connect_rpc_scala.syntax.all.*
1011
import scalapb.GeneratedMessage
1112

1213
import java.util.concurrent.TimeUnit
1314
import scala.concurrent.duration.Duration
14-
import scala.jdk.CollectionConverters.*
1515

1616
case class UnaryHandlerResponse(payload: ConformancePayload, trailers: Metadata)
1717

@@ -51,12 +51,12 @@ class ConformanceServiceImpl[F[_]: Async] extends ConformanceServiceFs2GrpcTrail
5151
ctx: Metadata,
5252
): F[UnaryHandlerResponse] = {
5353
val requestInfo = ConformancePayload.RequestInfo(
54-
requestHeaders = mkConformanceHeaders(ctx),
54+
requestHeaders = ConformanceHeadersConv.toHeaderSeq(ctx),
5555
timeoutMs = extractTimeoutMs(ctx),
5656
requests = requests.map(_.toProtoAny),
5757
)
5858

59-
val trailers = mkMetadata(
59+
val trailers = ConformanceHeadersConv.toMetadata(
6060
Seq.concat(
6161
responseDefinition.responseHeaders,
6262
responseDefinition.responseTrailers.map(h => h.copy(name = s"trailer-${h.name}")),
@@ -90,23 +90,6 @@ class ConformanceServiceImpl[F[_]: Async] extends ConformanceServiceFs2GrpcTrail
9090

9191
}
9292

93-
private def mkConformanceHeaders(metadata: Metadata): Seq[Header] =
94-
metadata.keys().asScala.map { key =>
95-
Header(key, metadata.getAll(asciiKey[String](key)).asScala.toSeq)
96-
}.toSeq
97-
98-
private def mkMetadata(headers: Seq[Header]): Metadata = {
99-
val metadata = new Metadata()
100-
headers.foreach { h =>
101-
val key = asciiKey[String](h.name)
102-
103-
h.value.foreach { v =>
104-
metadata.put(key, v)
105-
}
106-
}
107-
metadata
108-
}
109-
11093
private def extractTimeoutMs(metadata: Metadata): Option[Long] =
11194
Option(metadata.get(GrpcUtil.TIMEOUT_KEY)).map(_ / 1_000_000)
11295

Lines changed: 119 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package org.ivovk.connect_rpc_scala.conformance
22

3+
import cats.syntax.all.*
4+
import cats.effect.std.Dispatcher
35
import cats.effect.{IO, IOApp}
4-
import connectrpc.conformance.v1.{
5-
ClientCompatRequest,
6-
ClientCompatResponse,
7-
ConformanceServiceFs2GrpcTrailers,
8-
}
9-
import org.ivovk.connect_rpc_scala.conformance.util.ProtoSerDeser
6+
import connectrpc.conformance.v1 as conformance
7+
import connectrpc.conformance.v1.*
8+
import io.grpc.{Metadata, StatusRuntimeException}
9+
import org.http4s.Uri
10+
import org.http4s.ember.client.EmberClientBuilder
11+
import org.ivovk.connect_rpc_scala.conformance.util.{ConformanceHeadersConv, ProtoSerDeser}
12+
import org.ivovk.connect_rpc_scala.connect.StatusCodeMappings.toConnectCode
13+
import org.ivovk.connect_rpc_scala.http4s.ConnectHttp4sClientBuilder
14+
import org.slf4j.LoggerFactory
1015

1116
/**
1217
* Flow:
1318
*
14-
* - Upon launch, `ServerCompatRequest` message is sent from the test runner to the server to STDIN.
15-
* - Server is started and listens on a random port.
16-
* - `ServerCompatResponse` is sent from the server to STDOUT, which instructs the test runner on which port
17-
* the server is listening.
19+
* TODO: Describe the flow of the conformance client.
1820
*
1921
* All diagnostics should be written to STDERR.
2022
*
@@ -24,23 +26,117 @@ import org.ivovk.connect_rpc_scala.conformance.util.ProtoSerDeser
2426
*/
2527
object Http4sClientLauncher extends IOApp.Simple {
2628

27-
// private val logger = LoggerFactory.getLogger(getClass)
29+
private val logger = LoggerFactory.getLogger(getClass)
30+
31+
override def run: IO[Unit] = {
32+
logger.info("Starting conformance client tests...")
33+
34+
val res = for
35+
dispatcher <- Dispatcher.parallel[IO]
36+
httpClient <- EmberClientBuilder.default[IO].build
37+
38+
_ <- fs2.Stream
39+
.unfoldEval(0) { _ =>
40+
ProtoSerDeser[IO].read[ClientCompatRequest](System.in).option.map(_.map(_ -> 0))
41+
}
42+
.evalMap { req =>
43+
val f = for
44+
_ <- IO(logger.info(s"Received request: $req")).toResource
45+
46+
baseUri <- IO.fromEither(Uri.fromString(s"http://${req.host}:${req.port}")).toResource
47+
48+
channel <- new ConnectHttp4sClientBuilder(httpClient).build(baseUri)
49+
stub = ConformanceServiceFs2GrpcTrailers.stub[IO](dispatcher, channel)
50+
51+
resp <- runTest(stub, req).toResource
52+
53+
_ <- ProtoSerDeser[IO].write(System.out, resp).toResource
54+
yield ()
55+
56+
f.use_
57+
}.compile.resource.drain
58+
yield logger.info("Conformance client tests finished.")
59+
60+
res
61+
.use_
62+
.onError { case e: Throwable =>
63+
IO(logger.error("An error occurred during conformance client tests.", e))
64+
}
65+
}
66+
67+
private def runTest(
68+
stub: ConformanceServiceFs2GrpcTrailers[IO, Metadata],
69+
specReq: ClientCompatRequest,
70+
): IO[ClientCompatResponse] = {
71+
logger.info(">>> Running conformance test: {}", specReq.testName)
72+
73+
require(
74+
specReq.service.contains("connectrpc.conformance.v1.ConformanceService"),
75+
s"Invalid service name: ${specReq.service}. Expected 'connectrpc.conformance.v1.ConformanceService'.",
76+
)
2877

29-
override def run: IO[Unit] =
30-
fs2.Stream.repeatEval {
31-
val f = for
32-
req <- ProtoSerDeser[IO].read[ClientCompatRequest](System.in).toResource
78+
specReq.method match {
79+
case Some("Unary") =>
80+
val req = UnaryRequest.parseFrom(specReq.requestMessages.head.value.newCodedInput())
81+
val requestMetadata = ConformanceHeadersConv.toMetadata(specReq.requestHeaders)
3382

34-
service <- ConformanceServiceFs2GrpcTrailers.bindServiceResource(
35-
ConformanceServiceImpl[IO]()
36-
)
83+
logger.info("Decoded Request: {}", req)
84+
logger.info("Decoded Request metadata: {}", requestMetadata)
3785

38-
resp = ClientCompatResponse(req.testName)
86+
stub.unary(req, requestMetadata)
87+
.map { (resp, trailers) =>
88+
logger.info("<<< Conformance test completed: {}", specReq.testName)
3989

40-
_ <- ProtoSerDeser[IO].write(System.out, resp).toResource
41-
yield ()
90+
ClientCompatResponse.Result.Response(
91+
ClientResponseResult(
92+
responseHeaders = ConformanceHeadersConv.toHeaderSeq(trailers),
93+
payloads = resp.payload.toSeq,
94+
responseTrailers = ConformanceHeadersConv.toTrailingHeaderSeq(trailers),
95+
)
96+
)
97+
}
98+
.handleError {
99+
case e: StatusRuntimeException =>
100+
logger.error("Error during conformance test: {}", specReq.testName, e)
42101

43-
f.use_
44-
}.compile.drain
102+
ClientCompatResponse.Result.Response(
103+
ClientResponseResult(
104+
responseHeaders = ConformanceHeadersConv.toHeaderSeq(e.getTrailers),
105+
error = Some(
106+
conformance.Error(
107+
code = conformance.Code.fromValue(e.getStatus.toConnectCode.value),
108+
message = Some(e.getMessage),
109+
details = Seq.empty,
110+
)
111+
),
112+
responseTrailers = ConformanceHeadersConv.toTrailingHeaderSeq(e.getTrailers),
113+
)
114+
)
115+
case e: Throwable =>
116+
ClientCompatResponse.Result.Error(
117+
ClientErrorResult(
118+
message = e.getMessage
119+
)
120+
)
121+
}
122+
.map(result => ClientCompatResponse(specReq.testName, result))
123+
case Some(other) =>
124+
ClientCompatResponse(specReq.testName)
125+
.withError(
126+
ClientErrorResult(
127+
message = s"Unsupported method: $other. Only 'Unary' is supported in this client."
128+
)
129+
)
130+
.pure[IO]
131+
case None =>
132+
ClientCompatResponse(specReq.testName)
133+
.withError(
134+
ClientErrorResult(
135+
message = s"Method is not specified in the request."
136+
)
137+
)
138+
.pure[IO]
139+
}
140+
}
45141

46142
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.ivovk.connect_rpc_scala.conformance.util
2+
3+
import connectrpc.conformance.v1.Header
4+
import io.grpc.Metadata
5+
import org.ivovk.connect_rpc_scala.syntax.all.{asciiKey, given}
6+
7+
import scala.jdk.CollectionConverters.given
8+
9+
object ConformanceHeadersConv {
10+
private val TrailingHeaderPrefix = "trailer-"
11+
12+
def toHeaderSeq(metadata: Metadata): Seq[Header] =
13+
metadata.keys().asScala
14+
.flatMap { key =>
15+
if key.startsWith(TrailingHeaderPrefix) then None // Skip trailing headers
16+
else Some(Header(key, metadata.getAll(asciiKey[String](key)).asScala.toSeq))
17+
}
18+
.toSeq
19+
20+
def toTrailingHeaderSeq(metadata: Metadata): Seq[Header] =
21+
metadata.keys().asScala
22+
.flatMap { key =>
23+
if key.startsWith(TrailingHeaderPrefix) then
24+
val trailerKey = key.stripPrefix(TrailingHeaderPrefix)
25+
Some(Header(trailerKey, metadata.getAll(asciiKey[String](key)).asScala.toSeq))
26+
else None
27+
}
28+
.toSeq
29+
30+
def toMetadata(headers: Seq[Header]): Metadata = {
31+
val metadata = new Metadata()
32+
headers.foreach { h =>
33+
val key = asciiKey[String](h.name)
34+
35+
h.value.foreach { v =>
36+
metadata.put(key, v)
37+
}
38+
}
39+
metadata
40+
}
41+
}

core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodDescriptorExtensions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scalapb.grpc.ConcreteProtoMethodDescriptorSupplier
66
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
77

88
object MethodDescriptorExtensions {
9+
// Field number for the 'http' extension in google.protobuf.MethodOptions (see google/api/annotations.proto)
910
private val HttpFieldNumber = 72295728
1011

1112
extension (md: MethodDescriptor[_, _]) {

0 commit comments

Comments
 (0)