Skip to content

Commit e08f90e

Browse files
committed
fix: ConnectionMonitor is stopped by cancellation token during disconnect
1 parent 3129caf commit e08f90e

File tree

3 files changed

+37
-15
lines changed

3 files changed

+37
-15
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ internal async Task<bool> ConnectAsync()
129129
this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
130130
this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
131131
this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
132-
this.ConnectionMonitorThread = this.LaunchConnectionMonitorThread();
132+
this.ConnectionMonitorThread = this.LaunchConnectionMonitorThreadAsync(this.cancellationTokenSource.Token);
133133

134134
return true;
135135
}
@@ -184,7 +184,7 @@ internal async Task CancelBackgroundTasksAsync()
184184
Logger.Trace("ReceivedPacketsHandlerTask did not complete in time");
185185
}
186186

187-
if (this.ConnectionMonitorThread is not null && !this.ConnectionMonitorThread.IsAlive)
187+
if (this.ConnectionMonitorThread is not null && this.ConnectionMonitorThread.IsCompleted)
188188
{
189189
this.ConnectionMonitorThread = null;
190190
}

Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public partial class ConnectionManager
2121

2222
internal Task? ReceivedPacketsHandlerTask { get; set; }
2323

24-
internal Thread? ConnectionMonitorThread { get; set; }
24+
internal Task? ConnectionMonitorThread { get; set; }
2525

2626
/// <summary>
2727
/// Health check method to assure that tasks haven't faulted unexpectedly.
@@ -43,18 +43,14 @@ private void RunTaskHealthCheck(Task? task, string taskName)
4343
}
4444
}
4545

46-
private Thread LaunchConnectionMonitorThread()
47-
{
48-
var thread = new Thread(this.ConnectionMonitor);
49-
thread.Start();
50-
return thread;
51-
}
46+
private Task LaunchConnectionMonitorThreadAsync(CancellationToken cancellationToken) =>
47+
Task.Run(() => this.ConnectionMonitorAsync(cancellationToken), cancellationToken);
5248

5349
/// <summary>
5450
/// Asynchronous background task that monitors the connection state and sends PingReq packets when
5551
/// necessary.
5652
/// </summary>
57-
private void ConnectionMonitor()
53+
private async Task ConnectionMonitorAsync(CancellationToken cancellationToken)
5854
{
5955
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- Starting...{this.State}");
6056
if (this.Client.Options.KeepAlive == 0)
@@ -65,7 +61,7 @@ private void ConnectionMonitor()
6561
var keepAlivePeriod = this.Client.Options.KeepAlive;
6662
this.lastCommunicationTimer.Start();
6763

68-
while (true)
64+
while (!cancellationToken.IsCancellationRequested)
6965
{
7066
try
7167
{
@@ -97,7 +93,11 @@ private void ConnectionMonitor()
9793
this.RunTaskHealthCheck(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler");
9894

9995
// Sleep cycle
100-
Thread.Sleep(2000);
96+
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
97+
}
98+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
99+
{
100+
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- Stopped by cancellation token");
101101
}
102102
catch (Exception ex)
103103
{

Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public async Task ClientStateAsync()
8181
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
8282
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
8383
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
84-
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
84+
Assert.False(client.Connection.ConnectionMonitorThread.IsCompleted);
8585

8686
// Queues
8787
Assert.NotNull(client.Connection.SendQueue);
@@ -118,7 +118,7 @@ public async Task ClientStateAsync()
118118
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
119119
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
120120
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
121-
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
121+
Assert.False(client.Connection.ConnectionMonitorThread.IsCompleted);
122122

123123
// Queues
124124
Assert.NotNull(client.Connection.SendQueue);
@@ -136,7 +136,10 @@ public async Task ClientStateAsync()
136136
Assert.Null(client.Connection.ConnectionWriterTask);
137137
Assert.Null(client.Connection.ConnectionReaderTask);
138138
Assert.Null(client.Connection.ReceivedPacketsHandlerTask);
139-
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
139+
140+
// The task should be completed and null at this point since with every new call to ConnectAsync
141+
// a new task is started and during DisconnectAsync this task should be stopped and removed
142+
Assert.Null(client.Connection.ConnectionMonitorThread);
140143

141144
// Queues
142145
Assert.NotNull(client.Connection.SendQueue);
@@ -145,4 +148,23 @@ public async Task ClientStateAsync()
145148
// State
146149
Assert.Equal(ConnectState.Disconnected, client.Connection.State);
147150
}
151+
152+
[Fact]
153+
public async Task AfterDisconnect_ConnectionMonitorThread_ShouldBe_StoppedAsync()
154+
{
155+
using var client = new HiveMQClient();
156+
await client.ConnectAsync().ConfigureAwait(false);
157+
158+
// Hold the reference to the task since it's removed in the client
159+
// after DisconnectAsync
160+
var monitorThread = client.Connection.ConnectionMonitorThread;
161+
162+
Assert.True(monitorThread is not null && !monitorThread.IsCompleted);
163+
164+
await client.DisconnectAsync().ConfigureAwait(false);
165+
166+
// Task should be completed at this point. During DisconnectAsync the cancellation token
167+
// should be cancelled and the task should stop
168+
Assert.True(monitorThread is not null && monitorThread.IsCompleted);
169+
}
148170
}

0 commit comments

Comments
 (0)