Skip to content

Commit 93f7ff2

Browse files
authored
Better Background Tasks (#229)
1 parent def7629 commit 93f7ff2

File tree

6 files changed

+66
-74
lines changed

6 files changed

+66
-74
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManager.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ internal async Task<bool> ConnectAsync()
129129
this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
130130
this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
131131
this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
132-
this.ConnectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token);
132+
this.ConnectionMonitorThread = this.LaunchConnectionMonitorThread();
133133

134134
return true;
135135
}
@@ -184,13 +184,13 @@ internal async Task CancelBackgroundTasksAsync()
184184
Logger.Trace("ReceivedPacketsHandlerTask did not complete in time");
185185
}
186186

187-
if (this.ConnectionMonitorTask is not null && this.ConnectionMonitorTask.IsCompleted)
187+
if (this.ConnectionMonitorThread is not null && !this.ConnectionMonitorThread.IsAlive)
188188
{
189-
this.ConnectionMonitorTask = null;
189+
this.ConnectionMonitorThread = null;
190190
}
191191
else
192192
{
193-
Logger.Trace("ConnectionMonitorTask did not complete in time");
193+
Logger.Trace("ConnectionMonitorThread did not complete in time");
194194
}
195195
}
196196

Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs

Lines changed: 53 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ public partial class ConnectionManager
2121

2222
internal Task? ReceivedPacketsHandlerTask { get; set; }
2323

24-
internal Task? ConnectionMonitorTask { get; set; }
24+
internal Thread? ConnectionMonitorThread { get; set; }
2525

2626
/// <summary>
2727
/// Health check method to assure that tasks haven't faulted unexpectedly.
2828
/// </summary>
29-
private async Task RunTaskHealthCheckAsync(Task? task, string taskName)
29+
private void RunTaskHealthCheck(Task? task, string taskName)
3030
{
3131
if (task is null)
3232
{
@@ -38,85 +38,74 @@ private async Task RunTaskHealthCheckAsync(Task? task, string taskName)
3838
{
3939
Logger.Error($"{this.Client.Options.ClientId}-(CM)- {taskName} Faulted: {task.Exception}");
4040
Logger.Error($"{this.Client.Options.ClientId}-(CM)- {taskName} died. Disconnecting.");
41-
await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
41+
_ = Task.Run(async () => await this.HandleDisconnectionAsync(false).ConfigureAwait(false));
4242
}
4343
}
4444
}
4545

46+
private Thread LaunchConnectionMonitorThread()
47+
{
48+
var thread = new Thread(this.ConnectionMonitor);
49+
thread.Start();
50+
return thread;
51+
}
52+
4653
/// <summary>
4754
/// Asynchronous background task that monitors the connection state and sends PingReq packets when
4855
/// necessary.
4956
/// </summary>
50-
/// <param name="cancellationToken">The cancellation token.</param>
51-
private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task.Run(
52-
async () =>
57+
private void ConnectionMonitor()
58+
{
59+
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- Starting...{this.State}");
60+
if (this.Client.Options.KeepAlive == 0)
5361
{
54-
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- Starting...{this.State}");
55-
if (this.Client.Options.KeepAlive == 0)
56-
{
57-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- KeepAlive is 0. No pings will be sent.");
58-
}
62+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- KeepAlive is 0. No pings will be sent.");
63+
}
5964

60-
var keepAlivePeriod = this.Client.Options.KeepAlive;
61-
this.lastCommunicationTimer.Start();
65+
var keepAlivePeriod = this.Client.Options.KeepAlive;
66+
this.lastCommunicationTimer.Start();
6267

63-
while (true)
68+
while (true)
69+
{
70+
try
6471
{
65-
try
72+
// If connected and no recent packets have been sent, send a ping
73+
if (this.State == ConnectState.Connected)
6674
{
67-
// If connected and no recent packets have been sent, send a ping
68-
if (this.State == ConnectState.Connected)
75+
if (this.Client.Options.KeepAlive > 0 && this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
6976
{
70-
if (this.Client.Options.KeepAlive > 0 && this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
71-
{
72-
// Send PingReq
73-
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- --> PingReq");
74-
this.SendQueue.Enqueue(new PingReqPacket());
75-
}
76-
}
77-
78-
// Dumping Client State
79-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- {this.State}: last communications {this.lastCommunicationTimer.Elapsed} ago");
80-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- SendQueue:...............{this.SendQueue.Count}");
81-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- ReceivedQueue:...........{this.ReceivedQueue.Count}");
82-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- OutgoingPublishQueue:....{this.OutgoingPublishQueue.Count}");
83-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- OPubTransactionQueue:....{this.OPubTransactionQueue.Count}/{this.OPubTransactionQueue.Capacity}");
84-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- IPubTransactionQueue:....{this.IPubTransactionQueue.Count}/{this.IPubTransactionQueue.Capacity}");
85-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- # of Subscriptions:......{this.Client.Subscriptions.Count}");
86-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- PacketIDsInUse:..........{this.PacketIDManager.Count}");
87-
88-
// Background Tasks Health Check
89-
await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false);
90-
await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false);
91-
await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false);
92-
await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false);
93-
94-
// Sleep cycle
95-
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
96-
97-
// Check for cancellation
98-
if (cancellationToken.IsCancellationRequested)
99-
{
100-
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- Canceled & exiting...");
101-
break;
102-
}
103-
}
104-
catch (Exception ex)
105-
{
106-
if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested)
107-
{
108-
break;
109-
}
110-
else
111-
{
112-
Logger.Error($"{this.Client.Options.ClientId}-(CM)- Exception: {ex}");
113-
throw;
77+
// Send PingReq
78+
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- --> PingReq");
79+
this.SendQueue.Enqueue(new PingReqPacket());
11480
}
11581
}
116-
}
11782

