Minor Changes
-
1493848
Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect SchemaHo to use:
- When you parse raw kafka message, you can use the
MessageRouter.schemaRaw
in combination withConsumerSchema
helper methods to define the schema of the message.
import { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaRaw( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: ConsumerSchema.String, }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
MessageRouter.schemaJson
if you expect kafka messagevalue
to be a JSON string. Sovalue
schema property can be defined asSchema.Struct
.
import { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaJson( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: Schema.Struct({ message: Schema.String }), }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value.message, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message raw
value
as raw value, you can use theConsumerRecord.schemaValueRaw
in combination withConsumerSchema
helper methods to define the schema of the message value.
import { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message
value
as json value, you can use theConsumerRecord.schemaValueJson
. Sovalue
schema can be defined asSchema.Struct
.
import { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe( Effect.flatMap((value) => Console.log(value)), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
Patch Changes
f564387
Thanks @floydspace! - upgrade @confluentinc/kafka-javascript to the stable v1