Skip to content

Commit 32aeafb

Browse files
authored
Added RetryEmitFailureHandler (#358)
* Added RetryEmitFailureHandler
1 parent 13c89ac commit 32aeafb

File tree

7 files changed

+122
-20
lines changed

7 files changed

+122
-20
lines changed

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.cluster;
22

3+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
4+
35
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
46
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
57
import io.scalecube.cluster.gossip.GossipConfig;
@@ -45,7 +47,10 @@
4547
import reactor.core.Exceptions;
4648
import reactor.core.publisher.Flux;
4749
import reactor.core.publisher.Mono;
50+
import reactor.core.publisher.SignalType;
4851
import reactor.core.publisher.Sinks;
52+
import reactor.core.publisher.Sinks.EmitFailureHandler;
53+
import reactor.core.publisher.Sinks.EmitResult;
4954
import reactor.core.scheduler.Scheduler;
5055
import reactor.core.scheduler.Schedulers;
5156

@@ -283,7 +288,8 @@ private Mono<Cluster> doStart0() {
283288
failureDetector,
284289
gossip,
285290
metadataStore,
286-
config,
291+
config.membershipConfig(),
292+
config.failureDetectorConfig(),
287293
scheduler,
288294
cidGenerator,
289295
monitorModelBuilder);
@@ -293,8 +299,11 @@ private Mono<Cluster> doStart0() {
293299
membership
294300
.listen()
295301
/*.publishOn(scheduler)*/
296-
// Dont uncomment, already beign executed inside sc-cluster thread
297-
.subscribe(sink::tryEmitNext, this::onError, sink::tryEmitComplete));
302+
// Dont uncomment, already beign executed inside scalecube-cluster thread
303+
.subscribe(
304+
event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE),
305+
th -> LOGGER.error("[{}][membership][error] cause:", localMember, th),
306+
() -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE)));
298307

299308
return Mono.fromRunnable(() -> failureDetector.start())
300309
.then(Mono.fromRunnable(() -> gossip.start()))
@@ -580,4 +589,14 @@ private Message enhanceWithSender(Message message) {
580589
return Message.with(message).sender(address).build();
581590
}
582591
}
592+
593+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
594+
595+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
596+
597+
@Override
598+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
599+
return emitResult == FAIL_NON_SERIALIZED;
600+
}
601+
}
583602
}

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.cluster.fdetector;
22

3+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
4+
35
import io.scalecube.cluster.CorrelationIdGenerator;
46
import io.scalecube.cluster.Member;
57
import io.scalecube.cluster.fdetector.PingData.AckType;
@@ -21,7 +23,10 @@
2123
import reactor.core.Disposable;
2224
import reactor.core.Disposables;
2325
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.SignalType;
2427
import reactor.core.publisher.Sinks;
28+
import reactor.core.publisher.Sinks.EmitFailureHandler;
29+
import reactor.core.publisher.Sinks.EmitResult;
2530
import reactor.core.scheduler.Scheduler;
2631

2732
public final class FailureDetectorImpl implements FailureDetector {
@@ -106,7 +111,7 @@ public void stop() {
106111
actionsDisposables.dispose();
107112

108113
// Stop publishing events
109-
sink.tryEmitComplete();
114+
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
110115
}
111116

112117
@Override
@@ -371,7 +376,7 @@ private List<Member> selectPingReqMembers(Member pingMember) {
371376

372377
private void publishPingResult(long period, Member member, MemberStatus status) {
373378
LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status);
374-
sink.tryEmitNext(new FailureDetectorEvent(member, status));
379+
sink.emitNext(new FailureDetectorEvent(member, status), RetryEmitFailureHandler.INSTANCE);
375380
}
376381

377382
private MemberStatus computeMemberStatus(Message message, long period) {
@@ -419,4 +424,14 @@ private boolean isTransitPingAck(Message message) {
419424
Transport getTransport() {
420425
return transport;
421426
}
427+
428+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
429+
430+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
431+
432+
@Override
433+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
434+
return emitResult == FAIL_NON_SERIALIZED;
435+
}
436+
}
422437
}

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.cluster.gossip;
22

3+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
4+
35
import io.scalecube.cluster.ClusterMath;
46
import io.scalecube.cluster.Member;
57
import io.scalecube.cluster.membership.MembershipEvent;
@@ -24,7 +26,10 @@
2426
import reactor.core.publisher.Flux;
2527
import reactor.core.publisher.Mono;
2628
import reactor.core.publisher.MonoSink;
29+
import reactor.core.publisher.SignalType;
2730
import reactor.core.publisher.Sinks;
31+
import reactor.core.publisher.Sinks.EmitFailureHandler;
32+
import reactor.core.publisher.Sinks.EmitResult;
2833
import reactor.core.scheduler.Scheduler;
2934

