1
1
package io .scalecube .cluster ;
2
2
3
+ import static reactor .core .publisher .Sinks .EmitResult .FAIL_NON_SERIALIZED ;
4
+
3
5
import io .scalecube .cluster .fdetector .FailureDetectorConfig ;
4
6
import io .scalecube .cluster .fdetector .FailureDetectorImpl ;
5
7
import io .scalecube .cluster .gossip .GossipConfig ;
43
45
import reactor .core .Disposable ;
44
46
import reactor .core .Disposables ;
45
47
import reactor .core .Exceptions ;
46
- import reactor .core .publisher .DirectProcessor ;
47
48
import reactor .core .publisher .Flux ;
48
- import reactor .core .publisher .FluxSink ;
49
49
import reactor .core .publisher .Mono ;
50
- import reactor .core .publisher .MonoProcessor ;
50
+ import reactor .core .publisher .SignalType ;
51
+ import reactor .core .publisher .Sinks ;
52
+ import reactor .core .publisher .Sinks .EmitFailureHandler ;
53
+ import reactor .core .publisher .Sinks .EmitResult ;
51
54
import reactor .core .scheduler .Scheduler ;
52
55
import reactor .core .scheduler .Schedulers ;
53
56
@@ -79,17 +82,17 @@ public final class ClusterImpl implements Cluster {
79
82
cluster -> new ClusterMessageHandler () {};
80
83
81
84
// Subject
82
- private final DirectProcessor <MembershipEvent > membershipEvents = DirectProcessor . create ();
83
- private final FluxSink < MembershipEvent > membershipSink = membershipEvents . sink ();
85
+ private final Sinks . Many <MembershipEvent > membershipSink =
86
+ Sinks . many (). multicast (). directBestEffort ();
84
87
85
88
// Disposables
86
89
private final Disposable .Composite actionsDisposables = Disposables .composite ();
87
90
88
91
// Lifecycle
89
- private final MonoProcessor <Void > start = MonoProcessor . create ();
90
- private final MonoProcessor <Void > onStart = MonoProcessor . create ();
91
- private final MonoProcessor <Void > shutdown = MonoProcessor . create ();
92
- private final MonoProcessor <Void > onShutdown = MonoProcessor . create ();
92
+ private final Sinks . One <Void > start = Sinks . one ();
93
+ private final Sinks . One <Void > onStart = Sinks . one ();
94
+ private final Sinks . One <Void > shutdown = Sinks . one ();
95
+ private final Sinks . One <Void > onShutdown = Sinks . one ();
93
96
94
97
// Cluster components
95
98
private Transport transport ;
@@ -119,14 +122,16 @@ private ClusterImpl(ClusterImpl that) {
119
122
120
123
private void initLifecycle () {
121
124
start
125
+ .asMono ()
122
126
.then (doStart ())
123
- .doOnSuccess (avoid -> onStart .onComplete ( ))
124
- .doOnError (onStart :: onError )
127
+ .doOnSuccess (avoid -> onStart .emitEmpty ( RetryEmitFailureHandler . INSTANCE ))
128
+ .doOnError (th -> onStart . emitError ( th , RetryEmitFailureHandler . INSTANCE ) )
125
129
.subscribe (null , th -> LOGGER .error ("[{}][doStart] Exception occurred:" , localMember , th ));
126
130
127
131
shutdown
132
+ .asMono ()
128
133
.then (doShutdown ())
129
- .doFinally (s -> onShutdown .onComplete ( ))
134
+ .doFinally (s -> onShutdown .emitEmpty ( RetryEmitFailureHandler . INSTANCE ))
130
135
.subscribe (
131
136
null ,
132
137
th ->
@@ -232,8 +237,8 @@ public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
232
237
public Mono <Cluster > start () {
233
238
return Mono .defer (
234
239
() -> {
235
- start .onComplete ( );
236
- return onStart .thenReturn (this );
240
+ start .emitEmpty ( RetryEmitFailureHandler . INSTANCE );
241
+ return onStart .asMono (). thenReturn (this );
237
242
});
238
243
}
239
244
@@ -248,9 +253,9 @@ private Mono<Cluster> doStart() {
248
253
private Mono <Cluster > doStart0 () {
249
254
return TransportImpl .bind (config .transportConfig ())
250
255
.flatMap (
251
- transport1 -> {
252
- localMember = createLocalMember (transport1 .address ());
253
- transport = new SenderAwareTransport (transport1 , localMember .address ());
256
+ boundTransport -> {
257
+ localMember = createLocalMember (boundTransport .address ());
258
+ transport = new SenderAwareTransport (boundTransport , localMember .address ());
254
259
255
260
cidGenerator = new CorrelationIdGenerator (localMember .id ());
256
261
scheduler = Schedulers .newSingle ("sc-cluster-" + localMember .address ().port (), true );
@@ -260,7 +265,7 @@ private Mono<Cluster> doStart0() {
260
265
new FailureDetectorImpl (
261
266
localMember ,
262
267
transport ,
263
- membershipEvents .onBackpressureBuffer (),
268
+ membershipSink . asFlux () .onBackpressureBuffer (),
264
269
config .failureDetectorConfig (),
265
270
scheduler ,
266
271
cidGenerator );
@@ -269,7 +274,7 @@ private Mono<Cluster> doStart0() {
269
274
new GossipProtocolImpl (
270
275
localMember ,
271
276
transport ,
272
- membershipEvents .onBackpressureBuffer (),
277
+ membershipSink . asFlux () .onBackpressureBuffer (),
273
278
config .gossipConfig (),
274
279
scheduler );
275
280
@@ -294,8 +299,11 @@ private Mono<Cluster> doStart0() {
294
299
membership
295
300
.listen ()
296
301
/*.publishOn(scheduler)*/
297
- // Dont uncomment, already beign executed inside sc-cluster thread
298
- .subscribe (membershipSink ::next , this ::onError , membershipSink ::complete ));
302
+ // Dont uncomment, already beign executed inside scalecube-cluster thread
303
+ .subscribe (
304
+ event -> membershipSink .emitNext (event , RetryEmitFailureHandler .INSTANCE ),
305
+ ex -> LOGGER .error ("[{}][membership][error] cause:" , localMember , ex ),
306
+ () -> membershipSink .emitComplete (RetryEmitFailureHandler .INSTANCE )));
299
307
300
308
return Mono .fromRunnable (() -> failureDetector .start ())
301
309
.then (Mono .fromRunnable (() -> gossip .start ()))
@@ -317,30 +325,45 @@ private void validateConfiguration() {
317
325
if (metadataCodec == null ) {
318
326
Object metadata = config .metadata ();
319
327
if (metadata != null && !(metadata instanceof Serializable )) {
320
- throw new IllegalArgumentException (
321
- "Invalid cluster configuration: metadata must be Serializable" );
328
+ throw new IllegalArgumentException ("Invalid cluster config: metadata must be Serializable" );
322
329
}
323
330
}
324
331
332
+ Objects .requireNonNull (
333
+ config .transportConfig ().transportFactory (),
334
+ "Invalid cluster config: transportFactory must be specified" );
335
+
325
336
Objects .requireNonNull (
326
337
config .transportConfig ().messageCodec (),
327
- "Invalid cluster configuration: transport. messageCodec must be specified" );
338
+ "Invalid cluster config: messageCodec must be specified" );
328
339
329
340
Objects .requireNonNull (
330
341
config .membershipConfig ().namespace (),
331
- "Invalid cluster configuration : membership. namespace must be specified" );
342
+ "Invalid cluster config : membership namespace must be specified" );
332
343
333
344
if (!NAMESPACE_PATTERN .matcher (config .membershipConfig ().namespace ()).matches ()) {
334
345
throw new IllegalArgumentException (
335
- "Invalid cluster config: membership. namespace format is invalid" );
346
+ "Invalid cluster config: membership namespace format is invalid" );
336
347
}
337
348
}
338
349
339
350
private void startHandler () {
340
351
ClusterMessageHandler handler = this .handler .apply (this );
341
- actionsDisposables .add (listenMessage ().subscribe (handler ::onMessage , this ::onError ));
342
- actionsDisposables .add (listenMembership ().subscribe (handler ::onMembershipEvent , this ::onError ));
343
- actionsDisposables .add (listenGossip ().subscribe (handler ::onGossip , this ::onError ));
352
+ actionsDisposables .add (
353
+ listenMessage ()
354
+ .subscribe (
355
+ handler ::onMessage ,
356
+ ex -> LOGGER .error ("[{}][onMessage][error] cause:" , localMember , ex )));
357
+ actionsDisposables .add (
358
+ listenMembership ()
359
+ .subscribe (
360
+ handler ::onMembershipEvent ,
361
+ ex -> LOGGER .error ("[{}][onMembershipEvent][error] cause:" , localMember , ex )));
362
+ actionsDisposables .add (
363
+ listenGossip ()
364
+ .subscribe (
365
+ handler ::onGossip ,
366
+ ex -> LOGGER .error ("[{}][onGossip][error] cause:" , localMember , ex )));
344
367
}
345
368
346
369
private void startJmxMonitor () {
@@ -357,10 +380,6 @@ private void startJmxMonitor() {
357
380
}
358
381
}
359
382
360
- private void onError (Throwable th ) {
361
- LOGGER .error ("[{}] Received unexpected error:" , localMember , th );
362
- }
363
-
364
383
private Flux <Message > listenMessage () {
365
384
// filter out system messages
366
385
return transport .listen ().filter (msg -> !SYSTEM_MESSAGES .contains (msg .qualifier ()));
@@ -373,7 +392,7 @@ private Flux<Message> listenGossip() {
373
392
374
393
private Flux <MembershipEvent > listenMembership () {
375
394
// listen on live stream
376
- return membershipEvents .onBackpressureBuffer ();
395
+ return membershipSink . asFlux () .onBackpressureBuffer ();
377
396
}
378
397
379
398
/**
@@ -481,7 +500,7 @@ public <T> Mono<Void> updateMetadata(T metadata) {
481
500
482
501
@ Override
483
502
public void shutdown () {
484
- shutdown .onComplete ( );
503
+ shutdown .emitEmpty ( RetryEmitFailureHandler . INSTANCE );
485
504
}
486
505
487
506
private Mono <Void > doShutdown () {
@@ -524,12 +543,12 @@ private Mono<Void> dispose() {
524
543
525
544
@ Override
526
545
public Mono <Void > onShutdown () {
527
- return onShutdown ;
546
+ return onShutdown . asMono () ;
528
547
}
529
548
530
549
@ Override
531
550
public boolean isShutdown () {
532
- return onShutdown .isDisposed ();
551
+ return onShutdown .asMono (). toFuture (). isDone ();
533
552
}
534
553
535
554
private static class SenderAwareTransport implements Transport {
@@ -581,4 +600,14 @@ private Message enhanceWithSender(Message message) {
581
600
return Message .with (message ).sender (address ).build ();
582
601
}
583
602
}
603
+
604
+ private static class RetryEmitFailureHandler implements EmitFailureHandler {
605
+
606
+ private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler ();
607
+
608
+ @ Override
609
+ public boolean onEmitFailure (SignalType signalType , EmitResult emitResult ) {
610
+ return emitResult == FAIL_NON_SERIALIZED ;
611
+ }
612
+ }
584
613
}
0 commit comments