From d4c30df8344247d271c36f31d5faad5b2b291053 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 7 Apr 2024 23:28:12 +0530 Subject: [PATCH 01/12] fix: v5 doesn't write outgoing packets onto network (#842) * fix: v5 doesn't write outgoing packets onto network https://github.com/bytebeamio/rumqtt/pull/825#issuecomment-2041392647 * same for pingreq --- rumqttc/src/v5/eventloop.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index ab1edb17c..a59094807 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -209,7 +209,9 @@ impl EventLoop { self.options.pending_throttle ), if !self.pending.is_empty() || (!inflight_full && !collision) => match o { Ok(request) => { - self.state.handle_outgoing_packet(request)?; + if let Some(outgoing) = self.state.handle_outgoing_packet(request)? { + network.write(outgoing).await?; + } network.flush().await?; Ok(self.state.events.pop_front().unwrap()) } @@ -228,7 +230,9 @@ impl EventLoop { let timeout = self.keepalive_timeout.as_mut().unwrap(); timeout.as_mut().reset(Instant::now() + self.options.keep_alive); - self.state.handle_outgoing_packet(Request::PingReq)?; + if let Some(outgoing) = self.state.handle_outgoing_packet(Request::PingReq)? { + network.write(outgoing).await?; + } network.flush().await?; Ok(self.state.events.pop_front().unwrap()) } From f72acc8b1aeb3a0dbe39c0799bfeb9d5058a0abc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 9 Apr 2024 22:47:43 +0530 Subject: [PATCH 02/12] chore(deps): bump h2 from 0.4.2 to 0.4.4 (#839) --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7396d431..75a7ced4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -847,9 +847,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h2" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" dependencies = [ "bytes", "fnv", From e191890341b3e4229015436151988e4725670152 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 25 Apr 2024 05:39:38 +0000 Subject: [PATCH 03/12] Add error code for V5 PubAck/Rec/Comp, improve Notify flow --- rumqttc/src/lib.rs | 10 ++++++++- rumqttc/src/v5/state.rs | 47 +++++++++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index a1fe6a5c7..b5a391163 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -156,7 +156,9 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType}; pub type Incoming = Packet; -use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode, UnsubAckReason}; +use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode, + UnsubAckReason, + PubAckReason, PubRecReason, PubCompReason}; #[derive(Debug, thiserror::Error)] pub enum NoticeError { @@ -168,6 +170,12 @@ pub enum NoticeError { V5Subscribe(V5SubscribeReasonCode), #[error(" v5 Unsubscription Failure Reason: {0:?}")] V5Unsubscribe(UnsubAckReason), + #[error(" v5 Publish Ack Failure Reason Code: {0:?}")] + V5PubAck(PubAckReason), + #[error(" v5 Publish Rec Failure Reason Code: {0:?}")] + V5PubRec(PubRecReason), + #[error(" v5 Publish Comp Failure Reason Code: {0:?}")] + V5PubComp(PubCompReason), } impl From for NoticeError { diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 63c88c312..d4f9128c3 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -269,7 +269,7 @@ impl MqttState { } _ => { if let Some(tx) = tx { - tx.error(NoticeError::V5Subscribe(*reason)) + tx.error(NoticeError::V5Subscribe(*reason)); } return Err(StateError::SubFail { reason: *reason }); @@ -278,7 +278,7 @@ impl MqttState { } if let Some(tx) = tx { - tx.success() + tx.success(); } Ok(None) @@ -301,7 +301,7 @@ impl MqttState { for reason in unsuback.reasons.iter() { if reason != &UnsubAckReason::Success { if let Some(tx) = tx { - tx.error(NoticeError::V5Unsubscribe(*reason)) + tx.error(NoticeError::V5Unsubscribe(*reason)); } return Err(StateError::UnsubFail { reason: *reason }); @@ -309,7 +309,7 @@ impl MqttState { } if let Some(tx) = tx { - tx.success() + tx.success(); } Ok(None) @@ -409,23 +409,27 @@ impl MqttState { return Err(StateError::Unsolicited(puback.pkid)); } - if let (_, Some(tx)) = self + let (_, tx) = self .outgoing_pub .remove(&puback.pkid) - .ok_or(StateError::Unsolicited(puback.pkid))? - { - tx.success() - } + .ok_or(StateError::Unsolicited(puback.pkid))?; self.inflight -= 1; if puback.reason != PubAckReason::Success && puback.reason != PubAckReason::NoMatchingSubscribers { + if let Some(tx) = tx { + tx.error(NoticeError::V5PubAck(puback.reason)); + } return Err(StateError::PubAckFail { reason: puback.reason, }); } + if let Some(tx) = tx + { + tx.success(); + } let packet = self.check_collision(puback.pkid).map(|(publish, tx)| { self.outgoing_pub @@ -456,6 +460,9 @@ impl MqttState { if pubrec.reason != PubRecReason::Success && pubrec.reason != PubRecReason::NoMatchingSubscribers { + if let Some(tx) = tx { + tx.error(NoticeError::V5PubRec(pubrec.reason)); + } return Err(StateError::PubRecFail { reason: pubrec.reason, }); @@ -497,15 +504,27 @@ impl MqttState { error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); return Err(StateError::Unsolicited(pubcomp.pkid)); } - if let Some(tx) = self + + let tx = self .outgoing_rel .remove(&pubcomp.pkid) - .ok_or(StateError::Unsolicited(pubcomp.pkid))? + .ok_or(StateError::Unsolicited(pubcomp.pkid))?; + + self.inflight -= 1; + + if pubcomp.reason != PubCompReason::Success { - tx.success() + if let Some(tx) = tx { + tx.error(NoticeError::V5PubComp(pubcomp.reason)); + } + return Err(StateError::PubCompFail { + reason: pubcomp.reason, + }); + } + if let Some(tx) = tx { + tx.success(); } - self.inflight -= 1; let packet = self.check_collision(pubcomp.pkid).map(|(publish, tx)| { self.outgoing_pub .insert(pubcomp.pkid, (publish.clone(), tx)); @@ -551,7 +570,7 @@ impl MqttState { self.outgoing_pub.insert(pkid, (publish.clone(), notice_tx)); self.inflight += 1; } else if let Some(tx) = notice_tx { - tx.success() + tx.success(); } debug!( From 4cb80e1484b25372444a4b52f5c52b1599a00b59 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 25 Apr 2024 07:06:33 +0000 Subject: [PATCH 04/12] Remove pkid things, modify change log. --- rumqttc/CHANGELOG.md | 2 +- rumqttc/examples/pkid_promise.rs | 70 ----------------------------- rumqttc/examples/pkid_promise_v5.rs | 70 ----------------------------- 3 files changed, 1 insertion(+), 141 deletions(-) delete mode 100644 rumqttc/examples/pkid_promise.rs delete mode 100644 rumqttc/examples/pkid_promise_v5.rs diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 254bd31a4..35c7014e3 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -56,7 +56,7 @@ To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initiali `rusttls-pemfile` to `2.0.0`, `async-tungstenite` to `0.24.0`, `ws_stream_tungstenite` to `0.12.0` and `http` to `1.0.0`. This is a breaking change as types from some of these crates are part of the public API. -- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `PkidPromise` which resolves into the identifier value chosen by the `EventLoop` when handling the packet. +- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `NoticeFuture` which is noticed after the packet is released (sent in QoS0, ACKed in QoS1, COMPed in QoS2). ### Deprecated diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs deleted file mode 100644 index 0fefd093e..000000000 --- a/rumqttc/examples/pkid_promise.rs +++ /dev/null @@ -1,70 +0,0 @@ -use futures_util::stream::StreamExt; -use tokio::{ - select, - task::{self, JoinSet}, -}; -use tokio_util::time::DelayQueue; - -use rumqttc::{AsyncClient, MqttOptions, QoS}; -use std::error::Error; -use std::time::Duration; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - }); - - loop { - let event = eventloop.poll().await; - match &event { - Ok(v) => { - println!("Event = {v:?}"); - } - Err(e) => { - println!("Error = {e:?}"); - return Ok(()); - } - } - } -} - -async fn requests(client: AsyncClient) { - let mut joins = JoinSet::new(); - joins.spawn( - client - .subscribe("hello/world", QoS::AtMostOnce) - .await - .unwrap() - .wait_async(), - ); - - let mut queue = DelayQueue::new(); - for i in 1..=10 { - queue.insert(i as usize, Duration::from_secs(i)); - } - - loop { - select! { - Some(i) = queue.next() => { - joins.spawn( - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) - .await - .unwrap().wait_async(), - ); - } - Some(Ok(Ok(pkid))) = joins.join_next() => { - println!("Pkid: {:?}", pkid); - } - else => break, - } - } -} diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs deleted file mode 100644 index e4bd3d31c..000000000 --- a/rumqttc/examples/pkid_promise_v5.rs +++ /dev/null @@ -1,70 +0,0 @@ -use futures_util::stream::StreamExt; -use tokio::{ - select, - task::{self, JoinSet}, -}; -use tokio_util::time::DelayQueue; - -use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions}; -use std::error::Error; -use std::time::Duration; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - }); - - loop { - let event = eventloop.poll().await; - match &event { - Ok(v) => { - println!("Event = {v:?}"); - } - Err(e) => { - println!("Error = {e:?}"); - return Ok(()); - } - } - } -} - -async fn requests(client: AsyncClient) { - let mut joins = JoinSet::new(); - joins.spawn( - client - .subscribe("hello/world", QoS::AtMostOnce) - .await - .unwrap() - .wait_async(), - ); - - let mut queue = DelayQueue::new(); - for i in 1..=10 { - queue.insert(i as usize, Duration::from_secs(i)); - } - - loop { - select! { - Some(i) = queue.next() => { - joins.spawn( - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) - .await - .unwrap().wait_async(), - ); - } - Some(Ok(Ok(pkid))) = joins.join_next() => { - println!("Pkid: {:?}", pkid); - } - else => break, - } - } -} From f46920a96bbc555e028c18723ec57d5a88ca7e89 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 23 May 2024 11:51:47 +0800 Subject: [PATCH 05/12] Fix testing issue. --- rumqttc/Cargo.toml | 1 + rumqttc/src/state.rs | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index 149e49fd0..bcf0d7752 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -30,6 +30,7 @@ bytes = "1.5" log = "0.4" flume = { version = "0.11", default-features = false, features = ["async"] } thiserror = "1" +linked-hash-map = "0.5" # Optional # rustls diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 43bcb7ba0..25ef1a25f 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -2,8 +2,9 @@ use crate::{Event, Incoming, NoticeError, NoticeTx, Outgoing, Request}; use crate::mqttbytes::v4::*; use crate::mqttbytes::{self, *}; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::{io, time::Instant}; +use linked_hash_map::LinkedHashMap; /// Errors during state handling #[derive(Debug, thiserror::Error)] @@ -62,14 +63,14 @@ pub struct MqttState { /// Maximum number of allowed inflight pub(crate) max_inflight: u16, /// Outgoing QoS 1, 2 publishes which aren't acked yet - pub(crate) outgoing_pub: HashMap)>, + pub(crate) outgoing_pub: LinkedHashMap)>, /// Packet ids of released QoS 2 publishes - pub(crate) outgoing_rel: HashMap>, + pub(crate) outgoing_rel: LinkedHashMap>, /// Packet ids on incoming QoS 2 publishes pub(crate) incoming_pub: Vec>, - outgoing_sub: HashMap>, - outgoing_unsub: HashMap>, + outgoing_sub: LinkedHashMap>, + outgoing_unsub: LinkedHashMap>, /// Last collision due to broker not acking in order pub collision: Option<(Publish, Option)>, @@ -94,11 +95,11 @@ impl MqttState { inflight: 0, max_inflight, // index 0 is wasted as 0 is not a valid packet id - outgoing_pub: HashMap::new(), - outgoing_rel: HashMap::new(), - incoming_pub: vec![None; std::u16::MAX as usize + 1], - outgoing_sub: HashMap::new(), - outgoing_unsub: HashMap::new(), + outgoing_pub: LinkedHashMap::new(), + outgoing_rel: LinkedHashMap::new(), + incoming_pub: vec![None; u16::MAX as usize + 1], + outgoing_sub: LinkedHashMap::new(), + outgoing_unsub: LinkedHashMap::new(), collision: None, // TODO: Optimize these sizes later events: VecDeque::with_capacity(100), @@ -110,10 +111,21 @@ impl MqttState { pub fn clean(&mut self) -> Vec { let mut pending = Vec::with_capacity(100); + let mut second_half = Vec::with_capacity(100); + let mut last_pkid_found = false; for (_, (publish, tx)) in self.outgoing_pub.drain() { + let this_pkid = publish.pkid; let request = Request::Publish(tx, publish); - pending.push(request); + if !last_pkid_found { + second_half.push(request); + if this_pkid == self.last_puback { + last_pkid_found = true; + } + } else { + pending.push(request); + } } + pending.extend(second_half); // remove and collect pending releases for (pkid, tx) in self.outgoing_rel.drain() { @@ -579,7 +591,7 @@ impl MqttState { #[cfg(test)] mod test { - use std::collections::HashMap; + use linked_hash_map::LinkedHashMap; use super::{MqttState, StateError}; use crate::mqttbytes::v4::*; @@ -886,8 +898,8 @@ mod test { fn clean_is_calculating_pending_correctly() { let mut mqtt = build_mqttstate(); - fn build_outgoing_pub() -> HashMap)> { - let mut outgoing_pub = HashMap::new(); + fn build_outgoing_pub() -> LinkedHashMap)> { + let mut outgoing_pub = LinkedHashMap::new(); outgoing_pub.insert( 2, ( From 8005ce407dc4b60268c90819403c67fc85291796 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Fri, 31 May 2024 15:17:21 +0800 Subject: [PATCH 06/12] Let user notified on incoming pubrec. --- rumqttc/examples/ack_notif_v5.rs | 65 ++++++++++++++++++++++++++++++++ rumqttc/src/state.rs | 14 +++++-- rumqttc/src/v5/state.rs | 8 +++- 3 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 rumqttc/examples/ack_notif_v5.rs diff --git a/rumqttc/examples/ack_notif_v5.rs b/rumqttc/examples/ack_notif_v5.rs new file mode 100644 index 000000000..6e97268a4 --- /dev/null +++ b/rumqttc/examples/ack_notif_v5.rs @@ -0,0 +1,65 @@ +use tokio::task::{self, JoinSet}; + +use rumqttc::v5::{AsyncClient, MqttOptions, mqttbytes::QoS}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {v:?}"); + } + Err(e) => { + println!("Error = {e:?}"); + } + } + } + }); + + // Subscribe and wait for broker acknowledgement + client + .subscribe("hello/world", QoS::AtMostOnce) + .await + .unwrap() + .wait_async() + .await + .unwrap(); + + // Publish and spawn wait for notification + let mut set = JoinSet::new(); + + let future = client + .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024]) + .await + .unwrap(); + set.spawn(future.wait_async()); + + let future = client + .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024]) + .await + .unwrap(); + set.spawn(future.wait_async()); + + let future = client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024]) + .await + .unwrap(); + set.spawn(future.wait_async()); + + while let Some(res) = set.join_next().await { + println!("Acknoledged = {:?}", res?); + } + + Ok(()) +} diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 25ef1a25f..2d55ba1d0 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -68,9 +68,10 @@ pub struct MqttState { pub(crate) outgoing_rel: LinkedHashMap>, /// Packet ids on incoming QoS 2 publishes pub(crate) incoming_pub: Vec>, - - outgoing_sub: LinkedHashMap>, - outgoing_unsub: LinkedHashMap>, + /// Outgoing subscribes + pub(crate) outgoing_sub: LinkedHashMap>, + /// Outgoing unsubscribes + pub(crate) outgoing_unsub: LinkedHashMap>, /// Last collision due to broker not acking in order pub collision: Option<(Publish, Option)>, @@ -321,8 +322,13 @@ impl MqttState { .remove(&pubrec.pkid) .ok_or(StateError::Unsolicited(pubrec.pkid))?; + // Notify user about the publish, pubrel and pubcomp will be handled in background + if let Some(tx) = tx { + tx.success(); + } + // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel.insert(pubrec.pkid, tx); + self.outgoing_rel.insert(pubrec.pkid, None); let pubrel = PubRel { pkid: pubrec.pkid }; let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); self.events.push_back(event); diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index d4f9128c3..0d19bc3a3 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -466,10 +466,16 @@ impl MqttState { return Err(StateError::PubRecFail { reason: pubrec.reason, }); + } else { + + // Notifying the PUBREC from broker, PUBREL and PUBCOMP will be in backgroud + if let Some(tx) = tx { + tx.success(); + } } // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel.insert(pubrec.pkid, tx); + self.outgoing_rel.insert(pubrec.pkid, None); let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); self.events.push_back(event); From 8774ed20e1058f7d8ed17e85adab79c131a5d7d0 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Fri, 31 May 2024 16:16:35 +0800 Subject: [PATCH 07/12] Remove Notify for PUBREL. --- rumqttc/src/lib.rs | 2 +- rumqttc/src/state.rs | 16 +++++++--------- rumqttc/src/v5/mod.rs | 6 +++--- rumqttc/src/v5/state.rs | 12 +++++------- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index b5a391163..144f859a8 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -253,7 +253,7 @@ pub enum Request { PubAck(PubAck), PubRec(PubRec), PubComp(PubComp), - PubRel(Option, PubRel), + PubRel(PubRel), PingReq(PingReq), PingResp(PingResp), Subscribe(Option, Subscribe), diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 2d55ba1d0..795960d71 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -129,8 +129,8 @@ impl MqttState { pending.extend(second_half); // remove and collect pending releases - for (pkid, tx) in self.outgoing_rel.drain() { - let request = Request::PubRel(tx, PubRel::new(pkid)); + for (pkid, _) in self.outgoing_rel.drain() { + let request = Request::PubRel(PubRel::new(pkid)); pending.push(request); } @@ -157,7 +157,7 @@ impl MqttState { ) -> Result, StateError> { let packet = match request { Request::Publish(tx, publish) => self.outgoing_publish(publish, tx)?, - Request::PubRel(tx, pubrel) => self.outgoing_pubrel(pubrel, tx)?, + Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?, Request::Subscribe(tx, subscribe) => self.outgoing_subscribe(subscribe, tx)?, Request::Unsubscribe(tx, unsubscribe) => self.outgoing_unsubscribe(unsubscribe, tx)?, Request::PingReq(_) => self.outgoing_ping()?, @@ -432,10 +432,9 @@ impl MqttState { fn outgoing_pubrel( &mut self, - pubrel: PubRel, - notice_tx: Option, + pubrel: PubRel ) -> Result, StateError> { - let pubrel = self.save_pubrel(pubrel, notice_tx)?; + let pubrel = self.save_pubrel(pubrel)?; debug!("Pubrel. Pkid = {}", pubrel.pkid); let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid)); @@ -558,8 +557,7 @@ impl MqttState { fn save_pubrel( &mut self, - mut pubrel: PubRel, - notice_tx: Option, + mut pubrel: PubRel ) -> Result { let pubrel = match pubrel.pkid { // consider PacketIdentifier(0) as uninitialized packets @@ -570,7 +568,7 @@ impl MqttState { _ => pubrel, }; - self.outgoing_rel.insert(pubrel.pkid, notice_tx); + self.outgoing_rel.insert(pubrel.pkid, None); self.inflight += 1; Ok(pubrel) } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 9e707568f..28ed6edac 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -39,7 +39,7 @@ pub enum Request { PubAck(PubAck), PubRec(PubRec), PubComp(PubComp), - PubRel(Option, PubRel), + PubRel(PubRel), PingReq, PingResp, Subscribe(Option, Subscribe), @@ -55,7 +55,7 @@ impl Clone for Request { Self::Publish(_, p) => Self::Publish(None, p.clone()), Self::PubAck(p) => Self::PubAck(p.clone()), Self::PubRec(p) => Self::PubRec(p.clone()), - Self::PubRel(_, p) => Self::PubRel(None, p.clone()), + Self::PubRel(p) => Self::PubRel(p.clone()), Self::PubComp(p) => Self::PubComp(p.clone()), Self::Subscribe(_, p) => Self::Subscribe(None, p.clone()), Self::SubAck(p) => Self::SubAck(p.clone()), @@ -74,7 +74,7 @@ impl PartialEq for Request { (Self::Publish(_, p1), Self::Publish(_, p2)) => p1 == p2, (Self::PubAck(p1), Self::PubAck(p2)) => p1 == p2, (Self::PubRec(p1), Self::PubRec(p2)) => p1 == p2, - (Self::PubRel(_, p1), Self::PubRel(_, p2)) => p1 == p2, + (Self::PubRel(p1), Self::PubRel(p2)) => p1 == p2, (Self::PubComp(p1), Self::PubComp(p2)) => p1 == p2, (Self::Subscribe(_, p1), Self::Subscribe(_, p2)) => p1 == p2, (Self::SubAck(p1), Self::SubAck(p2)) => p1 == p2, diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 0d19bc3a3..913fe99da 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -169,8 +169,8 @@ impl MqttState { } // remove and collect pending releases - for (pkid, tx) in self.outgoing_rel.drain() { - let request = Request::PubRel(tx, PubRel::new(pkid, None)); + for (pkid, _) in self.outgoing_rel.drain() { + let request = Request::PubRel(PubRel::new(pkid, None)); pending.push(request); } @@ -197,7 +197,7 @@ impl MqttState { ) -> Result, StateError> { let packet = match request { Request::Publish(tx, publish) => self.outgoing_publish(publish, tx)?, - Request::PubRel(tx, pubrel) => self.outgoing_pubrel(pubrel, tx)?, + Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?, Request::Subscribe(tx, subscribe) => self.outgoing_subscribe(subscribe, tx)?, Request::Unsubscribe(tx, unsubscribe) => self.outgoing_unsubscribe(unsubscribe, tx)?, Request::PingReq => self.outgoing_ping()?, @@ -610,9 +610,8 @@ impl MqttState { fn outgoing_pubrel( &mut self, pubrel: PubRel, - notice_tx: Option, ) -> Result, StateError> { - let pubrel = self.save_pubrel(pubrel, notice_tx)?; + let pubrel = self.save_pubrel(pubrel)?; debug!("Pubrel. Pkid = {}", pubrel.pkid); @@ -738,7 +737,6 @@ impl MqttState { fn save_pubrel( &mut self, mut pubrel: PubRel, - notice_tx: Option, ) -> Result { let pubrel = match pubrel.pkid { // consider PacketIdentifier(0) as uninitialized packets @@ -749,7 +747,7 @@ impl MqttState { _ => pubrel, }; - self.outgoing_rel.insert(pubrel.pkid, notice_tx); + self.outgoing_rel.insert(pubrel.pkid, None); self.inflight += 1; Ok(pubrel) } From 07c25b0cc49b44d81d5e5824c9b0a564296231ef Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Tue, 4 Jun 2024 10:30:23 +0800 Subject: [PATCH 08/12] Notice wait drop due to session reset. --- rumqttc/src/eventloop.rs | 22 ++++++++++++++++------ rumqttc/src/lib.rs | 2 ++ rumqttc/src/v5/eventloop.rs | 22 ++++++++++++++++------ 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9b1ce8c5..c3e227b32 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -1,6 +1,7 @@ use crate::{framed::Network, Transport}; use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError}; use crate::{MqttOptions, Outgoing}; +use crate::NoticeError; use crate::framed::AsyncReadWrite; use crate::mqttbytes::v4::*; @@ -149,13 +150,22 @@ impl EventLoop { Ok(inner) => inner?, Err(_) => return Err(ConnectionError::NetworkTimeout), }; + // Last session might contain packets which aren't acked. If it's a new session, clear the pending packets. + if !connack.session_present { + for request in self.pending.drain(..) { + // If the request is a publish request, send an error to the future that is waiting for the ack. + if let Request::Publish(Some(tx), _) = request { + tx.error(NoticeError::SessionReset) + } + } + } self.network = Some(network); if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() { self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive))); } - return Ok(Event::Incoming(connack)); + return Ok(Event::Incoming(Packet::ConnAck(connack))); } match self.select().await { @@ -294,14 +304,14 @@ impl EventLoop { async fn connect( mqtt_options: &MqttOptions, network_options: NetworkOptions, -) -> Result<(Network, Incoming), ConnectionError> { +) -> Result<(Network, ConnAck), ConnectionError> { // connect to the broker let mut network = network_connect(mqtt_options, network_options).await?; // make MQTT connection request (which internally awaits for ack) - let packet = mqtt_connect(mqtt_options, &mut network).await?; + let connack = mqtt_connect(mqtt_options, &mut network).await?; - Ok((network, packet)) + Ok((network, connack)) } pub(crate) async fn socket_connect( @@ -469,7 +479,7 @@ async fn network_connect( async fn mqtt_connect( options: &MqttOptions, network: &mut Network, -) -> Result { +) -> Result { let keep_alive = options.keep_alive().as_secs() as u16; let clean_session = options.clean_session(); let last_will = options.last_will(); @@ -486,7 +496,7 @@ async fn mqtt_connect( // validate connack match network.read().await? { Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - Ok(Packet::ConnAck(connack)) + Ok(connack) } Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => Err(ConnectionError::NotConnAck(packet)), diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 144f859a8..020120b5b 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -176,6 +176,8 @@ pub enum NoticeError { V5PubRec(PubRecReason), #[error(" v5 Publish Comp Failure Reason Code: {0:?}")] V5PubComp(PubCompReason), + #[error(" Dropped due to session reconnect with previous state expire/lost")] + SessionReset, } impl From for NoticeError { diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index 996bdcf2b..94e9391cb 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -3,6 +3,7 @@ use super::mqttbytes::v5::*; use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Transport}; use crate::eventloop::socket_connect; use crate::framed::AsyncReadWrite; +use crate::NoticeError; use flume::{bounded, Receiver, Sender}; use tokio::select; @@ -141,13 +142,22 @@ impl EventLoop { connect(&mut self.options), ) .await??; + // Last session might contain packets which aren't acked. If it's a new session, clear the pending packets. + if !connack.session_present { + for request in self.pending.drain(..) { + // If the request is a publish request, send an error to the future that is waiting for the ack. + if let Request::Publish(Some(tx), _) = request { + tx.error(NoticeError::SessionReset) + } + } + } self.network = Some(network); if self.keepalive_timeout.is_none() { self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive))); } - self.state.handle_incoming_packet(connack)?; + self.state.handle_incoming_packet(Packet::ConnAck(connack))?; } match self.select().await { @@ -263,19 +273,19 @@ impl EventLoop { /// the stream. /// This function (for convenience) includes internal delays for users to perform internal sleeps /// between re-connections so that cancel semantics can be used during this sleep -async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> { +async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> { // connect to the broker let mut network = network_connect(options).await?; // make MQTT connection request (which internally awaits for ack) - let packet = mqtt_connect(options, &mut network).await?; + let connack = mqtt_connect(options, &mut network).await?; // Last session might contain packets which aren't acked. MQTT says these packets should be // republished in the next session // move pending messages from state to eventloop // let pending = self.state.clean(); // self.pending = pending.into_iter(); - Ok((network, packet)) + Ok((network, connack)) } async fn network_connect(options: &MqttOptions) -> Result { @@ -387,7 +397,7 @@ async fn network_connect(options: &MqttOptions) -> Result Result { +) -> Result { let keep_alive = options.keep_alive().as_secs() as u16; let clean_start = options.clean_start(); let client_id = options.client_id(); @@ -413,7 +423,7 @@ async fn mqtt_connect( } network.set_max_outgoing_size(props.max_packet_size); } - Ok(Packet::ConnAck(connack)) + Ok(connack) } Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => Err(ConnectionError::NotConnAck(Box::new(packet))), From 576bd79b348192b1e4bdfcfadb815cae3b53d5f2 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Wed, 3 Jul 2024 10:54:09 +0800 Subject: [PATCH 09/12] Update tests. --- rumqttc/examples/ack_notif.rs | 23 +++++++++++++++++++---- rumqttc/examples/ack_notif_v5.rs | 23 +++++++++++++++++++---- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/rumqttc/examples/ack_notif.rs b/rumqttc/examples/ack_notif.rs index bb98599a4..970394f69 100644 --- a/rumqttc/examples/ack_notif.rs +++ b/rumqttc/examples/ack_notif.rs @@ -1,4 +1,4 @@ -use tokio::task::{self, JoinSet}; +use tokio::{task::{self, JoinSet}, time}; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::error::Error; @@ -35,24 +35,38 @@ async fn main() -> Result<(), Box> { .wait_async() .await .unwrap(); + client + .subscribe("hello/world", QoS::AtLeastOnce) + .await + .unwrap() + .wait_async() + .await + .unwrap(); + client + .subscribe("hello/world", QoS::ExactlyOnce) + .await + .unwrap() + .wait_async() + .await + .unwrap(); // Publish and spawn wait for notification let mut set = JoinSet::new(); let future = client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) .await .unwrap(); set.spawn(future.wait_async()); let future = client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) .await .unwrap(); set.spawn(future.wait_async()); let future = client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) .await .unwrap(); set.spawn(future.wait_async()); @@ -61,5 +75,6 @@ async fn main() -> Result<(), Box> { println!("Acknoledged = {:?}", res?); } + time::sleep(Duration::from_secs(6)).await; Ok(()) } diff --git a/rumqttc/examples/ack_notif_v5.rs b/rumqttc/examples/ack_notif_v5.rs index 6e97268a4..78e99ef8f 100644 --- a/rumqttc/examples/ack_notif_v5.rs +++ b/rumqttc/examples/ack_notif_v5.rs @@ -1,4 +1,4 @@ -use tokio::task::{self, JoinSet}; +use tokio::{task::{self, JoinSet}, time}; use rumqttc::v5::{AsyncClient, MqttOptions, mqttbytes::QoS}; use std::error::Error; @@ -35,24 +35,38 @@ async fn main() -> Result<(), Box> { .wait_async() .await .unwrap(); + client + .subscribe("hello/world", QoS::AtLeastOnce) + .await + .unwrap() + .wait_async() + .await + .unwrap(); + client + .subscribe("hello/world", QoS::ExactlyOnce) + .await + .unwrap() + .wait_async() + .await + .unwrap(); // Publish and spawn wait for notification let mut set = JoinSet::new(); let future = client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) .await .unwrap(); set.spawn(future.wait_async()); let future = client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) .await .unwrap(); set.spawn(future.wait_async()); let future = client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024]) + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) .await .unwrap(); set.spawn(future.wait_async()); @@ -61,5 +75,6 @@ async fn main() -> Result<(), Box> { println!("Acknoledged = {:?}", res?); } + time::sleep(Duration::from_secs(6)).await; Ok(()) } From a63216fc3abf9d9bd008213e651a0536f4a344a7 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Wed, 3 Jul 2024 10:59:17 +0800 Subject: [PATCH 10/12] Publish NoticeTx. --- rumqttc/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index e547a3aee..017ec18e8 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -141,8 +141,7 @@ pub use client::{ pub use eventloop::{ConnectionError, Event, EventLoop}; pub use mqttbytes::v4::*; pub use mqttbytes::*; -use notice::NoticeTx; -pub use notice::{NoticeError, NoticeFuture}; +pub use notice::{NoticeTx, NoticeError, NoticeFuture}; #[cfg(feature = "use-rustls")] use rustls_native_certs::load_native_certs; pub use state::{MqttState, StateError}; From fb290fbe34f14f4de24ce3aae765a4a7314a66f2 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Wed, 3 Jul 2024 11:14:50 +0800 Subject: [PATCH 11/12] Notify on QoS2 pubcomp --- rumqttc/src/state.rs | 4 ---- rumqttc/src/v5/state.rs | 5 ----- 2 files changed, 9 deletions(-) diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index b0096c98b..e870414e7 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -312,10 +312,6 @@ impl MqttState { .remove(&pubrec.pkid) .ok_or(StateError::Unsolicited(pubrec.pkid))?; - // TODO: deprecated - // Notify user about the publish, pubrel and pubcomp will be handled in background - // tx.success(); - // NOTE: Inflight - 1 for qos2 in comp self.outgoing_rel.insert(pubrec.pkid, tx); let pubrel = PubRel { pkid: pubrec.pkid }; diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 0d09b0a81..d11d5cd27 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -448,11 +448,6 @@ impl MqttState { return Err(StateError::PubRecFail { reason: pubrec.reason, }); - } else { - - // TODO: deprecated - // Notifying the PUBREC from broker, PUBREL and PUBCOMP will be in backgroud - // tx.success(); } // NOTE: Inflight - 1 for qos2 in comp From 68fcb73b1d736449626e39d5528d74dbad82287e Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 4 Jul 2024 09:57:27 +0800 Subject: [PATCH 12/12] Squashed commit of the following: commit 518773d906bcb1cb9e148b12d0a58b7d92ff0468 Author: CQ Xiao <53201544+xiaocq2001@users.noreply.github.com> Date: Tue May 21 14:47:32 2024 +0800 rumqttc: resume session only if CONNACK with session present 1 (#864) * Check if session present to restore pending publishes. * Modify changelog. * remove changes that don't seem to be related * refactor: improve readability * feat: apply changes to v4 * Remove session_expiry_interval related code. * test: set clean session * test: broker saved session * test: fix resume reconnect --------- Co-authored-by: Devdutt Shenoi Co-authored-by: Devdutt Shenoi commit 67d9ca7bebfc049220fbfaad284dc76864391122 Author: CQ Xiao <53201544+xiaocq2001@users.noreply.github.com> Date: Thu May 16 23:05:10 2024 +0800 feat(rumqttc): set `session_expiry_interval` in v5 (#854) Co-authored-by: Devdutt Shenoi commit 98997d1085b1de7fb9cf684d0c80b66782bcb420 Author: CQ Xiao Date: Wed Jul 3 18:13:54 2024 +0800 HashMap -> VecDeque, cleanup --- rumqttc/CHANGELOG.md | 2 + rumqttc/examples/async_manual_acks_v5.rs | 1 + rumqttc/src/eventloop.rs | 2 + rumqttc/src/state.rs | 182 ++++++++++++++--------- rumqttc/src/v5/eventloop.rs | 8 +- rumqttc/src/v5/mod.rs | 21 +++ rumqttc/tests/broker.rs | 9 +- rumqttc/tests/reliability.rs | 38 ++--- 8 files changed, 173 insertions(+), 90 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 35c7014e3..03f320ea4 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `size()` method on `Packet` calculates size once serialized. * `read()` and `write()` methods on `Packet`. * `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection +* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`. ### Changed @@ -27,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Validate filters while creating subscription requests. * Make v4::Connect::write return correct value +* Resume session only if broker sends `CONNACK` with `session_present == 1`. ### Security diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index bcf1bf356..7597a6164 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -10,6 +10,7 @@ fn create_conn() -> (AsyncClient, EventLoop) { let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); mqttoptions .set_keep_alive(Duration::from_secs(5)) + .set_session_expiry_interval(u32::MAX.into()) .set_manual_acks(true) .set_clean_start(false); diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 0b09c1388..237cfbc28 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -172,6 +172,8 @@ impl EventLoop { match self.select().await { Ok(v) => Ok(v), Err(e) => { + // MQTT requires that packets pending acknowledgement should be republished on session resume. + // Move pending messages from state to eventloop. self.clean(); Err(e) } diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index e870414e7..220fa9a18 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -5,7 +5,6 @@ use crate::mqttbytes::v4::*; use crate::mqttbytes::{self, *}; use std::collections::VecDeque; use std::{io, time::Instant}; -use linked_hash_map::LinkedHashMap; /// Errors during state handling #[derive(Debug, thiserror::Error)] @@ -64,15 +63,16 @@ pub struct MqttState { /// Maximum number of allowed inflight pub(crate) max_inflight: u16, /// Outgoing QoS 1, 2 publishes which aren't acked yet - pub(crate) outgoing_pub: LinkedHashMap, + pub(crate) outgoing_pub1: VecDeque<(Publish, NoticeTx)>, + pub(crate) outgoing_pub2: VecDeque<(Publish, NoticeTx)>, /// Packet ids of released QoS 2 publishes - pub(crate) outgoing_rel: LinkedHashMap, + pub(crate) outgoing_rel: VecDeque<(u16, NoticeTx)>, /// Packet ids on incoming QoS 2 publishes pub(crate) incoming_pub: Vec>, /// Outgoing subscribes - pub(crate) outgoing_sub: LinkedHashMap, + pub(crate) outgoing_sub: VecDeque<(u16, NoticeTx)>, /// Outgoing unsubscribes - pub(crate) outgoing_unsub: LinkedHashMap, + pub(crate) outgoing_unsub: VecDeque<(u16, NoticeTx)>, /// Last collision due to broker not acking in order pub(crate) collision: Option<(Publish, NoticeTx)>, @@ -97,11 +97,12 @@ impl MqttState { inflight: 0, max_inflight, // index 0 is wasted as 0 is not a valid packet id - outgoing_pub: LinkedHashMap::new(), - outgoing_rel: LinkedHashMap::new(), + outgoing_pub1: VecDeque::with_capacity(10), + outgoing_pub2: VecDeque::with_capacity(10), + outgoing_rel: VecDeque::with_capacity(10), incoming_pub: vec![None; u16::MAX as usize + 1], - outgoing_sub: LinkedHashMap::new(), - outgoing_unsub: LinkedHashMap::new(), + outgoing_sub: VecDeque::with_capacity(10), + outgoing_unsub: VecDeque::with_capacity(10), collision: None, // TODO: Optimize these sizes later events: VecDeque::with_capacity(100), @@ -113,24 +114,27 @@ impl MqttState { pub fn clean(&mut self) -> Vec<(NoticeTx, Request)> { let mut pending = Vec::with_capacity(100); - let mut second_half = Vec::with_capacity(100); - let mut last_pkid_found = false; - for (_, (publish, tx)) in self.outgoing_pub.drain() { - let this_pkid = publish.pkid; - let request = Request::Publish(publish); - if !last_pkid_found { - second_half.push((tx, request)); - if this_pkid == self.last_puback { - last_pkid_found = true; + for outgoing_pub in [&mut self.outgoing_pub1, &mut self.outgoing_pub2] { + let mut second_half = Vec::with_capacity(100); + let mut last_pkid_found = false; + for (publish, tx) in outgoing_pub.drain(..) { + let this_pkid = publish.pkid; + let request = Request::Publish(publish); + if !last_pkid_found { + second_half.push((tx, request)); + if this_pkid == self.last_puback { + last_pkid_found = true; + } + } else { + pending.push((tx, request)); } - } else { - pending.push((tx, request)); } + pending.extend(second_half); } - pending.extend(second_half); + println!("pending = {:?}", pending.len()); // remove and collect pending releases - for (pkid, tx) in self.outgoing_rel.drain() { + for (pkid, tx) in self.outgoing_rel.drain(..) { let request = Request::PubRel(PubRel::new(pkid)); pending.push((tx, request)); } @@ -206,11 +210,14 @@ impl MqttState { error!("Unsolicited suback packet: {:?}", suback.pkid); return Err(StateError::Unsolicited(suback.pkid)); } - - let tx = self + // No expecting ordered acks for suback + // Search outgoing_sub to find the right suback.pkid + let pos = self .outgoing_sub - .remove(&suback.pkid) + .iter() + .position(|(pkid, _)| *pkid == suback.pkid) .ok_or(StateError::Unsolicited(suback.pkid))?; + let (_, tx) = self.outgoing_sub.remove(pos).unwrap(); for reason in suback.return_codes.iter() { match reason { @@ -236,10 +243,15 @@ impl MqttState { error!("Unsolicited unsuback packet: {:?}", unsuback.pkid); return Err(StateError::Unsolicited(unsuback.pkid)); } - self.outgoing_sub - .remove(&unsuback.pkid) - .ok_or(StateError::Unsolicited(unsuback.pkid))? - .success(); + // No expecting ordered acks for unsuback + // Search outgoing_unsub to find the right suback.pkid + let pos = self + .outgoing_unsub + .iter() + .position(|(pkid, _)| *pkid == unsuback.pkid) + .ok_or(StateError::Unsolicited(unsuback.pkid))?; + let (_, tx) = self.outgoing_unsub.remove(pos).unwrap(); + tx.success(); Ok(None) } @@ -276,10 +288,11 @@ impl MqttState { error!("Unsolicited puback packet: {:?}", puback.pkid); return Err(StateError::Unsolicited(puback.pkid)); } - + // Expecting ordered acks for puback + // Check front of outgoing_pub to see if it's in order let (_, tx) = self - .outgoing_pub - .remove(&puback.pkid) + .outgoing_pub1 + .pop_front() .ok_or(StateError::Unsolicited(puback.pkid))?; tx.success(); @@ -287,8 +300,11 @@ impl MqttState { self.inflight -= 1; let packet = self.check_collision(puback.pkid).map(|(publish, tx)| { - self.outgoing_pub - .insert(publish.pkid, (publish.clone(), tx)); + if publish.qos == QoS::AtLeastOnce { + self.outgoing_pub1.push_back((publish.clone(), tx)); + } else if publish.qos == QoS::AtMostOnce { + self.outgoing_pub2.push_back((publish.clone(), tx)); + } self.inflight += 1; let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); @@ -307,13 +323,19 @@ impl MqttState { return Err(StateError::Unsolicited(pubrec.pkid)); } - let (_, tx) = self - .outgoing_pub - .remove(&pubrec.pkid) + // Expecting ordered acks for pubrec + // Check front of outgoing_pub to see if it's in order + let (publish, tx) = self + .outgoing_pub2 + .pop_front() .ok_or(StateError::Unsolicited(pubrec.pkid))?; + if publish.pkid != pubrec.pkid { + error!("Unsolicited pubrec packet: {:?}", pubrec.pkid); + return Err(StateError::Unsolicited(pubrec.pkid)); + } // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel.insert(pubrec.pkid, tx); + self.outgoing_rel.push_back((pubrec.pkid, tx)); let pubrel = PubRel { pkid: pubrec.pkid }; let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); self.events.push_back(event); @@ -344,15 +366,25 @@ impl MqttState { error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); return Err(StateError::Unsolicited(pubcomp.pkid)); } - self.outgoing_rel - .remove(&pubcomp.pkid) - .ok_or(StateError::Unsolicited(pubcomp.pkid))? - .success(); + // Expecting ordered acks for pubcomp + // Check front of outgoing_pub to see if it's in order + let (pkid, tx) = self + .outgoing_rel + .pop_front() + .ok_or(StateError::Unsolicited(pubcomp.pkid))?; + if pkid != pubcomp.pkid { + error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); + return Err(StateError::Unsolicited(pubcomp.pkid)); + } + tx.success(); self.inflight -= 1; let packet = self.check_collision(pubcomp.pkid).map(|(publish, tx)| { - self.outgoing_pub - .insert(pubcomp.pkid, (publish.clone(), tx)); + if publish.qos == QoS::AtLeastOnce { + self.outgoing_pub1.push_back((publish.clone(), tx)); + } else if publish.qos == QoS::AtMostOnce { + self.outgoing_pub2.push_back((publish.clone(), tx)); + } let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); self.events.push_back(event); self.collision_ping_count = 0; @@ -383,7 +415,13 @@ impl MqttState { let pkid = publish.pkid; - if self.outgoing_pub.get(&publish.pkid).is_some() { + let outgoing_pub = match publish.qos { + QoS::AtLeastOnce => &mut self.outgoing_pub1, + QoS::ExactlyOnce => &mut self.outgoing_pub2, + _ => unreachable!(), + }; + if let Some(pos) = outgoing_pub.iter().position(|(publish, _)| publish.pkid == pkid) { + outgoing_pub.get(pos); info!("Collision on packet id = {:?}", publish.pkid); self.collision = Some((publish, notice_tx)); let event = Event::Outgoing(Outgoing::AwaitAck(pkid)); @@ -393,7 +431,7 @@ impl MqttState { // if there is an existing publish at this pkid, this implies that broker hasn't acked this // packet yet. This error is possible only when broker isn't acking sequentially - self.outgoing_pub.insert(pkid, (publish.clone(), notice_tx)); + outgoing_pub.push_back((publish.clone(), notice_tx)); self.inflight += 1; } else { notice_tx.success() @@ -492,7 +530,7 @@ impl MqttState { subscription.filters, subscription.pkid ); - self.outgoing_sub.insert(pkid, notice_tx); + self.outgoing_sub.push_back((pkid, notice_tx)); let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid)); self.events.push_back(event); @@ -512,7 +550,7 @@ impl MqttState { unsub.topics, unsub.pkid ); - self.outgoing_unsub.insert(pkid, notice_tx); + self.outgoing_unsub.push_back((pkid, notice_tx)); let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid)); self.events.push_back(event); @@ -552,8 +590,7 @@ impl MqttState { _ => pubrel, }; - self.outgoing_rel.insert(pubrel.pkid, notice_tx); - self.inflight += 1; + self.outgoing_rel.push_back((pubrel.pkid, notice_tx)); Ok(pubrel) } @@ -579,7 +616,7 @@ impl MqttState { #[cfg(test)] mod test { - use linked_hash_map::LinkedHashMap; + use std::collections::VecDeque; use super::{MqttState, StateError}; use crate::mqttbytes::v4::*; @@ -763,11 +800,18 @@ mod test { mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap(); assert_eq!(mqtt.inflight, 1); - mqtt.handle_incoming_puback(&PubAck::new(2)).unwrap(); + mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap(); + assert_eq!(mqtt.inflight, 1); + + let (tx, _) = NoticeTx::new(); + mqtt.outgoing_pubrel(PubRel::new(2), tx).unwrap(); + assert_eq!(mqtt.inflight, 1); + + mqtt.handle_incoming_pubcomp(&PubComp::new(2)).unwrap(); assert_eq!(mqtt.inflight, 0); - assert!(mqtt.outgoing_pub.get(&1).is_none()); - assert!(mqtt.outgoing_pub.get(&2).is_none()); + assert!(mqtt.outgoing_pub1.get(0).is_none()); + assert!(mqtt.outgoing_pub2.get(0).is_none()); } #[test] @@ -798,11 +842,11 @@ mod test { assert_eq!(mqtt.inflight, 2); // check if the remaining element's pkid is 1 - let (backup, _) = mqtt.outgoing_pub.get(&1).unwrap(); + let (backup, _) = mqtt.outgoing_pub1.get(0).unwrap(); assert_eq!(backup.pkid, 1); // check if the qos2 element's release pkik has been set - assert!(mqtt.outgoing_rel.get(&2).is_some()); + assert!(mqtt.outgoing_rel.get(0).is_some()); } #[test] @@ -898,11 +942,11 @@ mod test { fn clean_is_calculating_pending_correctly() { let mut mqtt = build_mqttstate(); - fn build_outgoing_pub() -> LinkedHashMap { - let mut outgoing_pub = LinkedHashMap::new(); + fn build_outgoing_pub() -> VecDeque<(Publish, NoticeTx)> { + let mut outgoing_pub = VecDeque::new(); let (tx, _) = NoticeTx::new(); - outgoing_pub.insert( - 2, + outgoing_pub.push_back( + // 2, ( Publish { dup: false, @@ -916,8 +960,8 @@ mod test { ), ); let (tx, _) = NoticeTx::new(); - outgoing_pub.insert( - 3, + outgoing_pub.push_back( + // 3, ( Publish { dup: false, @@ -931,8 +975,8 @@ mod test { ), ); let (tx, _) = NoticeTx::new(); - outgoing_pub.insert( - 4, + outgoing_pub.push_back( + // 4, ( Publish { dup: false, @@ -946,8 +990,8 @@ mod test { ), ); let (tx, _) = NoticeTx::new(); - outgoing_pub.insert( - 7, + outgoing_pub.push_back( + // 7, ( Publish { dup: false, @@ -964,7 +1008,7 @@ mod test { outgoing_pub } - mqtt.outgoing_pub = build_outgoing_pub(); + mqtt.outgoing_pub1 = build_outgoing_pub(); mqtt.last_puback = 3; let requests = mqtt.clean(); let res = vec![6, 1, 2, 3]; @@ -976,7 +1020,7 @@ mod test { } } - mqtt.outgoing_pub = build_outgoing_pub(); + mqtt.outgoing_pub1 = build_outgoing_pub(); mqtt.last_puback = 0; let requests = mqtt.clean(); let res = vec![1, 2, 3, 6]; @@ -988,7 +1032,7 @@ mod test { } } - mqtt.outgoing_pub = build_outgoing_pub(); + mqtt.outgoing_pub1 = build_outgoing_pub(); mqtt.last_puback = 6; let requests = mqtt.clean(); let res = vec![1, 2, 3, 6]; diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index b01d627cc..0db89296d 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -163,6 +163,8 @@ impl EventLoop { match self.select().await { Ok(v) => Ok(v), Err(e) => { + // MQTT requires that packets pending acknowledgement should be republished on session resume. + // Move pending messages from state to eventloop. self.clean(); Err(e) } @@ -417,12 +419,16 @@ async fn mqtt_connect( // validate connack match network.read().await? { Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - // Override local keep_alive value if set by server. if let Some(props) = &connack.properties { if let Some(keep_alive) = props.server_keep_alive { options.keep_alive = Duration::from_secs(keep_alive as u64); } network.set_max_outgoing_size(props.max_packet_size); + + // Override local session_expiry_interval value if set by server. + if props.session_expiry_interval.is_some() { + options.set_session_expiry_interval(props.session_expiry_interval); + } } Ok(connack) } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index a2bafc9a6..00ad6df68 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -353,6 +353,27 @@ impl MqttOptions { self.connect_properties.clone() } + /// set session expiry interval on connection properties + pub fn set_session_expiry_interval(&mut self, interval: Option) -> &mut Self { + if let Some(conn_props) = &mut self.connect_properties { + conn_props.session_expiry_interval = interval; + self + } else { + let mut conn_props = ConnectProperties::new(); + conn_props.session_expiry_interval = interval; + self.set_connect_properties(conn_props) + } + } + + /// get session expiry interval on connection properties + pub fn session_expiry_interval(&self) -> Option { + if let Some(conn_props) = &self.connect_properties { + conn_props.session_expiry_interval + } else { + None + } + } + /// set receive maximum on connection properties pub fn set_receive_maximum(&mut self, recv_max: Option) -> &mut Self { if let Some(conn_props) = &mut self.connect_properties { diff --git a/rumqttc/tests/broker.rs b/rumqttc/tests/broker.rs index ea66448f5..760a2ab37 100644 --- a/rumqttc/tests/broker.rs +++ b/rumqttc/tests/broker.rs @@ -21,7 +21,7 @@ pub struct Broker { impl Broker { /// Create a new broker which accepts 1 mqtt connection - pub async fn new(port: u16, connack: u8) -> Broker { + pub async fn new(port: u16, connack: u8, session_saved: bool) -> Broker { let addr = format!("127.0.0.1:{port}"); let listener = TcpListener::bind(&addr).await.unwrap(); @@ -32,9 +32,12 @@ impl Broker { framed.readb(&mut incoming).await.unwrap(); match incoming.pop_front().unwrap() { - Packet::Connect(_) => { + Packet::Connect(connect) => { let connack = match connack { - 0 => ConnAck::new(ConnectReturnCode::Success, false), + 0 => ConnAck::new( + ConnectReturnCode::Success, + !connect.clean_session && session_saved, + ), 1 => ConnAck::new(ConnectReturnCode::BadUserNamePassword, false), _ => { return Broker { diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 0a83d57ce..633ca4706 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -72,7 +72,7 @@ async fn _tick( #[tokio::test] async fn connection_should_timeout_on_time() { task::spawn(async move { - let _broker = Broker::new(1880, 3).await; + let _broker = Broker::new(1880, 3, false).await; time::sleep(Duration::from_secs(10)).await; }); @@ -125,7 +125,7 @@ async fn idle_connection_triggers_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1885, 0).await; + let mut broker = Broker::new(1885, 0, false).await; let mut count = 0; let mut start = Instant::now(); @@ -169,7 +169,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1886, 0).await; + let mut broker = Broker::new(1886, 0, false).await; let mut count = 0; let mut start = Instant::now(); @@ -204,7 +204,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(2000, 0).await; + let mut broker = Broker::new(2000, 0, false).await; let mut count = 0; // Start sending qos 0 publishes to the client. This triggers @@ -238,7 +238,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() { // A broker which consumes packets but doesn't reply task::spawn(async move { - let mut broker = Broker::new(2001, 0).await; + let mut broker = Broker::new(2001, 0, false).await; broker.blackhole().await; }); @@ -279,7 +279,7 @@ async fn requests_are_blocked_after_max_inflight_queue_size() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1887, 0).await; + let mut broker = Broker::new(1887, 0, false).await; for i in 1..=10 { let packet = broker.read_publish().await; @@ -306,7 +306,7 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() { run(&mut eventloop, true).await.unwrap(); }); - let mut broker = Broker::new(1888, 0).await; + let mut broker = Broker::new(1888, 0, false).await; // packet 1, 2, and 3 assert!(broker.read_publish().await.is_some()); @@ -341,7 +341,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { }); task::spawn(async move { - let mut broker = Broker::new(1891, 0).await; + let mut broker = Broker::new(1891, 0, false).await; // read all incoming packets first for i in 1..=4 { @@ -449,8 +449,8 @@ async fn next_poll_after_connect_failure_reconnects() { let options = MqttOptions::new("dummy", "127.0.0.1", 3000); task::spawn(async move { - let _broker = Broker::new(3000, 1).await; - let _broker = Broker::new(3000, 0).await; + let _broker = Broker::new(3000, 1, false).await; + let _broker = Broker::new(3000, 0, false).await; time::sleep(Duration::from_secs(15)).await; }); @@ -474,7 +474,9 @@ async fn next_poll_after_connect_failure_reconnects() { #[tokio::test] async fn reconnection_resumes_from_the_previous_state() { let mut options = MqttOptions::new("dummy", "127.0.0.1", 3001); - options.set_keep_alive(Duration::from_secs(5)); + options + .set_keep_alive(Duration::from_secs(5)) + .set_clean_session(false); // start sending qos0 publishes. Makes sure that there is out activity but no in activity let (client, mut eventloop) = AsyncClient::new(options, 5); @@ -489,7 +491,7 @@ async fn reconnection_resumes_from_the_previous_state() { }); // broker connection 1 - let mut broker = Broker::new(3001, 0).await; + let mut broker = Broker::new(3001, 0, false).await; for i in 1..=2 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -503,7 +505,7 @@ async fn reconnection_resumes_from_the_previous_state() { // a block around broker with {} is closing the connection as expected // broker connection 2 - let mut broker = Broker::new(3001, 0).await; + let mut broker = Broker::new(3001, 0, true).await; for i in 3..=4 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -514,7 +516,9 @@ async fn reconnection_resumes_from_the_previous_state() { #[tokio::test] async fn reconnection_resends_unacked_packets_from_the_previous_connection_first() { let mut options = MqttOptions::new("dummy", "127.0.0.1", 3002); - options.set_keep_alive(Duration::from_secs(5)); + options + .set_keep_alive(Duration::from_secs(5)) + .set_clean_session(false); // start sending qos0 publishes. this makes sure that there is // outgoing activity but no incoming activity @@ -530,14 +534,14 @@ async fn reconnection_resends_unacked_packets_from_the_previous_connection_first }); // broker connection 1. receive but don't ack - let mut broker = Broker::new(3002, 0).await; + let mut broker = Broker::new(3002, 0, false).await; for i in 1..=2 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); } // broker connection 2 receives from scratch - let mut broker = Broker::new(3002, 0).await; + let mut broker = Broker::new(3002, 0, true).await; for i in 1..=6 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -559,7 +563,7 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly }); task::spawn(async move { - let mut broker = Broker::new(3004, 0).await; + let mut broker = Broker::new(3004, 0, false).await; while (broker.read_packet().await).is_some() { time::sleep(Duration::from_secs_f64(0.5)).await; }