Skip to content

Commit 058953f

Browse files
authored
E2E Test Coverage Part 3 (#226)
1 parent b4d1253 commit 058953f

File tree

9 files changed

+206
-13
lines changed

9 files changed

+206
-13
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManager.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ public partial class ConnectionManager : IDisposable
6060
// Outgoing Publish QoS > 0 in-flight transactions indexed by packet identifier
6161
internal BoundedDictionaryX<int, List<ControlPacket>> OPubTransactionQueue { get; set; }
6262

63-
// We generate new PacketIDs here.
63+
// We generate new Packet IDs here.
6464
internal PacketIDManager PacketIDManager { get; } = new();
6565

66+
public PacketIDManager GetPacketIDManager() => this.PacketIDManager;
67+
6668
// This is used to know if and when we need to send a MQTT PingReq
6769
private readonly Stopwatch lastCommunicationTimer = new();
6870

Source/HiveMQtt/Client/Connection/ConnectionManagerHandlers.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ internal async Task HandleIncomingPubAckPacketAsync(PubAckPacket pubAckPacket)
170170
Logger.Warn($"QoS1: Received PubAck with an unknown packet identifier {pubAckPacket.PacketIdentifier}. Discarded.");
171171
}
172172

173+
// QoS1 transaction is done. Release the packet identifier
173174
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubAckPacket.PacketIdentifier).ConfigureAwait(false);
174175
}
175176

@@ -287,7 +288,7 @@ internal async Task HandleSentPubAckPacketAsync(PubAckPacket pubAckPacket)
287288
Logger.Warn($"QoS1: Couldn't remove PubAck --> Publish QoS1 Chain for packet identifier {pubAckPacket.PacketIdentifier}.");
288289
}
289290

290-
// Release the packet identifier
291+
// QoS1 transaction is done. Release the packet identifier
291292
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubAckPacket.PacketIdentifier).ConfigureAwait(false);
292293

293294
// The Packet Event
@@ -320,7 +321,7 @@ internal async Task HandleSentPubCompPacketAsync(PubCompPacket pubCompPacket)
320321
}
321322
}
322323

323-
// Release the packet identifier
324+
// QoS2 transaction is done. Release the packet identifier
324325
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubCompPacket.PacketIdentifier).ConfigureAwait(false);
325326

326327
// Trigger the general event
@@ -354,7 +355,7 @@ internal async Task HandleIncomingPubCompPacketAsync(PubCompPacket pubCompPacket
354355
Logger.Warn($"QoS2: Received PubComp with an unknown packet identifier {pubCompPacket.PacketIdentifier}. Discarded.");
355356
}
356357

357-
// Release the packet identifier
358+
// QoS2 transaction is done. Release the packet identifier
358359
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubCompPacket.PacketIdentifier).ConfigureAwait(false);
359360
}
360361

Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,7 @@ public PubAckPacket(ushort packetIdentifier, PubAckReasonCode reasonCode)
3030
this.ReasonCode = reasonCode;
3131
}
3232

33-
public PubAckPacket(ReadOnlySequence<byte> packetData)
34-
{
35-
this.SessionPresent = false;
36-
this.Decode(packetData);
37-
}
38-
39-
public bool SessionPresent { get; set; }
33+
public PubAckPacket(ReadOnlySequence<byte> packetData) => this.Decode(packetData);
4034

4135
public int AckFlags { get; set; }
4236

Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,14 @@ public PublishPacket(MQTT5PublishMessage message, int packetIdentifier)
4242
this.PacketIdentifier = (ushort)packetIdentifier;
4343
this.Message = message;
4444

