Skip to content

Commit 39589fb

Browse files
committed
Merge get_schema functionality into assemble_schema
1 parent ab137df commit 39589fb

File tree

3 files changed

+39
-41
lines changed

3 files changed

+39
-41
lines changed

src/sasquatchbackpack/sasquatch.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@ def __init__(self, topic_name: str, *, uses_redis: bool) -> None:
4646
self.uses_redis = uses_redis
4747

4848
@abstractmethod
49-
def get_schema(self, namespace: str) -> AvroBaseModel:
50-
pass
51-
52-
@abstractmethod
53-
def assemble_schema(self, records: dict, namespace: str) -> AvroBaseModel:
49+
def assemble_schema(self, namespace: str, records: dict) -> AvroBaseModel:
5450
pass
5551

5652
@abstractmethod
@@ -184,7 +180,7 @@ async def _dispatch(
184180

185181
for record in records:
186182
avro: bytes = await schema_manager.serialize(
187-
source.assemble_schema(record["value"], namespace)
183+
source.assemble_schema(namespace, record["value"])
188184
)
189185
headers = {
190186
"content-type": "avro",
@@ -220,7 +216,7 @@ def __init__(
220216
self.source = source
221217
self.config = DispatcherConfig()
222218

223-
self.schema = self.source.get_schema(self.config.namespace)
219+
self.schema = self.source.assemble_schema(self.config.namespace)
224220
nest_asyncio.apply()
225221
self.redis = RedisManager(
226222
self.config.redis_address

src/sasquatchbackpack/sources/usgs/scripts.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -143,28 +143,29 @@ def get_records(self) -> list[dict]:
143143
f"A connection error occurred while fetching records: {ce}"
144144
) from ce
145145

146-
def assemble_schema(self, record: dict, namespace: str) -> AvroBaseModel:
147-
schema = {
148-
"timestamp": record["timestamp"],
149-
"id": record["id"],
150-
"latitude": record["latitude"],
151-
"longitude": record["longitude"],
152-
"depth": record["depth"],
153-
"magnitude": record["magnitude"],
154-
"namespace": namespace,
155-
}
156-
return EarthquakeSchema.parse_obj(data=schema)
157-
158-
def get_schema(self, namespace: str) -> AvroBaseModel:
159-
schema = {
160-
"timestamp": 1,
161-
"id": "default",
162-
"latitude": 1.0,
163-
"longitude": 1.0,
164-
"depth": 1.0,
165-
"magnitude": 1.0,
166-
"namespace": namespace,
167-
}
146+
def assemble_schema(
147+
self, namespace: str, record: dict = {}
148+
) -> AvroBaseModel:
149+
if record == {}:
150+
schema = {
151+
"timestamp": 1,
152+
"id": "default",
153+
"latitude": 1.0,
154+
"longitude": 1.0,
155+
"depth": 1.0,
156+
"magnitude": 1.0,
157+
"namespace": namespace,
158+
}
159+
else:
160+
schema = {
161+
"timestamp": record["timestamp"],
162+
"id": record["id"],
163+
"latitude": record["latitude"],
164+
"longitude": record["longitude"],
165+
"depth": record["depth"],
166+
"magnitude": record["magnitude"],
167+
"namespace": namespace,
168+
}
168169
return EarthquakeSchema.parse_obj(data=schema)
169170

170171
def get_redis_key(self, datapoint: dict) -> str:

tests/dispatcher_test.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,19 @@ def __init__(self, current_records: list[dict[str, str]]) -> None:
3030
)
3131
self.records = current_records
3232

33-
def assemble_schema(self, record: dict, namespace: str) -> AvroBaseModel:
34-
schema = {
35-
"id": record["id"],
36-
"namespace": namespace,
37-
}
38-
return TestSchema.parse_obj(data=schema)
39-
40-
def get_schema(self, namespace: str) -> AvroBaseModel:
41-
schema = {
42-
"id": "default",
43-
"namespace": namespace,
44-
}
33+
def assemble_schema(
34+
self, namespace: str, record: dict = {}
35+
) -> AvroBaseModel:
36+
if record == {}:
37+
schema = {
38+
"id": "default",
39+
"namespace": namespace,
40+
}
41+
else:
42+
schema = {
43+
"id": record["id"],
44+
"namespace": namespace,
45+
}
4546
return TestSchema.parse_obj(data=schema)
4647

4748
def get_records(self) -> list[dict]:

0 commit comments

Comments
 (0)