Skip to content

Commit 31887ca

Browse files
author
Devdutt Shenoi
committed
feat: return reason of request failure
1 parent db7c322 commit 31887ca

File tree

7 files changed

+207
-81
lines changed

7 files changed

+207
-81
lines changed

rumqttc/examples/ack_promise.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,38 +28,46 @@ async fn main() -> Result<(), Box<dyn Error>> {
2828
});
2929

3030
// Subscribe and wait for broker acknowledgement
31-
let pkid = client
31+
match client
3232
.subscribe("hello/world", QoS::AtMostOnce)
3333
.await
3434
.unwrap()
3535
.await
36-
.unwrap();
37-
println!("Acknowledged Subscribe({pkid})");
36+
{
37+
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
38+
Err(e) => println!("Subscription failed: {e:?}"),
39+
}
3840

3941
// Publish at all QoS levels and wait for broker acknowledgement
40-
let pkid = client
42+
match client
4143
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
4244
.await
4345
.unwrap()
4446
.await
45-
.unwrap();
46-
println!("Acknowledged Pub({pkid})");
47+
{
48+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
49+
Err(e) => println!("Publish failed: {e:?}"),
50+
}
4751

48-
let pkid = client
52+
match client
4953
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
5054
.await
5155
.unwrap()
5256
.await
53-
.unwrap();
54-
println!("Acknowledged Pub({pkid})");
57+
{
58+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
59+
Err(e) => println!("Publish failed: {e:?}"),
60+
}
5561

56-
let pkid = client
62+
match client
5763
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
5864
.await
5965
.unwrap()
6066
.await
61-
.unwrap();
62-
println!("Acknowledged Pub({pkid})");
67+
{
68+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
69+
Err(e) => println!("Publish failed: {e:?}"),
70+
}
6371

6472
// Publish and spawn wait for notification
6573
let mut set = JoinSet::new();
@@ -82,8 +90,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
8290
.unwrap();
8391
set.spawn(async { future.await });
8492

85-
while let Some(Ok(Ok(pkid))) = set.join_next().await {
86-
println!("Acknowledged Pub({pkid})");
93+
while let Some(Ok(res)) = set.join_next().await {
94+
match res {
95+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
96+
Err(e) => println!("Publish failed: {e:?}"),
97+
}
98+
}
99+
100+
// Unsubscribe and wait for broker acknowledgement
101+
match client.unsubscribe("hello/world").await.unwrap().await {
102+
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
103+
Err(e) => println!("Unsubscription failed: {e:?}"),
87104
}
88105

89106
Ok(())

rumqttc/examples/ack_promise_sync.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,42 @@ fn main() -> Result<(), Box<dyn Error>> {
2626
});
2727

2828
// Subscribe and wait for broker acknowledgement
29-
let pkid = client
29+
match client
3030
.subscribe("hello/world", QoS::AtMostOnce)
3131
.unwrap()
32-
.blocking_recv()
33-
.unwrap();
34-
println!("Acknowledged Subscribe({pkid})");
32+
.blocking_wait()
33+
{
34+
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
35+
Err(e) => println!("Subscription failed: {e:?}"),
36+
}
3537

3638
// Publish at all QoS levels and wait for broker acknowledgement
37-
let pkid = client
39+
match client
3840
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
3941
.unwrap()
40-
.blocking_recv()
41-
.unwrap();
42-
println!("Acknowledged Pub({pkid})");
42+
.blocking_wait()
43+
{
44+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
45+
Err(e) => println!("Publish failed: {e:?}"),
46+
}
4347

44-
let pkid = client
48+
match client
4549
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
4650
.unwrap()
47-
.blocking_recv()
48-
.unwrap();
49-
println!("Acknowledged Pub({pkid})");
51+
.blocking_wait()
52+
{
53+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
54+
Err(e) => println!("Publish failed: {e:?}"),
55+
}
5056

51-
let pkid = client
57+
match client
5258
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
5359
.unwrap()
54-
.blocking_recv()
55-
.unwrap();
56-
println!("Acknowledged Pub({pkid})");
60+
.blocking_wait()
61+
{
62+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
63+
Err(e) => println!("Publish failed: {e:?}"),
64+
}
5765

