Skip to content

Commit 80ed481

Browse files
committed
TCPTransport Implementation
1 parent 6e27857 commit 80ed481

File tree

9 files changed

+199
-148
lines changed

9 files changed

+199
-148
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManager.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,23 @@ public partial class ConnectionManager : IDisposable
7373
public ConnectionManager(HiveMQClient client)
7474
{
7575
this.Client = client;
76-
this.Transport = new BaseTransport();
7776
this.cancellationTokenSource = new CancellationTokenSource();
7877
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Client.Options.ClientReceiveMaximum);
7978
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
8079
this.State = ConnectState.Disconnected;
8180

81+
// Connect the appropriate transport
82+
if (this.Client.Options.Host.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) ||
83+
this.Client.Options.Host.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
84+
{
85+
// this.Transport = new WebSocketTransport(this.Client.Options);
86+
this.Transport = new TCPTransport(this.Client.Options);
87+
}
88+
else
89+
{
90+
this.Transport = new TCPTransport(this.Client.Options);
91+
}
92+
8293
Logger.Trace("Trace Level Logging Legend:");
8394
Logger.Trace(" -(W)- == ConnectionWriter");
8495
Logger.Trace(" -(PW)- == ConnectionPublishWriter");
@@ -93,7 +104,7 @@ internal async Task<bool> ConnectAsync()
93104
if (this.Client.Options.Host.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) ||
94105
this.Client.Options.Host.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
95106
{
96-
this.Transport = new WebSocketTransport(this.Client.Options);
107+
// this.Transport = new WebSocketTransport(this.Client.Options);
97108
}
98109
else
99110
{
@@ -122,6 +133,19 @@ internal async Task<bool> ConnectAsync()
122133
return true;
123134
}
124135

136+
/// <summary>
137+
/// Close the connection.
138+
/// </summary>
139+
/// <returns>A boolean indicating if the connection was closed successfully.</returns>
140+
internal async Task<bool> CloseAsync()
141+
{
142+
// Cancel all background tasks
143+
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);
144+
145+
// Close the transport
146+
return await this.Transport.CloseAsync().ConfigureAwait(false);
147+
}
148+
125149
/// <summary>
126150
/// Cancel all background tasks.
127151
/// </summary>

Source/HiveMQtt/Client/Connection/ConnectionManagerHandlers.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket
150150
/// </summary>
151151
/// <param name="pubAckPacket">The received PubAck packet.</param>
152152
/// <returns>A task that represents the asynchronous operation.</returns>
153-
/// <exception cref="HiveMQttClientException">Raised if the packet identifier is unknown.</exception>
154153
internal async Task HandleIncomingPubAckPacketAsync(PubAckPacket pubAckPacket)
155154
{
156155
Logger.Trace($"{this.Client.Options.ClientId}-(RPH)- <-- Received PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}");
@@ -265,7 +264,6 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket)
265264
/// Handle an incoming PubComp packet.
266265
/// </summary>
267266
/// <param name="pubAckPacket">The received PubComp packet.</param>
268-
/// <exception cref="HiveMQttClientException">Raised if the packet identifier is unknown.</exception>
269267
/// <returns>A task that represents the asynchronous operation.</returns>
270268
internal async Task HandleSentPubAckPacketAsync(PubAckPacket pubAckPacket)
271269
{
@@ -333,7 +331,6 @@ internal async Task HandleSentPubCompPacketAsync(PubCompPacket pubCompPacket)
333331
/// Handle an incoming PubComp packet.
334332
/// </summary>
335333
/// <param name="pubCompPacket">The received PubComp packet.</param>
336-
/// <exception cref="HiveMQttClientException">Raised if the packet identifier is unknown.</exception>
337334
/// <returns>A task that represents the asynchronous operation.</returns>
338335
internal async Task HandleIncomingPubCompPacketAsync(PubCompPacket pubCompPacket)
339336
{

Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs

Lines changed: 116 additions & 109 deletions
Large diffs are not rendered by default.

Source/HiveMQtt/Client/HiveMQClientConnection.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ namespace HiveMQtt.Client;
2020

2121
using HiveMQtt.Client.Events;
2222
using HiveMQtt.Client.Exceptions;
23-
using HiveMQtt.Client.Internal;
2423
using HiveMQtt.MQTT5.ReasonCodes;
2524

2625
/// <inheritdoc />
@@ -83,5 +82,4 @@ private static async void AutomaticReconnectHandler(object? sender, AfterDisconn
8382
}
8483
}
8584
}
86-
8785
}

