File tree Expand file tree Collapse file tree 2 files changed +8
-3
lines changed Expand file tree Collapse file tree 2 files changed +8
-3
lines changed Original file line number Diff line number Diff line change @@ -223,6 +223,7 @@ async def direct_connect(self) -> None:
223
223
"""
224
224
config = KafkaConnectionSettings ()
225
225
kafka_broker = KafkaBroker (** config .to_faststream_params ())
226
+ prepared_publisher = kafka_broker .publisher (self .source .topic_name )
226
227
227
228
records = self ._get_source_records ()
228
229
if records is None :
@@ -231,7 +232,13 @@ async def direct_connect(self) -> None:
231
232
return
232
233
233
234
payload = json .dumps ({"value_schema" : self .schema , "records" : records })
234
- await kafka_broker .publish (payload , self .source .topic_name )
235
+ await prepared_publisher .publish (
236
+ payload ,
237
+ headers = {
238
+ "Content-Type" : "application/vnd.kafka.avro.v2+json" ,
239
+ "Accept" : "application/vnd.kafka.v2+json" ,
240
+ },
241
+ )
235
242
236
243
def post (self ) -> tuple [str , list ]:
237
244
"""Assemble schema and payload from the given source, then
Original file line number Diff line number Diff line change 4
4
from datetime import UTC , datetime , timedelta
5
5
6
6
import click
7
- import nest_asyncio
8
7
9
8
from sasquatchbackpack import sasquatch
10
9
from sasquatchbackpack .sources .usgs import scripts
@@ -201,7 +200,6 @@ def usgs_earthquake_data(
201
200
click .echo ("Post mode enabled: Sending data..." )
202
201
click .echo (f"Querying redis at { backpack_dispatcher .redis .address } " )
203
202
204
- nest_asyncio .apply ()
205
203
loop = asyncio .new_event_loop ()
206
204
loop .run_until_complete (backpack_dispatcher .direct_connect ())
207
205
You can’t perform that action at this time.
0 commit comments