From 35e7ee3530d076a175d74030fae827e4b98b27a2 Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Thu, 26 Sep 2024 03:00:05 -0700 Subject: [PATCH 1/6] ~ ConnectionListener for NettyTcpClientTransport --- .../tcp/client/NettyTcpClientTransport.java | 53 +++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java index 58e0c94..4d263e5 100644 --- a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java +++ b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java @@ -20,8 +20,10 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; @@ -37,6 +39,8 @@ public class NettyTcpClientTransport implements ModbusTcpClientTransport { private final AtomicReference> frameReceiver = new AtomicReference<>(); + private final List connectionListeners = new CopyOnWriteArrayList<>(); + private final ChannelFsm channelFsm; private final ExecutionQueue executionQueue; @@ -55,12 +59,23 @@ public NettyTcpClientTransport(NettyClientTransportConfig config) { .build() ); + executionQueue = new ExecutionQueue(config.executor()); + channelFsm.addTransitionListener( - (from, to, via) -> - logger.debug("onStateTransition: {} -> {} via {}", from, to, via) + (from, to, via) -> { + logger.debug("onStateTransition: {} -> {} via {}", from, to, via); + + executionQueue.submit(() -> handleStateTransition(from, to, via)); + } ); + } - executionQueue = new ExecutionQueue(config.executor()); + private void handleStateTransition(State from, State to, Event via) { + if (from != State.Connected && to == State.Connected) { + connectionListeners.forEach(ConnectionListener::onConnection); + } else if (from == State.Connected && to != State.Connected) { + connectionListeners.forEach(ConnectionListener::onConnectionLost); + } } @Override @@ -100,6 +115,24 @@ public boolean isConnected() { return channelFsm.getState() == State.Connected; } + /** + * Add a {@link ConnectionListener} to this transport. + * + * @param listener the listener to add. + */ + public void addConnectionListener(ConnectionListener listener) { + connectionListeners.add(listener); + } + + /** + * Remove a {@link ConnectionListener} from this transport. + * + * @param listener the listener to remove. + */ + public void removeConnectionListener(ConnectionListener listener) { + connectionListeners.remove(listener); + } + private class ModbusTcpFrameHandler extends SimpleChannelInboundHandler { @Override @@ -180,4 +213,18 @@ public static NettyTcpClientTransport create( return new NettyTcpClientTransport(config); } + public interface ConnectionListener { + + /** + * Callback invoked when the transport has connected. + */ + void onConnection(); + + /** + * Callback invoked when the transport has disconnected. + */ + void onConnectionLost(); + + } + } From c986806b701fa940a362fe0a231f795abc0cd981 Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Thu, 26 Sep 2024 03:08:12 -0700 Subject: [PATCH 2/6] ~ note about reconnects --- .../modbus/tcp/client/NettyTcpClientTransport.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java index 4d263e5..0438065 100644 --- a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java +++ b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java @@ -222,6 +222,9 @@ public interface ConnectionListener { /** * Callback invoked when the transport has disconnected. + * + *

Note that implementations do not need to initiate a reconnect, as this is handled + * automatically by {@link NettyTcpClientTransport}. */ void onConnectionLost(); From d08983d8865adeff75abded0b5acb6c334e57aea Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Thu, 26 Sep 2024 03:08:40 -0700 Subject: [PATCH 3/6] ~ format --- .../digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java index 0438065..46f8111 100644 --- a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java +++ b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java @@ -223,7 +223,7 @@ public interface ConnectionListener { /** * Callback invoked when the transport has disconnected. * - *

Note that implementations do not need to initiate a reconnect, as this is handled + *

Note that implementations do not need to initiate a reconnect, as this is handled * automatically by {@link NettyTcpClientTransport}. */ void onConnectionLost(); From 28aa745d2e496e9f630fd60fd791332c3278433f Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Thu, 26 Sep 2024 03:15:29 -0700 Subject: [PATCH 4/6] ~ access to underlying ChannelFsm --- .../tcp/client/NettyTcpClientTransport.java | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java index 46f8111..72fbacd 100644 --- a/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java +++ b/modbus-tcp/src/main/java/com/digitalpetri/modbus/tcp/client/NettyTcpClientTransport.java @@ -7,6 +7,7 @@ import com.digitalpetri.modbus.tcp.ModbusTcpCodec; import com.digitalpetri.netty.fsm.ChannelActions; import com.digitalpetri.netty.fsm.ChannelFsm; +import com.digitalpetri.netty.fsm.ChannelFsm.TransitionListener; import com.digitalpetri.netty.fsm.ChannelFsmConfig; import com.digitalpetri.netty.fsm.ChannelFsmFactory; import com.digitalpetri.netty.fsm.Event; @@ -65,16 +66,24 @@ public NettyTcpClientTransport(NettyClientTransportConfig config) { (from, to, via) -> { logger.debug("onStateTransition: {} -> {} via {}", from, to, via); - executionQueue.submit(() -> handleStateTransition(from, to, via)); + maybeNotifyConnectionListeners(from, to); } ); } - private void handleStateTransition(State from, State to, Event via) { + private void maybeNotifyConnectionListeners(State from, State to) { + if (connectionListeners.isEmpty()) { + return; + } + if (from != State.Connected && to == State.Connected) { - connectionListeners.forEach(ConnectionListener::onConnection); + executionQueue.submit(() -> + connectionListeners.forEach(ConnectionListener::onConnection) + ); } else if (from == State.Connected && to != State.Connected) { - connectionListeners.forEach(ConnectionListener::onConnectionLost); + executionQueue.submit(() -> + connectionListeners.forEach(ConnectionListener::onConnectionLost) + ); } } @@ -115,6 +124,18 @@ public boolean isConnected() { return channelFsm.getState() == State.Connected; } + /** + * Get the {@link ChannelFsm} used by this transport. + * + *

This should not generally be used by client code except perhaps to add a + * {@link TransitionListener} to receive more detailed callbacks about the connection status. + * + * @return the {@link ChannelFsm} used by this transport. + */ + public ChannelFsm getChannelFsm() { + return channelFsm; + } + /** * Add a {@link ConnectionListener} to this transport. * From 7bfe34850f5280c1ca71c8d0275d67ca0aa3b4a4 Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Wed, 13 Nov 2024 07:20:14 -0800 Subject: [PATCH 5/6] ~ ConnectionListener callback test --- .../modbus/test/ModbusTcpClientServerIT.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java b/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java index c2a2a4b..7f3d99c 100644 --- a/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java +++ b/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java @@ -1,5 +1,7 @@ package com.digitalpetri.modbus.test; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.digitalpetri.modbus.ModbusPduSerializer.DefaultRequestSerializer; import com.digitalpetri.modbus.client.ModbusClient; import com.digitalpetri.modbus.client.ModbusTcpClient; @@ -11,10 +13,13 @@ import com.digitalpetri.modbus.server.ReadWriteModbusServices; import com.digitalpetri.modbus.tcp.Netty; import com.digitalpetri.modbus.tcp.client.NettyTcpClientTransport; +import com.digitalpetri.modbus.tcp.client.NettyTcpClientTransport.ConnectionListener; import com.digitalpetri.modbus.tcp.client.NettyTimeoutScheduler; import com.digitalpetri.modbus.tcp.server.NettyTcpServerTransport; import java.nio.ByteBuffer; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -24,6 +29,8 @@ public class ModbusTcpClientServerIT extends ClientServerIT { ModbusTcpClient client; ModbusTcpServer server; + NettyTcpClientTransport clientTransport; + @BeforeEach void setup() throws Exception { var processImage = new ProcessImage(); @@ -59,7 +66,7 @@ protected Optional getProcessImage(int unitId) { } final var port = serverPort; - var clientTransport = NettyTcpClientTransport.create( + clientTransport = NettyTcpClientTransport.create( cfg -> { cfg.hostname = "localhost"; cfg.port = port; @@ -112,4 +119,28 @@ void sendRaw() throws Exception { System.out.println("responsePduBytes: " + Hex.format(responsePduBytes)); } + @Test + void connectionListener() throws Exception { + var onConnection = new CountDownLatch(1); + var onConnectionLost = new CountDownLatch(1); + + clientTransport.addConnectionListener(new ConnectionListener() { + @Override + public void onConnection() { + onConnection.countDown(); + } + + @Override + public void onConnectionLost() { + onConnectionLost.countDown(); + } + }); + + client.disconnect(); + assertTrue(onConnectionLost.await(1, TimeUnit.SECONDS)); + + client.connect(); + assertTrue(onConnection.await(1, TimeUnit.SECONDS)); + } + } From 8422744e0e15dd7114855694f2f8f8fdf83dcb36 Mon Sep 17 00:00:00 2001 From: Kevin Herron Date: Wed, 13 Nov 2024 07:24:51 -0800 Subject: [PATCH 6/6] ~ test tweak --- .../com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java b/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java index 7f3d99c..5cda666 100644 --- a/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java +++ b/modbus-tests/src/test/java/com/digitalpetri/modbus/test/ModbusTcpClientServerIT.java @@ -136,6 +136,8 @@ public void onConnectionLost() { } }); + assertTrue(client.isConnected()); + client.disconnect(); assertTrue(onConnectionLost.await(1, TimeUnit.SECONDS));