Skip to content

Commit b5b32d3

Browse files
committed
Fixes client downlink runtime attachment bug
1 parent e3bf823 commit b5b32d3

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

swimos_client/src/pending.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,6 @@ impl<'f> PendingConnections<'f> {
7979
)
8080
}
8181

82-
fn map_waiters(
83-
col: Option<FnvHashMap<Key, Vec<PendingDownlink>>>,
84-
) -> impl Iterator<Item = (Key, PendingDownlink)> {
85-
col.unwrap_or_default()
86-
.into_iter()
87-
.flat_map(|(key, downlinks)| downlinks.into_iter().map(move |dl| (key.clone(), dl)))
88-
}
89-
9082
pub fn feed_task(&self, task: BoxFuture<'f, Either<PendingDns, PendingHandshake>>) {
9183
self.tasks.push(task)
9284
}
@@ -110,14 +102,18 @@ impl<'f> PendingConnections<'f> {
110102
&mut self,
111103
host: Text,
112104
) -> impl Iterator<Item = (Key, PendingDownlink)> {
113-
Self::map_waiters(self.waiters.remove(&WaiterKey::Connection(host)))
105+
self.waiters
106+
.remove(&WaiterKey::Connection(host))
107+
.unwrap_or_default()
108+
.into_iter()
109+
.flat_map(|(key, downlinks)| downlinks.into_iter().map(move |dl| (key.clone(), dl)))
114110
}
115111

116-
pub fn drain_runtime_queue(
117-
&mut self,
118-
addr: SocketAddr,
119-
) -> impl Iterator<Item = (Key, PendingDownlink)> {
120-
Self::map_waiters(self.waiters.remove(&WaiterKey::Runtime(addr)))
112+
pub fn drain_runtime_queue(&mut self, addr: SocketAddr, key: &Key) -> Vec<PendingDownlink> {
113+
match self.waiters.get_mut(&WaiterKey::Runtime(addr)) {
114+
Some(waiters) => waiters.remove(key).unwrap_or_default(),
115+
None => Vec::new(),
116+
}
121117
}
122118

123119
pub fn waiting_on(&self, addr: SocketAddr, key: &Key) -> bool {

swimos_client/src/runtime.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ async fn runtime_task<Net, Ws, Provider>(
516516
.boxed(),
517517
);
518518

519-
for (_key, pending_downlink) in pending.drain_runtime_queue(sock) {
519+
for pending_downlink in pending.drain_runtime_queue(sock, &peer_key) {
520520
attachment_tasks
521521
.push(attach_downlink(attach.clone(), pending_downlink).boxed());
522522
}
@@ -526,7 +526,7 @@ async fn runtime_task<Net, Ws, Provider>(
526526
None => {
527527
let error =
528528
Err(DownlinkRuntimeError::new(DownlinkErrorKind::RemoteStopped).shared());
529-
for (_key, pending_downlink) in pending.drain_runtime_queue(sock) {
529+
for pending_downlink in pending.drain_runtime_queue(sock, &key) {
530530
let PendingDownlink {
531531
callback,
532532
address,
@@ -543,7 +543,7 @@ async fn runtime_task<Net, Ws, Provider>(
543543
RuntimeEvent::DownlinkRuntimeStarted {
544544
sock,
545545
result: Err((cause, host)),
546-
..
546+
key,
547547
} => {
548548
error!(error = %cause, host = %host, "Failed to start a downlink runtime to host: ");
549549

@@ -552,7 +552,7 @@ async fn runtime_task<Net, Ws, Provider>(
552552
cause,
553553
)
554554
.shared());
555-
for (_key, pending_downlink) in pending.drain_runtime_queue(sock) {
555+
for pending_downlink in pending.drain_runtime_queue(sock, &key) {
556556
let PendingDownlink {
557557
callback,
558558
address,

0 commit comments

Comments
 (0)