Skip to content

Commit b757da6

Browse files
author
rostyslav.baldovskyi
committed
Make compile (WIP)
1 parent 884c42e commit b757da6

File tree

26 files changed

+391
-200
lines changed

26 files changed

+391
-200
lines changed

cluster-api/src/main/java/io/scalecube/cluster/Cluster.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.scalecube.cluster.transport.api.Message;
44
import io.scalecube.net.Address;
55
import java.util.Collection;
6+
import java.util.List;
67
import java.util.Optional;
78
import reactor.core.publisher.Mono;
89

@@ -14,7 +15,7 @@ public interface Cluster {
1415
*
1516
* @return cluster address
1617
*/
17-
Address address();
18+
List<Address> addresses();
1819

1920
/**
2021
* Send a msg from this member (src) to target member (specified in parameters).
@@ -34,6 +35,15 @@ public interface Cluster {
3435
*/
3536
Mono<Void> send(Address address, Message message);
3637

38+
/**
39+
* Send a msg from this member (src) to target member (specified in parameters).
40+
*
41+
* @param addresses target addresses
42+
* @param message msg
43+
* @return promise telling success or failure
44+
*/
45+
Mono<Void> send(List<Address> addresses, Message message);
46+
3747
/**
3848
* Sends message to the given address. It will issue connect in case if no transport channel by
3949
* given transport {@code address} exists already. Send is an async operation and expecting a

cluster-api/src/main/java/io/scalecube/cluster/Member.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.io.ObjectInput;
88
import java.io.ObjectOutput;
99
import java.util.ArrayList;
10+
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.Objects;
1213
import java.util.StringJoiner;
@@ -42,6 +43,18 @@ public Member(String id, String alias, List<Address> addresses, String namespace
4243
this.namespace = Objects.requireNonNull(namespace, "namespace");
4344
}
4445

46+
/**
47+
* Constructor.
48+
*
49+
* @param id member id
50+
* @param alias member alias (optional)
51+
* @param address member address
52+
* @param namespace namespace
53+
*/
54+
public Member(String id, String alias, Address address, String namespace) {
55+
this(id, alias, Collections.singletonList(address), namespace);
56+
}
57+
4558
/**
4659
* Returns cluster member local id.
4760
*

cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.time.Duration;
66
import java.util.Arrays;
77
import java.util.Collection;
8+
import java.util.List;
89
import java.util.Map;
910
import java.util.StringJoiner;
1011
import java.util.concurrent.ConcurrentHashMap;
@@ -213,15 +214,40 @@ public InboundSettings inboundSettings(Address destination) {
213214
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
214215
}
215216

217+
/**
218+
* Returns network inbound settings applied to the given destination.
219+
*
220+
* @param destinations addresses of target endpoint
221+
* @return network inbound settings
222+
*/
223+
public InboundSettings inboundSettings(List<Address> destinations) {
224+
if (destinations.isEmpty()) {
225+
return defaultInboundSettings;
226+
}
227+
228+
for (Address destination : destinations) {
229+
InboundSettings inboundSettings = this.inboundSettings.get(destination);
230+
231+
if (inboundSettings != null) {
232+
return inboundSettings;
233+
}
234+
}
235+
236+
return defaultInboundSettings;
237+
}
238+
216239
/**
217240
* Setter for network emulator inbound settings for specific destination.
218241
*
219242
* @param shallPass shallPass inbound flag
220243
*/
221-
public void inboundSettings(Address destination, boolean shallPass) {
244+
public void inboundSettings(List<Address> destinations, boolean shallPass) {
222245
InboundSettings settings = new InboundSettings(shallPass);
223-
inboundSettings.put(destination, settings);
224-
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
246+
247+
destinations.forEach(destination -> {
248+
inboundSettings.put(destination, settings);
249+
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
250+
});
225251
}
226252

227253
/**

cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.scalecube.cluster.transport.api.Message;
44
import io.scalecube.cluster.transport.api.Transport;
55
import io.scalecube.net.Address;
6+
import java.util.Collections;
67
import reactor.core.publisher.Flux;
78
import reactor.core.publisher.Mono;
89

@@ -83,6 +84,6 @@ public Flux<Message> listen() {
8384
}
8485

8586
private Message enhanceWithSender(Message message) {
86-
return Message.with(message).sender(transport.address()).build();
87+
return Message.with(message).sender(Collections.singletonList(transport.address())).build();
8788
}
8889
}

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.scalecube.cluster.transport.api.Transport;
1717
import io.scalecube.cluster.transport.api.TransportConfig;
1818
import io.scalecube.cluster.transport.api.TransportFactory;
19+
import io.scalecube.cluster.transport.api.TransportWrapper;
1920
import io.scalecube.net.Address;
2021
import io.scalecube.utils.ServiceLoaderUtil;
2122
import java.io.Serializable;
@@ -243,7 +244,8 @@ private Mono<Cluster> doStart0() {
243244
.flatMap(
244245
boundTransport -> {
245246
localMember = createLocalMember(boundTransport.address());
246-
transport = new SenderAwareTransport(boundTransport, localMember.address());
247+
248+
transport = new SenderAwareTransport(boundTransport, localMember.addresses());
247249

248250
final String name =
249251
"sc-cluster-" + Integer.toHexString(System.identityHashCode(this));
@@ -379,7 +381,7 @@ private Member createLocalMember(Address address) {
379381
// First address comes as "fair" listen address
380382
memberAddresses.add(address);
381383

382-
// Tail goes as externalHosts, if the exist
384+
// Tail goes as externalHosts, if exists
383385
final List<String> externalHosts = config.externalHosts();
384386
if (externalHosts != null) {
385387
for (String externalHost : externalHosts) {
@@ -396,28 +398,33 @@ private Member createLocalMember(Address address) {
396398
}
397399

398400
@Override
399-
public Address address() {
400-
return member().address();
401+
public List<Address> addresses() {
402+
return member().addresses();
401403
}
402404

403405
@Override
404406
public Mono<Void> send(Member member, Message message) {
405-
return send(member.address(), message);
407+
return TransportWrapper.send(transport, member.addresses(), message);
406408
}
407409

408410
@Override
409411
public Mono<Void> send(Address address, Message message) {
410412
return transport.send(address, message);
411413
}
412414

415+
@Override
416+
public Mono<Void> send(List<Address> addresses, Message message) {
417+
return TransportWrapper.send(transport, addresses, message);
418+
}
419+
413420
@Override
414421
public Mono<Message> requestResponse(Address address, Message request) {
415422
return transport.requestResponse(address, request);
416423
}
417424

418425
@Override
419426
public Mono<Message> requestResponse(Member member, Message request) {
420-
return transport.requestResponse(member.address(), request);
427+
return TransportWrapper.requestResponse(transport, member.addresses(), request);
421428
}
422429

423430
@Override
@@ -526,11 +533,11 @@ public Mono<Void> onShutdown() {
526533
private static class SenderAwareTransport implements Transport {
527534

528535
private final Transport transport;
529-
private final Address address;
536+
private final List<Address> addresses;
530537

531-
private SenderAwareTransport(Transport transport, Address address) {
538+
private SenderAwareTransport(Transport transport, List<Address> addresses) {
532539
this.transport = Objects.requireNonNull(transport);
533-
this.address = Objects.requireNonNull(address);
540+
this.addresses = Objects.requireNonNull(addresses);
534541
}
535542

536543
@Override
@@ -569,7 +576,7 @@ public Flux<Message> listen() {
569576
}
570577

571578
private Message enhanceWithSender(Message message) {
572-
return Message.with(message).sender(address).build();
579+
return Message.with(message).sender(addresses).build();
573580
}
574581
}
575582
}

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.scalecube.cluster.membership.MembershipEvent;
99
import io.scalecube.cluster.transport.api.Message;
1010
import io.scalecube.cluster.transport.api.Transport;
11+
import io.scalecube.cluster.transport.api.TransportWrapper;
1112
import io.scalecube.net.Address;
1213
import java.time.Duration;
1314
import java.util.ArrayList;
@@ -42,6 +43,8 @@ public final class FailureDetectorImpl implements FailureDetector {
4243
private final Transport transport;
4344
private final FailureDetectorConfig config;
4445

46+
private final TransportWrapper transportWrapper;
47+
4548
// State
4649

4750
private final List<Member> pingMembers = new ArrayList<>();
@@ -81,6 +84,8 @@ public FailureDetectorImpl(
8184
this.config = Objects.requireNonNull(config);
8285
this.scheduler = Objects.requireNonNull(scheduler);
8386

87+
this.transportWrapper = new TransportWrapper(this.transport);
88+
8489
// Subscribe
8590
actionsDisposables.addAll(
8691
Arrays.asList(
@@ -145,9 +150,9 @@ private void doPing() {
145150
Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build();
146151

147152
LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
148-
Address address = pingMember.address();
149-
transport
150-
.requestResponse(address, pingMsg)
153+
List<Address> addresses = pingMember.addresses();
154+
transportWrapper
155+
.requestResponse(addresses, pingMsg)
151156
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
152157
.publishOn(scheduler)
153158
.subscribe(
@@ -189,8 +194,8 @@ private void doPingReq(
189194
Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout());
190195
pingReqMembers.forEach(
191196
member ->
192-
transport
193-
.requestResponse(member.address(), pingReqMsg)
197+
transportWrapper
198+
.requestResponse(member.addresses(), pingReqMsg)
194199
.timeout(timeout, scheduler)
195200
.publishOn(scheduler)
196201
.subscribe(
@@ -232,7 +237,7 @@ private void onMessage(Message message) {
232237
/** Listens to PING message and answers with ACK. */
233238
private void onPing(Message message) {
234239
long period = this.currentPeriod;
235-
Address sender = message.sender();
240+
List<Address> sender = message.sender();
236241
LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender);
237242
PingData data = message.data();
238243
data = data.withAckType(AckType.DEST_OK);
@@ -249,18 +254,18 @@ private void onPing(Message message) {
249254
String correlationId = message.correlationId();
250255
Message ackMessage =
251256
Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
252-
Address address = data.getFrom().address();
253-
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address);
254-
transport
255-
.send(address, ackMessage)
257+
List<Address> addresses = data.getFrom().addresses();
258+
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses);
259+
transportWrapper
260+
.send(addresses, ackMessage)
256261
.subscribe(
257262
null,
258263
ex ->
259264
LOGGER.debug(
260265
"[{}][{}] Failed to send PingAck to {}, cause: {}",
261266
localMember,
262267
period,
263-
address,
268+
addresses,
264269
ex.toString()));
265270
}
266271

