Skip to content

Commit 470e179

Browse files
authored
Use MessageStream in WebSocketConnection
1 parent 4c5dc11 commit 470e179

File tree

6 files changed

+666
-925
lines changed

6 files changed

+666
-925
lines changed

service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -994,8 +994,9 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
994994
config.idlePrimaryDeviceReminderConfiguration().minIdleDuration(), Clock.systemUTC()));
995995
webSocketEnvironment.setConnectListener(
996996
new AuthenticatedConnectListener(accountsManager, receiptSender, messagesManager, messageMetrics, pushNotificationManager,
997-
pushNotificationScheduler, redisMessageAvailabilityManager, disconnectionRequestManager,
998-
messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor, experimentEnrollmentManager));
997+
pushNotificationScheduler, disconnectionRequestManager,
998+
messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor, experimentEnrollmentManager
999+
));
9991000
webSocketEnvironment.jersey().register(new RateLimitByIpFilter(rateLimiters));
10001001
webSocketEnvironment.jersey().register(new RequestStatisticsFilter(TrafficSource.WEBSOCKET));
10011002
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);

service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java

Lines changed: 51 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77

88
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
99

10+
import com.google.common.annotations.VisibleForTesting;
1011
import io.micrometer.core.instrument.Tags;
11-
1212
import java.time.Duration;
1313
import java.util.Optional;
14+
import java.util.function.Function;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
@@ -23,12 +24,12 @@
2324
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
2425
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
2526
import org.whispersystems.textsecuregcm.push.ReceiptSender;
26-
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
2727
import org.whispersystems.textsecuregcm.storage.Account;
2828
import org.whispersystems.textsecuregcm.storage.AccountsManager;
2929
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
3030
import org.whispersystems.textsecuregcm.storage.Device;
3131
import org.whispersystems.textsecuregcm.storage.MessagesManager;
32+
import org.whispersystems.websocket.WebSocketClient;
3233
import org.whispersystems.websocket.session.WebSocketSessionContext;
3334
import org.whispersystems.websocket.setup.WebSocketConnectListener;
3435
import reactor.core.scheduler.Scheduler;
@@ -45,69 +46,82 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
4546
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
4647

4748
private final AccountsManager accountsManager;
48-
private final ReceiptSender receiptSender;
49-
private final MessagesManager messagesManager;
50-
private final MessageMetrics messageMetrics;
51-
private final PushNotificationManager pushNotificationManager;
52-
private final PushNotificationScheduler pushNotificationScheduler;
53-
private final RedisMessageAvailabilityManager redisMessageAvailabilityManager;
5449
private final DisconnectionRequestManager disconnectionRequestManager;
55-
private final Scheduler messageDeliveryScheduler;
56-
private final ClientReleaseManager clientReleaseManager;
57-
private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
58-
private final ExperimentEnrollmentManager experimentEnrollmentManager;
50+
private final WebSocketConnectionBuilder webSocketConnectionBuilder;
5951

6052
private final OpenWebSocketCounter openAuthenticatedWebSocketCounter;
6153
private final OpenWebSocketCounter openUnauthenticatedWebSocketCounter;
6254

55+
@VisibleForTesting
56+
@FunctionalInterface
57+
interface WebSocketConnectionBuilder {
58+
WebSocketConnection buildWebSocketConnection(Account account, Device device, WebSocketClient client);
59+
}
60+
6361
public AuthenticatedConnectListener(
6462
final AccountsManager accountsManager,
6563
final ReceiptSender receiptSender,
6664
final MessagesManager messagesManager,
6765
final MessageMetrics messageMetrics,
6866
final PushNotificationManager pushNotificationManager,
6967
final PushNotificationScheduler pushNotificationScheduler,
70-
final RedisMessageAvailabilityManager redisMessageAvailabilityManager,
7168
final DisconnectionRequestManager disconnectionRequestManager,
7269
final Scheduler messageDeliveryScheduler,
7370
final ClientReleaseManager clientReleaseManager,
7471
final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
7572
final ExperimentEnrollmentManager experimentEnrollmentManager) {
7673

74+
this(accountsManager,
75+
disconnectionRequestManager,
76+
(account, device, client) -> new WebSocketConnection(receiptSender,
77+
messagesManager,
78+
messageMetrics,
79+
pushNotificationManager,
80+
pushNotificationScheduler,
81+
account,
82+
device,
83+
client,
84+
messageDeliveryScheduler,
85+
clientReleaseManager,
86+
messageDeliveryLoopMonitor,
87+
experimentEnrollmentManager),
88+
authenticated -> new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME,
89+
NEW_CONNECTION_COUNTER_NAME,
90+
CONNECTED_DURATION_TIMER_NAME,
91+
Duration.ofHours(3),
92+
Tags.of(AUTHENTICATED_TAG_NAME, String.valueOf(authenticated)))
93+
);
94+
}
95+
96+
@VisibleForTesting AuthenticatedConnectListener(
97+
final AccountsManager accountsManager,
98+
final DisconnectionRequestManager disconnectionRequestManager,
99+
final WebSocketConnectionBuilder webSocketConnectionBuilder,
100+
final Function<Boolean, OpenWebSocketCounter> openWebSocketCounterBuilder) {
101+
77102
this.accountsManager = accountsManager;
78-
this.receiptSender = receiptSender;
79-
this.messagesManager = messagesManager;
80-
this.messageMetrics = messageMetrics;
81-
this.pushNotificationManager = pushNotificationManager;
82-
this.pushNotificationScheduler = pushNotificationScheduler;
83-
this.redisMessageAvailabilityManager = redisMessageAvailabilityManager;
84103
this.disconnectionRequestManager = disconnectionRequestManager;
85-
this.messageDeliveryScheduler = messageDeliveryScheduler;
86-
this.clientReleaseManager = clientReleaseManager;
87-
this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor;
88-
this.experimentEnrollmentManager = experimentEnrollmentManager;
89-
90-
openAuthenticatedWebSocketCounter =
91-
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "true"));
104+
this.webSocketConnectionBuilder = webSocketConnectionBuilder;
92105