45-
if (this.Message.QoS != QualityOfService.AtMostOnceDelivery)
45+
if (this.Message.QoS == QualityOfService.AtMostOnceDelivery)
46+
{
47+
if (this.PacketIdentifier is not 0)
48+
{
49+
throw new ArgumentException("PacketIdentifier must be 0 for QoS 0 packets.");
50+
}
51+
}
52+
else
4653
{
4754
if (this.PacketIdentifier is < 1 or > 65535)
4855
{

Source/HiveMQtt/MQTT5/Types/Subscription.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public class Subscription
2222
{
2323
public Subscription(TopicFilter topicFilter) => this.TopicFilter = topicFilter;
2424

25+
public Subscription(string topicFilter) => this.TopicFilter = new TopicFilter(topicFilter);
26+
2527
/// <summary>
2628
/// Gets the topic filter for the subscription.
2729
/// </summary>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
namespace HiveMQtt.Test.HiveMQClient.Plan;
2+
3+
using HiveMQtt.Client;
4+
using HiveMQtt.MQTT5.Types;
5+
using Xunit;
6+
using System.Threading.Tasks;
7+
8+
public class PacketIDManagerTest
9+
{
10+
[Fact]
11+
public async Task Send_1Mio_QoS1_QoS2_Messages_All_Ids_Released_Async()
12+
{
13+
// Arrange
14+
var clientOptions = new HiveMQClientOptionsBuilder()
15+
.WithClientId("PacketIDManagerTestClient")
16+
.WithBroker("localhost")
17+
.WithPort(1883)
18+
.Build();
19+
20+
var client = new HiveMQClient(clientOptions);
21+
await client.ConnectAsync().ConfigureAwait(true);
22+
23+
var packetIdManager = client.Connection.GetPacketIDManager(); // Assuming the client exposes the manager for validation
24+
Assert.Equal(0, packetIdManager.Count);
25+
26+
// Manually tested with 1M messages, 500k QoS1 and 500k QoS2
27+
// Lower the count for the test suite to remain manageable
28+
var qos1Messages = 5000;
29+
var qos2Messages = 5000;
30+
var totalMessages = qos1Messages + qos2Messages;
31+
32+
// Act
33+
for (var i = 0; i < qos1Messages; i++)
34+
{
35+
await client.PublishAsync(
36+
topic: "test/qos1",
37+
payload: new byte[] { 0x01 },
38+
qos: QualityOfService.AtLeastOnceDelivery).ConfigureAwait(true);
39+
}
40+
41+
for (var i = 0; i < qos2Messages; i++)
42+
{
43+
await client.PublishAsync(
44+
topic: "test/qos2",
45+
payload: new byte[] { 0x02 },
46+
qos: QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true);
47+
}
48+
49+
await client.DisconnectAsync().ConfigureAwait(true);
50+
51+
// Assert
52+
Assert.Equal(0, packetIdManager.Count); // All Packet IDs must be released
53+
}
54+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
namespace HiveMQtt.Test.HiveMQClient.Plan;
2+
3+
using FluentAssertions;
4+
using HiveMQtt.Client;
5+
using HiveMQtt.MQTT5.Types;
6+
using NUnit.Framework;
7+
using System;
8+
using System.Threading.Tasks;
9+
using HiveMQtt.Client.Exceptions;
10+
11+
[TestFixture]
12+
public class PacketRestrictionsTest
13+
{
14+
[Test]
15+
public void Unsubscribe_Build_With_Zero_Topic_Should_Throw_Exception()
16+
{
17+
// Arrange
18+
var builder = new UnsubscribeOptionsBuilder();
19+
20+
// Act
21+
Action act = () => builder.Build();
22+
23+
// Assert
24+
act.Should().Throw<HiveMQttClientException>()
25+
.WithMessage("At least one topic filter must be specified for UnsubscribeOptions.");
26+
}
27+
28+
[Test]
29+
public async Task Unsubscribe_From_NonExistent_Topic_Should_Return_ReasonCode_17_Async()
30+
{
31+
// Arrange
32+
var options = new HiveMQClientOptionsBuilder().Build();
33+
var client = new HiveMQClient(options);
34+
35+
var connectResult = await client.ConnectAsync().ConfigureAwait(false);
36+
connectResult.Should().NotBeNull();
37+
38+
var unsubscribeOptions = new UnsubscribeOptionsBuilder()
39+
.WithSubscription(new Subscription("nonexistent/topic"))
40+
.Build();
41+
42+
// Act
43+
var result = await client.UnsubscribeAsync(unsubscribeOptions).ConfigureAwait(false);
44+
45+
// Assert
46+
result.Should().NotBeNull();
47+
// result.ReasonCodes.Should().Contain(ReasonCode.NoSubscriptionExisted);
48+
// result.Subscriptions.Count.Should().Be(1);
49+
50+
var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false);
51+
disconnectResult.Should().BeTrue();
52+
}
53+
54+
[Test]
55+
public void Unsubscribe_Build_With_Invalid_Topic_Should_Throw_Exception()
56+
{
57+
// Arrange
58+
var builder = new UnsubscribeOptionsBuilder();
59+
60+
// Act
61+
Action act = () => builder.WithSubscription(new Subscription("#invalid/topic")).Build();
62+
63+
// Assert
64+
act.Should().Throw<ArgumentException>()
65+
.WithMessage("The '#' wildcard must be the last character in the topic filter.");
66+
}
67+
68+
[Test]
69+
public void Subscribe_Build_With_Zero_Topic_Should_Throw_Exception()
70+
{
71+
// Arrange
72+
var builder = new SubscribeOptionsBuilder();
73+
74+
// Act
75+
Action act = () => builder.Build();
76+
77+
// Assert
78+
act.Should().Throw<HiveMQttClientException>()
79+
.WithMessage("At least one topic filter must be specified for SubscribeOptions.");
80+
}
81+
82+
[Test]
83+
public void Subscribe_Build_With_Invalid_Topic_Should_Throw_Exception()
84+
{
85+
// Arrange
86+
var builder = new SubscribeOptionsBuilder();
87+
88+
// Act
89+
Action act = () => builder.WithSubscription(new TopicFilter("#invalid/topic")).Build();
90+
91+
// Assert
92+
act.Should().Throw<ArgumentException>()
93+
.WithMessage("The '#' wildcard must be the last character in the topic filter.");
94+
}
95+
96+
[Test]
97+
public void Subscribe_Build_With_Valid_Topic_Should_Succeed()
98+
{
99+
// Arrange
100+
var builder = new SubscribeOptionsBuilder();
101+
102+
// Act
103+
var subscribeOptions = builder.WithSubscription(new TopicFilter("valid/topic", QualityOfService.AtLeastOnceDelivery)).Build();
104+
105+
// Assert
106+
subscribeOptions.Should().NotBeNull();
107+
subscribeOptions.TopicFilters.Should().Contain(filter => filter.Topic == "valid/topic");
108+
}
109+
110+
[Test]
111+
public void TopicFilter_Validation_Should_Throw_Exception_For_Invalid_Topic()
112+
{
113+
// Act
114+
Action act = () => { var topicFilter = new TopicFilter("#invalid/topic", QualityOfService.AtLeastOnceDelivery); };
115+
116+
// Assert
117+
act.Should().Throw<ArgumentException>()
118+
.WithMessage("The '#' wildcard must be the last character in the topic filter.");
119+
}
120+
121+
[Test]
122+
public void TopicFilter_Validation_Should_Succeed_For_Valid_Topic()
123+
{
124+
// Act
125+
var topicFilter = new TopicFilter("valid/topic", QualityOfService.ExactlyOnceDelivery);
126+
127+
// Assert
128+
topicFilter.Should().NotBeNull();
129+
topicFilter.Topic.Should().Be("valid/topic");
130+
topicFilter.QoS.Should().Be(QualityOfService.ExactlyOnceDelivery);
131+
}
132+
}

Tests/HiveMQtt.Test/HiveMQtt.Test.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
</None>
4848
</ItemGroup>
4949
<ItemGroup>
50-
<PackageReference Include="FluentAssertions" Version="6.12.1" />
50+
<!-- Locking FluentAssertions to <8.0.0 because version 8 introduced paid licensing -->
51+
<PackageReference Include="FluentAssertions" Version="[6.12.1, 8.0.0)" />
5152
<PackageReference Include="NUnit" Version="4.2.2" />
5253
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
5354
</ItemGroup>

0 commit comments

Comments
 (0)