@@ -219,35 +219,39 @@ public void start() {
219
219
.thenReturn (entry );
220
220
case MessageStreamEntry .QueueEmpty _ -> Mono .just (entry );
221
221
}, MESSAGE_SENDER_MAX_CONCURRENCY )
222
- // `ConflictingMessageConsumerException` is handled before processing messages
223
- .doOnError (throwable -> !(throwable instanceof ConflictingMessageConsumerException ), throwable -> {
224
- measureSendMessageErrors (throwable );
225
-
226
- if (!client .isOpen ()) {
227
- logger .debug ("Client disconnected before queue cleared" );
228
- return ;
229
- }
230
-
231
- client .close (1011 , "Failed to retrieve messages" );
232
- })
233
- // Make sure we process message acknowledgements before sending the "queue clear" signal
234
- .doOnNext (entry -> {
235
- if (entry instanceof MessageStreamEntry .QueueEmpty ) {
236
- final Duration drainDuration = Duration .ofNanos (System .nanoTime () - queueDrainStartNanos );
237
-
238
- Metrics .summary (INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME , platformTag ).record (sentMessageCounter .sum ());
239
- Metrics .timer (INITIAL_QUEUE_DRAIN_TIMER_NAME , platformTag ).record (drainDuration );
240
-
241
- if (drainDuration .compareTo (SLOW_DRAIN_THRESHOLD ) > 0 ) {
242
- Metrics .counter (SLOW_QUEUE_DRAIN_COUNTER_NAME , platformTag ).increment ();
243
- }
244
-
245
- client .sendRequest ("PUT" , "/api/v1/queue/empty" ,
246
- Collections .singletonList (HeaderUtils .getTimestampHeader ()), Optional .empty ());
247
- }
248
- })
249
222
.subscribeOn (messageDeliveryScheduler )
250
- .subscribe ());
223
+ .subscribe (
224
+ entry -> {
225
+ if (entry instanceof MessageStreamEntry .QueueEmpty ) {
226
+ final Duration drainDuration = Duration .ofNanos (System .nanoTime () - queueDrainStartNanos );
227
+
228
+ Metrics .summary (INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME , platformTag ).record (sentMessageCounter .sum ());
229
+ Metrics .timer (INITIAL_QUEUE_DRAIN_TIMER_NAME , platformTag ).record (drainDuration );
230
+
231
+ if (drainDuration .compareTo (SLOW_DRAIN_THRESHOLD ) > 0 ) {
232
+ Metrics .counter (SLOW_QUEUE_DRAIN_COUNTER_NAME , platformTag ).increment ();
233
+ }
234
+
235
+ client .sendRequest ("PUT" , "/api/v1/queue/empty" ,
236
+ Collections .singletonList (HeaderUtils .getTimestampHeader ()), Optional .empty ());
237
+ }
238
+ },
239
+ throwable -> {
240
+ // `ConflictingMessageConsumerException` is handled before processing messages
241
+ if (throwable instanceof ConflictingMessageConsumerException ) {
242
+ return ;
243
+ }
244
+
245
+ measureSendMessageErrors (throwable );
246
+
247
+ if (!client .isOpen ()) {
248
+ logger .debug ("Client disconnected before queue cleared" );
249
+ return ;
250
+ }
251
+
252
+ client .close (1011 , "Failed to retrieve messages" );
253
+ }
254
+ ));
251
255
}
252
256
253
257
public void stop () {
0 commit comments