8
8
import com .digitalpetri .modbus .ModbusRtuResponseFrameParser .ParserState ;
9
9
import com .digitalpetri .modbus .client .ModbusRtuClientTransport ;
10
10
import com .digitalpetri .modbus .internal .util .ExecutionQueue ;
11
+ import com .digitalpetri .modbus .tcp .client .NettyTcpClientTransport .ConnectionListener ;
11
12
import com .digitalpetri .netty .fsm .ChannelActions ;
12
13
import com .digitalpetri .netty .fsm .ChannelFsm ;
14
+ import com .digitalpetri .netty .fsm .ChannelFsm .TransitionListener ;
13
15
import com .digitalpetri .netty .fsm .ChannelFsmConfig ;
14
16
import com .digitalpetri .netty .fsm .ChannelFsmFactory ;
15
17
import com .digitalpetri .netty .fsm .Event ;
29
31
import io .netty .handler .ssl .SslContext ;
30
32
import io .netty .handler .ssl .SslContextBuilder ;
31
33
import io .netty .handler .ssl .SslProtocols ;
34
+ import java .util .List ;
32
35
import java .util .concurrent .CompletableFuture ;
33
36
import java .util .concurrent .CompletionStage ;
37
+ import java .util .concurrent .CopyOnWriteArrayList ;
34
38
import java .util .concurrent .atomic .AtomicReference ;
35
39
import java .util .function .Consumer ;
36
40
import org .slf4j .Logger ;
@@ -47,6 +51,8 @@ public class NettyRtuClientTransport implements ModbusRtuClientTransport {
47
51
private final ModbusRtuResponseFrameParser frameParser = new ModbusRtuResponseFrameParser ();
48
52
private final AtomicReference <Consumer <ModbusRtuFrame >> frameReceiver = new AtomicReference <>();
49
53
54
+ private final List <ConnectionListener > connectionListeners = new CopyOnWriteArrayList <>();
55
+
50
56
private final ChannelFsm channelFsm ;
51
57
private final ExecutionQueue executionQueue ;
52
58
@@ -65,11 +71,29 @@ public NettyRtuClientTransport(NettyClientTransportConfig config) {
65
71
.build ());
66
72
67
73
channelFsm .addTransitionListener (
68
- (from , to , via ) -> logger .debug ("onStateTransition: {} -> {} via {}" , from , to , via ));
74
+ (from , to , via ) -> {
75
+ logger .debug ("onStateTransition: {} -> {} via {}" , from , to , via );
76
+
77
+ maybeNotifyConnectionListeners (from , to );
78
+ });
69
79
70
80
executionQueue = new ExecutionQueue (config .executor ());
71
81
}
72
82
83
+ @ SuppressWarnings ("DuplicatedCode" )
84
+ private void maybeNotifyConnectionListeners (State from , State to ) {
85
+ if (connectionListeners .isEmpty ()) {
86
+ return ;
87
+ }
88
+
89
+ if (from != State .Connected && to == State .Connected ) {
90
+ executionQueue .submit (() -> connectionListeners .forEach (ConnectionListener ::onConnection ));
91
+ } else if (from == State .Connected && to != State .Connected ) {
92
+ executionQueue .submit (
93
+ () -> connectionListeners .forEach (ConnectionListener ::onConnectionLost ));
94
+ }
95
+ }
96
+
73
97
@ Override
74
98
public CompletableFuture <Void > connect () {
75
99
return channelFsm .connect ().thenApply (c -> null );
@@ -85,6 +109,36 @@ public boolean isConnected() {
85
109
return channelFsm .getState () == State .Connected ;
86
110
}
87
111
112
+ /**
113
+ * Get the {@link ChannelFsm} used by this transport.
114
+ *
115
+ * <p>This should not generally be used by client code except perhaps to add a {@link
116
+ * TransitionListener} to receive more detailed callbacks about the connection status.
117
+ *
118
+ * @return the {@link ChannelFsm} used by this transport.
119
+ */
120
+ public ChannelFsm getChannelFsm () {
121
+ return channelFsm ;
122
+ }
123
+
124
+ /**
125
+ * Add a {@link ConnectionListener} to this transport.
126
+ *
127
+ * @param listener the listener to add.
128
+ */
129
+ public void addConnectionListener (ConnectionListener listener ) {
130
+ connectionListeners .add (listener );
131
+ }
132
+
133
+ /**
134
+ * Remove a {@link ConnectionListener} from this transport.
135
+ *
136
+ * @param listener the listener to remove.
137
+ */
138
+ public void removeConnectionListener (ConnectionListener listener ) {
139
+ connectionListeners .remove (listener );
140
+ }
141
+
88
142
@ Override
89
143
public CompletionStage <Void > send (ModbusRtuFrame frame ) {
90
144
return channelFsm
0 commit comments