Skip to content

Commit 7332289

Browse files
authored
Merge branch 'main' into flexible-map-dls
2 parents 9f4373d + c21a7ea commit 7332289

File tree

7 files changed

+616
-159
lines changed

7 files changed

+616
-159
lines changed

server/swimos_agent/src/lanes/join/map/downlink/mod.rs

Lines changed: 135 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
use std::{collections::HashSet, hash::Hash};
1616
use swimos_agent_protocol::MapMessage;
1717
use swimos_api::address::Address;
18+
use swimos_form::Form;
1819
use swimos_model::Text;
1920

2021
use crate::{
2122
agent_model::AgentDescription,
2223
downlink_lifecycle::{OnConsumeEvent, OnFailed, OnLinked, OnSynced, OnUnlinked},
2324
event_handler::{
24-
ActionContext, AndThen, AndThenContextual, ConstHandler, ContextualTrans, FollowedBy,
25-
HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult,
25+
ActionContext, AndThen, AndThenContextual, ConstHandler, ContextualTrans, Described,
26+
FollowedBy, HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult,
2627
},
2728
item::AgentItem,
2829
lanes::{join::DownlinkStatus, LinkClosedResponse},
@@ -34,7 +35,7 @@ use super::{
3435
on_failed::OnJoinMapFailed, on_linked::OnJoinMapLinked, on_synced::OnJoinMapSynced,
3536
on_unlinked::OnJoinMapUnlinked,
3637
},
37-
JoinMapLane,
38+
JoinMapAddDownlink, JoinMapLane,
3839
};
3940

4041
#[cfg(test)]
@@ -338,13 +339,13 @@ where
338339

339340
type OnUnlinkedWithCleanup<'a, L, K, V, Context, LC> = AndThen<
340341
<LC as OnJoinMapUnlinked<L, K, Context>>::OnJoinMapUnlinkedHandler<'a>,
341-
AfterClosed<L, K, V, Context>,
342-
AfterClosedTrans<L, K, V, Context>,
342+
AfterClosed<'a, L, K, V, Context>,
343+
AfterClosedTrans<'a, L, K, V, Context>,
343344
>;
344345
type OnFailedWithCleanup<'a, L, K, V, Context, LC> = AndThen<
345346
<LC as OnJoinMapFailed<L, K, Context>>::OnJoinMapFailedHandler<'a>,
346-
AfterClosed<L, K, V, Context>,
347-
AfterClosedTrans<L, K, V, Context>,
347+
AfterClosed<'a, L, K, V, Context>,
348+
AfterClosedTrans<'a, L, K, V, Context>,
348349
>;
349350

350351
type JoinMapOnUnlinked<'a, L, K, V, Context, LC> = AndThenContextual<
@@ -361,9 +362,12 @@ type JoinMapOnFailed<'a, L, K, V, Context, LC> = AndThenContextual<
361362

