@@ -185,9 +185,7 @@ def _get_topic_exists(self) -> bool | str:
185
185
186
186
return bool (any (topic ["topic_name" ] == topic_name for topic in topics ))
187
187
188
- def _handle_topic_exists (
189
- self , request : dict [str , dict [str , str ]]
190
- ) -> bool | None :
188
+ def _topic_exists (self , request : dict [str , dict [str , str ]]) -> bool | None :
191
189
"""Handle errors and format responses for topic checking.
192
190
193
191
Parameters
@@ -267,7 +265,10 @@ def _post_new_topic(self) -> dict[str, str]:
267
265
"message" : f"Error Creating Topic during POST: { e } " ,
268
266
}
269
267
270
- return {"status" : "Success" , "message" : response .text }
268
+ return {
269
+ "status" : "Success" ,
270
+ "message" : "A new kafka topic has been created." ,
271
+ }
271
272
272
273
def _get_source_records (self ) -> list [dict ] | None :
273
274
"""Get source records and check the redis server for any
@@ -299,6 +300,20 @@ def _get_source_records(self) -> list[dict] | None:
299
300
if self .redis .get (self .source .get_redis_key (record )) is None
300
301
]
301
302
303
+ def _check_success (self , response : dict ) -> None :
304
+ """Check whether the request has suceeded.
305
+
306
+ Parameters
307
+ ----------
308
+ response: dict
309
+ response object from post().
310
+ """
311
+ for item in response ["requests" ]:
312
+ if response ["requests" ][item ]["status" ] == "Error" :
313
+ response ["succeeded" ] = False
314
+ break
315
+ response ["succeeded" ] = True
316
+
302
317
def post (self , * , force_post : bool = False ) -> str :
303
318
"""Assemble schema and payload from the given source, then
304
319
makes a POST request to kafka.
@@ -337,19 +352,19 @@ def post(self, *, force_post: bool = False) -> str:
337
352
},
338
353
"records" : [],
339
354
}
340
- requests = response ["requests" ]
355
+ reqs = response ["requests" ]
341
356
342
- topic_exists = self ._handle_topic_exists ( requests )
357
+ topic_exists = self ._topic_exists ( reqs )
343
358
344
359
if topic_exists is None :
345
360
return json .dumps (response )
346
361
347
362
if force_post or not topic_exists :
348
- requests ["create_topic" ] = self ._post_new_topic ()
349
- if requests ["create_topic" ]["status" ] == "Error" :
363
+ reqs ["create_topic" ] = self ._post_new_topic ()
364
+ if reqs ["create_topic" ]["status" ] == "Error" :
350
365
return json .dumps (response )
351
366
else :
352
- requests ["create_topic" ] = {
367
+ reqs ["create_topic" ] = {
353
368
"status" : "Success" ,
354
369
"message" : (
355
370
"Relevant topic already exists in kafka, "
@@ -360,19 +375,22 @@ def post(self, *, force_post: bool = False) -> str:
360
375
records = self ._get_source_records ()
361
376
362
377
if records is None :
363
- requests ["write_values" ] = {
378
+ reqs ["write_values" ] = {
364
379
"status" : "Warning" ,
365
380
"message" : "No entries found, aborting POST request" ,
366
381
}
367
382
return json .dumps (response )
368
383
384
+ response ["records" ] = records
385
+
369
386
if len (records ) == 0 :
370
- requests ["write_values" ] = {
387
+ reqs ["write_values" ] = {
371
388
"status" : "Warning" ,
372
389
"message" : (
373
390
"All entries already present, aborting POST request"
374
391
),
375
392
}
393
+ self ._check_success (response )
376
394
return json .dumps (response )
377
395
378
396
payload = {"value_schema" : self .schema , "records" : records }
@@ -411,6 +429,9 @@ def post(self, *, force_post: bool = False) -> str:
411
429
412
430
response ["write_values" ] = {
413
431
"status" : "Success" ,
414
- "message" : post_response . text ,
432
+ "message" : "Data successfully sent!" ,
415
433
}
434
+
435
+ self ._check_success (response )
436
+
416
437
return json .dumps (response )
0 commit comments