Skip to content

fix: token interfaces #946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand All @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.wait()
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -42,7 +42,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.wait()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand Down Expand Up @@ -83,14 +83,14 @@ fn main() -> Result<(), Box<dyn Error>> {

while let Ok(res) = rx.recv() {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").unwrap().wait() {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand All @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(pkid) => println!("Acknowledged Unsub({pkid:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
58 changes: 34 additions & 24 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use crate::mqttbytes::{v4::*, QoS};
use crate::tokens::{NoResponse, Resolver, Token};
use crate::{
valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Pkid, Request,
valid_filter, valid_topic, AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions,
Request,
};

use bytes::Bytes;
Expand Down Expand Up @@ -75,7 +76,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -100,7 +101,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -119,7 +120,7 @@ impl AsyncClient {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub async fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -130,7 +131,7 @@ impl AsyncClient {
}

/// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -147,7 +148,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
{
Expand All @@ -165,7 +166,7 @@ impl AsyncClient {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);

Expand All @@ -184,7 +185,7 @@ impl AsyncClient {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);
let is_valid = subscribe_has_valid_filters(&subscribe);
Expand All @@ -198,7 +199,7 @@ impl AsyncClient {
}

/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -216,7 +217,7 @@ impl AsyncClient {
}

/// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -233,7 +234,10 @@ impl AsyncClient {
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub async fn unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand All @@ -243,7 +247,10 @@ impl AsyncClient {
}

/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn try_unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand Down Expand Up @@ -271,10 +278,10 @@ impl AsyncClient {
}
}

fn get_ack_req(publish: &Publish, resolver: Resolver<()>) -> Option<Request> {
fn get_ack_req(publish: &Publish, resolver: Resolver<AckOfAck>) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => {
resolver.resolve(());
resolver.resolve(AckOfAck::None);
return None;
}
QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid), resolver),
Expand Down Expand Up @@ -331,7 +338,7 @@ impl Client {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -355,7 +362,7 @@ impl Client {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -364,7 +371,7 @@ impl Client {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -375,7 +382,7 @@ impl Client {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
self.client.try_ack(publish)
}

Expand All @@ -384,7 +391,7 @@ impl Client {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);
let is_valid = subscribe_has_valid_filters(&subscribe);
Expand All @@ -402,12 +409,12 @@ impl Client {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
self.client.try_subscribe(topic, qos)
}

/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -424,15 +431,15 @@ impl Client {
Ok(token)
}

pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
self.client.try_subscribe_many(topics)
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand All @@ -442,7 +449,10 @@ impl Client {
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn try_unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
self.client.try_unsubscribe(topic)
}

Expand Down
27 changes: 21 additions & 6 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType};

pub type Incoming = Packet;

/// Used to encapsulate all publish/pubrec acknowledgements in v4
#[derive(Debug, PartialEq)]
pub enum AckOfPub {
PubAck(PubAck),
PubComp(PubComp),
None,
}

/// Used to encapsulate all ack/pubrel acknowledgements in v4
#[derive(Debug)]
pub enum AckOfAck {
None,
PubRel(PubRel),
}

/// Current outgoing activity on the eventloop
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Outgoing {
Expand Down Expand Up @@ -191,12 +206,12 @@ pub enum Outgoing {
/// handled one by one.
#[derive(Debug)]
pub enum Request {
Publish(Publish, Resolver<Pkid>),
PubAck(PubAck, Resolver<()>),
PubRec(PubRec, Resolver<()>),
PubRel(PubRel, Resolver<Pkid>),
Subscribe(Subscribe, Resolver<Pkid>),
Unsubscribe(Unsubscribe, Resolver<Pkid>),
Publish(Publish, Resolver<AckOfPub>),
PubAck(PubAck, Resolver<AckOfAck>),
PubRec(PubRec, Resolver<AckOfAck>),
PubRel(PubRel, Resolver<AckOfPub>),
Subscribe(Subscribe, Resolver<SubAck>),
Unsubscribe(Unsubscribe, Resolver<UnsubAck>),
Disconnect(Resolver<()>),
PingReq,
}
Expand Down
Loading