Skip to content

Commit 9bb945b

Browse files
authored
Merge pull request #109 from lsst-sqre/tickets/schema-registry
Readd schema registry support
2 parents 3efefd9 + 89a093c commit 9bb945b

File tree

13 files changed

+2384
-1596
lines changed

13 files changed

+2384
-1596
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
repos:
22
- repo: https://github.com/pre-commit/pre-commit-hooks
3-
rev: v5.0.0
3+
rev: v6.0.0
44
hooks:
55
- id: check-toml
66
- id: check-yaml
77
- id: trailing-whitespace
88

99
- repo: https://github.com/astral-sh/ruff-pre-commit
10-
rev: v0.12.3
10+
rev: v0.12.9
1111
hooks:
1212
- id: ruff
1313
args: [--fix, --exit-non-zero-on-fix]

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ Find changes for the upcoming release in the project's [changelog.d directory](h
66

77
<!-- scriv-insert-here -->
88

9+
<a id='changelog-0.4.1'></a>
10+
## 0.4.1 (2025-08-15)
11+
12+
### New features
13+
14+
- Readded support for avro schemas via direct connection
15+
916
<a id='changelog-0.4.0'></a>
1017
## 0.4.0 (2025-07-16)
1118

docs/dev/add_datasources.rst

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ Use this as an opportunity to debug your commands so they work as intended befor
7979
Add Schemas
8080
===========
8181

82-
.. warning ::
83-
84-
Sasquatch-backpack version 0.4.0 does not support non-json schemas in its default publishing method. Support is planned, but in the meantime either opt to use the old publish method (PublishMethod.REST_API), or serialize any schema data to json before publishing.
85-
8682
`Sasquatch <https://sasquatch.lsst.io>`__ (the wearer of the proverbial backpack), uses `Avro schemas <https://sasquatch.lsst.io/user-guide/avro.html>`__ for data serialization.
8783
Navigate to your Datasource's folder and create a ``schemas.py`` file for your Datasource.
8884
Inside, use `pydantic's AvroBaseModel <https://marcosschroh.github.io/dataclasses-avroschema/pydantic/>`__ to create an avro schema.
@@ -104,7 +100,7 @@ Below is a quick sample of usage.
104100
class Meta:
105101
"""Schema metadata."""
106102
107-
namespace = "$namespace"
103+
namespace = "Default"
108104
schema_name = "topic_name_goes_here"
109105
110106
Make one such schema for each command or API call you wish to make.
@@ -128,7 +124,7 @@ While not required, giving each entry a unique ID is strongly reccommended to id
128124
Note 3: Meta
129125
------------
130126
The Meta subclass is required, and must contain both namespace and schema_name values.
131-
The namespace will be replaced with its actual value later on when the file is parsed, so simply keep its value as shown above, in "$thing" format.
127+
The namespace will be replaced with its actual value later on when the file is parsed, so simply keep its value "Default" as shown above.
132128
The schema_name, on the other hand, should be hardcoded in.
133129

134130
Add Configs
@@ -161,7 +157,7 @@ Add Datasources
161157
===============
162158
Now you're finally ready to add Datasources.
163159
From within your ``scripts.py`` file, for each command you have, make a new Datasource class inheriting from ``sasquatchbackpack.sasquatch.Datasource``.
164-
These new classes will require two methods: ``get_records()`` and ``get_redis_key()``.
160+
These new classes will require three methods: ``get_records()``, ``assemble_schema()`` and ``get_redis_key()``.
165161

166162
``get_records()`` should call the Datasource's respective ``scripts.py`` function, then return the encoded results in an array.
167163
This should be surrounded with a "try" like so:
@@ -179,6 +175,31 @@ This should be surrounded with a "try" like so:
179175
f"A connection error occurred while fetching records: {ce}"
180176
) from ce
181177
178+
``assemble_schema()`` should take one of the list items obtained by get records (or None), and the namespace. This is where that default value above is substituted. This function has two purposes. One is to create a full schema object from a provided record and the other is to provide a compliant boilerplate schema if not provided a record. This is achieved like so:
179+
180+
.. code-block:: python
181+
182+
def assemble_schema(self, namespace: str, record:dict | None = None) -> AvroBaseModel:
183+
"""Docstring: the third"""
184+
if record is None:
185+
schema = {
186+
"timestamp": 1,
187+
"id": "default",
188+
# Every object in the schema needs to be here, and provided with a boiler plate values
189+
#eg: "percentage":1.0,
190+
"namespace"=namespace,
191+
}
192+
else:
193+
schema = {
194+
"timestamp": record["timestamp"],
195+
"id": record["id"],
196+
# Again, Every object in the schema needs to be here, and provided with its record value.
197+
#eg: "percentage":record["percentage"]
198+
"namespace": namespace,
199+
}
200+
201+
return CommandnameSchema.parse_obj(data=schema)
202+
182203
``get_redis_key()`` can safely return an empty string if your config has set uses_redis to false, and you don't intend to integrate this souce with backpack's redis instance.
183204
Otherwise, this method should return a unique string structured as such: ``f"{self.topic_name}:uniqueItemIdentifierHere"``.
184205
This identifier is best suited as an integer id number as stated above in Note #2, however can be anything that uniquely identifies this specific object.

docs/dev/publish_methods.rst

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ Direct Connection
1313

1414
PublishMethod.DIRECT_CONNECTION is the current default option. With sasquatch and backpack running on the same clusters, it just makes sense to directly connect in. This runs up a connection to sasquatch when publish is called, and sends published data through that tunnel.
1515

16-
.. warning ::
17-
18-
Sasquatch-backpack version 0.4.0 does not support non-json schemas in its default publishing method. Support is planned, but in the meantime either opt to use the old publish method (PublishMethod.REST_API), or serialize any schema data to json before publishing.
19-
2016
REST_API
2117
========
2218

docs/documenteer.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,17 @@ package = "sasquatchbackpack"
99
rst_epilog_file = "_rst_epilog.rst"
1010
disable_primary_sidebars = ["index", "changelog"]
1111
extensions = ['sphinx_click']
12-
nitpick_ignore = [["py:class", "faststream.kafka.KafkaBroker"]]
12+
nitpick_ignore = [
13+
[
14+
"py:class",
15+
"faststream.kafka.KafkaBroker",
16+
],
17+
[
18+
"py:class",
19+
"dataclasses_avroschema.pydantic.main.AvroBaseModel",
20+
],
21+
]
1322

1423
[sphinx.intersphinx.projects]
24+
safir = "https://safir.lsst.io"
1525
python = "https://docs.python.org/3/"

0 commit comments

Comments
 (0)