Skip to content

Commit bdba0fe

Browse files
committed
Schema registry code passes local tests
1 parent 5702af5 commit bdba0fe

File tree

4 files changed

+50
-18
lines changed

4 files changed

+50
-18
lines changed

docs/documenteer.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,17 @@ package = "sasquatchbackpack"
99
rst_epilog_file = "_rst_epilog.rst"
1010
disable_primary_sidebars = ["index", "changelog"]
1111
extensions = ['sphinx_click']
12-
nitpick_ignore = [["py:class", "faststream.kafka.KafkaBroker"]]
12+
nitpick_ignore = [
13+
[
14+
"py:class",
15+
"faststream.kafka.KafkaBroker",
16+
],
17+
[
18+
"py:class",
19+
"dataclasses_avroschema.pydantic.main.AvroBaseModel",
20+
],
21+
]
1322

1423
[sphinx.intersphinx.projects]
24+
safir = "https://safir.lsst.io"
1525
python = "https://docs.python.org/3/"

src/sasquatchbackpack/sasquatch.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,18 @@ class DataSource(ABC):
4040
Specific source name, used as an identifier
4141
"""
4242

43-
def __init__(
44-
self, topic_name: str, schema: str, *, uses_redis: bool
45-
) -> None:
43+
def __init__(self, topic_name: str, *, uses_redis: bool) -> None:
4644
self.topic_name = topic_name
47-
self.schema = schema
4845
self.uses_redis = uses_redis
4946

47+
@abstractmethod
48+
def get_schema(self, namespace: str) -> AvroBaseModel:
49+
pass
50+
51+
@abstractmethod
52+
def assemble_schema(self, records: dict, namespace: str) -> AvroBaseModel:
53+
pass
54+
5055
@abstractmethod
5156
def get_records(self) -> list[dict]:
5257
pass
@@ -66,6 +71,7 @@ class RedisManager:
6671
"""
6772

6873
def __init__(self, address: str) -> None:
74+
nest_asyncio.apply()
6975
self.address = address
7076
self.model = redis.from_url(self.address)
7177

@@ -173,10 +179,10 @@ async def _dispatch(
173179
except Exception as e:
174180
return e
175181

176-
await schema_manager.register_model(schema)
182+
await schema_manager.register_model(type(schema))
177183

178184
for record in records:
179-
avro: bytes = schema_manager.serialize(
185+
avro: bytes = await schema_manager.serialize(
180186
source.assemble_schema(record, namespace)
181187
)
182188

src/sasquatchbackpack/sources/usgs/scripts.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import dataclass, field
44
from datetime import datetime, timedelta
55

6+
from dataclasses_avroschema.pydantic import AvroBaseModel
67
from libcomcat.search import search
78

89
from sasquatchbackpack.sasquatch import DataSource
@@ -99,9 +100,7 @@ def __init__(
99100
self,
100101
config: USGSConfig,
101102
) -> None:
102-
super().__init__(
103-
config.topic_name, config.schema, uses_redis=config.uses_redis
104-
)
103+
super().__init__(config.topic_name, uses_redis=config.uses_redis)
105104
self.duration = config.duration
106105
self.config = config
107106
self.radius = config.radius
@@ -144,9 +143,7 @@ def get_records(self) -> list[dict]:
144143
f"A connection error occurred while fetching records: {ce}"
145144
) from ce
146145

147-
def assemble_schema(
148-
self, record: dict, namespace: str
149-
) -> EarthquakeSchema.avro_schema():
146+
def assemble_schema(self, record: dict, namespace: str) -> AvroBaseModel:
150147
schema = {
151148
"timestamp": record["timestamp"],
152149
"id": record["id"],
@@ -156,13 +153,19 @@ def assemble_schema(
156153
"magnitude": record["magnitude"],
157154
"namespace": namespace,
158155
}
159-
return EarthquakeSchema(**schema)
156+
return EarthquakeSchema.parse_obj(data=schema)
160157

161-
def get_schema(self, namespace: str) -> EarthquakeSchema.avro_schema():
158+
def get_schema(self, namespace: str) -> AvroBaseModel:
162159
schema = {
160+
"timestamp": 1,
161+
"id": "default",
162+
"latitude": 1.0,
163+
"longitude": 1.0,
164+
"depth": 1.0,
165+
"magnitude": 1.0,
163166
"namespace": namespace,
164167
}
165-
return EarthquakeSchema(**schema)
168+
return EarthquakeSchema.parse_obj(data=schema)
166169

167170
def get_redis_key(self, datapoint: dict) -> str:
168171
"""Allow USGS API to format its own redis keys.

tests/dispatcher_test.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,32 @@ class TestSchema(AvroBaseModel):
1818
class Meta:
1919
"""Schema metadata."""
2020

21-
namespace = "$namespace"
21+
namespace = "Default"
2222
schema_name = "testSchema"
2323

2424

2525
class TestSource(sasquatch.DataSource):
2626
def __init__(self, current_records: list[dict[str, str]]) -> None:
2727
super().__init__(
2828
"test",
29-
TestSchema.avro_schema().replace("double", "float"),
3029
uses_redis=True,
3130
)
3231
self.records = current_records
3332

33+
def assemble_schema(self, record: dict, namespace: str) -> AvroBaseModel:
34+
schema = {
35+
"id": record["value"]["id"],
36+
"namespace": namespace,
37+
}
38+
return TestSchema.parse_obj(data=schema)
39+
40+
def get_schema(self, namespace: str) -> AvroBaseModel:
41+
schema = {
42+
"id": "default",
43+
"namespace": namespace,
44+
}
45+
return TestSchema.parse_obj(data=schema)
46+
3447
def get_records(self) -> list[dict]:
3548
return [{"value": {"id": record["id"]}} for record in self.records]
3649

0 commit comments

Comments
 (0)