21
21
import io .dropwizard .core .setup .Bootstrap ;
22
22
import io .dropwizard .core .setup .Environment ;
23
23
import io .dropwizard .jetty .HttpsConnectorFactory ;
24
- import io .dropwizard .lifecycle .setup .LifecycleEnvironment ;
25
24
import io .grpc .ServerBuilder ;
26
25
import io .lettuce .core .metrics .MicrometerCommandLatencyRecorder ;
27
26
import io .lettuce .core .metrics .MicrometerOptions ;
@@ -365,8 +364,8 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
365
364
366
365
UncaughtExceptionHandler .register ();
367
366
368
- ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder . of ( environment , "dynamicConfiguration" )
369
- .threads (1 ).build ();
367
+ ScheduledExecutorService dynamicConfigurationExecutor = environment . lifecycle ( )
368
+ .scheduledExecutorService ( name ( getClass (), "dynamicConfiguration-%d" )). threads (1 ).build ();
370
369
371
370
DynamicConfigurationManager <DynamicConfiguration > dynamicConfigurationManager =
372
371
new DynamicConfigurationManager <>(
@@ -398,7 +397,8 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
398
397
399
398
environment .lifecycle ().manage (new ManagedAwsCrt ());
400
399
401
- final ExecutorService awsSdkMetricsExecutor = virtualExecutorService (environment , "awsSdkMetrics" );
400
+ final ExecutorService awsSdkMetricsExecutor = environment .lifecycle ()
401
+ .virtualExecutorService (name (getClass (), "awsSdkMetrics-%d" ));
402
402
403
403
final DynamoDbAsyncClient dynamoDbAsyncClient = config .getDynamoDbClientConfiguration ()
404
404
.buildAsyncClient (awsCredentialsProvider , new MicrometerAwsSdkMetricPublisher (awsSdkMetricsExecutor , "dynamoDbAsync" ));
@@ -415,7 +415,8 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
415
415
BlockingQueue <Runnable > messageDeletionQueue = new LinkedBlockingQueue <>();
416
416
Metrics .gaugeCollectionSize (name (getClass (), "messageDeletionQueueSize" ), Collections .emptyList (),
417
417
messageDeletionQueue );
418
- ExecutorService messageDeletionAsyncExecutor = ExecutorServiceBuilder .of (environment , "messageDeletionAsyncExecutor" )
418
+ ExecutorService messageDeletionAsyncExecutor = environment .lifecycle ()
419
+ .executorService (name (getClass (), "messageDeletionAsyncExecutor-%d" ))
419
420
.minThreads (2 )
420
421
.maxThreads (2 )
421
422
.allowCoreThreadTimeOut (true )
@@ -502,79 +503,98 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
502
503
Metrics .gaugeCollectionSize (MetricsUtil .name (getClass (), "messageDeliveryQueue" ), Collections .emptyList (),
503
504
messageDeliveryQueue );
504
505
505
- ScheduledExecutorService recurringJobExecutor = ScheduledExecutorServiceBuilder .of (environment , "recurringJob" ).threads (6 ).build ();
506
- ScheduledExecutorService websocketScheduledExecutor = ScheduledExecutorServiceBuilder .of (environment , "websocket" ).threads (8 ).build ();
507
- ExecutorService apnSenderExecutor = ExecutorServiceBuilder .of (environment , "apnSender" )
506
+ ScheduledExecutorService recurringJobExecutor = environment .lifecycle ()
507
+ .scheduledExecutorService (name (getClass (), "recurringJob-%d" )).threads (6 ).build ();
508
+ ScheduledExecutorService websocketScheduledExecutor = environment .lifecycle ()
509
+ .scheduledExecutorService (name (getClass (), "websocket-%d" )).threads (8 ).build ();
510
+ ExecutorService apnSenderExecutor = environment .lifecycle ().executorService (name (getClass (), "apnSender-%d" ))
508
511
.maxThreads (1 ).minThreads (1 ).build ();
509
- ExecutorService fcmSenderExecutor = ExecutorServiceBuilder . of ( environment , "fcmSender" )
512
+ ExecutorService fcmSenderExecutor = environment . lifecycle (). executorService ( name ( getClass () , "fcmSender-%d" ) )
510
513
.maxThreads (32 ).minThreads (32 ).workQueue (fcmSenderQueue ).build ();
511
- ExecutorService secureValueRecoveryServiceExecutor = ExecutorServiceBuilder . of ( environment , "secureValueRecoveryService" )
512
- .maxThreads (1 ).minThreads (1 ).build ();
513
- ExecutorService storageServiceExecutor = ExecutorServiceBuilder . of ( environment , "storageService" )
514
- .maxThreads (1 ).minThreads (1 ).build ();
515
- ExecutorService virtualThreadEventLoggerExecutor = ExecutorServiceBuilder . of ( environment , "virtualThreadEventLogger" )
516
- .minThreads (1 ).maxThreads (1 ).build ();
517
- ExecutorService asyncOperationQueueingExecutor = ExecutorServiceBuilder . of ( environment , "asyncOperationQueueing" )
518
- .minThreads (1 ).maxThreads (1 ).build ();
519
- ScheduledExecutorService secureValueRecoveryServiceRetryExecutor =
520
- ScheduledExecutorServiceBuilder . of ( environment , "secureValueRecoveryServiceRetry" ).threads (1 ).build ();
521
- ScheduledExecutorService storageServiceRetryExecutor =
522
- ScheduledExecutorServiceBuilder . of ( environment , "storageServiceRetry" ).threads (1 ).build ();
523
- ScheduledExecutorService remoteStorageRetryExecutor =
524
- ScheduledExecutorServiceBuilder . of ( environment , "remoteStorageRetry" ).threads (1 ).build ();
525
- ScheduledExecutorService registrationIdentityTokenRefreshExecutor =
526
- ScheduledExecutorServiceBuilder . of ( environment , "registrationIdentityTokenRefresh" ).threads (1 ).build ();
514
+ ExecutorService secureValueRecoveryServiceExecutor = environment . lifecycle ( )
515
+ .executorService ( name ( getClass (), "secureValueRecoveryService-%d" )). maxThreads (1 ).minThreads (1 ).build ();
516
+ ExecutorService storageServiceExecutor = environment . lifecycle ( )
517
+ .executorService ( name ( getClass (), "storageService-%d" )). maxThreads (1 ).minThreads (1 ).build ();
518
+ ExecutorService virtualThreadEventLoggerExecutor = environment . lifecycle ( )
519
+ .executorService ( name ( getClass (), "virtualThreadEventLogger-%d" )). minThreads (1 ).maxThreads (1 ).build ();
520
+ ExecutorService asyncOperationQueueingExecutor = environment . lifecycle ( )
521
+ .executorService ( name ( getClass (), "asyncOperationQueueing-%d" )). minThreads (1 ).maxThreads (1 ).build ();
522
+ ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment . lifecycle ()
523
+ . scheduledExecutorService ( name ( getClass () , "secureValueRecoveryServiceRetry-%d" ) ).threads (1 ).build ();
524
+ ScheduledExecutorService storageServiceRetryExecutor = environment . lifecycle ()
525
+ . scheduledExecutorService ( name ( getClass () , "storageServiceRetry-%d" ) ).threads (1 ).build ();
526
+ ScheduledExecutorService remoteStorageRetryExecutor = environment . lifecycle ()
527
+ . scheduledExecutorService ( name ( getClass () , "remoteStorageRetry-%d" ) ).threads (1 ).build ();
528
+ ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment . lifecycle ()
529
+ . scheduledExecutorService ( name ( getClass () , "registrationIdentityTokenRefresh-%d" ) ).threads (1 ).build ();
527
530
528
531
Scheduler messageDeliveryScheduler = Schedulers .fromExecutorService (
529
- ExecutorServiceBuilder .of (environment , "messageDelivery" )
530
- .minThreads (20 )
531
- .maxThreads (20 )
532
- .workQueue (messageDeliveryQueue )
533
- .build (),
532
+ ExecutorServiceMetrics .monitor (Metrics .globalRegistry ,
533
+ environment .lifecycle ().executorService (name (getClass (), "messageDelivery-%d" ))
534
+ .minThreads (20 )
535
+ .maxThreads (20 )
536
+ .workQueue (messageDeliveryQueue )
537
+ .build (),
538
+ MetricsUtil .name (getClass (), "messageDeliveryExecutor" ), MetricsUtil .PREFIX ),
534
539
"messageDelivery" );
535
540
536
541
// TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
537
- ExecutorService batchIdentityCheckExecutor = ExecutorServiceBuilder .of (environment , "batchIdentityCheck" ).minThreads (32 ).maxThreads (32 ).build ();
538
- ExecutorService subscriptionProcessorExecutor = ExecutorServiceBuilder .of (environment , "subscriptionProcessor" )
542
+ ExecutorService batchIdentityCheckExecutor = environment .lifecycle ().executorService (name (getClass (), "batchIdentityCheck-%d" )).minThreads (32 ).maxThreads (32 ).build ();
543
+ ExecutorService subscriptionProcessorExecutor = environment .lifecycle ()
544
+ .executorService (name (getClass (), "subscriptionProcessor-%d" ))
539
545
.maxThreads (availableProcessors ) // mostly this is IO bound so tying to number of processors is tenuous at best
540
546
.minThreads (availableProcessors ) // mostly this is IO bound so tying to number of processors is tenuous at best
541
547
.allowCoreThreadTimeOut (true ).
542
548
build ();
543
- ExecutorService receiptSenderExecutor = ExecutorServiceBuilder .of (environment , "receiptSender" )
549
+ ExecutorService receiptSenderExecutor = environment .lifecycle ()
550
+ .executorService (name (getClass (), "receiptSender-%d" ))
544
551
.maxThreads (2 )
545
552
.minThreads (2 )
546
553
.workQueue (receiptSenderQueue )
547
554
.rejectedExecutionHandler (new ThreadPoolExecutor .CallerRunsPolicy ())
548
555
.build ();
549
- ExecutorService registrationCallbackExecutor = ExecutorServiceBuilder .of (environment , "registration" )
556
+ ExecutorService registrationCallbackExecutor = environment .lifecycle ()
557
+ .executorService (name (getClass (), "registration-%d" ))
550
558
.maxThreads (2 )
551
559
.minThreads (2 )
552
560
.build ();
553
- ExecutorService accountLockExecutor = ExecutorServiceBuilder .of (environment , "accountLock" )
561
+ ExecutorService accountLockExecutor = environment .lifecycle ()
562
+ .executorService (name (getClass (), "accountLock-%d" ))
554
563
.minThreads (8 )
555
564
.maxThreads (8 )
556
565
.build ();
557
566
// unbounded executor (same as cachedThreadPool)
558
- ExecutorService remoteStorageHttpExecutor = ExecutorServiceBuilder .of (environment , "remoteStorage" )
567
+ ExecutorService remoteStorageHttpExecutor = environment .lifecycle ()
568
+ .executorService (name (getClass (), "remoteStorage-%d" ))
559
569
.minThreads (0 )
560
570
.maxThreads (Integer .MAX_VALUE )
561
571
.workQueue (new SynchronousQueue <>())
562
572
.keepAliveTime (io .dropwizard .util .Duration .seconds (60L ))
563
573
.build ();
564
- ExecutorService cloudflareTurnHttpExecutor = ExecutorServiceBuilder .of (environment , "cloudflareTurn" )
574
+ ExecutorService cloudflareTurnHttpExecutor = environment .lifecycle ()
575
+ .executorService (name (getClass (), "cloudflareTurn-%d" ))
565
576
.maxThreads (2 )
566
577
.minThreads (2 )
567
578
.build ();
568
- ExecutorService googlePlayBillingExecutor = virtualExecutorService (environment , "googlePlayBilling" );
569
- ExecutorService appleAppStoreExecutor = virtualExecutorService (environment , "appleAppStore" );
570
- ExecutorService clientEventExecutor = virtualExecutorService (environment , "clientEvent" );
571
- ExecutorService disconnectionRequestListenerExecutor = virtualExecutorService (environment , "disconnectionRequest" );
572
-
573
- ScheduledExecutorService appleAppStoreRetryExecutor = ScheduledExecutorServiceBuilder .of (environment , "appleAppStoreRetry" ).threads (1 ).build ();
574
- ScheduledExecutorService subscriptionProcessorRetryExecutor = ScheduledExecutorServiceBuilder .of (environment , "subscriptionProcessorRetry" ).threads (1 ).build ();
575
- ScheduledExecutorService cloudflareTurnRetryExecutor = ScheduledExecutorServiceBuilder .of (environment , "cloudflareTurnRetry" ).threads (1 ).build ();
576
- ScheduledExecutorService messagePollExecutor = ScheduledExecutorServiceBuilder .of (environment , "messagePollExecutor" ).threads (1 ).build ();
577
- ScheduledExecutorService provisioningWebsocketTimeoutExecutor = ScheduledExecutorServiceBuilder .of (environment , "provisioningWebsocketTimeout" ).threads (1 ).build ();
579
+ ExecutorService googlePlayBillingExecutor = environment .lifecycle ()
580
+ .virtualExecutorService (name (getClass (), "googlePlayBilling-%d" ));
581
+ ExecutorService appleAppStoreExecutor = environment .lifecycle ()
582
+ .virtualExecutorService (name (getClass (), "appleAppStore-%d" ));
583
+ ExecutorService clientEventExecutor = environment .lifecycle ()
584
+ .virtualExecutorService (name (getClass (), "clientEvent-%d" ));
585
+ ExecutorService disconnectionRequestListenerExecutor = environment .lifecycle ()
586
+ .virtualExecutorService (name (getClass (), "disconnectionRequest-%d" ));
587
+
588
+ ScheduledExecutorService appleAppStoreRetryExecutor = environment .lifecycle ()
589
+ .scheduledExecutorService (name (getClass (), "appleAppStoreRetry-%d" )).threads (1 ).build ();
590
+ ScheduledExecutorService subscriptionProcessorRetryExecutor = environment .lifecycle ()
591
+ .scheduledExecutorService (name (getClass (), "subscriptionProcessorRetry-%d" )).threads (1 ).build ();
592
+ ScheduledExecutorService cloudflareTurnRetryExecutor = environment .lifecycle ()
593
+ .scheduledExecutorService (name (getClass (), "cloudflareTurnRetry-%d" )).threads (1 ).build ();
594
+ ScheduledExecutorService messagePollExecutor = environment .lifecycle ()
595
+ .scheduledExecutorService (name (getClass (), "messagePollExecutor-%d" )).threads (1 ).build ();
596
+ ScheduledExecutorService provisioningWebsocketTimeoutExecutor = environment .lifecycle ()
597
+ .scheduledExecutorService (name (getClass (), "provisioningWebsocketTimeout-%d" )).threads (1 ).build ();
578
598
579
599
final ManagedNioEventLoopGroup dnsResolutionEventLoopGroup = new ManagedNioEventLoopGroup ();
580
600
final DnsNameResolver cloudflareDnsResolver = new DnsNameResolverBuilder (dnsResolutionEventLoopGroup .next ())
@@ -902,7 +922,8 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
902
922
noiseWebSocketTlsPrivateKey = null ;
903
923
}
904
924
905
- final ExecutorService noiseWebSocketDelegatedTaskExecutor = ExecutorServiceBuilder .of (environment , "noiseWebsocketDelegatedTask" )
925
+ final ExecutorService noiseWebSocketDelegatedTaskExecutor = environment .lifecycle ()
926
+ .executorService (name (getClass (), "noiseWebsocketDelegatedTask-%d" ))
906
927
.minThreads (8 )
907
928
.maxThreads (8 )
908
929
.allowCoreThreadTimeOut (false )
@@ -1206,47 +1227,6 @@ private void registerExceptionMappers(Environment environment,
1206
1227
});
1207
1228
}
1208
1229
1209
- private static class ExecutorServiceBuilder extends io .dropwizard .lifecycle .setup .ExecutorServiceBuilder {
1210
- private final String baseName ;
1211
-
1212
- public ExecutorServiceBuilder (final LifecycleEnvironment environment , final String baseName ) {
1213
- super (environment , name (WhisperServerService .class , baseName ) + "-%d" );
1214
- this .baseName = baseName ;
1215
- }
1216
-
1217
- @ Override
1218
- public ExecutorService build () {
1219
- return ExecutorServiceMetrics .monitor (Metrics .globalRegistry , super .build (), baseName , MetricsUtil .PREFIX );
1220
- }
1221
-
1222
- public static ExecutorServiceBuilder of (final Environment environment , final String name ) {
1223
- return new ExecutorServiceBuilder (environment .lifecycle (), name );
1224
- }
1225
- }
1226
-
1227
- private static class ScheduledExecutorServiceBuilder extends io .dropwizard .lifecycle .setup .ScheduledExecutorServiceBuilder {
1228
- private final String baseName ;
1229
-
1230
- public ScheduledExecutorServiceBuilder (final LifecycleEnvironment environment , final String baseName ) {
1231
- super (environment , name (WhisperServerService .class , baseName ) + "-%d" , false );
1232
- this .baseName = baseName ;
1233
- }
1234
-
1235
- @ Override
1236
- public ScheduledExecutorService build () {
1237
- return ExecutorServiceMetrics .monitor (Metrics .globalRegistry , super .build (), baseName , MetricsUtil .PREFIX );
1238
- }
1239
-
1240
- public static ScheduledExecutorServiceBuilder of (final Environment environment , final String name ) {
1241
- return new ScheduledExecutorServiceBuilder (environment .lifecycle (), name );
1242
- }
1243
- }
1244
-
1245
- private ExecutorService virtualExecutorService (final Environment environment , final String name ) {
1246
- return ExecutorServiceMetrics .monitor (
1247
- Metrics .globalRegistry , environment .lifecycle ().virtualExecutorService (name (getClass (), name ) + "-%d" ), name , MetricsUtil .PREFIX );
1248
- }
1249
-
1250
1230
public static void main (String [] args ) throws Exception {
1251
1231
new WhisperServerService ().run (args );
1252
1232
}
0 commit comments