Skip to content

Commit dd09249

Browse files
committed
~ ConnectionListener for NettyTcpClientTransport
1 parent 6ba5c82 commit dd09249

File tree

1 file changed

+50
-3
lines changed

1 file changed

+50
-3
lines changed

modbus-tcp/src/main/java/com/digitalpetri/modbus/client/NettyTcpClientTransport.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import io.netty.channel.SimpleChannelInboundHandler;
2020
import io.netty.channel.socket.SocketChannel;
2121
import io.netty.channel.socket.nio.NioSocketChannel;
22+
import java.util.List;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.CopyOnWriteArrayList;
2426
import java.util.concurrent.atomic.AtomicReference;
2527
import java.util.function.Consumer;
2628
import org.slf4j.Logger;
@@ -36,6 +38,8 @@ public class NettyTcpClientTransport implements ModbusTcpClientTransport {
3638

3739
private final AtomicReference<Consumer<ModbusTcpFrame>> frameReceiver = new AtomicReference<>();
3840

41+
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();
42+
3943
private final ChannelFsm channelFsm;
4044
private final ExecutionQueue executionQueue;
4145

@@ -54,12 +58,23 @@ public NettyTcpClientTransport(NettyClientTransportConfig config) {
5458
.build()
5559
);
5660

61+
executionQueue = new ExecutionQueue(config.executor());
62+
5763
channelFsm.addTransitionListener(
58-
(from, to, via) ->
59-
logger.debug("onStateTransition: {} -> {} via {}", from, to, via)
64+
(from, to, via) -> {
65+
logger.debug("onStateTransition: {} -> {} via {}", from, to, via);
66+
67+
executionQueue.submit(() -> handleStateTransition(from, to, via));
68+
}
6069
);
70+
}
6171

62-
executionQueue = new ExecutionQueue(config.executor());
72+
private void handleStateTransition(State from, State to, Event via) {
73+
if (from != State.Connected && to == State.Connected) {
74+
connectionListeners.forEach(ConnectionListener::onConnection);
75+
} else if (from == State.Connected && to != State.Connected) {
76+
connectionListeners.forEach(ConnectionListener::onConnectionLost);
77+
}
6378
}
6479

6580
@Override
@@ -99,6 +114,24 @@ public boolean isConnected() {
99114
return channelFsm.getState() == State.Connected;
100115
}
101116

117+
/**
118+
* Add a {@link ConnectionListener} to this transport.
119+
*
120+
* @param listener the listener to add.
121+
*/
122+
public void addConnectionListener(ConnectionListener listener) {
123+
connectionListeners.add(listener);
124+
}
125+
126+
/**
127+
* Remove a {@link ConnectionListener} from this transport.
128+
*
129+
* @param listener the listener to remove.
130+
*/
131+
public void removeConnectionListener(ConnectionListener listener) {
132+
connectionListeners.remove(listener);
133+
}
134+
102135
private class ModbusTcpFrameHandler extends SimpleChannelInboundHandler<ModbusTcpFrame> {
103136

104137
@Override
@@ -179,4 +212,18 @@ public static NettyTcpClientTransport create(
179212
return new NettyTcpClientTransport(config);
180213
}
181214

215+
public interface ConnectionListener {
216+
217+
/**
218+
* Callback invoked when the transport has connected.
219+
*/
220+
void onConnection();
221+
222+
/**
223+
* Callback invoked when the transport has disconnected.
224+
*/
225+
void onConnectionLost();
226+
227+
}
228+
182229
}

0 commit comments

Comments
 (0)