Skip to content

Commit d321218

Browse files
committed
Refactor: split into builder and handler; integrate with tpolecat extension
1 parent aec5296 commit d321218

File tree

11 files changed

+197
-177
lines changed

11 files changed

+197
-177
lines changed

README.md

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ val httpServer: Resource[IO, org.http4s.server.Server] = {
9494
9595
for {
9696
// Create httpApp with Connect-RPC routes, specifying your GRPC services
97-
httpApp <- ConnectRpcHttpRoutes.create[IO](grpcServices)
98-
.map(_.orNotFound)
97+
httpApp <- ConnectRouteBuilder.forServices[IO](grpcServices).build
9998
10099
// Create http server
101100
httpServer <- EmberServerBuilder.default[IO]
@@ -121,15 +120,13 @@ Here is how you can integrate OpenTelemetry with the Connect-RPC server:
121120
val grpcServices: Seq[io.grpc.ServiceDefinition] = ??? // Your GRPC service(s)
122121
val grpcOtel : GrpcOpenTelemetry = ??? // GrpcOpenTelemetry instance
123122
124-
ConnectRpcHttpRoutes.create[IO](
125-
grpcServices,
126-
Configuration.default
127-
// Configure the server to use the same opentelemetry instance as the main server
128-
.withServerConfigurator { sb =>
129-
grpcOtel.configureServerBuilder(sb)
130-
sb
131-
}
132-
)
123+
ConnectRouteBuilder.forServices[IO](grpcServices)
124+
// Configure the server to use the same opentelemetry instance as the main server
125+
.withServerConfigurator { sb =>
126+
grpcOtel.configureServerBuilder(sb)
127+
sb
128+
}
129+
.build
133130
```
134131

135132
This will make sure that all the traffic going through the Connect-RPC server will be captured by the same

build.sbt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import org.typelevel.scalacoptions.ScalacOptions
2+
13
ThisBuild / scalaVersion := "3.3.4"
24

35
ThisBuild / organization := "io.github.igor-vovk"
@@ -14,6 +16,12 @@ ThisBuild / developers := List(
1416
)
1517
ThisBuild / sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeCentralHost
1618

19+
ThisBuild / tpolecatExcludeOptions ++= Set(
20+
ScalacOptions.warnNonUnitStatement,
21+
ScalacOptions.warnValueDiscard,
22+
)
23+
24+
1725
lazy val noPublish = List(
1826
publish := {},
1927
publishLocal := {},

conformance/src/main/protobuf/connectrpc/conformance/v1/config.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ enum Codec {
133133
CODEC_UNSPECIFIED = 0;
134134
CODEC_PROTO = 1;
135135
CODEC_JSON = 2;
136-
CODEC_TEXT = 3 [deprecated = true]; // not used; will be ignored
137136
}
138137

139138
enum Compression {

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import cats.implicits.*
55
import com.google.protobuf.ByteString
66
import connectrpc.conformance.v1.*
77
import io.grpc.{Metadata, Status, StatusRuntimeException}
8-
import org.slf4j.LoggerFactory
98
import scalapb.TextFormat
109

1110
import java.util.concurrent.TimeUnit
@@ -18,8 +17,6 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
1817

1918
import org.ivovk.connect_rpc_scala.Mappings.*
2019

21-
private val logger = LoggerFactory.getLogger(getClass)
22-
2320
override def unary(
2421
request: UnaryRequest,
2522
ctx: Metadata

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import cats.effect.{IO, IOApp, Sync}
44
import com.comcast.ip4s.{Port, host, port}
55
import connectrpc.conformance.v1.{ConformanceServiceFs2GrpcTrailers, ServerCompatRequest, ServerCompatResponse}
66
import org.http4s.ember.server.EmberServerBuilder
7-
import org.ivovk.connect_rpc_scala.{Configuration, ConnectRpcHttpRoutes}
7+
import org.ivovk.connect_rpc_scala.ConnectRouteBuilder
88
import scalapb.json4s.TypeRegistry
99

1010
import java.io.InputStream
@@ -36,27 +36,23 @@ object Main extends IOApp.Simple {
3636
ConformanceServiceImpl[IO]()
3737
)
3838

39-
httpApp <- ConnectRpcHttpRoutes
40-
.create[IO](
41-
Seq(service),
42-
Configuration.default
43-
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
44-
// JSON-serialization conformance tests
45-
.withJsonPrinterConfigurator { p =>
46-
p.withTypeRegistry(
47-
TypeRegistry.default
48-
.addMessage[connectrpc.conformance.v1.UnaryRequest]
49-
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
50-
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
51-
)
52-
}
53-
)
54-
.map(r => r.orNotFound)
39+
app <- ConnectRouteBuilder.forService[IO](service)
40+
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
41+
// JSON-serialization conformance tests
42+
.withJsonPrinterConfigurator { p =>
43+
p.withTypeRegistry(
44+
TypeRegistry.default
45+
.addMessage[connectrpc.conformance.v1.UnaryRequest]
46+
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
47+
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
48+
)
49+
}
50+
.build
5551

5652
server <- EmberServerBuilder.default[IO]
5753
.withHost(host"127.0.0.1")
5854
.withPort(port"0") // random port
59-
.withHttpApp(httpApp)
55+
.withHttpApp(app)
6056
.build
6157

6258
addr = server.address

core/src/main/scala/org/ivovk/connect_rpc_scala/Configuration.scala

Lines changed: 0 additions & 33 deletions
This file was deleted.

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala renamed to core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala

Lines changed: 55 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2,125 +2,84 @@ package org.ivovk.connect_rpc_scala
22

33
import cats.Endo
44
import cats.data.EitherT
5-
import cats.effect.{Async, Resource}
5+
import cats.effect.Async
66
import cats.implicits.*
7-
import fs2.compression.Compression
8-
import io.grpc.*
7+
import io.grpc.{CallOptions, Channel, ClientInterceptors, Metadata, StatusException, StatusRuntimeException}
98
import io.grpc.MethodDescriptor.MethodType
109
import io.grpc.stub.MetadataUtils
11-
import org.http4s.*
1210
import org.http4s.dsl.Http4sDsl
13-
import org.ivovk.connect_rpc_scala.http.*
11+
import org.http4s.{MediaType, Method, Response}
12+
import org.ivovk.connect_rpc_scala.Mappings.*
1413
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
1514
import org.ivovk.connect_rpc_scala.http.MessageCodec.given
16-
import org.ivovk.connect_rpc_scala.http.QueryParams.*
15+
import org.ivovk.connect_rpc_scala.http.{MediaTypes, MessageCodec, MessageCodecRegistry, RequestEntity}
1716
import org.slf4j.{Logger, LoggerFactory}
1817
import scalapb.grpc.ClientCalls
19-
import scalapb.json4s.JsonFormat
2018
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TextFormat}
2119

2220
import java.util.concurrent.atomic.AtomicReference
23-
import scala.concurrent.duration.MILLISECONDS
21+
import scala.concurrent.duration.*
2422
import scala.util.chaining.*
2523

26-
27-
object ConnectRpcHttpRoutes {
28-
29-
import Mappings.*
24+
class ConnectHandler[F[_]: Async](
25+
codecRegistry: MessageCodecRegistry[F],
26+
methodRegistry: MethodRegistry,
27+
channel: Channel,
28+
httpDsl: Http4sDsl[F],
29+
) {
30+
import httpDsl.*
3031

3132
private val logger: Logger = LoggerFactory.getLogger(getClass)
3233

33-
def create[F[_] : Async](
34-
services: Seq[ServerServiceDefinition],
35-
configuration: Configuration = Configuration.default
36-
): Resource[F, HttpRoutes[F]] = {
37-
val dsl = Http4sDsl[F]
38-
import dsl.*
39-
40-
val jsonPrinter = configuration.jsonPrinterConfigurator(JsonFormat.printer)
41-
42-
val codecRegistry = MessageCodecRegistry[F](
43-
JsonMessageCodec[F](jsonPrinter),
44-
ProtoMessageCodec[F],
45-
)
46-
47-
val methodRegistry = MethodRegistry(services)
48-
49-
for
50-
ipChannel <- InProcessChannelBridge.create(
51-
services,
52-
configuration.serverBuilderConfigurator,
53-
configuration.channelBuilderConfigurator,
54-
configuration.waitForShutdown,
34+
def handle(
35+
httpMethod: Method,
36+
contentType: Option[MediaType],
37+
entity: RequestEntity[F],
38+
grpcMethodName: String,
39+
): F[Response[F]] = {
40+
val eitherT = for
41+
given MessageCodec[F] <- EitherT.fromOptionM(
42+
contentType.flatMap(codecRegistry.byContentType).pure[F],
43+
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
44+
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
5545
)
56-
yield
57-
def handle(
58-
httpMethod: Method,
59-
contentType: Option[MediaType],
60-
entity: RequestEntity[F],
61-
grpcMethod: String,
62-
): F[Response[F]] = {
63-
val eitherT = for
64-
given MessageCodec[F] <- EitherT.fromOptionM(
65-
contentType.flatMap(codecRegistry.byContentType).pure[F],
66-
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
67-
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
68-
)
69-
70-
method <- EitherT.fromOptionM(
71-
methodRegistry.get(grpcMethod).pure[F],
72-
NotFound(connectrpc.Error(
73-
code = io.grpc.Status.NOT_FOUND.toConnectCode,
74-
message = s"Method not found: $grpcMethod".some
75-
))
76-
)
7746

78-
_ <- EitherT.cond[F](
79-
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
80-
httpMethod == Method.POST || (httpMethod == Method.GET && method.descriptor.isSafe) || true,
81-
(),
82-
Forbidden(connectrpc.Error(
83-
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
84-
message = s"Only POST-requests are allowed for method: $grpcMethod".some
85-
))
86-
).leftSemiflatMap(identity)
87-
88-
response <- method.descriptor.getType match
89-
case MethodType.UNARY =>
90-
EitherT.right(handleUnary(dsl, method, entity, ipChannel))
91-
case unsupported =>
92-
EitherT.left(NotImplemented(connectrpc.Error(
93-
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
94-
message = s"Unsupported method type: $unsupported".some
95-
)))
96-
yield response
97-
98-
eitherT.merge
99-
}
100-
101-
HttpRoutes.of[F] {
102-
case req@Method.GET -> Root / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) =>
103-
val grpcMethod = grpcMethodName(serviceName, methodName)
104-
val entity = RequestEntity[F](message, req.headers)
105-
106-
handle(Method.GET, contentType.some, entity, grpcMethod)
107-
case req@Method.POST -> Root / serviceName / methodName =>
108-
val grpcMethod = grpcMethodName(serviceName, methodName)
109-
val contentType = req.contentType.map(_.mediaType)
110-
val entity = RequestEntity[F](req)
47+
method <- EitherT.fromOptionM(
48+
methodRegistry.get(grpcMethodName).pure[F],
49+
NotFound(connectrpc.Error(
50+
code = io.grpc.Status.NOT_FOUND.toConnectCode,
51+
message = s"Method not found: $grpcMethodName".some
52+
))
53+
)
11154

112-
handle(Method.POST, contentType, entity, grpcMethod)
113-
}
55+
_ <- EitherT.cond[F](
56+
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
57+
httpMethod == Method.POST || (httpMethod == Method.GET && method.descriptor.isSafe) || true,
58+
(),
59+
Forbidden(connectrpc.Error(
60+
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
61+
message = s"Only POST-requests are allowed for method: $grpcMethodName".some
62+
))
63+
).leftSemiflatMap(identity)
64+
65+
response <- method.descriptor.getType match
66+
case MethodType.UNARY =>
67+
EitherT.right(handleUnary(method, entity, channel))
68+
case unsupported =>
69+
EitherT.left(NotImplemented(connectrpc.Error(
70+
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
71+
message = s"Unsupported method type: $unsupported".some
72+
)))
73+
yield response
74+
75+
eitherT.merge
11476
}
11577

116-
private def handleUnary[F[_] : Async](
117-
dsl: Http4sDsl[F],
78+
private def handleUnary(
11879
method: MethodRegistry.Entry,
11980
req: RequestEntity[F],
12081
channel: Channel
12182
)(using codec: MessageCodec[F]): F[Response[F]] = {
122-
import dsl.*
123-
12483
if (logger.isTraceEnabled) {
12584
// Used in conformance tests
12685
req.headers.get[`X-Test-Case-Name`] match {
@@ -204,8 +163,8 @@ object ConnectRpcHttpRoutes {
204163
(messageParts.mkString("\n"), details)
205164
)
206165

207-
val message = messageWithDetails.map(_._1)
208-
val details = messageWithDetails.map(_._2).getOrElse(Seq.empty)
166+
//val message = messageWithDetails.map(_._1)
167+
//val details = messageWithDetails.map(_._2).getOrElse(Seq.empty)
209168

210169
val httpStatus = grpcStatus.toHttpStatus
211170
val connectCode = grpcStatus.toConnectCode
@@ -223,6 +182,4 @@ object ConnectRpcHttpRoutes {
223182
}
224183
}
225184

226-
private inline def grpcMethodName(service: String, method: String): String = service + "/" + method
227-
228185
}

0 commit comments

Comments
 (0)