1
1
package org .ivovk .connect_rpc_scala .http
2
2
3
3
import cats .Applicative
4
- import cats .data .EitherT
5
4
import cats .effect .{Async , Sync }
6
5
import cats .implicits .*
7
6
import com .google .protobuf .CodedOutputStream
@@ -10,7 +9,7 @@ import fs2.io.{readOutputStream, toInputStreamResource}
10
9
import fs2 .text .decodeWithCharset
11
10
import fs2 .{Chunk , Stream }
12
11
import org .http4s .headers .`Content-Type`
13
- import org .http4s .{ContentCoding , DecodeResult , Entity , EntityDecoder , EntityEncoder , MediaRange , MediaType }
12
+ import org .http4s .{ContentCoding , DecodeResult , Entity , EntityDecoder , EntityEncoder , InvalidMessageBodyFailure , MediaRange , MediaType }
14
13
import org .ivovk .connect_rpc_scala .ConnectRpcHttpRoutes .getClass
15
14
import org .slf4j .{Logger , LoggerFactory }
16
15
import scalapb .json4s .{JsonFormat , Printer }
@@ -54,17 +53,17 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
54
53
.compile.string
55
54
}
56
55
57
- val f = string
56
+ string
58
57
.flatMap { str =>
59
58
if (logger.isTraceEnabled) {
60
- logger.trace(s " >>> Headers: ${entity.headers.redactSensitive}" )
59
+ logger.trace(s " >>> Headers: ${entity.headers.redactSensitive() }" )
61
60
logger.trace(s " >>> JSON: $str" )
62
61
}
63
62
64
63
Sync [F ].delay(JsonFormat .fromJsonString(str))
65
64
}
66
-
67
- EitherT .right(f )
65
+ .attemptT
66
+ .leftMap(e => InvalidMessageBodyFailure (e.getMessage, e.some) )
68
67
}
69
68
70
69
override def encode [A <: Message ](message : A ): Entity [F ] = {
@@ -93,7 +92,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
93
92
override val mediaType : MediaType = MediaTypes .`application/proto`
94
93
95
94
override def decode [A <: Message ](entity : RequestEntity [F ])(using cmp : Companion [A ]): DecodeResult [F , A ] = {
96
- val f = entity.message match {
95
+ val msg = entity.message match {
97
96
case str : String =>
98
97
Async [F ].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset)))
99
98
.flatMap(arr => Async [F ].delay(cmp.parseFrom(arr)))
@@ -102,14 +101,17 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
102
101
.use(is => Async [F ].delay(cmp.parseFrom(is)))
103
102
}
104
103
105
- EitherT .right(f.map { message =>
106
- if (logger.isTraceEnabled) {
107
- logger.trace(s " >>> Headers: ${entity.headers.redactSensitive}" )
108
- logger.trace(s " >>> Proto: ${message.toProtoString}" )
109
- }
104
+ msg
105
+ .map { message =>
106
+ if (logger.isTraceEnabled) {
107
+ logger.trace(s " >>> Headers: ${entity.headers.redactSensitive()}" )
108
+ logger.trace(s " >>> Proto: ${message.toProtoString}" )
109
+ }
110
110
111
- message
112
- })
111
+ message
112
+ }
113
+ .attemptT
114
+ .leftMap(e => InvalidMessageBodyFailure (e.getMessage, e.some))
113
115
}
114
116
115
117
override def encode [A <: Message ](message : A ): Entity [F ] = {
0 commit comments