Skip to content

fix: ConnectionMonitor is stopped by cancellation token during discon… #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Source/HiveMQtt/Client/Connection/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
this.ConnectionMonitorThread = this.LaunchConnectionMonitorThread();
this.ConnectionMonitorThread = this.LaunchConnectionMonitorThreadAsync(this.cancellationTokenSource.Token);

return true;
}
Expand All @@ -142,7 +142,7 @@
{
// Don't use CancelAsync here to maintain backwards compatibility
// with >=.net6.0. CancelAsync was introduced in .net8.0
this.cancellationTokenSource.Cancel();

Check warning on line 145 in Source/HiveMQtt/Client/Connection/ConnectionManager.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Check warning on line 145 in Source/HiveMQtt/Client/Connection/ConnectionManager.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x


// Delay for a short period to allow the tasks to cancel
await Task.Delay(1000).ConfigureAwait(false);
Expand Down Expand Up @@ -184,7 +184,7 @@
Logger.Trace("ReceivedPacketsHandlerTask did not complete in time");
}

if (this.ConnectionMonitorThread is not null && !this.ConnectionMonitorThread.IsAlive)
if (this.ConnectionMonitorThread is not null && this.ConnectionMonitorThread.IsCompleted)
{
this.ConnectionMonitorThread = null;
}
Expand Down
20 changes: 10 additions & 10 deletions Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public partial class ConnectionManager

internal Task? ReceivedPacketsHandlerTask { get; set; }

internal Thread? ConnectionMonitorThread { get; set; }
internal Task? ConnectionMonitorThread { get; set; }

/// <summary>
/// Health check method to assure that tasks haven't faulted unexpectedly.
Expand All @@ -43,18 +43,14 @@ private void RunTaskHealthCheck(Task? task, string taskName)
}
}

private Thread LaunchConnectionMonitorThread()
{
var thread = new Thread(this.ConnectionMonitor);
thread.Start();
return thread;
}
private Task LaunchConnectionMonitorThreadAsync(CancellationToken cancellationToken) =>
Task.Run(() => this.ConnectionMonitorAsync(cancellationToken), cancellationToken);

/// <summary>
/// Asynchronous background task that monitors the connection state and sends PingReq packets when
/// necessary.
/// </summary>
private void ConnectionMonitor()
private async Task ConnectionMonitorAsync(CancellationToken cancellationToken)
{
Logger.Trace($"{this.Client.Options.ClientId}-(CM)- Starting...{this.State}");
if (this.Client.Options.KeepAlive == 0)
Expand All @@ -65,7 +61,7 @@ private void ConnectionMonitor()
var keepAlivePeriod = this.Client.Options.KeepAlive;
this.lastCommunicationTimer.Start();

while (true)
while (!cancellationToken.IsCancellationRequested)
{
try
{
Expand Down Expand Up @@ -97,7 +93,11 @@ private void ConnectionMonitor()
this.RunTaskHealthCheck(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler");

// Sleep cycle
Thread.Sleep(2000);
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug($"{this.Client.Options.ClientId}-(CM)- Stopped by cancellation token");
}
catch (Exception ex)
{
Expand Down
28 changes: 25 additions & 3 deletions Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task ClientStateAsync()
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
Assert.False(client.Connection.ConnectionMonitorThread.IsCompleted);

// Queues
Assert.NotNull(client.Connection.SendQueue);
Expand Down Expand Up @@ -118,7 +118,7 @@ public async Task ClientStateAsync()
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionWriterTask.Status);
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ConnectionReaderTask.Status);
Assert.Equal(TaskStatus.WaitingForActivation, client.Connection.ReceivedPacketsHandlerTask.Status);
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);
Assert.False(client.Connection.ConnectionMonitorThread.IsCompleted);

// Queues
Assert.NotNull(client.Connection.SendQueue);
Expand All @@ -136,7 +136,10 @@ public async Task ClientStateAsync()
Assert.Null(client.Connection.ConnectionWriterTask);
Assert.Null(client.Connection.ConnectionReaderTask);
Assert.Null(client.Connection.ReceivedPacketsHandlerTask);
Assert.True(client.Connection.ConnectionMonitorThread.IsAlive);

// The task should be completed and null at this point since with every new call to ConnectAsync
// a new task is started and during DisconnectAsync this task should be stopped and removed
Assert.Null(client.Connection.ConnectionMonitorThread);

// Queues
Assert.NotNull(client.Connection.SendQueue);
Expand All @@ -145,4 +148,23 @@ public async Task ClientStateAsync()
// State
Assert.Equal(ConnectState.Disconnected, client.Connection.State);
}

[Fact]
public async Task AfterDisconnect_ConnectionMonitorThread_ShouldBe_StoppedAsync()
{
using var client = new HiveMQClient();
await client.ConnectAsync().ConfigureAwait(false);

// Hold the reference to the task since it's removed in the client
// after DisconnectAsync
var monitorThread = client.Connection.ConnectionMonitorThread;

Assert.True(monitorThread is not null && !monitorThread.IsCompleted);

await client.DisconnectAsync().ConfigureAwait(false);

// Task should be completed at this point. During DisconnectAsync the cancellation token
// should be cancelled and the task should stop
Assert.True(monitorThread is not null && monitorThread.IsCompleted);
}
}
Loading