Skip to content

Commit 23d1e4d

Browse files
authored
Merge pull request #371 from matyasberry/feature/transport-tweaks
Alternative transport implementations
2 parents 730ce0d + e324722 commit 23d1e4d

File tree

8 files changed

+48
-11
lines changed

8 files changed

+48
-11
lines changed

transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.scalecube.cluster.transport.api;
22

3+
import io.scalecube.net.Address;
34
import java.util.StringJoiner;
5+
import java.util.function.Function;
46
import reactor.core.Exceptions;
57

68
public final class TransportConfig implements Cloneable {
@@ -20,6 +22,7 @@ public final class TransportConfig implements Cloneable {
2022
private MessageCodec messageCodec = MessageCodec.INSTANCE;
2123
private int maxFrameLength = 2 * 1024 * 1024; // 2 MB
2224
private TransportFactory transportFactory;
25+
private Function<Address, Address> addressMapper = Function.identity();
2326

2427
public TransportConfig() {}
2528

@@ -134,6 +137,22 @@ public TransportConfig maxFrameLength(int maxFrameLength) {
134137
return t;
135138
}
136139

140+
/**
141+
* Setter for {@code addressMapper}.
142+
*
143+
* @param addressMapper address mapper
144+
* @return new {@code TransportConfig} instance
145+
*/
146+
public TransportConfig addressMapper(Function<Address, Address> addressMapper) {
147+
TransportConfig t = clone();
148+
t.addressMapper = addressMapper;
149+
return t;
150+
}
151+
152+
public Function<Address, Address> addressMapper() {
153+
return addressMapper;
154+
}
155+
137156
public TransportFactory transportFactory() {
138157
return transportFactory;
139158
}
@@ -168,6 +187,7 @@ public String toString() {
168187
.add("messageCodec=" + messageCodec)
169188
.add("maxFrameLength=" + maxFrameLength)
170189
.add("transportFactory=" + transportFactory)
190+
.add("addressMapper=" + addressMapper)
171191
.toString();
172192
}
173193
}

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,27 @@ public final class TransportImpl implements Transport {
5656
// Transport factory
5757
private final Receiver receiver;
5858
private final Sender sender;
59+
private final Function<Address, Address> addressMapper;
5960

6061
/**
61-
* Constructor with cofig as parameter.
62+
* Constructor with config as parameter.
6263
*
6364
* @param messageCodec message codec
6465
* @param receiver transport receiver part
6566
* @param sender transport sender part
67+
* @param addressMapper function to map addresses. Useful when running against NAT-ed
68+
* environments. Used during connection setup so that the actual connection is established
69+
* against <code>addressMapper.apply(origAddress) destination</code>
6670
*/
67-
public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender) {
71+
public TransportImpl(
72+
MessageCodec messageCodec,
73+
Receiver receiver,
74+
Sender sender,
75+
Function<Address, Address> addressMapper) {
6876
this.messageCodec = messageCodec;
6977
this.receiver = receiver;
7078
this.sender = sender;
79+
this.addressMapper = addressMapper;
7180
}
7281

7382
private static Address prepareAddress(DisposableServer server) {
@@ -216,8 +225,9 @@ private ByteBuf encodeMessage(Message message) {
216225
}
217226

218227
private Mono<? extends Connection> connect(Address remoteAddress) {
228+
final Address mappedAddr = addressMapper.apply(remoteAddress);
219229
return sender
220-
.connect(remoteAddress)
230+
.connect(mappedAddr)
221231
.doOnSuccess(
222232
connection -> {
223233
connection

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import reactor.netty.DisposableServer;
99
import reactor.netty.tcp.TcpServer;
1010

11-
final class TcpReceiver implements Receiver {
11+
public final class TcpReceiver implements Receiver {
1212

1313
private final TransportConfig config;
1414

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import reactor.netty.Connection;
1111
import reactor.netty.tcp.TcpClient;
1212

13-
final class TcpSender implements Sender {
13+
public final class TcpSender implements Sender {
1414

1515
private final TransportConfig config;
1616

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpTransportFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ public final class TcpTransportFactory implements TransportFactory {
99

1010
@Override
1111
public Transport createTransport(TransportConfig config) {
12-
return new TransportImpl(config.messageCodec(), new TcpReceiver(config), new TcpSender(config));
12+
return new TransportImpl(
13+
config.messageCodec(),
14+
new TcpReceiver(config),
15+
new TcpSender(config),
16+
config.addressMapper());
1317
}
1418
}

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
import reactor.netty.http.websocket.WebsocketInbound;
1313
import reactor.netty.http.websocket.WebsocketOutbound;
1414

15-
final class WebsocketReceiver implements Receiver {
15+
public final class WebsocketReceiver implements Receiver {
1616

1717
private final TransportConfig config;
1818

19-
WebsocketReceiver(TransportConfig config) {
19+
public WebsocketReceiver(TransportConfig config) {
2020
this.config = config;
2121
}
2222

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
import reactor.netty.http.client.HttpClient;
1313
import reactor.netty.http.client.WebsocketClientSpec;
1414

15-
final class WebsocketSender implements Sender {
15+
public final class WebsocketSender implements Sender {
1616

1717
private final TransportConfig config;
1818

19-
WebsocketSender(TransportConfig config) {
19+
public WebsocketSender(TransportConfig config) {
2020
this.config = config;
2121
}
2222

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketTransportFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ public final class WebsocketTransportFactory implements TransportFactory {
1010
@Override
1111
public Transport createTransport(TransportConfig config) {
1212
return new TransportImpl(
13-
config.messageCodec(), new WebsocketReceiver(config), new WebsocketSender(config));
13+
config.messageCodec(),
14+
new WebsocketReceiver(config),
15+
new WebsocketSender(config),
16+
config.addressMapper());
1417
}
1518
}

0 commit comments

Comments
 (0)