Skip to content

Commit 8ad2223

Browse files
author
Devdutt Shenoi
committed
feat: return pkid of ack
1 parent 2632ab1 commit 8ad2223

File tree

3 files changed

+15
-14
lines changed

3 files changed

+15
-14
lines changed

rumqttc/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,12 @@ impl From<Unsubscribe> for Request {
223223
}
224224
}
225225

226-
pub type AckPromise = oneshot::Receiver<()>;
226+
pub type Pkid = u16;
227+
pub type AckPromise = oneshot::Receiver<Pkid>;
227228

228229
#[derive(Debug)]
229230
pub struct PromiseTx {
230-
inner: oneshot::Sender<()>,
231+
inner: oneshot::Sender<Pkid>,
231232
}
232233

233234
impl PromiseTx {
@@ -237,8 +238,8 @@ impl PromiseTx {
237238
(PromiseTx { inner }, promise)
238239
}
239240

240-
fn resolve(self) {
241-
if self.inner.send(()).is_err() {
241+
fn resolve(self, pkid: Pkid) {
242+
if self.inner.send(pkid).is_err() {
242243
trace!("Promise was drpped")
243244
}
244245
}

rumqttc/src/state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl MqttState {
206206
.any(|x| matches!(x, SubscribeReasonCode::Success(_)))
207207
{
208208
if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() {
209-
tx.resolve();
209+
tx.resolve(suback.pkid);
210210
}
211211
}
212212

@@ -223,7 +223,7 @@ impl MqttState {
223223
}
224224

225225
if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() {
226-
tx.resolve();
226+
tx.resolve(unsuback.pkid);
227227
}
228228

229229
Ok(None)
@@ -271,7 +271,7 @@ impl MqttState {
271271

272272
if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() {
273273
// Resolve promise for QoS 1
274-
tx.resolve();
274+
tx.resolve(puback.pkid);
275275
}
276276

277277
self.inflight -= 1;
@@ -333,7 +333,7 @@ impl MqttState {
333333

334334
if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() {
335335
// Resolve promise for QoS 2
336-
tx.resolve();
336+
tx.resolve(pubcomp.pkid);
337337
}
338338

339339
self.outgoing_rel.set(pubcomp.pkid as usize, false);
@@ -398,7 +398,7 @@ impl MqttState {
398398
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
399399
self.events.push_back(event);
400400
match (publish.qos, tx) {
401-
(QoS::AtMostOnce, Some(tx)) => tx.resolve(),
401+
(QoS::AtMostOnce, Some(tx)) => tx.resolve(publish.pkid),
402402
(_, tx) => self.ack_waiter[publish.pkid as usize] = tx,
403403
}
404404

rumqttc/src/v5/state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl MqttState {
269269
.any(|x| matches!(x, SubscribeReasonCode::Success(_)))
270270
{
271271
if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() {
272-
tx.resolve();
272+
tx.resolve(suback.pkid);
273273
}
274274
}
275275

@@ -293,7 +293,7 @@ impl MqttState {
293293

294294
if unsuback.reasons.contains(&UnsubAckReason::Success) {
295295
if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() {
296-
tx.resolve();
296+
tx.resolve(unsuback.pkid);
297297
}
298298
}
299299

@@ -401,7 +401,7 @@ impl MqttState {
401401

402402
if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() {
403403
// Resolve promise for QoS 1
404-
tx.resolve();
404+
tx.resolve(puback.pkid);
405405
}
406406

407407
self.inflight -= 1;
@@ -490,7 +490,7 @@ impl MqttState {
490490

491491
if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() {
492492
// Resolve promise for QoS 2
493-
tx.resolve();
493+
tx.resolve(pubcomp.pkid);
494494
}
495495

496496
self.outgoing_rel.set(pubcomp.pkid as usize, false);
@@ -577,7 +577,7 @@ impl MqttState {
577577
let event = Event::Outgoing(Outgoing::Publish(pkid));
578578
self.events.push_back(event);
579579
match (publish.qos, tx) {
580-
(QoS::AtMostOnce, Some(tx)) => tx.resolve(),
580+
(QoS::AtMostOnce, Some(tx)) => tx.resolve(0),
581581
(_, tx) => self.ack_waiter[publish.pkid as usize] = tx,
582582
}
583583

0 commit comments

Comments
 (0)