Skip to content

fix: lock subscriptions #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
/// <inheritdoc />
public List<Subscription> Subscriptions { get; } = new();

private SemaphoreSlim SubscriptionsSemaphore { get; } = new(1, 1);

/// <inheritdoc />
public bool IsConnected() => this.Connection.State == ConnectState.Connected;

Expand Down Expand Up @@ -479,8 +481,16 @@
subscription.MessageReceivedHandler = handler.Value;
}
}
}

this.Subscriptions.Add(subscription);
try
{
await this.SubscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
this.Subscriptions.AddRange(subscribeResult.Subscriptions);
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}

// Fire the corresponding event
Expand Down Expand Up @@ -508,9 +518,17 @@
/// <inheritdoc />
public async Task<UnsubscribeResult> UnsubscribeAsync(Subscription subscription)
{
if (!this.Subscriptions.Contains(subscription))
try
{
throw new HiveMQttClientException("No such subscription found. Make sure to take subscription(s) from HiveMQClient.Subscriptions[] or HiveMQClient.GetSubscriptionByTopic().");
await this.SubscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
if (!this.Subscriptions.Contains(subscription))
{
throw new HiveMQttClientException("No such subscription found. Make sure to take subscription(s) from HiveMQClient.Subscriptions[] or HiveMQClient.GetSubscriptionByTopic().");
}
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}

var unsubOptions = new UnsubscribeOptionsBuilder()
Expand All @@ -523,11 +541,22 @@
/// <inheritdoc />
public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscriptions)
{
for (var i = 0; i < subscriptions.Count; i++)
HashSet<Subscription> currentSubscriptions;
try
{
if (!this.Subscriptions.Contains(subscriptions[i]))
await this.SubscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
currentSubscriptions = this.Subscriptions.ToHashSet();

Check warning on line 548 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 548 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 548 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}

foreach (var sub in subscriptions)
{
if (!currentSubscriptions.Contains(sub))
{
throw new HiveMQttClientException("No such subscription found. Make sure to take subscription(s) from HiveMQClient.Subscriptions[] or HiveMQClient.GetSubscriptionByTopic().");
throw new HiveMQttClientException("No such subscription found. Make sure to take subscription(s) from HiveMQClient.Subscriptions[] or HiveMQClient.GetSubscriptionByTopic().");
}
}

Expand Down Expand Up @@ -589,13 +618,28 @@
};

var counter = 0;
var subscriptionsToRemove = new List<Subscription>();
foreach (var reasonCode in unsubAck.ReasonCodes)
{
unsubscribeResult.Subscriptions[counter].UnsubscribeReasonCode = reasonCode;
if (reasonCode == UnsubAckReasonCode.Success)
{
// Remove the subscription from the client
this.Subscriptions.Remove(unsubscribeResult.Subscriptions[counter]);
// Collect subscriptions which need to be removed
subscriptionsToRemove.Add(unsubscribeResult.Subscriptions[counter]);

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Fix formatting (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0055)

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Fix formatting

Check warning on line 628 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Fix formatting
}
}

if (subscriptionsToRemove.Count != 0)
{
try
{
// remove subscriptions from client while locking them
await this.SubscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
_ = this.Subscriptions.RemoveAll(subscriptionsToRemove.Contains);
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}
}

Expand Down
50 changes: 35 additions & 15 deletions Source/HiveMQtt/Client/HiveMQClientEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,26 +266,46 @@
messageHandled = true;
}

if (packet.Message.Topic is null)
{
return;
}

// Per Subscription Event Handler
foreach (var subscription in this.Subscriptions)
// use ToList, so the iteration goes through a copy and changes at the list make not problems
// otherwise it would be necessary to lock the Subscriptions with the semaphore of HiveMQClient
List<Subscription> tempList;
try
{
this.SubscriptionsSemaphore.Wait();
tempList = this.Subscriptions.ToList();

Check warning on line 281 in Source/HiveMQtt/Client/HiveMQClientEvents.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 281 in Source/HiveMQtt/Client/HiveMQClientEvents.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 281 in Source/HiveMQtt/Client/HiveMQClientEvents.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}

var matchingSubscriptions = tempList.Where(sub =>
sub.MessageReceivedHandler is not null &&
MatchTopic(sub.TopicFilter.Topic, packet.Message.Topic));

foreach (var subscription in matchingSubscriptions)
{
if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic))
// We have a per-subscription message handler.
_ = Task.Run(() =>
{
if (subscription.MessageReceivedHandler != null && subscription.MessageReceivedHandler.GetInvocationList().Length > 0)
try
{
// We have a per-subscription message handler.
_ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): {t.Exception?.Message}");
}
}, TaskScheduler.Default);

messageHandled = true;
subscription.MessageReceivedHandler?.Invoke(this, eventArgs);
}
}
catch (Exception e)
{
Logger.Error(
$"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): {e.Message}");
}
});

messageHandled = true;
}