362363
impl<L, K, V, LC, Context> OnUnlinked<Context> for JoinMapDownlink<L, K, V, LC, Context>
363364
where
364-
Context: AgentDescription,
365-
L: Clone + Hash + Eq + Send,
366-
K: Clone + Hash + Eq + Send,
365+
Context: AgentDescription + 'static,
366+
L: Clone + Hash + Eq + Send + 'static,
367+
K: Form + Clone + Hash + Ord + Eq + Send + 'static,
368+
K::Rec: Send,
369+
V: Form + Send + 'static,
370+
V::BodyRec: Send,
367371
LC: OnJoinMapUnlinked<L, K, Context>,
368372
{
369373
type OnUnlinkedHandler<'a> = JoinMapOnUnlinked<'a, L, K, V, Context, LC>
@@ -388,9 +392,12 @@ where
388392

389393
impl<L, K, V, LC, Context> OnFailed<Context> for JoinMapDownlink<L, K, V, LC, Context>
390394
where
391-
Context: AgentDescription,
392-
L: Clone + Hash + Eq + Send,
393-
K: Clone + Hash + Eq + Send,
395+
Context: AgentDescription + 'static,
396+
L: Clone + Hash + Eq + Send + 'static,
397+
K: Form + Clone + Hash + Ord + Eq + Send + 'static,
398+
K::Rec: Send,
399+
V: Form + Send + 'static,
400+
V::BodyRec: Send,
394401
LC: OnJoinMapFailed<L, K, Context>,
395402
{
396403
type OnFailedHandler<'a> = JoinMapOnFailed<'a, L, K, V, Context, LC>
@@ -456,10 +463,13 @@ impl<'a, L, K, V, Context, LC> RunOnFailedTrans<'a, L, K, V, Context, LC> {
456463
impl<'a, L, K, V, Context, LC> ContextualTrans<Context, L>
457464
for RunOnUnlinkedTrans<'a, L, K, V, Context, LC>
458465
where
459-
Context: AgentDescription,
466+
Context: AgentDescription + 'static,
460467
LC: OnJoinMapUnlinked<L, K, Context>,
461-
L: Clone + Hash + Eq,
462-
K: Clone + Hash + Eq,
468+
L: Clone + Hash + Eq + Send + 'static,
469+
K: Form + Clone + Hash + Ord + Eq + Send + 'static,
470+
K::Rec: Send,
471+
V: Form + Send + 'static,
472+
V::BodyRec: Send,
463473
{
464474
type Out = OnUnlinkedWithCleanup<'a, L, K, V, Context, LC>;
465475

@@ -480,6 +490,7 @@ where
480490
.on_unlinked(link_key.clone(), remote, keys.clone())
481491
.and_then(AfterClosedTrans {
482492
projection,
493+
address: remote_lane,
483494
link_key,
484495
keys,
485496
})
@@ -489,10 +500,13 @@ where
489500
impl<'a, L, K, V, Context, LC> ContextualTrans<Context, L>
490501
for RunOnFailedTrans<'a, L, K, V, Context, LC>
491502
where
492-
Context: AgentDescription,
503+
Context: AgentDescription + 'static,
493504
LC: OnJoinMapFailed<L, K, Context>,
494-
L: Clone + Hash + Eq,
495-
K: Clone + Hash + Eq,
505+
L: Clone + Hash + Eq + Send + 'static,
506+
K: Form + Clone + Hash + Ord + Eq + Send + 'static,
507+
K::Rec: Send,
508+
V: Form + Send + 'static,
509+
V::BodyRec: Send,
496510
{
497511
type Out = OnFailedWithCleanup<'a, L, K, V, Context, LC>;
498512

@@ -513,89 +527,121 @@ where
513527
.on_failed(link_key.clone(), remote, keys.clone())
514528
.and_then(AfterClosedTrans {
515529
projection,
530+
address: remote_lane,
516531
link_key,
517532
keys,
518533
})
519534
}
520535
}
521536

522-
pub struct AfterClosedTrans<L, K, V, Context> {
537+
pub struct AfterClosedTrans<'a, L, K, V, Context> {
523538
projection: fn(&Context) -> &JoinMapLane<L, K, V>,
539+
address: &'a Address<Text>,
524540
link_key: L,
525541
keys: HashSet<K>,
526542
}
527543

528-
impl<L, K, V, Context> HandlerTrans<LinkClosedResponse> for AfterClosedTrans<L, K, V, Context> {
529-
type Out = AfterClosed<L, K, V, Context>;
544+
impl<'a, L, K, V, Context> HandlerTrans<LinkClosedResponse>
545+
for AfterClosedTrans<'a, L, K, V, Context>
546+
{
547+
type Out = AfterClosed<'a, L, K, V, Context>;
530548

531-
fn transform(self, input: LinkClosedResponse) -> Self::Out {
549+
fn transform(self, response: LinkClosedResponse) -> Self::Out {
532550
let AfterClosedTrans {
533551
projection,
552+
address,
534553
link_key,
535554
keys,
536555
} = self;
537-
AfterClosed {
556+
AfterClosed::Cleanup {
538557
projection,
558+
address,
539559
link_key,
540-
response: Some(input),
560+
response,
541561
keys,
542562
}
543563
}
544564
}
545565

546566
/// An event handler that cleans up after a downlink unlinks or fails.
547-
pub struct AfterClosed<L, K, V, Context> {
548-
link_key: L,
549-
projection: fn(&Context) -> &JoinMapLane<L, K, V>,
550-
response: Option<LinkClosedResponse>,
551-
keys: HashSet<K>,
567+
#[derive(Default)]
568+
pub enum AfterClosed<'a, L, K, V, Context> {
569+
Cleanup {
570+
link_key: L,
571+
projection: fn(&Context) -> &JoinMapLane<L, K, V>,
572+
address: &'a Address<Text>,
573+
response: LinkClosedResponse,
574+
keys: HashSet<K>,
575+
},
576+
Restarting(JoinMapAddDownlink<Context, L, K, V>),
577+
#[default]
578+
Done,
552579
}
553580

554-
impl<L, K, V, Context> HandlerAction<Context> for AfterClosed<L, K, V, Context>
581+
impl<'a, L, K, V, Context> HandlerAction<Context> for AfterClosed<'a, L, K, V, Context>
555582
where
556-
Context: AgentDescription,
557-
L: Clone + Hash + Eq,
558-
K: Clone + Hash + Eq,
583+
Context: AgentDescription + 'static,
584+
L: Clone + Hash + Eq + Send + 'static,
585+
K: Form + Clone + Hash + Ord + Eq + Send + 'static,
586+
K::Rec: Send,
587+
V: Form + Send + 'static,
588+
V::BodyRec: Send,
559589
{
560590
type Completion = ();
561591

562592
fn step(
563593
&mut self,
564-
_action_context: &mut ActionContext<Context>,
565-
_meta: AgentMetadata,
594+
action_context: &mut ActionContext<Context>,
595+
meta: AgentMetadata,
566596
context: &Context,
567597
) -> StepResult<Self::Completion> {
568-
let AfterClosed {
569-
link_key,
570-
projection,
571-
response,
572-
keys,
573-
} = self;
574-
if let Some(response) = response.take() {
575-
let JoinMapLane {
576-
inner,
577-
link_tracker,
578-
} = projection(context);
579-
link_tracker.borrow_mut().remove_link(link_key);
580-
match response {
581-
LinkClosedResponse::Retry => todo!(),
582-
LinkClosedResponse::Abandon => StepResult::done(()),
583-
LinkClosedResponse::Delete => {
584-
if keys.is_empty() {
585-
StepResult::done(())
586-
} else {
587-
for key in keys.iter() {
588-
inner.remove(key);
598+
loop {
599+
match std::mem::take(self) {
600+
AfterClosed::Cleanup {
601+
link_key,
602+
projection,
603+
address,
604+
response,
605+
keys,
606+
} => {
607+
let JoinMapLane {
608+
inner,
609+
link_tracker,
610+
} = projection(context);
611+
link_tracker.borrow_mut().remove_link(&link_key);
612+
match response {
613+
LinkClosedResponse::Abandon => break StepResult::done(()),
614+
LinkClosedResponse::Delete => {
615+
break if keys.is_empty() {
616+
StepResult::done(())
617+
} else {
618+
for key in keys.iter() {
619+
inner.remove(key);
620+
}
621+
StepResult::Complete {
622+
modified_item: Some(Modification::of(inner.id())),
623+
result: (),
624+
}
625+
};
589626
}
590-
StepResult::Complete {
591-
modified_item: Some(Modification::of(inner.id())),
592-
result: (),
627+
LinkClosedResponse::Retry => {
628+
*self = AfterClosed::Restarting(JoinMapAddDownlink::new(
629+
projection,
630+
link_key,
631+
address.clone(),
632+
));
593633
}
594634
}
595635
}
636+
AfterClosed::Restarting(mut handler) => {
637+
let result = handler.step(action_context, meta, context);
638+
if result.is_cont() {
639+
*self = AfterClosed::Restarting(handler);
640+
}
641+
break result;
642+
}
643+
AfterClosed::Done => break StepResult::after_done(),
596644
}
597-
} else {
598-
StepResult::after_done()
599645
}
600646
}
601647

@@ -604,17 +650,32 @@ where
604650
context: &Context,
605651
f: &mut std::fmt::Formatter<'_>,
606652
) -> Result<(), std::fmt::Error> {
607-
let AfterClosed {
608-
projection,
609-
response,
610-
..
611-
} = self;
612-
let lane = (projection)(context);
613-
let name = context.item_name(lane.id());
614-
f.debug_struct("AfterClosed")
615-
.field("id", &lane.id())
616-
.field("lane_name", &name.as_ref().map(|s| s.as_ref()))
617-
.field("consumed", &response.is_none())
618-
.finish()
653+
match self {
654+
AfterClosed::Cleanup {
655+
projection,
656+
address,
657+
response,
658+
..
659+
} => {
660+
let lane = (projection)(context);
661+
let name = context.item_name(lane.id());
662+
f.debug_struct("AfterClosed")
663+
.field("state", &"Cleanup")
664+
.field("id", &lane.id())
665+
.field("lane_name", &name.as_ref().map(|s| s.as_ref()))
666+
.field("address", address)
667+
.field("response", response)
668+
.finish()
669+
}
670+
AfterClosed::Restarting(handler) => f
671+
.debug_struct("AfterClosed")
672+
.field("state", &"Restarting")
673+
.field("handler", &Described::new(context, handler))
674+
.finish(),
675+
AfterClosed::Done => f
676+
.debug_struct("AfterClosed")
677+
.field("state", &"Done")
678+
.finish(),
679+
}
619680
}
620681
}

0 commit comments

Comments
 (0)