Source/HiveMQtt/Client/HiveMQClientUtil.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,15 @@ protected virtual void Dispose(bool disposing)
168168
// and unmanaged resources.
169169
if (disposing)
170170
{
171-
if (this.ConnectState == Internal.ConnectState.Connected)
171+
if (this.Connection.State == Internal.ConnectState.Connected)
172172
{
173173
Logger.Trace("HiveMQClient Dispose: Disconnecting connected client.");
174174
_ = Task.Run(async () => await this.DisconnectAsync().ConfigureAwait(false));
175175
}
176176

177177
// Dispose managed resources.
178-
this.cancellationTokenSource.Cancel();
179-
this.cancellationTokenSource.Dispose();
178+
// this.cancellationTokenSource.Cancel();
179+
// this.cancellationTokenSource.Dispose();
180180
}
181181

182182
// Call the appropriate methods to clean up

Source/HiveMQtt/Client/Transport/BaseTransport.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@ namespace HiveMQtt.Client.Transport;
1919
using System.Net.Sockets;
2020
using HiveMQtt.Client.Exceptions;
2121

22-
public class BaseTransport
22+
public abstract class BaseTransport
2323
{
2424
protected static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
2525

26-
public async Task<bool> CloseAsync() =>
27-
throw new NotImplementedException("Close must be implemented in derived classes");
26+
public abstract Task<bool> ConnectAsync();
2827

29-
public async Task<bool> ConnectAsync() =>
30-
throw new NotImplementedException("ConnectAsync must be implemented in derived classes");
28+
public abstract Task<bool> CloseAsync(bool? shutdownPipeline = true);
29+
30+
public abstract Task<bool> WriteAsync(byte[] buffer, CancellationToken cancellationToken = default);
31+
32+
public abstract Task<TransportReadResult> ReadAsync(CancellationToken cancellationToken = default);
33+
34+
public abstract void AdvanceTo(SequencePosition consumed);
35+
36+
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
3137

3238
/// <summary>
3339
/// Lookup the hostname and return the IP address.

Source/HiveMQtt/Client/Transport/TCPTransport.cs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ public class TCPTransport : BaseTransport, IDisposable
3636

3737
internal HiveMQClientOptions Options { get; }
3838

39-
public TCPTransport(HiveMQClientOptions options)
40-
{
41-
this.Options = options;
42-
}
39+
public TCPTransport(HiveMQClientOptions options) => this.Options = options;
4340

4441
/// <summary>
4542
/// SSLStream Callback. This is used to always allow invalid broker certificates.
@@ -160,10 +157,10 @@ private async Task<bool> CreateTLSConnectionAsync(Stream stream)
160157
/// Make a TCP connection to a remote broker.
161158
/// </summary>
162159
/// <returns>A boolean representing the success or failure of the operation.</returns>
163-
internal async Task<bool> ConnectAsync()
160+
public override async Task<bool> ConnectAsync()
164161
{
165162
IPEndPoint ipEndPoint;
166-
var ipAddress = await this.LookupHostNameAsync(this.Options.Host, this.Options.PreferIPv6).ConfigureAwait(false);
163+
var ipAddress = await LookupHostNameAsync(this.Options.Host, this.Options.PreferIPv6).ConfigureAwait(false);
167164

168165
// Create the IPEndPoint depending on whether it is a host name or IP address.
169166
if (ipAddress == null)
@@ -212,10 +209,13 @@ internal async Task<bool> ConnectAsync()
212209
return socketConnected;
213210
}
214211

215-
internal async Task<bool> CloseSocketAsync(bool? shutdownPipeline = true)
212+
/// <summary>
213+
/// Close the TCP connection.
214+
/// </summary>
215+
/// <param name="shutdownPipeline">A boolean indicating whether to shutdown the pipeline.</param>
216+
/// <returns>A boolean indicating whether the operation was successful.</returns>
217+
public override async Task<bool> CloseAsync(bool? shutdownPipeline = true)
216218
{
217-
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);
218-
219219
if (shutdownPipeline == true)
220220
{
221221
if (this.Reader != null && this.Writer != null)
@@ -254,39 +254,59 @@ internal async Task<bool> CloseSocketAsync(bool? shutdownPipeline = true)
254254
/// <summary>
255255
/// Write a buffer to the stream.
256256
/// </summary>
257-
/// <param name="source">The buffer to write.</param>
257+
/// <param name="buffer">The buffer to write.</param>
258258
/// <param name="cancellationToken">The cancellation token.</param>
259-
/// <returns>A FlushResult wrapped in a ValueTask.</returns>
259+
/// <returns>A boolean indicating whether the write was successful.</returns>
260260
/// <exception cref="HiveMQttClientException">Raised if the writer is null.</exception>
261-
internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
261+
public override async Task<bool> WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
262262
{
263-
if (this.Writer is null)
263+
if (this.Writer == null)
264+
{
265+
throw new HiveMQttClientException("TCP Transport Writer is null");
266+
}
267+
268+
var source = new ReadOnlyMemory<byte>(buffer);
269+
var writeResult = await this.Writer.WriteAsync(source, cancellationToken).ConfigureAwait(false);
270+
271+
if (writeResult.IsCompleted || writeResult.IsCanceled)
264272
{
265-
throw new HiveMQttClientException("Writer is null");
273+
Logger.Debug($"-(TCP)- WriteAsync: The party is over. IsCompleted={writeResult.IsCompleted} IsCancelled={writeResult.IsCanceled}");
274+
return false;
266275
}
267276

268-
var writeResult = this.Writer.WriteAsync(source, cancellationToken);
269-
this.lastCommunicationTimer.Restart();
270-
return writeResult;
277+
return true;
271278
}
272279

273280
/// <summary>
274281
/// Read a buffer from the stream.
275282
/// </summary>
276283
/// <param name="cancellationToken">The cancellation token.</param>
277-
/// <returns>A ReadResult wrapped in a ValueTask.</returns>
284+
/// <returns>A TransportReadResult object containing the buffer.</returns>
278285
/// <exception cref="HiveMQttClientException">Raised if the reader is null.</exception>
279-
internal async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
286+
public override async Task<TransportReadResult> ReadAsync(CancellationToken cancellationToken = default)
280287
{
281-
if (this.Reader is null)
288+
if (this.Reader == null)
282289
{
283290
throw new HiveMQttClientException("Reader is null");
284291
}
285292

286293
var readResult = await this.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
287-
return readResult;
294+
295+
if (readResult.IsCanceled || readResult.IsCompleted)
296+
{
297+
Logger.Debug($"-(TCP)- ReadAsync: The party is over. IsCompleted={readResult.IsCompleted} IsCancelled={readResult.IsCanceled}");
298+
return new TransportReadResult(true);
299+
}
300+
301+
// var bytesRead = readResult.Buffer.Length;
302+
// this.Reader.AdvanceTo(readResult.Buffer.End);
303+
// return bytesRead;
304+
return new TransportReadResult(readResult.Buffer);
288305
}
289306

307+
public override void AdvanceTo(SequencePosition consumed) => this.Reader?.AdvanceTo(consumed);
308+
309+
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => this.Reader?.AdvanceTo(consumed, examined);
290310

291311
/// <summary>
292312
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.

Source/HiveMQtt/Client/Transport/WebSocketTransport.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@ namespace HiveMQtt.Client.Transport;
1717

1818
using HiveMQtt.Client.Options;
1919

20-
public class WebSocketTransport : BaseTransport
20+
public class WebSocketTransport// : BaseTransport
2121
{
2222
public WebSocketTransport(HiveMQClientOptions options)
2323
{
2424
// ...
2525
}
26-
2726
}

Source/HiveMQtt/HiveMQtt.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
</ItemGroup>
3939
<ItemGroup>
4040
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
41-
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
41+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
4242
<PackageReference Include="NLog" Version="5.2.3" />
4343
<PackageReference Include="Websocket.Client" Version="5.1.2" />
4444
</ItemGroup>

0 commit comments

Comments
 (0)