Skip to content

Refactor: split into builder and handler; integrate with tpolecat ext… #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ val httpServer: Resource[IO, org.http4s.server.Server] = {

for {
// Create httpApp with Connect-RPC routes, specifying your GRPC services
httpApp <- ConnectRpcHttpRoutes.create[IO](grpcServices)
.map(_.orNotFound)
httpApp <- ConnectRouteBuilder.forServices[IO](grpcServices).build

// Create http server
httpServer <- EmberServerBuilder.default[IO]
Expand All @@ -121,15 +120,13 @@ Here is how you can integrate OpenTelemetry with the Connect-RPC server:
val grpcServices: Seq[io.grpc.ServiceDefinition] = ??? // Your GRPC service(s)
val grpcOtel : GrpcOpenTelemetry = ??? // GrpcOpenTelemetry instance

ConnectRpcHttpRoutes.create[IO](
grpcServices,
Configuration.default
// Configure the server to use the same opentelemetry instance as the main server
.withServerConfigurator { sb =>
grpcOtel.configureServerBuilder(sb)
sb
}
)
ConnectRouteBuilder.forServices[IO](grpcServices)
// Configure the server to use the same opentelemetry instance as the main server
.withServerConfigurator { sb =>
grpcOtel.configureServerBuilder(sb)
sb
}
.build
```

This will make sure that all the traffic going through the Connect-RPC server will be captured by the same
Expand Down
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.typelevel.scalacoptions.ScalacOptions

ThisBuild / scalaVersion := "3.3.4"

ThisBuild / organization := "io.github.igor-vovk"
Expand All @@ -14,6 +16,12 @@ ThisBuild / developers := List(
)
ThisBuild / sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeCentralHost

ThisBuild / tpolecatExcludeOptions ++= Set(
ScalacOptions.warnNonUnitStatement,
ScalacOptions.warnValueDiscard,
)


lazy val noPublish = List(
publish := {},
publishLocal := {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ enum Codec {
CODEC_UNSPECIFIED = 0;
CODEC_PROTO = 1;
CODEC_JSON = 2;
CODEC_TEXT = 3 [deprecated = true]; // not used; will be ignored
}

enum Compression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import cats.implicits.*
import com.google.protobuf.ByteString
import connectrpc.conformance.v1.*
import io.grpc.{Metadata, Status, StatusRuntimeException}
import org.slf4j.LoggerFactory
import scalapb.TextFormat

import java.util.concurrent.TimeUnit
Expand All @@ -18,8 +17,6 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai

import org.ivovk.connect_rpc_scala.Mappings.*

private val logger = LoggerFactory.getLogger(getClass)

override def unary(
request: UnaryRequest,
ctx: Metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.{IO, IOApp, Sync}
import com.comcast.ip4s.{Port, host, port}
import connectrpc.conformance.v1.{ConformanceServiceFs2GrpcTrailers, ServerCompatRequest, ServerCompatResponse}
import org.http4s.ember.server.EmberServerBuilder
import org.ivovk.connect_rpc_scala.{Configuration, ConnectRpcHttpRoutes}
import org.ivovk.connect_rpc_scala.ConnectRouteBuilder
import scalapb.json4s.TypeRegistry

import java.io.InputStream
Expand Down Expand Up @@ -36,27 +36,23 @@ object Main extends IOApp.Simple {
ConformanceServiceImpl[IO]()
)

httpApp <- ConnectRpcHttpRoutes
.create[IO](
Seq(service),
Configuration.default
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
// JSON-serialization conformance tests
.withJsonPrinterConfigurator { p =>
p.withTypeRegistry(
TypeRegistry.default
.addMessage[connectrpc.conformance.v1.UnaryRequest]
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
)
}
)
.map(r => r.orNotFound)
app <- ConnectRouteBuilder.forService[IO](service)
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
// JSON-serialization conformance tests
.withJsonPrinterConfigurator { p =>
p.withTypeRegistry(
TypeRegistry.default
.addMessage[connectrpc.conformance.v1.UnaryRequest]
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
)
}
.build

server <- EmberServerBuilder.default[IO]
.withHost(host"127.0.0.1")
.withPort(port"0") // random port
.withHttpApp(httpApp)
.withHttpApp(app)
.build

addr = server.address
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,84 @@ package org.ivovk.connect_rpc_scala

import cats.Endo
import cats.data.EitherT
import cats.effect.{Async, Resource}
import cats.effect.Async
import cats.implicits.*
import fs2.compression.Compression
import io.grpc.*
import io.grpc.{CallOptions, Channel, ClientInterceptors, Metadata, StatusException, StatusRuntimeException}
import io.grpc.MethodDescriptor.MethodType
import io.grpc.stub.MetadataUtils
import org.http4s.*
import org.http4s.dsl.Http4sDsl
import org.ivovk.connect_rpc_scala.http.*
import org.http4s.{MediaType, Method, Response}
import org.ivovk.connect_rpc_scala.Mappings.*
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
import org.ivovk.connect_rpc_scala.http.MessageCodec.given
import org.ivovk.connect_rpc_scala.http.QueryParams.*
import org.ivovk.connect_rpc_scala.http.{MediaTypes, MessageCodec, MessageCodecRegistry, RequestEntity}
import org.slf4j.{Logger, LoggerFactory}
import scalapb.grpc.ClientCalls
import scalapb.json4s.JsonFormat
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TextFormat}

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.MILLISECONDS
import scala.concurrent.duration.*
import scala.util.chaining.*


object ConnectRpcHttpRoutes {

import Mappings.*
class ConnectHandler[F[_]: Async](
codecRegistry: MessageCodecRegistry[F],
methodRegistry: MethodRegistry,
channel: Channel,
httpDsl: Http4sDsl[F],
) {
import httpDsl.*

private val logger: Logger = LoggerFactory.getLogger(getClass)

def create[F[_] : Async](
services: Seq[ServerServiceDefinition],
configuration: Configuration = Configuration.default
): Resource[F, HttpRoutes[F]] = {
val dsl = Http4sDsl[F]
import dsl.*

val jsonPrinter = configuration.jsonPrinterConfigurator(JsonFormat.printer)

val codecRegistry = MessageCodecRegistry[F](
JsonMessageCodec[F](jsonPrinter),
ProtoMessageCodec[F],
)

val methodRegistry = MethodRegistry(services)

for
ipChannel <- InProcessChannelBridge.create(
services,
configuration.serverBuilderConfigurator,
configuration.channelBuilderConfigurator,
configuration.waitForShutdown,
def handle(
httpMethod: Method,
contentType: Option[MediaType],
entity: RequestEntity[F],
grpcMethodName: String,
): F[Response[F]] = {
val eitherT = for
given MessageCodec[F] <- EitherT.fromOptionM(
contentType.flatMap(codecRegistry.byContentType).pure[F],
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
)
yield
def handle(
httpMethod: Method,
contentType: Option[MediaType],
entity: RequestEntity[F],
grpcMethod: String,
): F[Response[F]] = {
val eitherT = for
given MessageCodec[F] <- EitherT.fromOptionM(
contentType.flatMap(codecRegistry.byContentType).pure[F],
UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " +
s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}")
)

method <- EitherT.fromOptionM(
methodRegistry.get(grpcMethod).pure[F],
NotFound(connectrpc.Error(
code = io.grpc.Status.NOT_FOUND.toConnectCode,
message = s"Method not found: $grpcMethod".some
))
)

_ <- EitherT.cond[F](
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
httpMethod == Method.POST || (httpMethod == Method.GET && method.descriptor.isSafe) || true,
(),
Forbidden(connectrpc.Error(
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
message = s"Only POST-requests are allowed for method: $grpcMethod".some
))
).leftSemiflatMap(identity)

response <- method.descriptor.getType match
case MethodType.UNARY =>
EitherT.right(handleUnary(dsl, method, entity, ipChannel))
case unsupported =>
EitherT.left(NotImplemented(connectrpc.Error(
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
message = s"Unsupported method type: $unsupported".some
)))
yield response

eitherT.merge
}

HttpRoutes.of[F] {
case req@Method.GET -> Root / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) =>
val grpcMethod = grpcMethodName(serviceName, methodName)
val entity = RequestEntity[F](message, req.headers)

handle(Method.GET, contentType.some, entity, grpcMethod)
case req@Method.POST -> Root / serviceName / methodName =>
val grpcMethod = grpcMethodName(serviceName, methodName)
val contentType = req.contentType.map(_.mediaType)
val entity = RequestEntity[F](req)
method <- EitherT.fromOptionM(
methodRegistry.get(grpcMethodName).pure[F],
NotFound(connectrpc.Error(
code = io.grpc.Status.NOT_FOUND.toConnectCode,
message = s"Method not found: $grpcMethodName".some
))
)

handle(Method.POST, contentType, entity, grpcMethod)
}
_ <- EitherT.cond[F](
// Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged
httpMethod == Method.POST || (httpMethod == Method.GET && method.descriptor.isSafe) || true,
(),
Forbidden(connectrpc.Error(
code = io.grpc.Status.PERMISSION_DENIED.toConnectCode,
message = s"Only POST-requests are allowed for method: $grpcMethodName".some
))
).leftSemiflatMap(identity)

response <- method.descriptor.getType match
case MethodType.UNARY =>
EitherT.right(handleUnary(method, entity, channel))
case unsupported =>
EitherT.left(NotImplemented(connectrpc.Error(
code = io.grpc.Status.UNIMPLEMENTED.toConnectCode,
message = s"Unsupported method type: $unsupported".some
)))
yield response

eitherT.merge
}

private def handleUnary[F[_] : Async](
dsl: Http4sDsl[F],
private def handleUnary(
method: MethodRegistry.Entry,
req: RequestEntity[F],
channel: Channel
)(using codec: MessageCodec[F]): F[Response[F]] = {
import dsl.*

if (logger.isTraceEnabled) {
// Used in conformance tests
req.headers.get[`X-Test-Case-Name`] match {
Expand Down Expand Up @@ -204,8 +163,8 @@ object ConnectRpcHttpRoutes {
(messageParts.mkString("\n"), details)
)

val message = messageWithDetails.map(_._1)
val details = messageWithDetails.map(_._2).getOrElse(Seq.empty)
//val message = messageWithDetails.map(_._1)
//val details = messageWithDetails.map(_._2).getOrElse(Seq.empty)

val httpStatus = grpcStatus.toHttpStatus
val connectCode = grpcStatus.toConnectCode
Expand All @@ -223,6 +182,4 @@ object ConnectRpcHttpRoutes {
}
}

private inline def grpcMethodName(service: String, method: String): String = service + "/" + method

}
Loading