Skip to content

Commit b76eaa1

Browse files
authored
Add a bounded virtual executor service
1 parent c883cd8 commit b76eaa1

10 files changed

+411
-20
lines changed

service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public class WhisperServerConfiguration extends Configuration {
316316
@Valid
317317
@NotNull
318318
@JsonProperty
319-
private VirtualThreadConfiguration virtualThread = new VirtualThreadConfiguration(Duration.ofMillis(1));
319+
private VirtualThreadConfiguration virtualThread = new VirtualThreadConfiguration();
320320

321321
@Valid
322322
@NotNull

service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
import org.whispersystems.textsecuregcm.subscriptions.StripeManager;
259259
import org.whispersystems.textsecuregcm.util.BufferingInterceptor;
260260
import org.whispersystems.textsecuregcm.util.ManagedAwsCrt;
261+
import org.whispersystems.textsecuregcm.util.ManagedExecutors;
261262
import org.whispersystems.textsecuregcm.util.SystemMapper;
262263
import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier;
263264
import org.whispersystems.textsecuregcm.util.VirtualExecutorServiceProvider;
@@ -395,8 +396,10 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
395396

396397
environment.lifecycle().manage(new ManagedAwsCrt());
397398

398-
final ExecutorService awsSdkMetricsExecutor = environment.lifecycle()
399-
.virtualExecutorService(name(getClass(), "awsSdkMetrics-%d"));
399+
final ExecutorService awsSdkMetricsExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
400+
"awsSdkMetrics",
401+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
402+
environment);
400403

401404
final DynamoDbAsyncClient dynamoDbAsyncClient = config.getDynamoDbClientConfiguration()
402405
.buildAsyncClient(awsCredentialsProvider, new MicrometerAwsSdkMetricPublisher(awsSdkMetricsExecutor, "dynamoDbAsync"));
@@ -561,14 +564,23 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
561564
.maxThreads(2)
562565
.minThreads(2)
563566
.build();
564-
ExecutorService googlePlayBillingExecutor = environment.lifecycle()
565-
.virtualExecutorService(name(getClass(), "googlePlayBilling-%d"));
566-
ExecutorService appleAppStoreExecutor = environment.lifecycle()
567-
.virtualExecutorService(name(getClass(), "appleAppStore-%d"));
568-
ExecutorService clientEventExecutor = environment.lifecycle()
569-
.virtualExecutorService(name(getClass(), "clientEvent-%d"));
570-
ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle()
571-
.virtualExecutorService(name(getClass(), "disconnectionRequest-%d"));
567+
568+
ExecutorService googlePlayBillingExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
569+
"googlePlayBilling",
570+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
571+
environment);
572+
ExecutorService appleAppStoreExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
573+
"appleAppStore",
574+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
575+
environment);
576+
ExecutorService clientEventExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
577+
"clientEvent",
578+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
579+
environment);
580+
ExecutorService disconnectionRequestListenerExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
581+
"disconnectionRequest",
582+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
583+
environment);
572584

573585
ScheduledExecutorService appleAppStoreRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "appleAppStoreRetry").threads(1).build();
574586
ScheduledExecutorService subscriptionProcessorRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "subscriptionProcessorRetry").threads(1).build();
@@ -976,7 +988,9 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
976988
environment.jersey().register(new BufferingInterceptor());
977989
environment.jersey().register(new RestDeprecationFilter(dynamicConfigurationManager, experimentEnrollmentManager));
978990

979-
environment.jersey().register(new VirtualExecutorServiceProvider("managed-async-virtual-thread-"));
991+
environment.jersey().register(new VirtualExecutorServiceProvider(
992+
"managed-async-virtual-thread",
993+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor()));
980994
environment.jersey().register(new RateLimitByIpFilter(rateLimiters));
981995
environment.jersey().register(new RequestStatisticsFilter(TrafficSource.HTTP));
982996
environment.jersey().register(MultiRecipientMessageProvider.class);
@@ -987,7 +1001,9 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
9871001
///
9881002
WebSocketEnvironment<AuthenticatedDevice> webSocketEnvironment = new WebSocketEnvironment<>(environment,
9891003
config.getWebSocketConfiguration(), Duration.ofMillis(90000));
990-
webSocketEnvironment.jersey().register(new VirtualExecutorServiceProvider("managed-async-websocket-virtual-thread-"));
1004+
webSocketEnvironment.jersey().register(new VirtualExecutorServiceProvider(
1005+
"managed-async-websocket-virtual-thread",
1006+
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor()));
9911007
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
9921008
webSocketEnvironment.setAuthenticatedWebSocketUpgradeFilter(new IdlePrimaryDeviceAuthenticatedWebSocketUpgradeFilter(
9931009
config.idlePrimaryDeviceReminderConfiguration().minIdleDuration(), Clock.systemUTC()));

