|
3 | 3 | __all__ = ["BackpackDispatcher", "DataSource", "DispatcherConfig"]
|
4 | 4 |
|
5 | 5 | import asyncio
|
| 6 | +import json |
6 | 7 | import os
|
7 | 8 | from abc import ABC, abstractmethod
|
8 | 9 | from dataclasses import dataclass, field
|
9 | 10 | from string import Template
|
10 | 11 |
|
11 | 12 | import redis.asyncio as redis
|
12 | 13 | import requests
|
| 14 | +from faststream.kafka import KafkaBroker |
| 15 | +from safir.kafka import KafkaConnectionSettings, SecurityProtocol |
13 | 16 |
|
14 | 17 | # Code yoinked from https://github.com/lsst-sqre/
|
15 | 18 | # sasquatch/blob/main/examples/RestProxyAPIExample.ipynb
|
@@ -215,6 +218,25 @@ def _get_source_records(self) -> list[dict] | None:
|
215 | 218 | if self.redis.get(self.source.get_redis_key(record)) is None
|
216 | 219 | ]
|
217 | 220 |
|
| 221 | + async def direct_connect(self) -> None: |
| 222 | + """Assemble a schema and payload from the given source, |
| 223 | + and route data directly to kafka. |
| 224 | + """ |
| 225 | + config = KafkaConnectionSettings( |
| 226 | + bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", ""), |
| 227 | + security_protocol=SecurityProtocol.SSL, |
| 228 | + ) |
| 229 | + kafka_broker = KafkaBroker(**config.to_faststream_params()) |
| 230 | + |
| 231 | + records = self._get_source_records() |
| 232 | + if records is None: |
| 233 | + return |
| 234 | + if len(records) == 0: |
| 235 | + return |
| 236 | + |
| 237 | + payload = json.dumps({"value_schema": self.schema, "records": records}) |
| 238 | + await kafka_broker.publish(payload, self.source.topic_name) |
| 239 | + |
218 | 240 | def post(self) -> tuple[str, list]:
|
219 | 241 | """Assemble schema and payload from the given source, then
|
220 | 242 | makes a POST request to kafka.
|
|
0 commit comments