Skip to content

v0.9.0

Latest
Compare
Choose a tag to compare
@github-actions github-actions released this 21 Jul 18:21
9eddcf9

Minor Changes

  • 1493848 Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect Schema

    Ho to use:

    1. When you parse raw kafka message, you can use the MessageRouter.schemaRaw in combination with ConsumerSchema helper methods to define the schema of the message.
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaRaw(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: ConsumerSchema.String,
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse raw kafka message, you can use the MessageRouter.schemaJson if you expect kafka message value to be a JSON string. So value schema property can be defined as Schema.Struct.
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaJson(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: Schema.Struct({ message: Schema.String }),
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value.message,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message raw value as raw value, you can use the ConsumerRecord.schemaValueRaw in combination with ConsumerSchema helper methods to define the schema of the message value.
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message value as json value, you can use the ConsumerRecord.schemaValueJson. So value schema can be defined as Schema.Struct.
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe(
          Effect.flatMap((value) => Console.log(value)),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );

Patch Changes