service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,20 @@
66

77
import java.time.Duration;
88

9-
public record VirtualThreadConfiguration(Duration pinEventThreshold) {}
9+
public record VirtualThreadConfiguration(
10+
Duration pinEventThreshold,
11+
Integer maxConcurrentThreadsPerExecutor) {
12+
13+
public VirtualThreadConfiguration() {
14+
this(null, null);
15+
}
16+
17+
public VirtualThreadConfiguration {
18+
if (maxConcurrentThreadsPerExecutor == null) {
19+
maxConcurrentThreadsPerExecutor = 1_000_000;
20+
}
21+
if (pinEventThreshold == null) {
22+
pinEventThreshold = Duration.ofMillis(1);
23+
}
24+
}
25+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2025 Signal Messenger, LLC
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
package org.whispersystems.textsecuregcm.util;
6+
7+
import com.google.common.annotations.VisibleForTesting;
8+
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.Metrics;
10+
import io.micrometer.core.instrument.Tags;
11+
import java.util.concurrent.ThreadFactory;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
16+
17+
/**
18+
* A thread factory that creates virtual threads but limits the total number of virtual threads created.
19+
*/
20+
public class BoundedVirtualThreadFactory implements ThreadFactory {
21+
22+
private static final Logger logger = LoggerFactory.getLogger(BoundedVirtualThreadFactory.class);
23+
24+
private final AtomicInteger runningThreads = new AtomicInteger();
25+
private final ThreadFactory delegate;
26+
private final int maxConcurrentThreads;
27+
28+
private final Counter created;
29+
private final Counter completed;
30+
31+
public BoundedVirtualThreadFactory(final String threadPoolName, final int maxConcurrentThreads) {
32+
this.maxConcurrentThreads = maxConcurrentThreads;
33+
34+
final Tags tags = Tags.of("pool", threadPoolName);
35+
Metrics.gauge(
36+
MetricsUtil.name(BoundedVirtualThreadFactory.class, "active"),
37+
tags, runningThreads, (rt) -> (double) rt.get());
38+
this.created = Metrics.counter(MetricsUtil.name(BoundedVirtualThreadFactory.class, "created"), tags);
39+
this.completed = Metrics.counter(MetricsUtil.name(BoundedVirtualThreadFactory.class, "completed"), tags);
40+
41+
// The virtual thread factory will initialize thread names by appending the thread index to the provided prefix
42+
this.delegate = Thread.ofVirtual().name(threadPoolName + "-", 0).factory();
43+
44+
}
45+
46+
@Override
47+
public Thread newThread(final Runnable r) {
48+
if (!tryAcquire()) {
49+
return null;
50+
}
51+
Thread thread = null;
52+
try {
53+
final Runnable wrapped = () -> {
54+
try {
55+
r.run();
56+
} finally {
57+
release();
58+
}
59+
};
60+
thread = delegate.newThread(wrapped);
61+
} finally {
62+
if (thread == null) {
63+
release();
64+
}
65+
}
66+
return thread;
67+
}
68+
69+
70+
@VisibleForTesting
71+
int getRunningThreads() {
72+
return runningThreads.get();
73+
}
74+
75+
private boolean tryAcquire() {
76+
int old;
77+
do {
78+
old = runningThreads.get();
79+
if (old >= maxConcurrentThreads) {
80+
return false;
81+
}
82+
} while (!runningThreads.compareAndSet(old, old + 1));
83+
created.increment();
84+
return true;
85+
}
86+
87+
private void release() {
88+
int updated = runningThreads.decrementAndGet();
89+
if (updated < 0) {
90+
logger.error("Released a thread and count was {}, which should never happen", updated);
91+
}
92+
completed.increment();
93+
}
94+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2025 Signal Messenger, LLC
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
package org.whispersystems.textsecuregcm.util;
6+
7+
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
8+
9+
import io.dropwizard.core.setup.Environment;
10+
import io.dropwizard.lifecycle.ExecutorServiceManager;
11+
import io.dropwizard.util.Duration;
12+
import io.micrometer.core.instrument.MeterRegistry;
13+
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
17+
/**
18+
* Build Executor Services managed by dropwizard, supplementing executors provided by
19+
* {@link io.dropwizard.lifecycle.setup.LifecycleEnvironment#executorService}
20+
*/
21+
public class ManagedExecutors {
22+
23+
private static final Duration SHUTDOWN_DURATION = Duration.seconds(5);
24+
25+
private ManagedExecutors() {
26+
}
27+
28+
public static ExecutorService newVirtualThreadPerTaskExecutor(
29+
final String threadNamePrefix,
30+
final int maxConcurrentThreads,
31+
final Environment environment) {
32+
33+
final BoundedVirtualThreadFactory threadFactory =
34+
new BoundedVirtualThreadFactory(threadNamePrefix, maxConcurrentThreads);
35+
final ExecutorService virtualThreadExecutor = Executors.newThreadPerTaskExecutor(threadFactory);
36+
environment.lifecycle()
37+
.manage(new ExecutorServiceManager(virtualThreadExecutor, SHUTDOWN_DURATION, threadNamePrefix));
38+
return virtualThreadExecutor;
39+
}
40+
}

service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualExecutorServiceProvider.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,32 @@
1515

1616
@ManagedAsyncExecutor
1717
public class VirtualExecutorServiceProvider implements ExecutorServiceProvider {
18+
1819
private static final Logger logger = LoggerFactory.getLogger(VirtualExecutorServiceProvider.class);
1920

2021

2122
/**
2223
* Default thread pool executor termination timeout in milliseconds.
2324
*/
2425
public static final int TERMINATION_TIMEOUT = 5000;
26+
2527
private final String virtualThreadNamePrefix;
28+
private final int maxConcurrentThreads;
2629

27-
public VirtualExecutorServiceProvider(final String virtualThreadNamePrefix) {
30+
public VirtualExecutorServiceProvider(
31+
final String virtualThreadNamePrefix,
32+
final int maxConcurrentThreads) {
2833
this.virtualThreadNamePrefix = virtualThreadNamePrefix;
34+
this.maxConcurrentThreads = maxConcurrentThreads;
2935
}
3036

3137

3238
@Override
3339
public ExecutorService getExecutorService() {
3440
logger.info("Creating executor service with virtual thread per task");
35-
return Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(virtualThreadNamePrefix, 0).factory());
41+
final ExecutorService executor = Executors.newThreadPerTaskExecutor(
42+
new BoundedVirtualThreadFactory(virtualThreadNamePrefix, maxConcurrentThreads));
43+
return executor;
3644
}
3745

3846
@Override

service/src/test/java/org/whispersystems/textsecuregcm/BufferingInterceptorIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void run(final Configuration configuration, final Environment environment
3434
final TestController testController = new TestController();
3535
environment.jersey().register(testController);
3636
environment.jersey().register(new BufferingInterceptor());
37-
environment.jersey().register(new VirtualExecutorServiceProvider("virtual-thread-"));
37+
environment.jersey().register(new VirtualExecutorServiceProvider("virtual-thread-", 10));
3838
JettyWebSocketServletContainerInitializer.configure(environment.getApplicationContext(), null);
3939
}
4040
}

service/src/test/java/org/whispersystems/textsecuregcm/WhisperServerServiceTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
import java.net.URI;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.concurrent.ArrayBlockingQueue;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.Semaphore;
25+
import java.util.concurrent.ThreadFactory;
26+
import java.util.concurrent.atomic.AtomicInteger;
2127
import org.eclipse.jetty.util.component.LifeCycle;
2228
import org.eclipse.jetty.websocket.api.Session;
2329
import org.eclipse.jetty.websocket.api.StatusCode;
2430
import org.eclipse.jetty.websocket.client.WebSocketClient;
31+
import org.jetbrains.annotations.NotNull;
2532
import org.junit.jupiter.api.AfterAll;
2633
import org.junit.jupiter.api.BeforeAll;
2734
import org.junit.jupiter.api.Test;
@@ -34,6 +41,7 @@
3441
import org.whispersystems.textsecuregcm.tests.util.TestWebsocketListener;
3542
import org.whispersystems.textsecuregcm.util.AttributeValues;
3643
import org.whispersystems.textsecuregcm.util.HeaderUtils;
44+
import org.whispersystems.textsecuregcm.util.Util;
3745
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
3846
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3947
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

0 commit comments

Comments
 (0)