93-
openUnauthenticatedWebSocketCounter =
94-
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "false"));
106+
openAuthenticatedWebSocketCounter = openWebSocketCounterBuilder.apply(true);
107+
openUnauthenticatedWebSocketCounter = openWebSocketCounterBuilder.apply(false);
95108
}
96109

97110
@Override
98111
public void onWebSocketConnect(final WebSocketSessionContext context) {
99112

100113
final boolean authenticated = (context.getAuthenticated() != null);
101-
final OpenWebSocketCounter openWebSocketCounter =
102-
authenticated ? openAuthenticatedWebSocketCounter : openUnauthenticatedWebSocketCounter;
103114

104-
openWebSocketCounter.countOpenWebSocket(context);
115+
(authenticated ? openAuthenticatedWebSocketCounter : openUnauthenticatedWebSocketCounter).countOpenWebSocket(context);
105116

106117
if (authenticated) {
107118
final AuthenticatedDevice auth = context.getAuthenticated(AuthenticatedDevice.class);
108119

109-
final Optional<Account> maybeAuthenticatedAccount = accountsManager.getByAccountIdentifier(auth.accountIdentifier());
110-
final Optional<Device> maybeAuthenticatedDevice = maybeAuthenticatedAccount.flatMap(account -> account.getDevice(auth.deviceId()));
120+
final Optional<Account> maybeAuthenticatedAccount =
121+
accountsManager.getByAccountIdentifier(auth.accountIdentifier());
122+
123+
final Optional<Device> maybeAuthenticatedDevice =
124+
maybeAuthenticatedAccount.flatMap(account -> account.getDevice(auth.deviceId()));
111125

112126
if (maybeAuthenticatedAccount.isEmpty() || maybeAuthenticatedDevice.isEmpty()) {
113127
log.warn("{}:{} not found when opening authenticated WebSocket", auth.accountIdentifier(), auth.deviceId());
@@ -116,18 +130,10 @@ public void onWebSocketConnect(final WebSocketSessionContext context) {
116130
return;
117131
}
118132

119-
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
120-
messagesManager,
121-
messageMetrics,
122-
pushNotificationManager,
123-
pushNotificationScheduler,
124-
maybeAuthenticatedAccount.get(),
125-
maybeAuthenticatedDevice.get(),
126-
context.getClient(),
127-
messageDeliveryScheduler,
128-
clientReleaseManager,
129-
messageDeliveryLoopMonitor,
130-
experimentEnrollmentManager);
133+
final WebSocketConnection connection =
134+
webSocketConnectionBuilder.buildWebSocketConnection(maybeAuthenticatedAccount.get(),
135+
maybeAuthenticatedDevice.get(),
136+
context.getClient());
131137

132138
disconnectionRequestManager.addListener(maybeAuthenticatedAccount.get().getIdentifier(IdentityType.ACI),
133139
maybeAuthenticatedDevice.get().getId(),
@@ -138,27 +144,11 @@ public void onWebSocketConnect(final WebSocketSessionContext context) {
138144
maybeAuthenticatedDevice.get().getId(),
139145
connection);
140146

141-
// We begin the shutdown process by removing this client's "presence," which means it will again begin to
142-
// receive push notifications for inbound messages. We should do this first because, at this point, the
143-
// connection has already closed and attempts to actually deliver a message via the connection will not succeed.
144-
// It's preferable to start sending push notifications as soon as possible.
145-
redisMessageAvailabilityManager.handleClientDisconnected(auth.accountIdentifier(), auth.deviceId());
146-
147-
// Finally, stop trying to deliver messages and send a push notification if the connection is aware of any
148-
// undelivered messages.
149147
connection.stop();
150148
});
151149

152150
try {
153-
// Once we "start" the websocket connection, we'll cancel any scheduled "you may have new messages" push
154-
// notifications and begin delivering any stored messages for the connected device. We have not yet declared the
155-
// client as "present" yet. If a message arrives at this point, we will update the message availability state
156-
// correctly, but we may also send a spurious push notification.
157151
connection.start();
158-
159-
// Finally, we register this client's presence, which suppresses push notifications. We do this last because
160-
// receiving extra push notifications is generally preferable to missing out on a push notification.
161-
redisMessageAvailabilityManager.handleClientConnected(auth.accountIdentifier(), auth.deviceId(), connection);
162152
} catch (final Exception e) {
163153
log.warn("Failed to initialize websocket", e);
164154
context.getClient().close(1011, "Unexpected error initializing connection");

0 commit comments

Comments
 (0)