diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java index 5739dd5..738118c 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java @@ -16,16 +16,16 @@ public class AsynchronousDeliveryStrategy implements DeliveryStrategy { public boolean send(Producer producer, ProducerRecord record, final E event, final FailedDeliveryCallback failedDeliveryCallback) { try { - producer.send(record, new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - failedDeliveryCallback.onFailedDelivery(event, exception); - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + failedDeliveryCallback.onFailedDelivery(event, exception); } }); return true; - } catch (BufferExhaustedException | TimeoutException e) { + } catch (Exception e) { + if (e instanceof org.apache.kafka.common.errors.InterruptException) { + Thread.currentThread().interrupt(); + } failedDeliveryCallback.onFailedDelivery(event, e); return false; } diff --git a/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java b/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java index 5becb31..f23be9a 100644 --- a/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java +++ b/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Test; @@ -74,4 +75,16 @@ public void testCallbackWillTriggerOnFailedDeliveryOnProducerSendTimeout() { verify(failedDeliveryCallback).onFailedDelivery(eq("msg"), same(exception)); } + @Test + public void testCallbackWillTriggerOnFailedDeliveryOnAnyError() { + final Exception exception = new KafkaException("miau"); + final ProducerRecord record = new ProducerRecord("topic", 0, null, "msg"); + + when(producer.send(same(record), any(Callback.class))).thenThrow(exception); + + unit.send(producer, record, "msg", failedDeliveryCallback); + + verify(failedDeliveryCallback).onFailedDelivery(eq("msg"), same(exception)); + } + }