3035
public final class GossipProtocolImpl implements GossipProtocol {
@@ -113,7 +118,7 @@ public void stop() {
113118
actionsDisposables.dispose();
114119

115120
// Stop publishing events
116-
sink.tryEmitComplete();
121+
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
117122
}
118123

119124
@Override
@@ -201,7 +206,7 @@ private void onGossipReq(Message message) {
201206
if (gossipState == null) { // new gossip
202207
gossipState = new GossipState(gossip, period);
203208
gossips.put(gossip.gossipId(), gossipState);
204-
sink.tryEmitNext(gossip.message());
209+
sink.emitNext(gossip.message(), RetryEmitFailureHandler.INSTANCE);
205210
}
206211
gossipState.addToInfected(gossipRequest.from());
207212
}
@@ -378,4 +383,14 @@ Transport getTransport() {
378383
Member getMember() {
379384
return localMember;
380385
}
386+
387+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
388+
389+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
390+
391+
@Override
392+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
393+
return emitResult == FAIL_NON_SERIALIZED;
394+
}
395+
}
381396
}

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import static io.scalecube.cluster.membership.MemberStatus.ALIVE;
44
import static io.scalecube.cluster.membership.MemberStatus.DEAD;
55
import static io.scalecube.cluster.membership.MemberStatus.LEAVING;
6+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
67

7-
import io.scalecube.cluster.ClusterConfig;
88
import io.scalecube.cluster.ClusterMath;
99
import io.scalecube.cluster.CorrelationIdGenerator;
1010
import io.scalecube.cluster.Member;
@@ -47,7 +47,10 @@
4747
import reactor.core.publisher.Flux;
4848
import reactor.core.publisher.Mono;
4949
import reactor.core.publisher.MonoSink;
50+
import reactor.core.publisher.SignalType;
5051
import reactor.core.publisher.Sinks;
52+
import reactor.core.publisher.Sinks.EmitFailureHandler;
53+
import reactor.core.publisher.Sinks.EmitResult;
5154
import reactor.core.scheduler.Scheduler;
5255

5356
public final class MembershipProtocolImpl implements MembershipProtocol {
@@ -108,7 +111,8 @@ private enum MembershipUpdateReason {
108111
* @param failureDetector failure detector
109112
* @param gossipProtocol gossip protocol
110113
* @param metadataStore metadata store
111-
* @param config cluster config parameters
114+
* @param membershipConfig membershipConfig
115+
* @param failureDetectorConfig failureDetectorConfig
112116
* @param scheduler scheduler
113117
* @param cidGenerator correlation id generator
114118
* @param monitorModelBuilder monitor model builder
@@ -119,7 +123,8 @@ public MembershipProtocolImpl(
119123
FailureDetector failureDetector,
120124
GossipProtocol gossipProtocol,
121125
MetadataStore metadataStore,
122-
ClusterConfig config,
126+
MembershipConfig membershipConfig,
127+
FailureDetectorConfig failureDetectorConfig,
123128
Scheduler scheduler,
124129
CorrelationIdGenerator cidGenerator,
125130
ClusterMonitorModel.Builder monitorModelBuilder) {
@@ -132,8 +137,8 @@ public MembershipProtocolImpl(
132137
this.scheduler = Objects.requireNonNull(scheduler);
133138
this.cidGenerator = Objects.requireNonNull(cidGenerator);
134139
this.monitorModelBuilder = Objects.requireNonNull(monitorModelBuilder);
135-
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
136-
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
140+
this.membershipConfig = Objects.requireNonNull(membershipConfig);
141+
this.failureDetectorConfig = Objects.requireNonNull(failureDetectorConfig);
137142

138143
// Prepare seeds
139144
seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers());
@@ -302,7 +307,7 @@ public void stop() {
302307
suspicionTimeoutTasks.clear();
303308

304309
// Stop publishing events
305-
sink.tryEmitComplete();
310+
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
306311
}
307312

308313
@Override
@@ -730,7 +735,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {
730735

731736
private void publishEvent(MembershipEvent event) {
732737
LOGGER.info("[{}][publishEvent] {}", localMember, event);
733-
sink.tryEmitNext(event);
738+
sink.emitNext(event, RetryEmitFailureHandler.INSTANCE);
734739
}
735740

736741
private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
@@ -937,4 +942,14 @@ private void onMemberRemoved(MembershipEvent event) {
937942
removedMembersHistory.remove(0);
938943
}
939944
}
945+
946+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
947+
948+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
949+
950+
@Override
951+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
952+
return emitResult == FAIL_NON_SERIALIZED;
953+
}
954+
}
940955
}

cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
56

67
import io.scalecube.cluster.BaseTest;
78
import io.scalecube.cluster.ClusterConfig;
@@ -34,7 +35,10 @@
3435
import reactor.core.Exceptions;
3536
import reactor.core.publisher.Flux;
3637
import reactor.core.publisher.Mono;
38+
import reactor.core.publisher.SignalType;
3739
import reactor.core.publisher.Sinks;
40+
import reactor.core.publisher.Sinks.EmitFailureHandler;
41+
import reactor.core.publisher.Sinks.EmitResult;
3842
import reactor.core.scheduler.Scheduler;
3943
import reactor.core.scheduler.Schedulers;
4044
import reactor.util.retry.Retry;
@@ -1158,7 +1162,8 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf
11581162
failureDetector,
11591163
gossipProtocol,
11601164
metadataStore,
1161-
config,
1165+
config.membershipConfig(),
1166+
config.failureDetectorConfig(),
11621167
scheduler,
11631168
cidGenerator,
11641169
new ClusterMonitorModel.Builder());
@@ -1285,7 +1290,20 @@ private Sinks.Many<MembershipEvent> startRecordingRemoved(MembershipProtocolImpl
12851290
membership
12861291
.listen()
12871292
.filter(MembershipEvent::isRemoved)
1288-
.subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete);
1293+
.subscribe(
1294+
event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE),
1295+
error -> sink.emitError(error, RetryEmitFailureHandler.INSTANCE),
1296+
() -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE));
12891297
return sink;
12901298
}
1299+
1300+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
1301+
1302+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
1303+
1304+
@Override
1305+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
1306+
return emitResult == FAIL_NON_SERIALIZED;
1307+
}
1308+
}
12911309
}

pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
35
<modelVersion>4.0.0</modelVersion>
46

57
<parent>

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.transport.netty;
22

3+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
4+
35
import io.netty.buffer.ByteBuf;
46
import io.netty.buffer.ByteBufAllocator;
57
import io.netty.buffer.ByteBufInputStream;
@@ -28,7 +30,10 @@
2830
import reactor.core.Exceptions;
2931
import reactor.core.publisher.Flux;
3032
import reactor.core.publisher.Mono;
33+
import reactor.core.publisher.SignalType;
3134
import reactor.core.publisher.Sinks;
35+
import reactor.core.publisher.Sinks.EmitFailureHandler;
36+
import reactor.core.publisher.Sinks.EmitResult;
3237
import reactor.netty.Connection;
3338
import reactor.netty.DisposableServer;
3439
import reactor.netty.resources.LoopResources;
@@ -39,7 +44,7 @@ public final class TransportImpl implements Transport {
3944

4045
private final MessageCodec messageCodec;
4146

42-
// Sink
47+
// Subject
4348
private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();
4449

4550
// Close handler
@@ -156,7 +161,10 @@ public Mono<Transport> start() {
156161
context ->
157162
context.put(
158163
ReceiverContext.class,
159-
new ReceiverContext(loopResources, this::toMessage, sink::tryEmitNext)));
164+
new ReceiverContext(
165+
loopResources,
166+
this::toMessage,
167+
message -> sink.emitNext(message, RetryEmitFailureHandler.INSTANCE))));
160168
}
161169

162170
@Override
@@ -183,7 +191,7 @@ private Mono<Void> doStop() {
183191
() -> {
184192
LOGGER.info("[{}][doStop] Stopping", address);
185193
// Complete incoming messages observable
186-
sink.tryEmitComplete();
194+
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
187195
return Flux.concatDelayError(closeServer(), shutdownLoopResources())
188196
.then()
189197
.doFinally(s -> connections.clear())
@@ -343,4 +351,14 @@ public Function<Message, ByteBuf> messageEncoder() {
343351
return messageEncoder;
344352
}
345353
}
354+
355+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
356+
357+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
358+
359+
@Override
360+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
361+
return emitResult == FAIL_NON_SERIALIZED;
362+
}
363+
}
346364
}

0 commit comments

Comments
 (0)