5866
// Spawn threads for each publish, use channel to notify result
5967
let (tx, rx) = bounded(1);
@@ -63,7 +71,7 @@ fn main() -> Result<(), Box<dyn Error>> {
6371
.unwrap();
6472
let tx_clone = tx.clone();
6573
thread::spawn(move || {
66-
let res = future.blocking_recv();
74+
let res = future.blocking_wait();
6775
tx_clone.send(res).unwrap()
6876
});
6977

@@ -72,20 +80,29 @@ fn main() -> Result<(), Box<dyn Error>> {
7280
.unwrap();
7381
let tx_clone = tx.clone();
7482
thread::spawn(move || {
75-
let res = future.blocking_recv();
83+
let res = future.blocking_wait();
7684
tx_clone.send(res).unwrap()
7785
});
7886

7987
let future = client
8088
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
8189
.unwrap();
8290
thread::spawn(move || {
83-
let res = future.blocking_recv();
91+
let res = future.blocking_wait();
8492
tx.send(res).unwrap()
8593
});
8694

87-
while let Ok(Ok(pkid)) = rx.recv() {
88-
println!("Acknowledged Pub({:?})", pkid);
95+
while let Ok(res) = rx.recv() {
96+
match res {
97+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
98+
Err(e) => println!("Publish failed: {e:?}"),
99+
}
100+
}
101+
102+
// Unsubscribe and wait for broker acknowledgement
103+
match client.unsubscribe("hello/world").unwrap().blocking_wait() {
104+
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
105+
Err(e) => println!("Unsubscription failed: {e:?}"),
89106
}
90107

91108
Ok(())

rumqttc/examples/ack_promise_v5.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,38 +28,46 @@ async fn main() -> Result<(), Box<dyn Error>> {
2828
});
2929

3030
// Subscribe and wait for broker acknowledgement
31-
let pkid = client
31+
match client
3232
.subscribe("hello/world", QoS::AtMostOnce)
3333
.await
3434
.unwrap()
3535
.await
36-
.unwrap();
37-
println!("Acknowledged Subscribe({pkid})");
36+
{
37+
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
38+
Err(e) => println!("Subscription failed: {e:?}"),
39+
}
3840

3941
// Publish at all QoS levels and wait for broker acknowledgement
40-
let pkid = client
42+
match client
4143
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
4244
.await
4345
.unwrap()
4446
.await
45-
.unwrap();
46-
println!("Acknowledged Pub({pkid})");
47+
{
48+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
49+
Err(e) => println!("Publish failed: {e:?}"),
50+
}
4751

48-
let pkid = client
52+
match client
4953
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
5054
.await
5155
.unwrap()
5256
.await
53-
.unwrap();
54-
println!("Acknowledged Pub({pkid})");
57+
{
58+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
59+
Err(e) => println!("Publish failed: {e:?}"),
60+
}
5561

56-
let pkid = client
62+
match client
5763
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
5864
.await
5965
.unwrap()
6066
.await
61-
.unwrap();
62-
println!("Acknowledged Pub({pkid})");
67+
{
68+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
69+
Err(e) => println!("Publish failed: {e:?}"),
70+
}
6371

6472
// Publish and spawn wait for notification
6573
let mut set = JoinSet::new();
@@ -82,8 +90,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
8290
.unwrap();
8391
set.spawn(async { future.await });
8492

85-
while let Some(Ok(Ok(pkid))) = set.join_next().await {
86-
println!("Acknowledged Pub({pkid})");
93+
while let Some(Ok(res)) = set.join_next().await {
94+
match res {
95+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
96+
Err(e) => println!("Publish failed: {e:?}"),
97+
}
98+
}
99+
100+
// Unsubscribe and wait for broker acknowledgement
101+
match client.unsubscribe("hello/world").await.unwrap().await {
102+
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
103+
Err(e) => println!("Unsubscription failed: {e:?}"),
87104
}
88105

89106
Ok(())

rumqttc/examples/ack_promise_v5_sync.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,29 @@ fn main() -> Result<(), Box<dyn Error>> {
2929
let pkid = client
3030
.subscribe("hello/world", QoS::AtMostOnce)
3131
.unwrap()
32-
.blocking_recv()
32+
.blocking_wait()
3333
.unwrap();
3434
println!("Acknowledged Subscribe({pkid})");
3535

3636
// Publish at all QoS levels and wait for broker acknowledgement
3737
let pkid = client
3838
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
3939
.unwrap()
40-
.blocking_recv()
40+
.blocking_wait()
4141
.unwrap();
4242
println!("Acknowledged Pub({pkid})");
4343

4444
let pkid = client
4545
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
4646
.unwrap()
47-
.blocking_recv()
47+
.blocking_wait()
4848
.unwrap();
4949
println!("Acknowledged Pub({pkid})");
5050

5151
let pkid = client
5252
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
5353
.unwrap()
54-
.blocking_recv()
54+
.blocking_wait()
5555
.unwrap();
5656
println!("Acknowledged Pub({pkid})");
5757

@@ -63,7 +63,7 @@ fn main() -> Result<(), Box<dyn Error>> {
6363
.unwrap();
6464
let tx_clone = tx.clone();
6565
thread::spawn(move || {
66-
let res = future.blocking_recv();
66+
let res = future.blocking_wait();
6767
tx_clone.send(res).unwrap()
6868
});
6969

@@ -72,20 +72,29 @@ fn main() -> Result<(), Box<dyn Error>> {
7272
.unwrap();
7373
let tx_clone = tx.clone();
7474
thread::spawn(move || {
75-
let res = future.blocking_recv();
75+
let res = future.blocking_wait();
7676
tx_clone.send(res).unwrap()
7777
});
7878

7979
let future = client
8080
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
8181
.unwrap();
8282
thread::spawn(move || {
83-
let res = future.blocking_recv();
83+
let res = future.blocking_wait();
8484
tx.send(res).unwrap()
8585
});
8686

87-
while let Ok(Ok(pkid)) = rx.recv() {
88-
println!("Acknowledged Pub({:?})", pkid);
87+
while let Ok(res) = rx.recv() {
88+
match res {
89+
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
90+
Err(e) => println!("Publish failed: {e:?}"),
91+
}
92+
}
93+
94+
// Unsubscribe and wait for broker acknowledgement
95+
match client.unsubscribe("hello/world").unwrap().blocking_wait() {
96+
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
97+
Err(e) => println!("Unsubscription failed: {e:?}"),
8998
}
9099

91100
Ok(())

0 commit comments

Comments
 (0)