Skip to content

Commit 68513b3

Browse files
authored
Initial preparations to support GRPC Transcoding (#50)
1 parent 8960a07 commit 68513b3

File tree

17 files changed

+213
-116
lines changed

17 files changed

+213
-116
lines changed

build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ lazy val core = project
5454
"io.grpc" % "grpc-protobuf" % Versions.grpc,
5555
"io.grpc" % "grpc-inprocess" % Versions.grpc,
5656

57+
"com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0" % "protobuf",
58+
"com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0",
59+
5760
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
5861

5962
"org.http4s" %% "http4s-dsl" % Versions.http4s,

conformance/src/main/protobuf/connectrpc/package.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ option (scalapb.options) = {
1111
enum_value_naming: CAMEL_CASE
1212
enum_strip_prefix: true
1313
preserve_unknown_fields: false
14+
scala3_sources: true
1415
};

conformance/src/main/resources/logback.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,6 @@
33

44
<configuration>
55

6-
<!-- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">-->
7-
<!-- <encoder>-->
8-
<!-- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
9-
<!-- </encoder>-->
10-
<!-- </appender>-->
11-
12-
<!-- <appender name="NOOP" class="ch.qos.logback.core.helpers.NOPAppender"/>-->
13-
146
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
157
<file>${LOGS_PATH}/out.log</file>
168
<append>true</append>

core/src/main/protobuf/connectrpc/package.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ option (scalapb.options) = {
1111
enum_value_naming: CAMEL_CASE
1212
enum_strip_prefix: true
1313
preserve_unknown_fields: false
14+
scala3_sources: true
1415
};

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala

Lines changed: 15 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package org.ivovk.connect_rpc_scala
22

3-
import cats.data.EitherT
43
import cats.effect.Async
54
import cats.implicits.*
65
import io.grpc.*
76
import io.grpc.MethodDescriptor.MethodType
87
import org.http4s.dsl.Http4sDsl
9-
import org.http4s.{Header, MediaType, MessageFailure, Method, Response}
8+
import org.http4s.{Header, MessageFailure, Response}
109
import org.ivovk.connect_rpc_scala.Mappings.*
11-
import org.ivovk.connect_rpc_scala.grpc.{ClientCalls, GrpcHeaders, MethodName, MethodRegistry}
10+
import org.ivovk.connect_rpc_scala.grpc.{ClientCalls, GrpcHeaders, MethodRegistry}
1211
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
12+
import org.ivovk.connect_rpc_scala.http.RequestEntity
13+
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec
1314
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given
14-
import org.ivovk.connect_rpc_scala.http.codec.{MessageCodec, MessageCodecRegistry}
15-
import org.ivovk.connect_rpc_scala.http.{MediaTypes, RequestEntity}
1615
import org.slf4j.{Logger, LoggerFactory}
1716
import scalapb.GeneratedMessage
1817

@@ -21,8 +20,6 @@ import scala.jdk.CollectionConverters.*
2120
import scala.util.chaining.*
2221

2322
class ConnectHandler[F[_] : Async](
24-
codecRegistry: MessageCodecRegistry[F],
25-
methodRegistry: MethodRegistry,
2623
channel: Channel,
2724
httpDsl: Http4sDsl[F],
2825
treatTrailersAsHeaders: Boolean,
@@ -33,53 +30,22 @@ class ConnectHandler[F[_] : Async](
3330
private val logger: Logger = LoggerFactory.getLogger(getClass)
3431

3532
def handle(
36-
httpMethod: Method,
37-
contentType: Option[MediaType],
38-
entity: RequestEntity[F],
39-
grpcMethodName: MethodName,
40-
): F[Response[F]] = {
41-
val eitherT = for
42-
given MessageCodec[F] <- EitherT.fromOptionM(
43-
contentType.flatMap(codecRegistry.byContentType).pure[F],
44-
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
45-
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
46-
)
47-
48-
method <- EitherT.fromOptionM(
49-
methodRegistry.get(grpcMethodName).pure[F],
50-
NotFound(connectrpc.Error(
51-
code = io.grpc.Status.NOT_FOUND.toConnectCode,
52-
message = s"Method not found: ${grpcMethodName.fullyQualifiedName}".some
53-
))
54-
)
55-
56-
_ <- EitherT.cond[F](
57-
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
58-
httpMethod == Method.POST || (httpMethod == Method.GET && method.descriptor.isSafe) || true,
59-
(),
60-
Forbidden(connectrpc.Error(
61-
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
62-
message = s"Only POST-requests are allowed for method: ${grpcMethodName.fullyQualifiedName}".some
33+
req: RequestEntity[F],
34+
method: MethodRegistry.Entry,
35+
)(using MessageCodec[F]): F[Response[F]] = {
36+
method.descriptor.getType match
37+
case MethodType.UNARY =>
38+
handleUnary(req, method)
39+
case unsupported =>
40+
NotImplemented(connectrpc.Error(
41+
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
42+
message = s"Unsupported method type: $unsupported".some
6343
))
64-
).leftSemiflatMap(identity)
65-
66-
response <- method.descriptor.getType match
67-
case MethodType.UNARY =>
68-
EitherT.right(handleUnary(method, entity, channel))
69-
case unsupported =>
70-
EitherT.left(NotImplemented(connectrpc.Error(
71-
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
72-
message = s"Unsupported method type: $unsupported".some
73-
)))
74-
yield response
75-
76-
eitherT.merge
7744
}
7845

7946
private def handleUnary(
80-
method: MethodRegistry.Entry,
8147
req: RequestEntity[F],
82-
channel: Channel
48+
method: MethodRegistry.Entry,
8349
)(using MessageCodec[F]): F[Response[F]] = {
8450
if (logger.isTraceEnabled) {
8551
// Used in conformance tests

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package org.ivovk.connect_rpc_scala
22

33
import cats.Endo
4+
import cats.data.OptionT
45
import cats.effect.{Async, Resource}
56
import cats.implicits.*
67
import io.grpc.{ManagedChannelBuilder, ServerBuilder, ServerServiceDefinition}
78
import org.http4s.dsl.Http4sDsl
8-
import org.http4s.{HttpApp, HttpRoutes, Method, Uri}
9+
import org.http4s.{HttpApp, HttpRoutes, MediaType, Method, Response, Uri}
910
import org.ivovk.connect_rpc_scala.grpc.*
1011
import org.ivovk.connect_rpc_scala.http.*
1112
import org.ivovk.connect_rpc_scala.http.QueryParams.*
12-
import org.ivovk.connect_rpc_scala.http.codec.{JsonMessageCodec, JsonMessageCodecBuilder, MessageCodecRegistry, ProtoMessageCodec}
13+
import org.ivovk.connect_rpc_scala.http.codec.*
1314

1415
import java.util.concurrent.Executor
1516
import scala.concurrent.ExecutionContext
@@ -124,29 +125,56 @@ final class ConnectRouteBuilder[F[_] : Async] private(
124125
)
125126
yield
126127
val handler = new ConnectHandler(
127-
codecRegistry,
128-
methodRegistry,
129128
channel,
130129
httpDsl,
131130
treatTrailersAsHeaders,
132131
)
133132

134-
HttpRoutes.of[F] {
135-
case req@Method.GET -> `pathPrefix` / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) =>
136-
val grpcMethod = MethodName(serviceName, methodName)
137-
val entity = RequestEntity[F](message, req.headers)
138-
139-
handler.handle(Method.GET, contentType.some, entity, grpcMethod)
140-
case req@Method.POST -> `pathPrefix` / serviceName / methodName =>
141-
val grpcMethod = MethodName(serviceName, methodName)
142-
val contentType = req.contentType.map(_.mediaType)
143-
val entity = RequestEntity[F](req)
144-
145-
handler.handle(Method.POST, contentType, entity, grpcMethod)
133+
HttpRoutes[F] {
134+
case req@Method.GET -> `pathPrefix` / service / method :? EncodingQP(mediaType) +& MessageQP(message) =>
135+
OptionT.fromOption[F](methodRegistry.get(service, method))
136+
// Temporary support GET-requests for all methods,
137+
// until https://github.com/scalapb/ScalaPB/pull/1774 is merged
138+
.filter(_.descriptor.isSafe || true)
139+
.semiflatMap { methodEntry =>
140+
withCodec(httpDsl, codecRegistry, mediaType.some) { codec =>
141+
val entity = RequestEntity[F](message, req.headers)
142+
143+
handler.handle(entity, methodEntry)(using codec)
144+
}
145+
}
146+
case req@Method.POST -> `pathPrefix` / service / method =>
147+
OptionT.fromOption[F](methodRegistry.get(service, method))
148+
.semiflatMap { methodEntry =>
149+
withCodec(httpDsl, codecRegistry, req.contentType.map(_.mediaType)) { codec =>
150+
val entity = RequestEntity[F](req.body, req.headers)
151+
152+
handler.handle(entity, methodEntry)(using codec)
153+
}
154+
}
155+
case _ =>
156+
OptionT.none
146157
}
147158
}
148159

149160
def build: Resource[F, HttpApp[F]] =
150161
buildRoutes.map(_.orNotFound)
151162

163+
private def withCodec(
164+
dsl: Http4sDsl[F],
165+
registry: MessageCodecRegistry[F],
166+
mediaType: Option[MediaType]
167+
)(r: MessageCodec[F] => F[Response[F]]): F[Response[F]] = {
168+
import dsl.*
169+
170+
mediaType.flatMap(registry.byMediaType) match {
171+
case Some(codec) => r(codec)
172+
case None =>
173+
val message = s"Unsupported media-type ${mediaType.show}. " +
174+
s"Supported media types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}"
175+
176+
UnsupportedMediaType(message)
177+
}
178+
}
179+
152180
}

core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodName.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ type Service = String
66
type Method = String
77

88
object MethodName {
9-
def apply(descriptor: MethodDescriptor[_, _]): MethodName =
9+
def from(descriptor: MethodDescriptor[_, _]): MethodName =
1010
MethodName(descriptor.getServiceName, descriptor.getBareMethodName)
1111
}
1212

core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodRegistry.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package org.ivovk.connect_rpc_scala.grpc
22

3+
import com.google.api.AnnotationsProto
4+
import com.google.api.http.HttpRule
35
import io.grpc.{MethodDescriptor, ServerMethodDefinition, ServerServiceDefinition}
6+
import scalapb.grpc.ConcreteProtoMethodDescriptorSupplier
47
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}
58

69
import scala.jdk.CollectionConverters.*
710

811
object MethodRegistry {
912

1013
case class Entry(
14+
name: MethodName,
1115
requestMessageCompanion: GeneratedMessageCompanion[GeneratedMessage],
16+
httpRule: Option[HttpRule],
1217
descriptor: MethodDescriptor[GeneratedMessage, GeneratedMessage],
1318
)
1419

@@ -30,23 +35,37 @@ object MethodRegistry {
3035
val requestCompanion = companionField.get(requestMarshaller)
3136
.asInstanceOf[GeneratedMessageCompanion[GeneratedMessage]]
3237

33-
val methodEntry = Entry(
38+
val httpRule = extractHttpRule(methodDescriptor)
39+
40+
Entry(
41+
name = MethodName.from(methodDescriptor),
3442
requestMessageCompanion = requestCompanion,
43+
httpRule = httpRule,
3544
descriptor = methodDescriptor,
3645
)
37-
38-
MethodName(methodDescriptor) -> methodEntry
3946
}
40-
.groupMapReduce((mn, _) => mn.service)((mn, m) => Map(mn.method -> m))(_ ++ _)
47+
.groupMapReduce(_.name.service)(e => Map(e.name.method -> e))(_ ++ _)
4148

4249
new MethodRegistry(entries)
4350
}
4451

52+
private def extractHttpRule(methodDescriptor: MethodDescriptor[_, _]): Option[HttpRule] = {
53+
methodDescriptor.getSchemaDescriptor match
54+
case sd: ConcreteProtoMethodDescriptorSupplier =>
55+
val fields = sd.getMethodDescriptor.getOptions.getUnknownFields
56+
val fieldNumber = AnnotationsProto.http.getNumber
57+
58+
if fields.hasField(fieldNumber) then
59+
Some(HttpRule.parseFrom(fields.getField(fieldNumber).getLengthDelimitedList.get(0).toByteArray))
60+
else None
61+
case _ => None
62+
}
63+
4564
}
4665

4766
class MethodRegistry private(entries: Map[Service, Map[Method, MethodRegistry.Entry]]) {
4867

49-
def get(methodName: MethodName): Option[MethodRegistry.Entry] =
50-
entries.getOrElse(methodName.service, Map.empty).get(methodName.method)
68+
def get(service: Service, method: Method): Option[MethodRegistry.Entry] =
69+
entries.getOrElse(service, Map.empty).get(method)
5170

5271
}

core/src/main/scala/org/ivovk/connect_rpc_scala/http/RequestEntity.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,11 @@ package org.ivovk.connect_rpc_scala.http
33
import cats.MonadThrow
44
import fs2.Stream
55
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
6-
import org.http4s.{Charset, ContentCoding, Headers, Media}
6+
import org.http4s.{Charset, ContentCoding, Headers}
77
import org.ivovk.connect_rpc_scala.http.Headers.`Connect-Timeout-Ms`
88
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec
99
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
1010

11-
object RequestEntity {
12-
def apply[F[_]](m: Media[F]): RequestEntity[F] =
13-
RequestEntity(m.body, m.headers)
14-
}
1511

1612
/**
1713
* Encoded message and headers with the knowledge how this message can be decoded.
@@ -23,7 +19,7 @@ case class RequestEntity[F[_]](
2319
headers: Headers,
2420
) {
2521

26-
def contentType: Option[`Content-Type`] =
22+
private def contentType: Option[`Content-Type`] =
2723
headers.get[`Content-Type`]
2824

2925
def charset: Charset =

core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/MessageCodec.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package org.ivovk.connect_rpc_scala.http.codec
22

3-
import cats.Applicative
43
import org.http4s.headers.`Content-Type`
5-
import org.http4s.{DecodeResult, Entity, EntityDecoder, EntityEncoder, MediaRange, MediaType}
4+
import org.http4s.{DecodeResult, Entity, EntityEncoder, MediaType}
65
import org.ivovk.connect_rpc_scala.http.RequestEntity
76
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}
87

98
object MessageCodec {
10-
given [F[_] : Applicative, A <: Message](using codec: MessageCodec[F], cmp: Companion[A]): EntityDecoder[F, A] =
11-
EntityDecoder.decodeBy(MediaRange.`*/*`)(m => codec.decode(RequestEntity(m)))
12-
139
given [F[_], A <: Message](using codec: MessageCodec[F]): EntityEncoder[F, A] =
1410
EntityEncoder.encodeBy(`Content-Type`(codec.mediaType))(codec.encode)
1511
}

0 commit comments

Comments
 (0)