Skip to content

Commit 10d843c

Browse files
author
Devdutt Shenoi
committed
test: reliability of ack promises
1 parent 066783a commit 10d843c

File tree

2 files changed

+208
-1
lines changed

2 files changed

+208
-1
lines changed

rumqttc/tests/broker.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@ impl Broker {
122122
self.framed.write(packet).await.unwrap();
123123
}
124124

125+
/// Sends a publish record
126+
pub async fn pubrec(&mut self, pkid: u16) {
127+
let packet = Packet::PubRec(PubRec::new(pkid));
128+
self.framed.write(packet).await.unwrap();
129+
}
130+
131+
/// Sends a publish complete
132+
pub async fn pubcomp(&mut self, pkid: u16) {
133+
let packet = Packet::PubComp(PubComp::new(pkid));
134+
self.framed.write(packet).await.unwrap();
135+
}
136+
125137
/// Sends an acknowledgement
126138
pub async fn pingresp(&mut self) {
127139
let packet = Packet::PingResp;

rumqttc/tests/reliability.rs

Lines changed: 196 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use matches::assert_matches;
22
use std::time::{Duration, Instant};
3-
use tokio::{task, time};
3+
use tokio::{
4+
task,
5+
time::{self, timeout},
6+
};
47

58
mod broker;
69

@@ -585,3 +588,195 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly
585588
});
586589
handle.await.unwrap();
587590
}
591+
592+
#[tokio::test]
593+
async fn resolve_on_qos0_before_write_to_tcp_buffer() {
594+
let options = MqttOptions::new("dummy", "127.0.0.1", 3004);
595+
let (client, mut eventloop) = AsyncClient::new(options, 5);
596+
597+
task::spawn(async move {
598+
let res = run(&mut eventloop, false).await;
599+
if let Err(e) = res {
600+
match e {
601+
ConnectionError::FlushTimeout => {
602+
assert!(eventloop.network.is_none());
603+
println!("State is being clean properly");
604+
}
605+
_ => {
606+
println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer.");
607+
}
608+
}
609+
}
610+
});
611+
612+
let mut broker = Broker::new(3004, 0, false).await;
613+
614+
let token = client
615+
.publish("hello/world", QoS::AtMostOnce, false, [1; 1])
616+
.await
617+
.unwrap();
618+
619+
// Token can resolve as soon as it was processed by eventloop
620+
assert_eq!(
621+
timeout(Duration::from_secs(1), token)
622+
.await
623+
.unwrap()
624+
.unwrap(),
625+
0
626+
);
627+
628+
// Verify the packet still reached broker
629+
// NOTE: this can't always be guaranteed
630+
let Packet::Publish(Publish {
631+
qos,
632+
topic,
633+
pkid,
634+
payload,
635+
..
636+
}) = broker.read_packet().await.unwrap()
637+
else {
638+
unreachable!()
639+
};
640+
assert_eq!(topic, "hello/world");
641+
assert_eq!(qos, QoS::AtMostOnce);
642+
assert_eq!(payload.to_vec(), [1; 1]);
643+
assert_eq!(pkid, 0);
644+
}
645+
646+
#[tokio::test]
647+
async fn resolve_on_qos1_ack_from_broker() {
648+
let options = MqttOptions::new("dummy", "127.0.0.1", 3004);
649+
let (client, mut eventloop) = AsyncClient::new(options, 5);
650+
651+
task::spawn(async move {
652+
let res = run(&mut eventloop, false).await;
653+
if let Err(e) = res {
654+
match e {
655+
ConnectionError::FlushTimeout => {
656+
assert!(eventloop.network.is_none());
657+
println!("State is being clean properly");
658+
}
659+
_ => {
660+
println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer.");
661+
}
662+
}
663+
}
664+
});
665+
666+
let mut broker = Broker::new(3004, 0, false).await;
667+
668+
let mut token = client
669+
.publish("hello/world", QoS::AtLeastOnce, false, [1; 1])
670+
.await
671+
.unwrap();
672+
673+
// Token shouldn't resolve before reaching broker
674+
timeout(Duration::from_secs(1), &mut token)
675+
.await
676+
.unwrap_err();
677+
678+
let Packet::Publish(Publish {
679+
qos,
680+
topic,
681+
pkid,
682+
payload,
683+
..
684+
}) = broker.read_packet().await.unwrap()
685+
else {
686+
unreachable!()
687+
};
688+
assert_eq!(topic, "hello/world");
689+
assert_eq!(qos, QoS::AtLeastOnce);
690+
assert_eq!(payload.to_vec(), [1; 1]);
691+
assert_eq!(pkid, 1);
692+
693+
// Token shouldn't resolve until packet is acked
694+
timeout(Duration::from_secs(1), &mut token)
695+
.await
696+
.unwrap_err();
697+
698+
// Finally ack the packet
699+
broker.ack(1).await;
700+
701+
// Token shouldn't resolve until packet is acked
702+
assert_eq!(
703+
timeout(Duration::from_secs(1), &mut token)
704+
.await
705+
.unwrap()
706+
.unwrap(),
707+
1
708+
);
709+
}
710+
711+
#[tokio::test]
712+
async fn resolve_on_qos2_ack_from_broker() {
713+
let options = MqttOptions::new("dummy", "127.0.0.1", 3004);
714+
let (client, mut eventloop) = AsyncClient::new(options, 5);
715+
716+
task::spawn(async move {
717+
let res = run(&mut eventloop, false).await;
718+
if let Err(e) = res {
719+
match e {
720+
ConnectionError::FlushTimeout => {
721+
assert!(eventloop.network.is_none());
722+
println!("State is being clean properly");
723+
}
724+
_ => {
725+
println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer.");
726+
}
727+
}
728+
}
729+
});
730+
731+
let mut broker = Broker::new(3004, 0, false).await;
732+
733+
let mut token = client
734+
.publish("hello/world", QoS::ExactlyOnce, false, [1; 1])
735+
.await
736+
.unwrap();
737+
738+
// Token shouldn't resolve before reaching broker
739+
timeout(Duration::from_secs(1), &mut token)
740+
.await
741+
.unwrap_err();
742+
743+
let Packet::Publish(Publish {
744+
qos,
745+
topic,
746+
pkid,
747+
payload,
748+
..
749+
}) = broker.read_packet().await.unwrap()
750+
else {
751+
unreachable!()
752+
};
753+
assert_eq!(topic, "hello/world");
754+
assert_eq!(qos, QoS::ExactlyOnce);
755+
assert_eq!(payload.to_vec(), [1; 1]);
756+
assert_eq!(pkid, 1);
757+
758+
// Token shouldn't resolve till publish recorded
759+
timeout(Duration::from_secs(1), &mut token)
760+
.await
761+
.unwrap_err();
762+
763+
// Record the publish message
764+
broker.pubrec(1).await;
765+
766+
// Token shouldn't resolve till publish complete
767+
timeout(Duration::from_secs(1), &mut token)
768+
.await
769+
.unwrap_err();
770+
771+
// Complete the publish message ack
772+
broker.pubcomp(1).await;
773+
774+
// Finally the publish is QoS2 acked
775+
assert_eq!(
776+
timeout(Duration::from_secs(1), &mut token)
777+
.await
778+
.unwrap()
779+
.unwrap(),
780+
1
781+
);
782+
}

0 commit comments

Comments
 (0)