Skip to content

Commit b5b820e

Browse files
author
Devdutt Shenoi
committed
feat: LinkRx::recv_async
1 parent db1f261 commit b5b820e

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

rumqttd/src/link/local.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,23 @@ impl LinkRx {
338338
}
339339
}
340340

341+
pub async fn recv_async(&mut self) -> Result<Option<Notification>, LinkError> {
342+
// Read from cache first
343+
// One router_rx trigger signifies a bunch of notifications. So we
344+
// should always check cache first
345+
match self.cache.pop_front() {
346+
Some(v) => Ok(Some(v)),
347+
None => {
348+
// If cache is empty, check for router trigger and get fresh notifications
349+
self.router_rx.recv_async().await?;
350+
// Collect 'all' the data in the buffer after a notification.
351+
// Notification means fresh data which isn't previously collected
352+
mem::swap(&mut *self.send_buffer.lock(), &mut self.cache);
353+
Ok(self.cache.pop_front())
354+
}
355+
}
356+
}
357+
341358
pub fn recv_deadline(&mut self, deadline: Instant) -> Result<Option<Notification>, LinkError> {
342359
// Read from cache first
343360
// One router_rx trigger signifies a bunch of notifications. So we

0 commit comments

Comments
 (0)