diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 426d0e1..ceee2ed 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -234,9 +234,14 @@ public async Task PublishAsync(MQTT5PublishMessage message, Cance throw new HiveMQttClientException("Retained messages are not supported by the broker"); } - if (message.QoS.HasValue && this.Connection.ConnectionProperties.MaximumQoS.HasValue && + if (message.QoS.HasValue && this.Connection?.ConnectionProperties?.MaximumQoS.HasValue == true && (ushort)message.QoS.Value > this.Connection.ConnectionProperties.MaximumQoS.Value) { + if (this.Connection == null) + { + throw new HiveMQttClientException("Connection is not available"); + } + Logger.Debug($"Reducing message QoS from {message.QoS} to broker enforced maximum of {this.Connection.ConnectionProperties.MaximumQoS}"); message.QoS = (QualityOfService)this.Connection.ConnectionProperties.MaximumQoS.Value; } @@ -247,12 +252,17 @@ public async Task PublishAsync(MQTT5PublishMessage message, Cance var publishPacket = new PublishPacket(message, 0); Logger.Trace($"Queuing QoS 0 publish packet for send: {publishPacket.GetType().Name}"); - this.Connection.OutgoingPublishQueue.Enqueue(publishPacket); + this.Connection?.OutgoingPublishQueue.Enqueue(publishPacket); return new PublishResult(publishPacket.Message); } else if (message.QoS == QualityOfService.AtLeastOnceDelivery) { // QoS 1: Acknowledged Delivery + if (this.Connection == null) + { + throw new HiveMQttClientException("Connection is not available"); + } + var packetIdentifier = await this.Connection.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); var publishPacket = new PublishPacket(message, (ushort)packetIdentifier); PubAckPacket pubAckPacket; @@ -278,6 +288,11 @@ public async Task PublishAsync(MQTT5PublishMessage message, Cance else if (message.QoS == QualityOfService.ExactlyOnceDelivery) { // QoS 2: Assured Delivery + if (this.Connection == null) + { + throw new HiveMQttClientException("Connection is not available"); + } + var packetIdentifier = await this.Connection.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); var publishPacket = new PublishPacket(message, (ushort)packetIdentifier); var publishResult = new PublishResult(publishPacket.Message); @@ -406,6 +421,11 @@ public async Task SubscribeAsync(SubscribeOptions options) // Fire the corresponding event this.BeforeSubscribeEventLauncher(options); + if (this.Connection == null) + { + throw new HiveMQttClientException("Connection is not available"); + } + // FIXME: We should only ever have one subscribe in flight at any time (for now) // Construct the MQTT Subscribe packet var packetIdentifier = await this.Connection.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 894775c..d854457 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -262,6 +262,7 @@ internal virtual void OnMessageReceivedEventLauncher(PublishPacket packet) } }, TaskScheduler.Default); } + messageHandled = true; }