Skip to content

Commit 6e27857

Browse files
committed
Organize into a ConnectionManager with Transport layers
1 parent d9bd984 commit 6e27857

File tree

10 files changed

+896
-718
lines changed

10 files changed

+896
-718
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright 2024-present HiveMQ and the HiveMQ Community
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
namespace HiveMQtt.Client.Connection;
17+
18+
using System.Diagnostics;
19+
using HiveMQtt.Client.Internal;
20+
using HiveMQtt.Client.Transport;
21+
using HiveMQtt.MQTT5;
22+
using HiveMQtt.MQTT5.Packets;
23+
using HiveMQtt.MQTT5.Types;
24+
25+
/// <summary>
26+
/// Represents a connection manager for the MQTT client.
27+
/// </summary>
28+
public partial class ConnectionManager : IDisposable
29+
{
30+
private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
31+
32+
// The HiveMQClient this ConnectionManager is associated with
33+
internal HiveMQClient Client { get; }
34+
35+
// This is how we kill innocent and not so innocent Tasks
36+
private CancellationTokenSource cancellationTokenSource;
37+
38+
// The state of the connection
39+
internal ConnectState State { get; set; }
40+
41+
// The protocol specific transport layer (TCP, WebSocket, etc.)
42+
internal BaseTransport Transport { get; set; }
43+
44+
// The MQTT Properties for the active connection.
45+
internal MQTT5Properties ConnectionProperties { get; set; } = new();
46+
47+
// The outgoing publish packets queue. Publish packets are separated from other control packets
48+
// so that we can correctly respect the Broker's flow control.
49+
internal AwaitableQueueX<PublishPacket> OutgoingPublishQueue { get; } = new();
50+
51+
// Non-publish control packets queue; everything else
52+
internal AwaitableQueueX<ControlPacket> SendQueue { get; } = new();
53+
54+
// Received control packets queue
55+
internal AwaitableQueueX<ControlPacket> ReceivedQueue { get; } = new();
56+
57+
// Incoming Publish QoS > 0 in-flight transactions indexed by packet identifier
58+
internal BoundedDictionaryX<int, List<ControlPacket>> IPubTransactionQueue { get; set; }
59+
60+
// Outgoing Publish QoS > 0 in-flight transactions indexed by packet identifier
61+
internal BoundedDictionaryX<int, List<ControlPacket>> OPubTransactionQueue { get; set; }
62+
63+
// We generate new PacketIDs here.
64+
internal PacketIDManager PacketIDManager { get; } = new();
65+
66+
// This is used to know if and when we need to send a MQTT PingReq
67+
private readonly Stopwatch lastCommunicationTimer = new();
68+
69+
/// <summary>
70+
/// Initializes a new instance of the <see cref="ConnectionManager"/> class.
71+
/// </summary>
72+
/// <param name="client">The HiveMQClient this ConnectionManager is associated with.</param>
73+
public ConnectionManager(HiveMQClient client)
74+
{
75+
this.Client = client;
76+
this.Transport = new BaseTransport();
77+
this.cancellationTokenSource = new CancellationTokenSource();
78+
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Client.Options.ClientReceiveMaximum);
79+
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
80+
this.State = ConnectState.Disconnected;
81+
82+
Logger.Trace("Trace Level Logging Legend:");
83+
Logger.Trace(" -(W)- == ConnectionWriter");
84+
Logger.Trace(" -(PW)- == ConnectionPublishWriter");
85+
Logger.Trace(" -(R)- == ConnectionReader");
86+
Logger.Trace(" -(CM)- == ConnectionMonitor");
87+
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
88+
}
89+
90+
internal async Task<bool> ConnectAsync()
91+
{
92+
// Connect the appropriate transport
93+
if (this.Client.Options.Host.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) ||
94+
this.Client.Options.Host.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
95+
{
96+
this.Transport = new WebSocketTransport(this.Client.Options);
97+
}
98+
else
99+
{
100+
this.Transport = new TCPTransport(this.Client.Options);
101+
}
102+
103+
// Reset the CancellationTokenSource in case this is a reconnect
104+
this.cancellationTokenSource.Dispose();
105+
this.cancellationTokenSource = new CancellationTokenSource();
106+
107+
var connected = await this.Transport.ConnectAsync().ConfigureAwait(false);
108+
109+
if (!connected)
110+
{
111+
Logger.Error("Failed to connect to broker");
112+
return false;
113+
}
114+
115+
// Start the traffic processors
116+
this.ConnectionPublishWriterTask = this.ConnectionPublishWriterAsync(this.cancellationTokenSource.Token);
117+
this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
118+
this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
119+
this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
120+
this.ConnectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token);
121+
122+
return true;
123+
}
124+
125+
/// <summary>
126+
/// Cancel all background tasks.
127+
/// </summary>
128+
/// <returns>A task representing the asynchronous operation.</returns>
129+
internal async Task CancelBackgroundTasksAsync()
130+
{
131+
// Don't use CancelAsync here to maintain backwards compatibility
132+
// with >=.net6.0. CancelAsync was introduced in .net8.0
133+
this.cancellationTokenSource.Cancel();
134+
135+
// Delay for a short period to allow the tasks to cancel
136+
await Task.Delay(1000).ConfigureAwait(false);
137+
138+
// Reset the tasks
139+
if (this.ConnectionPublishWriterTask is not null && this.ConnectionPublishWriterTask.IsCompleted)
140+
{
141+
this.ConnectionPublishWriterTask = null;
142+
}
143+
else
144+
{
145+
Logger.Error("ConnectionPublishWriterTask did not complete");
146+
}
147+
148+
if (this.ConnectionWriterTask is not null && this.ConnectionWriterTask.IsCompleted)
149+
{
150+
this.ConnectionWriterTask = null;
151+
}
152+
else
153+
{
154+
Logger.Error("ConnectionWriterTask did not complete");
155+
}
156+
157+
if (this.ConnectionReaderTask is not null && this.ConnectionReaderTask.IsCompleted)
158+
{
159+
this.ConnectionReaderTask = null;
160+
}
161+
else
162+
{
163+
Logger.Error("ConnectionReaderTask did not complete");
164+
}
165+
166+
if (this.ReceivedPacketsHandlerTask is not null && this.ReceivedPacketsHandlerTask.IsCompleted)
167+
{
168+
this.ReceivedPacketsHandlerTask = null;
169+
}
170+
else
171+
{
172+
Logger.Error("ReceivedPacketsHandlerTask did not complete");
173+
}
174+
175+
if (this.ConnectionMonitorTask is not null && this.ConnectionMonitorTask.IsCompleted)
176+
{
177+
this.ConnectionMonitorTask = null;
178+
}
179+
else
180+
{
181+
Logger.Error("ConnectionMonitorTask did not complete");
182+
}
183+
}
184+
185+
/// <summary>
186+
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
187+
/// </summary>
188+
public void Dispose()
189+
{
190+
this.Dispose();
191+
/*
192+
This object will be cleaned up by the Dispose method.
193+
Therefore, you should call GC.SuppressFinalize to
194+
take this object off the finalization queue
195+
and prevent finalization code for this object
196+
from executing a second time.
197+
*/
198+
GC.SuppressFinalize(this);
199+
}
200+
}

0 commit comments

Comments
 (0)