Skip to content

Commit 5584051

Browse files
authored
Support idempotent (GET) requests (#14)
1 parent 7848f69 commit 5584051

File tree

12 files changed

+221
-47
lines changed

12 files changed

+221
-47
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ codecs: [ CODEC_JSON ]
6464
stream_types: [ STREAM_TYPE_UNARY ]
6565
supports_tls: false
6666
supports_trailers: false
67-
supports_connect_get: false
67+
supports_connect_get: true
6868
supports_message_receive_limit: false
6969
```
7070
@@ -87,7 +87,7 @@ Diagnostic data from the server itself is output in the `out/out.log` file.
8787

8888
### Conformance tests status
8989

90-
Current status: 6/78 tests pass
90+
Current status: 6/79 tests pass
9191

9292
Known issues:
9393

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ lazy val noPublish = List(
2424
lazy val Versions = new {
2525
val grpc = "1.68.1"
2626
val http4s = "0.23.29"
27+
val logback = "1.5.12"
2728
}
2829

2930
lazy val core = project
@@ -51,6 +52,8 @@ lazy val core = project
5152
"org.http4s" %% "http4s-client" % Versions.http4s % Test,
5253

5354
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
55+
56+
"ch.qos.logback" % "logback-classic" % Versions.logback % Test,
5457
),
5558
)
5659

@@ -63,7 +66,7 @@ lazy val conformance = project
6366
libraryDependencies ++= Seq(
6467
"org.http4s" %% "http4s-ember-server" % Versions.http4s,
6568

66-
"ch.qos.logback" % "logback-classic" % "1.5.12" % Runtime,
69+
"ch.qos.logback" % "logback-classic" % Versions.logback % Runtime,
6770
),
6871
)
6972

conformance-suite.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ features:
66
stream_types: [ STREAM_TYPE_UNARY ]
77
supports_tls: false
88
supports_trailers: false
9-
supports_connect_get: false
9+
supports_connect_get: true
1010
supports_message_receive_limit: false

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,48 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
1919

2020
private val logger = LoggerFactory.getLogger(getClass)
2121

22-
override def unary(request: UnaryRequest, ctx: Metadata): F[(UnaryResponse, Metadata)] = {
23-
val responseDefinition = request.getResponseDefinition
22+
override def unary(
23+
request: UnaryRequest,
24+
ctx: Metadata
25+
): F[(UnaryResponse, Metadata)] = {
26+
for
27+
payload <- handleUnaryRequest(
28+
request.getResponseDefinition,
29+
Seq(request.toProtoAny),
30+
ctx
31+
)
32+
yield (UnaryResponse(payload.some), new Metadata())
33+
}
2434

25-
val trailers = new Metadata()
26-
responseDefinition.responseTrailers.foreach { h =>
27-
val key = Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER)
28-
h.value.foreach(v => trailers.put(key, v))
29-
}
35+
override def idempotentUnary(
36+
request: IdempotentUnaryRequest,
37+
ctx: Metadata,
38+
): F[(IdempotentUnaryResponse, Metadata)] = {
39+
for
40+
payload <- handleUnaryRequest(
41+
request.getResponseDefinition,
42+
Seq(request.toProtoAny),
43+
ctx
44+
)
45+
yield (IdempotentUnaryResponse(payload.some), new Metadata())
46+
}
47+
48+
private def handleUnaryRequest(
49+
responseDefinition: UnaryResponseDefinition,
50+
requests: Seq[com.google.protobuf.any.Any],
51+
ctx: Metadata,
52+
): F[ConformancePayload] = {
53+
//val trailers = new Metadata()
54+
//responseDefinition.responseTrailers.foreach { h =>
55+
// val key = Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER)
56+
// h.value.foreach(v => trailers.put(key, v))
57+
//}
58+
59+
val requestInfo = ConformancePayload.RequestInfo(
60+
requestHeaders = mkConformanceHeaders(ctx),
61+
timeoutMs = extractTimeout(ctx),
62+
requests = requests
63+
)
3064

3165
val responseData = responseDefinition.response match {
3266
case UnaryResponseDefinition.Response.ResponseData(bs) =>
@@ -37,39 +71,33 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
3771
val status = Status.fromCodeValue(code.value)
3872
.withDescription(message.orNull)
3973
.augmentDescription(
40-
TextFormat.printToSingleLineUnicodeString(
41-
ConformancePayload.RequestInfo(
42-
requests = Seq(request.toProtoAny)
43-
).toProtoAny
44-
)
74+
TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoAny)
4575
)
4676

47-
throw new StatusRuntimeException(status, trailers)
77+
throw new StatusRuntimeException(status)
4878
}
4979

50-
val timeout = Option(ctx.get(Metadata.Key.of("grpc-timeout", Metadata.ASCII_STRING_MARSHALLER)))
51-
.map(v => v.substring(0, v.length - 1).toLong / 1000)
52-
53-
val payload = ConformancePayload(
54-
data = responseData.getOrElse(ByteString.EMPTY),
55-
requestInfo = ConformancePayload.RequestInfo(
56-
requestHeaders = mkConformanceHeaders(ctx),
57-
timeoutMs = timeout,
58-
requests = Seq(request.toProtoAny),
59-
connectGetInfo = None,
60-
).some
61-
)
62-
6380
Async[F].sleep(Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)) *>
64-
Async[F].pure((UnaryResponse(payload.some), trailers))
81+
Async[F].pure(ConformancePayload(
82+
responseData.getOrElse(ByteString.EMPTY),
83+
requestInfo.some
84+
))
6585
}
6686

87+
private def keyof(key: String): Metadata.Key[String] =
88+
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
89+
6790
private def mkConformanceHeaders(metadata: Metadata): Seq[Header] = {
6891
metadata.keys().asScala.map { key =>
69-
Header(key, metadata.getAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)).asScala.toSeq)
92+
Header(key, metadata.getAll(keyof(key)).asScala.toSeq)
7093
}.toSeq
7194
}
7295

96+
private def extractTimeout(metadata: Metadata): Option[Long] = {
97+
Option(metadata.get(keyof("grpc-timeout")))
98+
.map(v => v.substring(0, v.length - 1).toLong / 1000)
99+
}
100+
73101
override def serverStream(
74102
request: ServerStreamRequest,
75103
ctx: Metadata
@@ -91,8 +119,4 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
91119
ctx: Metadata
92120
): F[(UnimplementedResponse, Metadata)] = ???
93121

94-
override def idempotentUnary(
95-
request: IdempotentUnaryRequest,
96-
ctx: Metadata
97-
): F[(IdempotentUnaryResponse, Metadata)] = ???
98122
}

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ object Main extends IOApp.Simple {
4646
p.withTypeRegistry(
4747
TypeRegistry.default
4848
.addMessage[connectrpc.conformance.v1.UnaryRequest]
49+
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
4950
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
5051
)
5152
}

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@ import cats.effect.Async
55
import cats.effect.kernel.Resource
66
import cats.implicits.*
77
import fs2.compression.Compression
8+
import fs2.{Chunk, Stream}
89
import io.grpc.*
910
import io.grpc.MethodDescriptor.MethodType
1011
import io.grpc.stub.MetadataUtils
1112
import org.http4s.*
1213
import org.http4s.dsl.Http4sDsl
1314
import org.http4s.headers.`Content-Type`
1415
import org.ivovk.connect_rpc_scala.http.*
15-
import org.ivovk.connect_rpc_scala.http.Headers.{`Connect-Timeout-Ms`, `X-Test-Case-Name`}
16+
import org.ivovk.connect_rpc_scala.http.Headers.*
1617
import org.ivovk.connect_rpc_scala.http.MessageCodec.given
18+
import org.ivovk.connect_rpc_scala.http.QueryParams.*
1719
import org.slf4j.{Logger, LoggerFactory}
1820
import scalapb.grpc.ClientCalls
1921
import scalapb.json4s.{JsonFormat, Printer}
@@ -54,6 +56,41 @@ object ConnectRpcHttpRoutes {
5456
ipChannel <- InProcessChannelBridge.create(services, configuration.waitForShutdown)
5557
yield
5658
HttpRoutes.of[F] {
59+
case req@Method.GET -> Root / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) =>
60+
val grpcMethod = grpcMethodName(serviceName, methodName)
61+
62+
codecRegistry.byContentType(contentType) match {
63+
case Some(codec) =>
64+
given MessageCodec[F] = codec
65+
66+
val media = Media[F](Stream.chunk(Chunk.array(message.getBytes)), req.headers)
67+
68+
methodRegistry.get(grpcMethod) match {
69+
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
70+
case Some(entry) if entry.methodDescriptor.isSafe || true =>
71+
entry.methodDescriptor.getType match
72+
case MethodType.UNARY =>
73+
handleUnary(dsl, entry, media, ipChannel)
74+
case unsupported =>
75+
NotImplemented(connectrpc.Error(
76+
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
77+
message = s"Unsupported method type: $unsupported".some
78+
))
79+
case Some(_) =>
80+
Forbidden(connectrpc.Error(
81+
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
82+
message = s"Method supports calling using POST: $grpcMethod".some
83+
))
84+
case None =>
85+
NotFound(connectrpc.Error(
86+
code = io.grpc.Status.NOT_FOUND.toConnectCode,
87+
message = s"Method not found: $grpcMethod".some
88+
))
89+
}
90+
case None =>
91+
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
92+
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
93+
}
5794
case req@Method.POST -> Root / serviceName / methodName =>
5895
val grpcMethod = grpcMethodName(serviceName, methodName)
5996
val contentType = req.headers.get[`Content-Type`].map(_.mediaType)
@@ -79,7 +116,8 @@ object ConnectRpcHttpRoutes {
79116
))
80117
}
81118
case None =>
82-
UnsupportedMediaType(s"Unsupported Content-Type header ${contentType.map(_.show).orNull}")
119+
UnsupportedMediaType(s"Unsupported content-type ${contentType.map(_.show).orNull}. " +
120+
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
83121
}
84122
}
85123
}
@@ -88,7 +126,7 @@ object ConnectRpcHttpRoutes {
88126
private def handleUnary[F[_] : Async](
89127
dsl: Http4sDsl[F],
90128
entry: RegistryEntry,
91-
req: Request[F],
129+
req: Media[F],
92130
channel: Channel
93131
)(using codec: MessageCodec[F]): F[Response[F]] = {
94132
import dsl.*
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.ivovk.connect_rpc_scala.http
2+
3+
import org.http4s.MediaType
4+
5+
import scala.annotation.targetName
6+
7+
object MediaTypes {
8+
9+
@targetName("applicationJson")
10+
val `application/json`: MediaType = MediaType.application.json
11+
12+
@targetName("applicationProto")
13+
val `application/proto`: MediaType = MediaType.unsafeParse("application/proto")
14+
15+
val allSupported: Seq[MediaType] = List(`application/json`, `application/proto`)
16+
17+
}

core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](jsonPrinter: Printer) extends
3636

3737
private val logger: Logger = LoggerFactory.getLogger(getClass)
3838

39-
override val mediaType: MediaType = MediaType.application.`json`
39+
override val mediaType: MediaType = MediaTypes.`application/json`
4040

4141
override def decode[A <: Message](m: Media[F])(using cmp: Companion[A]): DecodeResult[F, A] = {
4242
val charset = m.charset.getOrElse(Charset.`UTF-8`).nioCharset
@@ -72,8 +72,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
7272

7373
private val logger: Logger = LoggerFactory.getLogger(getClass)
7474

75-
override val mediaType: MediaType =
76-
MediaType.unsafeParse("application/proto")
75+
override val mediaType: MediaType = MediaTypes.`application/proto`
7776

7877
override def decode[A <: Message](m: Media[F])(using cmp: Companion[A]): DecodeResult[F, A] = {
7978
val f = toInputStreamResource(decompressed(m)).use { is =>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.ivovk.connect_rpc_scala.http
2+
3+
import org.http4s.dsl.impl.QueryParamDecoderMatcher
4+
import org.http4s.{Charset, MediaType, ParseFailure, QueryParamDecoder}
5+
6+
import java.net.URLDecoder
7+
8+
object QueryParams {
9+
10+
private val encodingQPDecoder = QueryParamDecoder[String].emap {
11+
case "json" => Right(MediaTypes.`application/json`)
12+
case "proto" => Right(MediaTypes.`application/proto`)
13+
case other => Left(ParseFailure(other, "Invalid encoding"))
14+
}
15+
16+
object EncodingQP extends QueryParamDecoderMatcher[MediaType]("encoding")(encodingQPDecoder)
17+
18+
private val messageQPDecoder = QueryParamDecoder[String]
19+
.map(URLDecoder.decode(_, Charset.`UTF-8`.nioCharset))
20+
21+
object MessageQP extends QueryParamDecoderMatcher[String]("message")(messageQPDecoder)
22+
23+
}

core/src/test/protobuf/TestService.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ package org.ivovk.connect_rpc_scala.test;
44

55
service TestService {
66
rpc Add(AddRequest) returns (AddResponse) {}
7+
8+
// This method can be called using GET request
9+
rpc Get(GetRequest) returns (GetResponse) {
10+
option idempotency_level = NO_SIDE_EFFECTS;
11+
}
712
}
813

914
message AddRequest {
@@ -14,3 +19,11 @@ message AddRequest {
1419
message AddResponse {
1520
int32 sum = 1;
1621
}
22+
23+
message GetRequest {
24+
string key = 1;
25+
}
26+
27+
message GetResponse {
28+
string value = 1;
29+
}

0 commit comments

Comments
 (0)