Skip to content

Commit 7ed47cb

Browse files
committed
Support external configuration for GRPC client and server builders
1 parent 4da983f commit 7ed47cb

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ object Main extends IOApp.Simple {
4040
.create[IO](
4141
Seq(service),
4242
Configuration(
43-
jsonPrinterConfiguration = { p =>
43+
jsonPrinterConfigurer = { p =>
4444
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
4545
// JSON-serialization conformance tests
4646
p.withTypeRegistry(

core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import scala.concurrent.duration.*
2525
import scala.util.chaining.*
2626

2727
case class Configuration(
28-
jsonPrinterConfiguration: Endo[Printer] = identity,
28+
jsonPrinterConfigurer: Endo[Printer] = identity,
29+
serverBuilderConfigurer: Endo[ServerBuilder[_]] = identity,
30+
channelBuilderConfigurer: Endo[ManagedChannelBuilder[_]] = identity,
2931
waitForShutdown: Duration = 10.seconds,
3032
)
3133

@@ -42,7 +44,7 @@ object ConnectRpcHttpRoutes {
4244
val dsl = Http4sDsl[F]
4345
import dsl.*
4446

45-
val jsonPrinter = configuration.jsonPrinterConfiguration(JsonFormat.printer)
47+
val jsonPrinter = configuration.jsonPrinterConfigurer(JsonFormat.printer)
4648

4749
val codecRegistry = MessageCodecRegistry[F](
4850
JsonMessageCodec[F](jsonPrinter),
@@ -52,7 +54,12 @@ object ConnectRpcHttpRoutes {
5254
val methodRegistry = MethodRegistry(services)
5355

5456
for
55-
ipChannel <- InProcessChannelBridge.create(services, configuration.waitForShutdown)
57+
ipChannel <- InProcessChannelBridge.create(
58+
services,
59+
configuration.serverBuilderConfigurer,
60+
configuration.channelBuilderConfigurer,
61+
configuration.waitForShutdown,
62+
)
5663
yield
5764
def handle(
5865
httpMethod: Method,

core/src/main/scala/org/ivovk/connect_rpc_scala/InProcessChannelBridge.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,42 @@
11
package org.ivovk.connect_rpc_scala
22

3+
import cats.Endo
34
import cats.effect.{Resource, Sync}
45
import cats.implicits.*
6+
import io.grpc.*
57
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
6-
import io.grpc.{Channel, ManagedChannel, Server, ServerServiceDefinition}
78

89
import java.util.concurrent.TimeUnit
910
import scala.concurrent.duration.Duration
1011
import scala.jdk.CollectionConverters.*
12+
import scala.util.chaining.*
1113

1214
object InProcessChannelBridge {
1315

1416
def create[F[_] : Sync](
1517
services: Seq[ServerServiceDefinition],
18+
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
19+
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
1620
waitForShutdown: Duration,
1721
): Resource[F, Channel] = {
1822
for
1923
name <- Resource.eval(Sync[F].delay(InProcessServerBuilder.generateName()))
20-
server <- createServer(name, services, waitForShutdown)
21-
channel <- createStub(name, waitForShutdown)
24+
server <- createServer(name, services, waitForShutdown, serverBuilderConfigurer)
25+
channel <- createStub(name, waitForShutdown, channelBuilderConfigurer)
2226
yield channel
2327
}
2428

2529
private def createServer[F[_] : Sync](
2630
name: String,
2731
services: Seq[ServerServiceDefinition],
2832
waitForShutdown: Duration,
33+
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
2934
): Resource[F, Server] = {
3035
val acquire = Sync[F].delay {
3136
InProcessServerBuilder.forName(name)
3237
.directExecutor()
3338
.addServices(services.asJava)
39+
.pipe(serverBuilderConfigurer)
3440
.build()
3541
.start()
3642
}
@@ -43,10 +49,12 @@ object InProcessChannelBridge {
4349
private def createStub[F[_] : Sync](
4450
name: String,
4551
waitForShutdown: Duration,
52+
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
4653
): Resource[F, ManagedChannel] = {
4754
val acquire = Sync[F].delay {
4855
InProcessChannelBuilder.forName(name)
4956
.directExecutor()
57+
.pipe(channelBuilderConfigurer)
5058
.build()
5159
}
5260
val release = (c: ManagedChannel) =>

0 commit comments

Comments
 (0)