118-
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- Exiting...{this.State}, cancellationRequested={cancellationToken.IsCancellationRequested}");
119-
}, cancellationToken);
83+
// Dumping Client State
84+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- {this.State}: last communications {this.lastCommunicationTimer.Elapsed} ago");
85+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- SendQueue:...............{this.SendQueue.Count}");
86+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- ReceivedQueue:...........{this.ReceivedQueue.Count}");
87+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- OutgoingPublishQueue:....{this.OutgoingPublishQueue.Count}");
88+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- OPubTransactionQueue:....{this.OPubTransactionQueue.Count}/{this.OPubTransactionQueue.Capacity}");
89+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- IPubTransactionQueue:....{this.IPubTransactionQueue.Count}/{this.IPubTransactionQueue.Capacity}");
90+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- # of Subscriptions:......{this.Client.Subscriptions.Count}");
91+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- PacketIDsInUse:..........{this.PacketIDManager.Count}");
92+
93+
// Background Tasks Health Check
94+
this.RunTaskHealthCheck(this.ConnectionWriterTask, "ConnectionWriter");
95+
this.RunTaskHealthCheck(this.ConnectionReaderTask, "ConnectionReader");
96+
this.RunTaskHealthCheck(this.ConnectionPublishWriterTask, "ConnectionPublishWriter");
97+
this.RunTaskHealthCheck(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler");
98+
99+
// Sleep cycle
100+
Thread.Sleep(2000);
101+
}
102+
catch (Exception ex)
103+
{
104+
Logger.Error($"{this.Client.Options.ClientId}-(CM)- Exception: {ex}");
105+
throw;
106+
}
107+
} // while (true)
108+
}
120109

121110
/// <summary>
122111
/// Asynchronous background task that handles the outgoing publish packets queued in OutgoingPublishQueue.

Source/HiveMQtt/Client/Transport/TCPTransport.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ public override async Task<bool> CloseAsync(bool? shutdownPipeline = true)
242242
if (this.Stream != null)
243243
{
244244
// Dispose of the Stream
245+
await this.Stream.FlushAsync().ConfigureAwait(false);
245246
this.Stream.Close();
246247
await this.Stream.DisposeAsync().ConfigureAwait(false);
247248
this.Stream = null;

Source/HiveMQtt/MQTT5/ControlPacket.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ protected void EncodeProperties(MemoryStream writer)
556556

557557
_ = EncodeVariableByteInteger(writer, propertiesLength);
558558

559+
propertyStream.Flush();
559560
_ = propertyStream.Seek(0, SeekOrigin.Begin);
560561
propertyStream.CopyTo(writer);
561562
}

Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ protected void EncodeLastWillProperties(MemoryStream writer)
316316
}
317317

318318
_ = EncodeVariableByteInteger(writer, propertiesLength);
319+
propertyStream.Flush();
319320

320321
_ = propertyStream.Seek(0, SeekOrigin.Begin);
321322
propertyStream.CopyTo(writer);

Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public async Task ClientStateAsync()
5454
Assert.Null(client.Connection.ConnectionWriterTask);
5555
Assert.Null(client.Connection.ConnectionReaderTask);
5656
Assert.Null(client.Connection.ReceivedPacketsHandlerTask);
57-
Assert.Null(client.Connection.ConnectionMonitorTask);
57+
Assert.Null(client.Connection.ConnectionMonitorThread);
5858

5959
// Queues
6060
Assert.NotNull(client.Connection.SendQueue);
@@ -76,12 +76,12 @@ public async Task ClientStateAsync()
7676
Assert.NotNull(client.Connection.ConnectionWriterTask);
7777
Assert.NotNull(client.Connection.ConnectionReaderTask);
7878
Assert.NotNull(client.Connection.ReceivedPacketsHandlerTask);
79-
Assert.NotNull(client.Connection.ConnectionMonitorTask);
79+
Assert.NotNull(client.Connection.ConnectionMonitorThread);
8080

8181
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
8282
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
8383
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
84-
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionMonitorTask.Status);
84+
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
8585

8686
// Queues
8787
Assert.NotNull(client.Connection.SendQueue);
@@ -113,12 +113,12 @@ public async Task ClientStateAsync()
113113
Assert.NotNull(client.Connection.ConnectionWriterTask);
114114
Assert.NotNull(client.Connection.ConnectionReaderTask);
115115
Assert.NotNull(client.Connection.ReceivedPacketsHandlerTask);
116-
Assert.NotNull(client.Connection.ConnectionMonitorTask);
116+
Assert.NotNull(client.Connection.ConnectionMonitorThread);
117117

118118
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
119119
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
120120
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
121-
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionMonitorTask.Status);
121+
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
122122

123123
// Queues
124124
Assert.NotNull(client.Connection.SendQueue);
@@ -136,7 +136,7 @@ public async Task ClientStateAsync()
136136
Assert.Null(client.Connection.ConnectionWriterTask);
137137
Assert.Null(client.Connection.ConnectionReaderTask);
138138
Assert.Null(client.Connection.ReceivedPacketsHandlerTask);
139-
Assert.Null(client.Connection.ConnectionMonitorTask);
139+
Assert.Null(client.Connection.ConnectionMonitorThread);
140140

141141
// Queues
142142
Assert.NotNull(client.Connection.SendQueue);

0 commit comments

Comments
 (0)