Skip to content

Commit a1ecbcc

Browse files
committed
Add avro headers
1 parent bdba0fe commit a1ecbcc

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

src/sasquatchbackpack/sasquatch.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from safir.kafka import (
2525
KafkaConnectionSettings,
2626
PydanticSchemaManager,
27+
SchemaInfo,
2728
SchemaManagerSettings,
2829
)
2930

@@ -179,14 +180,17 @@ async def _dispatch(
179180
except Exception as e:
180181
return e
181182

182-
await schema_manager.register_model(type(schema))
183+
info: SchemaInfo = await schema_manager.register_model(type(schema))
183184

184185
for record in records:
185186
avro: bytes = await schema_manager.serialize(
186-
source.assemble_schema(record, namespace)
187+
source.assemble_schema(record["value"], namespace)
187188
)
188-
189-
await publisher.publish(avro)
189+
headers = {
190+
"content-type": "avro",
191+
"schema-id": str(info.schema_id),
192+
}
193+
await publisher.publish(avro, headers=headers)
190194
return None
191195

192196

@@ -229,7 +233,7 @@ def __init__(
229233
self.broker = broker_in if broker_in is not None else local_broker
230234
except NameError:
231235
if broker_in is None:
232-
raise ValueError from None
236+
raise ValueError("Kafka environment variables unset") from None
233237
self.broker = broker_in
234238

235239
try:
@@ -239,7 +243,7 @@ def __init__(
239243
)
240244
except NameError:
241245
if manager_in is None:
242-
raise ValueError from None
246+
raise ValueError("Schema environment variable unset") from None
243247
self.manager = manager_in
244248

245249
def create_topic(self) -> str:

tests/dispatcher_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, current_records: list[dict[str, str]]) -> None:
3232

3333
def assemble_schema(self, record: dict, namespace: str) -> AvroBaseModel:
3434
schema = {
35-
"id": record["value"]["id"],
35+
"id": record["id"],
3636
"namespace": namespace,
3737
}
3838
return TestSchema.parse_obj(data=schema)

0 commit comments

Comments
 (0)