Skip to content

Commit 43c0ea7

Browse files
committed
Done with TransportWrapperTest.java
1 parent 0f16819 commit 43c0ea7

File tree

2 files changed

+36
-47
lines changed

2 files changed

+36
-47
lines changed

cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class TransportWrapper {
1313

1414
private final Transport transport;
1515

16-
private final Map<Member, Integer> addressIndexByMember = new ConcurrentHashMap<>();
16+
private final Map<Member, AtomicInteger> addressIndexByMember = new ConcurrentHashMap<>();
1717

1818
public TransportWrapper(Transport transport) {
1919
this.transport = transport;
@@ -27,27 +27,21 @@ public TransportWrapper(Transport transport) {
2727
* @return mono result
2828
*/
2929
public Mono<Message> requestResponse(Member member, Message request) {
30+
final List<Address> addresses = member.addresses();
31+
final AtomicInteger currentIndex =
32+
addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger());
3033
return Mono.defer(
31-
() -> {
32-
final List<Address> addresses = member.addresses();
33-
final int numRetries = addresses.size() - 1;
34-
final Integer index = addressIndexByMember.getOrDefault(member, 0);
35-
final AtomicInteger currentIndex = new AtomicInteger(index);
36-
37-
return Mono.defer(
38-
() -> {
39-
int increment = currentIndex.getAndIncrement();
40-
41-
if (increment == addresses.size()) {
42-
increment = 0;
43-
currentIndex.set(1);
44-
}
45-
46-
final Address address = addresses.get(increment);
47-
return transport.requestResponse(address, request);
48-
})
49-
.retry(numRetries);
50-
});
34+
() -> {
35+
synchronized (this) {
36+
if (currentIndex.get() == addresses.size()) {
37+
currentIndex.set(0);
38+
}
39+
final Address address = addresses.get(currentIndex.getAndIncrement());
40+
return transport.requestResponse(address, request);
41+
}
42+
})
43+
.retry(addresses.size() - 1)
44+
.doOnError(throwable -> addressIndexByMember.remove(member, currentIndex));
5145
}
5246

5347
/**
@@ -58,26 +52,20 @@ public Mono<Message> requestResponse(Member member, Message request) {
5852
* @return mono result
5953
*/
6054
public Mono<Void> send(Member member, Message request) {
55+
final List<Address> addresses = member.addresses();
56+
final AtomicInteger currentIndex =
57+
addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger());
6158
return Mono.defer(
62-
() -> {
63-
final List<Address> addresses = member.addresses();
64-
final int numRetries = addresses.size() - 1;
65-
final Integer index = addressIndexByMember.getOrDefault(member, 0);
66-
final AtomicInteger currentIndex = new AtomicInteger(index);
67-
68-
return Mono.defer(
69-
() -> {
70-
int increment = currentIndex.getAndIncrement();
71-
72-
if (increment == addresses.size()) {
73-
increment = 0;
74-
currentIndex.set(1);
75-
}
76-
77-
final Address address = addresses.get(increment);
78-
return transport.send(address, request);
79-
})
80-
.retry(numRetries);
81-
});
59+
() -> {
60+
synchronized (this) {
61+
if (currentIndex.get() == addresses.size()) {
62+
currentIndex.set(0);
63+
}
64+
final Address address = addresses.get(currentIndex.getAndIncrement());
65+
return transport.send(address, request);
66+
}
67+
})
68+
.retry(addresses.size() - 1)
69+
.doOnError(throwable -> addressIndexByMember.remove(member, currentIndex));
8270
}
8371
}

cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Collections;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.concurrent.atomic.AtomicInteger;
1415
import java.util.stream.Stream;
1516
import java.util.stream.Stream.Builder;
1617
import org.junit.jupiter.api.Assertions;
@@ -75,12 +76,12 @@ static void populateBuilder(Builder<Arguments> builder, int size) {
7576
}
7677
}
7778

78-
private Map<Member, Integer> addressIndexByMember()
79+
private Map<Member, AtomicInteger> addressIndexByMember()
7980
throws NoSuchFieldException, IllegalAccessException {
8081
final Field field = TransportWrapper.class.getDeclaredField("addressIndexByMember");
8182
field.setAccessible(true);
8283
//noinspection unchecked
83-
return (Map<Member, Integer>) field.get(transportWrapper);
84+
return (Map<Member, AtomicInteger>) field.get(transportWrapper);
8485
}
8586

8687
@ParameterizedTest
@@ -94,7 +95,7 @@ void requestResponseShouldWorkByRoundRobin(int size, int startIndex, int success
9495
}
9596

9697
if (startIndex > 0) {
97-
addressIndexByMember().put(member, startIndex);
98+
addressIndexByMember().put(member, new AtomicInteger(startIndex));
9899
}
99100

100101
for (int i = 0; i < size; i++) {
@@ -162,7 +163,7 @@ void requestResponseShouldFailByRoundRobin(int size, int startIndex, int ignore)
162163
}
163164

164165
if (startIndex > 0) {
165-
addressIndexByMember().put(member, startIndex);
166+
addressIndexByMember().put(member, new AtomicInteger(startIndex));
166167
}
167168

168169
for (int i = 0; i < size; i++) {
@@ -186,7 +187,7 @@ void sendShouldWorkByRoundRobin(int size, int startIndex, int successIndex) thro
186187
}
187188

188189
if (startIndex > 0) {
189-
addressIndexByMember().put(member, startIndex);
190+
addressIndexByMember().put(member, new AtomicInteger(startIndex));
190191
}
191192

192193
for (int i = 0; i < size; i++) {
@@ -212,7 +213,7 @@ void sendShouldFailByRoundRobin(int size, int startIndex, int ignore) throws Exc
212213
}
213214

214215
if (startIndex > 0) {
215-
addressIndexByMember().put(member, startIndex);
216+
addressIndexByMember().put(member, new AtomicInteger(startIndex));
216217
}
217218

218219
for (int i = 0; i < size; i++) {

0 commit comments

Comments
 (0)