File tree Expand file tree Collapse file tree 1 file changed +14
-7
lines changed Expand file tree Collapse file tree 1 file changed +14
-7
lines changed Original file line number Diff line number Diff line change @@ -219,7 +219,7 @@ def _get_source_records(self) -> list[dict] | None:
219
219
if self .redis .get (self .source .get_redis_key (record )) is None
220
220
]
221
221
222
- async def direct_connect (self ) -> None :
222
+ async def direct_connect (self ) -> tuple [ str , list ] :
223
223
"""Assemble a schema and payload from the given source,
224
224
and route data directly to kafka.
225
225
"""
@@ -228,15 +228,22 @@ async def direct_connect(self) -> None:
228
228
await kafka_broker .connect ()
229
229
230
230
@kafka_broker .publisher (self .config .namespace )
231
- async def dothing (self : Self ) -> list [ dict ] | None :
231
+ async def dothing (self : Self ) -> tuple [ str , list ] :
232
232
records : list [dict ] | None = self ._get_source_records ()
233
233
if records is None :
234
- return None
234
+ return (
235
+ "Warning: No entries found, aborting POST request" ,
236
+ [],
237
+ )
235
238
if len (records ) == 0 :
236
- return None
237
- return records
238
-
239
- await dothing (self )
239
+ return (
240
+ "Warning: All entries already present,"
241
+ " aborting POST request" ,
242
+ records ,
243
+ )
244
+ return "Sent thing" , records
245
+
246
+ return await dothing (self )
240
247
241
248
def post (self ) -> tuple [str , list ]:
242
249
"""Assemble schema and payload from the given source, then
You can’t perform that action at this time.
0 commit comments