@@ -275,18 +280,18 @@ private void onPingReq(Message message) {
275280
PingData pingReqData = new PingData(localMember, target, originalIssuer);
276281
Message pingMessage =
277282
Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
278-
Address address = target.address();
279-
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address);
280-
transport
281-
.send(address, pingMessage)
283+
List<Address> addresses = target.addresses();
284+
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses);
285+
transportWrapper
286+
.send(addresses, pingMessage)
282287
.subscribe(
283288
null,
284289
ex ->
285290
LOGGER.debug(
286291
"[{}][{}] Failed to send transit Ping to {}, cause: {}",
287292
localMember,
288293
period,
289-
address,
294+
addresses,
290295
ex.toString()));
291296
}
292297

@@ -305,18 +310,18 @@ private void onTransitPingAck(Message message) {
305310
PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType);
306311
Message originalAckMessage =
307312
Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
308-
Address address = target.address();
309-
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address);
310-
transport
311-
.send(address, originalAckMessage)
313+
List<Address> addresses = target.addresses();
314+
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses);
315+
transportWrapper
316+
.send(addresses, originalAckMessage)
312317
.subscribe(
313318
null,
314319
ex ->
315320
LOGGER.debug(
316321
"[{}][{}] Failed to resend transit PingAck to {}, cause: {}",
317322
localMember,
318323
period,
319-
address,
324+
addresses,
320325
ex.toString()));
321326
}
322327

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.scalecube.cluster.membership.MembershipEvent;
88
import io.scalecube.cluster.transport.api.Message;
99
import io.scalecube.cluster.transport.api.Transport;
10+
import io.scalecube.cluster.transport.api.TransportWrapper;
1011
import io.scalecube.net.Address;
1112
import java.util.ArrayList;
1213
import java.util.Arrays;
@@ -287,14 +288,13 @@ private void spreadGossipsTo(long period, Member member) {
287288
}
288289

289290
// Send gossip request
290-
Address address = member.address();
291+
List<Address> addresses = member.addresses();
291292

292293
gossips.stream()
293294
.map(this::buildGossipRequestMessage)
294295
.forEach(
295296
message ->
296-
transport
297-
.send(address, message)
297+
TransportWrapper.send(transport, addresses, message)
298298
.subscribe(
299299
null,
300300
ex ->
@@ -303,7 +303,7 @@ private void spreadGossipsTo(long period, Member member) {
303303
localMember,
304304
period,
305305
message,
306-
address,
306+
addresses,
307307
ex.toString())));
308308
}
309309

0 commit comments

Comments
 (0)