18
18
import java .util .Collections ;
19
19
import java .util .List ;
20
20
import java .util .Optional ;
21
- import java .util .Random ;
22
21
import java .util .UUID ;
23
22
import java .util .concurrent .CompletableFuture ;
24
- import java .util .concurrent .ScheduledExecutorService ;
25
- import java .util .concurrent .ScheduledFuture ;
26
23
import java .util .concurrent .Semaphore ;
27
24
import java .util .concurrent .TimeUnit ;
28
25
import java .util .concurrent .TimeoutException ;
29
26
import java .util .concurrent .atomic .AtomicBoolean ;
30
- import java .util .concurrent .atomic .AtomicInteger ;
31
27
import java .util .concurrent .atomic .AtomicLong ;
32
28
import java .util .concurrent .atomic .AtomicReference ;
33
29
import java .util .concurrent .atomic .LongAdder ;
34
- import javax .annotation .Nullable ;
35
30
import org .apache .commons .lang3 .StringUtils ;
36
31
import org .eclipse .jetty .util .StaticException ;
37
32
import org .reactivestreams .Publisher ;
48
43
import org .whispersystems .textsecuregcm .metrics .MessageMetrics ;
49
44
import org .whispersystems .textsecuregcm .metrics .MetricsUtil ;
50
45
import org .whispersystems .textsecuregcm .metrics .UserAgentTagUtil ;
46
+ import org .whispersystems .textsecuregcm .push .MessageAvailabilityListener ;
51
47
import org .whispersystems .textsecuregcm .push .PushNotificationManager ;
52
48
import org .whispersystems .textsecuregcm .push .PushNotificationScheduler ;
53
49
import org .whispersystems .textsecuregcm .push .ReceiptSender ;
54
- import org .whispersystems .textsecuregcm .push .MessageAvailabilityListener ;
55
50
import org .whispersystems .textsecuregcm .storage .Account ;
56
51
import org .whispersystems .textsecuregcm .storage .ClientReleaseManager ;
57
52
import org .whispersystems .textsecuregcm .storage .Device ;
65
60
import reactor .core .publisher .Flux ;
66
61
import reactor .core .publisher .Mono ;
67
62
import reactor .core .scheduler .Scheduler ;
63
+ import reactor .util .retry .Retry ;
68
64
69
65
public class WebSocketConnection implements MessageAvailabilityListener , DisconnectionRequestListener {
70
66
@@ -80,7 +76,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
80
76
"initialQueueLength" );
81
77
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name (WebSocketConnection .class , "drainInitialQueue" );
82
78
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name (WebSocketConnection .class , "slowQueueDrain" );
83
- private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name (WebSocketConnection .class , "queueDrainRetry" );
84
79
private static final String DISPLACEMENT_COUNTER_NAME = name (WebSocketConnection .class , "displacement" );
85
80
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name (WebSocketConnection .class ,
86
81
"clientNonSuccessResponse" );
@@ -105,11 +100,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
105
100
@ VisibleForTesting
106
101
static final int MESSAGE_SENDER_MAX_CONCURRENCY = 256 ;
107
102
108
- @ VisibleForTesting
109
- static final int MAX_CONSECUTIVE_RETRIES = 5 ;
110
- private static final long RETRY_DELAY_MILLIS = 1_000 ;
111
- private static final int RETRY_DELAY_JITTER_MILLIS = 500 ;
112
-
113
103
private static final int DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS = 5 * 60 * 1000 ;
114
104
115
105
private static final Duration CLOSE_WITH_PENDING_MESSAGES_NOTIFICATION_DELAY = Duration .ofMinutes (1 );
@@ -130,19 +120,14 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
130
120
131
121
private final int sendFuturesTimeoutMillis ;
132
122
133
- private final ScheduledExecutorService scheduledExecutorService ;
134
-
135
123
private final Semaphore processStoredMessagesSemaphore = new Semaphore (1 );
136
124
private final AtomicReference <StoredMessageState > storedMessageState = new AtomicReference <>(
137
125
StoredMessageState .PERSISTED_NEW_MESSAGES_AVAILABLE );
138
126
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean (false );
139
127
private final LongAdder sentMessageCounter = new LongAdder ();
140
128
private final AtomicLong queueDrainStartTime = new AtomicLong ();
141
- private final AtomicInteger consecutiveRetries = new AtomicInteger ();
142
- private final AtomicReference <ScheduledFuture <?>> retryFuture = new AtomicReference <>();
143
129
private final AtomicReference <Disposable > messageSubscription = new AtomicReference <>();
144
130
145
- private final Random random = new Random ();
146
131
private final Scheduler messageDeliveryScheduler ;
147
132
148
133
private final ClientReleaseManager clientReleaseManager ;
@@ -161,7 +146,6 @@ public WebSocketConnection(ReceiptSender receiptSender,
161
146
Account authenticatedAccount ,
162
147
Device authenticatedDevice ,
163
148
WebSocketClient client ,
164
- ScheduledExecutorService scheduledExecutorService ,
165
149
Scheduler messageDeliveryScheduler ,
166
150
ClientReleaseManager clientReleaseManager ,
167
151
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor ,
@@ -176,7 +160,6 @@ public WebSocketConnection(ReceiptSender receiptSender,
176
160
authenticatedDevice ,
177
161
client ,
178
162
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS ,
179
- scheduledExecutorService ,
180
163
messageDeliveryScheduler ,
181
164
clientReleaseManager ,
182
165
messageDeliveryLoopMonitor , experimentEnrollmentManager );
@@ -192,7 +175,6 @@ public WebSocketConnection(ReceiptSender receiptSender,
192
175
Device authenticatedDevice ,
193
176
WebSocketClient client ,
194
177
int sendFuturesTimeoutMillis ,
195
- ScheduledExecutorService scheduledExecutorService ,
196
178
Scheduler messageDeliveryScheduler ,
197
179
ClientReleaseManager clientReleaseManager ,
198
180
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor ,
@@ -207,7 +189,6 @@ public WebSocketConnection(ReceiptSender receiptSender,
207
189
this .authenticatedDevice = authenticatedDevice ;
208
190
this .client = client ;
209
191
this .sendFuturesTimeoutMillis = sendFuturesTimeoutMillis ;
210
- this .scheduledExecutorService = scheduledExecutorService ;
211
192
this .messageDeliveryScheduler = messageDeliveryScheduler ;
212
193
this .clientReleaseManager = clientReleaseManager ;
213
194
this .messageDeliveryLoopMonitor = messageDeliveryLoopMonitor ;
@@ -221,12 +202,6 @@ public void start() {
221
202
}
222
203
223
204
public void stop () {
224
- final ScheduledFuture <?> future = retryFuture .get ();
225
-
226
- if (future != null ) {
227
- future .cancel (false );
228
- }
229
-
230
205
final Disposable subscription = messageSubscription .get ();
231
206
if (subscription != null ) {
232
207
subscription .dispose ();
@@ -342,7 +317,6 @@ void processStoredMessages() {
342
317
}
343
318
344
319
// Cleared the queue! Send a queue empty message if we need to
345
- consecutiveRetries .set (0 );
346
320
if (sentInitialQueueEmptyMessage .compareAndSet (false , true )) {
347
321
final Tags tags = Tags .of (UserAgentTagUtil .getPlatformTag (client .getUserAgent ()));
348
322
final long drainDuration = System .currentTimeMillis () - queueDrainStartTime .get ();
@@ -362,42 +336,23 @@ void processStoredMessages() {
362
336
}
363
337
})
364
338
// Potentially kick off more work, must happen after we release the semaphore
365
- .whenComplete ((ignored , cause ) -> processMoreIfRequested (cause ));
366
- }
367
- }
368
-
369
- /**
370
- * After processing messages, kick off another processing job if more messages came in or if there was an error
371
- *
372
- * @param cause An error that was encountered when processing the message queue, if there was one
373
- */
374
- private void processMoreIfRequested (final @ Nullable Throwable cause ) {
375
- if (cause == null ) {
376
- // Success, but check if more messages came in while we were processing
377
- if (storedMessageState .get () != StoredMessageState .EMPTY ) {
378
- processStoredMessages ();
379
- }
380
- return ;
381
- }
339
+ .whenComplete ((ignored , cause ) -> {
340
+ if (cause != null ) {
341
+ if (!client .isOpen ()) {
342
+ logger .debug ("Client disconnected before queue cleared" );
343
+ return ;
344
+ }
382
345
383
- if (!client .isOpen ()) {
384
- logger .debug ("Client disconnected before queue cleared" );
385
- return ;
386
- }
346
+ client .close (1011 , "Failed to retrieve messages" );
347
+ return ;
348
+ }
387
349
388
- if (consecutiveRetries .incrementAndGet () > MAX_CONSECUTIVE_RETRIES ) {
389
- logger .warn ("Max consecutive retries exceeded" , cause );
390
- client .close (1011 , "Failed to retrieve messages" );
391
- return ;
350
+ // Success, but check if more messages came in while we were processing
351
+ if (storedMessageState .get () != StoredMessageState .EMPTY ) {
352
+ processStoredMessages ();
353
+ }
354
+ });
392
355
}
393
-
394
- logger .debug ("Failed to clear queue" , cause );
395
- final Tags tags = Tags .of (UserAgentTagUtil .getPlatformTag (client .getUserAgent ()));
396
-
397
- Metrics .counter (QUEUE_DRAIN_RETRY_COUNTER_NAME , tags ).increment ();
398
-
399
- final long delay = RETRY_DELAY_MILLIS + random .nextInt (RETRY_DELAY_JITTER_MILLIS );
400
- retryFuture .set (scheduledExecutorService .schedule (this ::processStoredMessages , delay , TimeUnit .MILLISECONDS ));
401
356
}
402
357
403
358
private CompletableFuture <Void > sendMessages (final boolean cachedMessagesOnly ) {
@@ -407,7 +362,6 @@ private CompletableFuture<Void> sendMessages(final boolean cachedMessagesOnly) {
407
362
messagesManager .getMessagesForDeviceReactive (authenticatedAccount .getIdentifier (IdentityType .ACI ), authenticatedDevice , cachedMessagesOnly );
408
363
409
364
final AtomicBoolean hasSentFirstMessage = new AtomicBoolean ();
410
- final AtomicBoolean hasErrored = new AtomicBoolean ();
411
365
412
366
final Disposable subscription = Flux .from (messages )
413
367
.name (SEND_MESSAGES_FLUX_NAME )
@@ -423,19 +377,12 @@ private CompletableFuture<Void> sendMessages(final boolean cachedMessagesOnly) {
423
377
}
424
378
})
425
379
.flatMapSequential (envelope ->
426
- Mono .fromFuture (() -> sendMessage (envelope )
427
- .orTimeout (sendFuturesTimeoutMillis , TimeUnit .MILLISECONDS ))
428
- .onErrorResume (
429
- // let the first error pass through to terminate the subscription
430
- e -> {
431
- final boolean firstError = !hasErrored .getAndSet (true );
432
- measureSendMessageErrors (e , firstError );
433
-
434
- return !firstError ;
435
- },
436
- // otherwise just emit nothing
437
- e -> Mono .empty ()
438
- ), MESSAGE_SENDER_MAX_CONCURRENCY )
380
+ Mono .defer (() -> Mono .fromFuture (() -> sendMessage (envelope ).orTimeout (sendFuturesTimeoutMillis , TimeUnit .MILLISECONDS )))
381
+ .doOnError (this ::measureSendMessageErrors )
382
+ // Note that this will retry both for "send to client" timeouts and failures to delete messages on
383
+ // acknowledgement
384
+ .retryWhen (Retry .backoff (4 , Duration .ofSeconds (1 ))),
385
+ MESSAGE_SENDER_MAX_CONCURRENCY )
439
386
.subscribeOn (messageDeliveryScheduler )
440
387
.subscribe (
441
388
// no additional consumer of values - it is Flux<Void> by now
@@ -450,7 +397,7 @@ private CompletableFuture<Void> sendMessages(final boolean cachedMessagesOnly) {
450
397
return queueCleared ;
451
398
}
452
399
453
- private void measureSendMessageErrors (final Throwable e , final boolean terminal ) {
400
+ private void measureSendMessageErrors (final Throwable e ) {
454
401
final String errorType ;
455
402
456
403
if (e instanceof TimeoutException ) {
@@ -461,7 +408,7 @@ private void measureSendMessageErrors(final Throwable e, final boolean terminal)
461
408
(e instanceof StaticException staticException && "Closed" .equals (staticException .getMessage ()))) {
462
409
errorType = "connectionClosed" ;
463
410
} else {
464
- logger .warn (terminal ? "Send message failure terminated stream" : "Send message failed" , e );
411
+ logger .warn ("Send message failed" , e );
465
412
errorType = "other" ;
466
413
}
467
414
0 commit comments