Skip to content

Commit 2a94fc5

Browse files
author
rostyslav.baldovskyi
committed
Fix comments, Improve error handler for updateMembership
1 parent b757da6 commit 2a94fc5

File tree

5 files changed

+13
-52
lines changed

5 files changed

+13
-52
lines changed

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ public final class FailureDetectorImpl implements FailureDetector {
4343
private final Transport transport;
4444
private final FailureDetectorConfig config;
4545

46-
private final TransportWrapper transportWrapper;
47-
4846
// State
4947

5048
private final List<Member> pingMembers = new ArrayList<>();
@@ -84,8 +82,6 @@ public FailureDetectorImpl(
8482
this.config = Objects.requireNonNull(config);
8583
this.scheduler = Objects.requireNonNull(scheduler);
8684

87-
this.transportWrapper = new TransportWrapper(this.transport);
88-
8985
// Subscribe
9086
actionsDisposables.addAll(
9187
Arrays.asList(
@@ -151,8 +147,7 @@ private void doPing() {
151147

152148
LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
153149
List<Address> addresses = pingMember.addresses();
154-
transportWrapper
155-
.requestResponse(addresses, pingMsg)
150+
TransportWrapper.requestResponse(transport, addresses, pingMsg)
156151
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
157152
.publishOn(scheduler)
158153
.subscribe(
@@ -194,8 +189,7 @@ private void doPingReq(
194189
Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout());
195190
pingReqMembers.forEach(
196191
member ->
197-
transportWrapper
198-
.requestResponse(member.addresses(), pingReqMsg)
192+
TransportWrapper.requestResponse(transport, member.addresses(), pingReqMsg)
199193
.timeout(timeout, scheduler)
200194
.publishOn(scheduler)
201195
.subscribe(
@@ -256,8 +250,7 @@ private void onPing(Message message) {
256250
Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
257251
List<Address> addresses = data.getFrom().addresses();
258252
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses);
259-
transportWrapper
260-
.send(addresses, ackMessage)
253+
TransportWrapper.send(transport, addresses, ackMessage)
261254
.subscribe(
262255
null,
263256
ex ->
@@ -282,8 +275,7 @@ private void onPingReq(Message message) {
282275
Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
283276
List<Address> addresses = target.addresses();
284277
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses);
285-
transportWrapper
286-
.send(addresses, pingMessage)
278+
TransportWrapper.send(transport, addresses, pingMessage)
287279
.subscribe(
288280
null,
289281
ex ->
@@ -312,8 +304,7 @@ private void onTransitPingAck(Message message) {
312304
Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
313305
List<Address> addresses = target.addresses();
314306
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses);
315-
transportWrapper
316-
.send(addresses, originalAckMessage)
307+
TransportWrapper.send(transport, addresses, originalAckMessage)
317308
.subscribe(
318309
null,
319310
ex ->

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ private enum MembershipUpdateReason {
8080
private final GossipProtocol gossipProtocol;
8181
private final MetadataStore metadataStore;
8282

83-
private final TransportWrapper transportWrapper;
84-
8583
// State
8684

8785
private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
@@ -129,8 +127,6 @@ public MembershipProtocolImpl(
129127
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
130128
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
131129

132-
this.transportWrapper = new TransportWrapper(this.transport);
133-
134130
// Prepare seeds
135131
seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers());
136132

@@ -355,8 +351,7 @@ private void doSync() {
355351

356352
Message message = prepareSyncDataMsg(SYNC, null);
357353
LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses);
358-
transportWrapper
359-
.send(addresses, message)
354+
TransportWrapper.send(transport, addresses, message)
360355
.subscribe(
361356
null,
362357
ex ->
@@ -413,8 +408,7 @@ private Mono<Void> onSync(Message syncMsg) {
413408
.doOnSuccess(
414409
avoid -> {
415410
Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
416-
transportWrapper
417-
.send(sender, message)
411+
TransportWrapper.send(transport, sender, message)
418412
.subscribe(
419413
null,
420414
ex ->
@@ -443,8 +437,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
443437
// alive with inc + 1
444438
Message syncMsg = prepareSyncDataMsg(SYNC, null);
445439
List<Address> addresses = fdEvent.member().addresses();
446-
transportWrapper
447-
.send(addresses, syncMsg)
440+
TransportWrapper.send(transport, addresses, syncMsg)
448441
.subscribe(
449442
null,
450443
ex ->
@@ -532,11 +525,11 @@ private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
532525
updateMembership(r1, reason)
533526
.doOnError(
534527
ex ->
535-
LOGGER.warn(
528+
LOGGER.error(
536529
"[{}][syncMembership][{}][error] cause: {}",
537530
localMember,
538531
reason,
539-
ex.toString()))
532+
ex))
540533
.onErrorResume(ex -> Mono.empty()))
541534
.toArray(Mono[]::new);
542535

cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ public class MetadataStoreImpl implements MetadataStore {
3939
private final Transport transport;
4040
private final ClusterConfig config;
4141

42-
private final TransportWrapper transportWrapper;
43-
4442
// State
4543

4644
private final Map<Member, ByteBuffer> membersMetadata = new HashMap<>();
@@ -73,8 +71,6 @@ public MetadataStoreImpl(
7371
this.config = Objects.requireNonNull(config);
7472
this.scheduler = Objects.requireNonNull(scheduler);
7573
this.localMetadata = localMetadata; // optional
76-
77-
this.transportWrapper = new TransportWrapper(this.transport);
7874
}
7975

8076
@Override
@@ -164,12 +160,9 @@ public Mono<ByteBuffer> fetchMetadata(Member member) {
164160
.data(new GetMetadataRequest(member))
165161
.build();
166162

167-
// TODO. Make transport abstraction around this logic
168-
169163
List<Address> addresses = member.addresses();
170164

171-
return transportWrapper
172-
.requestResponse(addresses, request)
165+
return TransportWrapper.requestResponse(transport, addresses, request)
173166
.timeout(Duration.ofMillis(config.metadataTimeout()), scheduler)
174167
.publishOn(scheduler)
175168
.doOnSuccess(
@@ -230,8 +223,7 @@ private void onMetadataRequest(Message message) {
230223
.build();
231224

232225
LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender);
233-
transportWrapper
234-
.send(sender, response)
226+
TransportWrapper.send(transport, sender, response)
235227
.subscribe(
236228
null,
237229
ex ->

transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ public List<Address> sender() {
201201
}
202202

203203
return Arrays.stream(headerValue.split(","))
204-
.map(String::trim) // Removes leading and trailing spaces.
205204
.map(Address::from)
206205
.collect(Collectors.toList());
207206
}

transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,6 @@
66

77
public class TransportWrapper {
88

9-
private final Transport transport;
10-
11-
public TransportWrapper(Transport transport) {
12-
this.transport = transport;
13-
}
14-
15-
public Mono<Message> requestResponse(List<Address> addresses, Message request) {
16-
return requestResponse(transport, addresses, 0, request);
17-
}
18-
199
public static Mono<Message> requestResponse(
2010
Transport transport, List<Address> addresses, Message request) {
2111
return requestResponse(transport, addresses, 0, request);
@@ -24,18 +14,14 @@ public static Mono<Message> requestResponse(
2414
private static Mono<Message> requestResponse(
2515
Transport transport, List<Address> addresses, int currentIndex, Message request) {
2616
if (currentIndex >= addresses.size()) {
27-
return Mono.error(new RuntimeException("All addresses have been tried and failed."));
17+
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
2818
}
2919

3020
return transport
3121
.requestResponse(addresses.get(currentIndex), request)
3222
.onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request));
3323
}
3424

35-
public Mono<Void> send(List<Address> addresses, Message request) {
36-
return send(transport, addresses, 0, request);
37-
}
38-
3925
public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
4026
return send(transport, addresses, 0, request);
4127
}

0 commit comments

Comments
 (0)