if (!messageHandled)
Expand Down
31 changes: 16 additions & 15 deletions Source/HiveMQtt/Client/HiveMQClientUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,18 @@
/// <returns>A boolean indicating whether the subscription exists.</returns>
internal bool SubscriptionExists(Subscription subscription)
{
if (this.Subscriptions.Contains(subscription))
List<Subscription> tempList;
try
{
return true;
this.SubscriptionsSemaphore.Wait();
tempList = this.Subscriptions.ToList();

Check warning on line 38 in Source/HiveMQtt/Client/HiveMQClientUtil.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 38 in Source/HiveMQtt/Client/HiveMQClientUtil.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 38 in Source/HiveMQtt/Client/HiveMQClientUtil.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)
}

foreach (var candidate in this.Subscriptions)
finally
{
if (candidate.TopicFilter.Topic == subscription.TopicFilter.Topic)
{
return true;
}
_ = this.SubscriptionsSemaphore.Release();
}

return false;
return tempList.Any(s => s.TopicFilter.Topic == subscription.TopicFilter.Topic);
}

/// <summary>
Expand All @@ -54,15 +52,18 @@
/// <returns>The subscription or null if not found.</returns>
internal Subscription? GetSubscriptionByTopic(string topic)
{
foreach (var subscription in this.Subscriptions)
List<Subscription> tempList;
try
{
if (subscription.TopicFilter.Topic == topic)
{
return subscription;
}
this.SubscriptionsSemaphore.Wait();
tempList = this.Subscriptions.ToList();

Check warning on line 59 in Source/HiveMQtt/Client/HiveMQClientUtil.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)

Check warning on line 59 in Source/HiveMQtt/Client/HiveMQClientUtil.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-9.0.x

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0305)
}
finally
{
_ = this.SubscriptionsSemaphore.Release();
}

return null;
return tempList.FirstOrDefault(s => s.TopicFilter.Topic == topic);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace HiveMQtt.Test.HiveMQClient;

using System.Collections.Concurrent;
using Client;
using Client.Options;
using MQTT5.ReasonCodes;
using MQTT5.Types;
using Xunit;
using Xunit.Abstractions;

public class ThreadSafeSubscribeUnsubscribeTest
{
private readonly ITestOutputHelper testOutputHelper;

public ThreadSafeSubscribeUnsubscribeTest(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Use expression body for constructor (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0021)

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Use expression body for constructor (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0021)

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Use expression body for constructor (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0021)

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Use expression body for constructor (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0021)

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Use expression body for constructors

Check warning on line 17 in Tests/HiveMQtt.Test/HiveMQClient/ThreadSafeSubscribeUnsubscribeTest.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Use expression body for constructors
}

[Fact]
public async Task SubscribeUnsubscribe_InManyThreadsAsync()
{
const int workerCount = 100;
const int iterationsPerWorker = 100;
const int topicsPerIteration = 10;
const int publishesPerIteration = 10;
const int totalExpectedSuccesses = workerCount * iterationsPerWorker;

var options = new HiveMQClientOptionsBuilder().WithClientId("ConcurrentSubscribeUnsubscribeAndPublish").Build();
options.ResponseTimeoutInMs = 20000;
var client = new HiveMQClient(options);
Assert.NotNull(client);

var connectResult = await client.ConnectAsync().ConfigureAwait(false);
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
Assert.True(client.IsConnected());

client.OnMessageReceived += (_, args) => { };
_ = await client.SubscribeAsync("/test/#").ConfigureAwait(false);

var exceptionMessages = new ConcurrentBag<string>();
var successCount = 0;
var tasks = new List<Task>();

foreach (var workerId in Enumerable.Range(0, workerCount))
{
tasks.Add(Task.Run(async () =>
{
for (var i = 0; i < iterationsPerWorker; i++)
{
var topicPrefix = $"/test/topic/{workerId}/{i}";

var topicsToManage = Enumerable.Range(0, topicsPerIteration)
.Select(j => $"{topicPrefix}/{(char)('a' + j)}")
.ToList();

try
{
var topicFilters = topicsToManage.Select(topic => new TopicFilter(topic, QualityOfService.ExactlyOnceDelivery)).ToList();
var subscribeOptions = new SubscribeOptions { TopicFilters = topicFilters };
_ = await client.SubscribeAsync(subscribeOptions).ConfigureAwait(false);

var publishTasks = new List<Task>(publishesPerIteration * 3);
for (var j = 0; j < publishesPerIteration; j++)
{
publishTasks.Add(client.PublishAsync(topicsToManage.First(), "Hello World"));
publishTasks.Add(client.PublishAsync(topicsToManage.Last(), "Hello World"));
publishTasks.Add(client.PublishAsync("/unknown/topic", "Hello World"));
}

await Task.WhenAll(publishTasks).ConfigureAwait(false);

var subscriptions = topicsToManage.Select(topic => new Subscription(new TopicFilter(topic))).ToList();
var unsubscribeOptions = new UnsubscribeOptions { Subscriptions = subscriptions };
_ = await client.UnsubscribeAsync(unsubscribeOptions).ConfigureAwait(false);

_ = Interlocked.Increment(ref successCount);
}
catch (Exception e)
{
var errorMessage = $"Worker {workerId}, Iteration {i}: {e}";
exceptionMessages.Add(errorMessage);
this.testOutputHelper.WriteLine(errorMessage);
}
}
}));
}

await Task.WhenAll(tasks).ConfigureAwait(false);

Assert.Equal(totalExpectedSuccesses, successCount);
Assert.Empty(exceptionMessages);
}
}
Loading