Skip to content

Commit 5ae774c

Browse files
Merge pull request #699 from swimos/simplified-context
Improvements to opening downlinks and simplification of the ActionContext.
2 parents abc11b6 + 048579e commit 5ae774c

File tree

32 files changed

+1389
-874
lines changed

32 files changed

+1389
-874
lines changed

server/swimos_agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ json = ["dep:serde", "dep:serde_json"]
1414

1515
[dependencies]
1616
futures = { workspace = true }
17-
swimos_utilities = { workspace = true, features = ["io", "trigger", "circular_buffer", "encoding"] }
17+
swimos_utilities = { workspace = true, features = ["io", "trigger", "circular_buffer", "encoding", "errors"] }
1818
swimos_model = { workspace = true }
1919
swimos_form = { workspace = true }
2020
swimos_recon = { workspace = true }

server/swimos_agent/src/agent_lifecycle/utility/tests.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use swimos_utilities::routing::RouteUri;
2323
use crate::{
2424
event_handler::{ActionContext, HandlerAction, LocalBoxEventHandler, StepResult},
2525
meta::AgentMetadata,
26-
test_context::{no_downlink, DummyAgentContext, NO_DYN_LANES},
26+
test_context::{NO_DOWNLINKS, NO_DYN_LANES},
2727
};
2828

2929
use super::HandlerContext;
@@ -101,8 +101,7 @@ async fn suspend_repeatedly() {
101101
match handler.step(
102102
&mut ActionContext::new(
103103
&spawner,
104-
&DummyAgentContext,
105-
&no_downlink,
104+
&NO_DOWNLINKS,
106105
&NO_DYN_LANES,
107106
&mut join_lane_init,
108107
&mut ad_hoc_buffer,

server/swimos_agent/src/agent_model/downlink/hosted/event/mod.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use tracing::{debug, error, info, trace};
3636
use crate::{
3737
agent_model::downlink::{
3838
BoxDownlinkChannel, DownlinkChannel, DownlinkChannelError, DownlinkChannelEvent,
39+
DownlinkChannelFactory,
3940
},
4041
config::SimpleDownlinkConfig,
4142
downlink_lifecycle::EventDownlinkLifecycle,
@@ -80,10 +81,23 @@ where
8081
}
8182
}
8283

83-
pub fn create<Context>(self, receiver: ByteReader) -> BoxDownlinkChannel<Context>
84-
where
85-
LC: EventDownlinkLifecycle<T, Context> + 'static,
86-
{
84+
pub fn dl_state(&self) -> &Arc<AtomicU8> {
85+
&self.dl_state
86+
}
87+
}
88+
89+
impl<T, LC, Context> DownlinkChannelFactory<Context> for EventDownlinkFactory<T, LC>
90+
where
91+
T: StructuralReadable + Send + 'static,
92+
T::Rec: Send,
93+
LC: EventDownlinkLifecycle<T, Context> + 'static,
94+
{
95+
fn create(
96+
self,
97+
_context: &Context,
98+
_: ByteWriter,
99+
receiver: ByteReader,
100+
) -> BoxDownlinkChannel<Context> {
87101
let EventDownlinkFactory {
88102
address,
89103
lifecycle,
@@ -107,8 +121,21 @@ where
107121
Box::new(chan)
108122
}
109123

110-
pub fn dl_state(&self) -> &Arc<AtomicU8> {
111-
&self.dl_state
124+
fn create_box(
125+
self: Box<Self>,
126+
context: &Context,
127+
tx: ByteWriter,
128+
rx: ByteReader,
129+
) -> BoxDownlinkChannel<Context> {
130+
(*self).create(context, tx, rx)
131+
}
132+
133+
fn kind(&self) -> DownlinkKind {
134+
if self.map_events {
135+
DownlinkKind::MapEvent
136+
} else {
137+
DownlinkKind::Event
138+
}
112139
}
113140
}
114141

server/swimos_agent/src/agent_model/downlink/hosted/event/tests.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio_util::codec::FramedWrite;
3030

3131
use super::{EventDownlinkFactory, SimpleDownlinkConfig};
3232
use crate::{
33-
agent_model::downlink::{BoxDownlinkChannel, DownlinkChannelEvent},
33+
agent_model::downlink::{BoxDownlinkChannel, DownlinkChannelEvent, DownlinkChannelFactory},
3434
downlink_lifecycle::{OnConsumeEvent, OnFailed, OnLinked, OnSynced, OnUnlinked},
3535
event_handler::{HandlerActionExt, LocalBoxEventHandler, SideEffect},
3636
};
@@ -132,24 +132,25 @@ struct TestContext {
132132
stop_tx: Option<trigger::Sender>,
133133
}
134134

135-
fn make_hosted_input(config: SimpleDownlinkConfig) -> TestContext {
135+
fn make_hosted_input(context: &FakeAgent, config: SimpleDownlinkConfig) -> TestContext {
136136
let inner: Events = Default::default();
137137
let lc = FakeLifecycle {
138138
inner: inner.clone(),
139139
};
140140

141-
let (tx, rx) = byte_channel::byte_channel(BUFFER_SIZE);
141+
let (in_tx, in_rx) = byte_channel::byte_channel(BUFFER_SIZE);
142+
let (out_tx, _) = byte_channel::byte_channel(BUFFER_SIZE);
142143
let (stop_tx, stop_rx) = trigger::trigger();
143144

144145
let address = Address::new(None, Text::new("/node"), Text::new("lane"));
145146

146147
let fac = EventDownlinkFactory::new(address, lc, config, stop_rx, false);
147148

148-
let chan = fac.create(rx);
149+
let chan = fac.create(context, out_tx, in_rx);
149150
TestContext {
150151
channel: chan,
151152
events: inner,
152-
sender: FramedWrite::new(tx, Default::default()),
153+
sender: FramedWrite::new(in_tx, Default::default()),
153154
stop_tx: Some(stop_tx),
154155
}
155156
}
@@ -206,12 +207,13 @@ async fn expect_no_unlink_shutdown(channel: &mut BoxDownlinkChannel<FakeAgent>,
206207

207208
#[tokio::test]
208209
async fn event_dl_shutdown_when_input_stops() {
210+
let agent = FakeAgent;
209211
let TestContext {
210212
mut channel,
211213
sender,
212214
stop_tx: _stop_tx,
213215
events: _events,
214-
} = make_hosted_input(SimpleDownlinkConfig::default());
216+
} = make_hosted_input(&agent, SimpleDownlinkConfig::default());
215217

216218
let agent = FakeAgent;
217219

@@ -224,12 +226,14 @@ async fn event_dl_shutdown_when_input_stops() {
224226

225227
#[tokio::test]
226228
async fn event_dl_shutdown_on_stop_signal() {
229+
let agent = FakeAgent;
230+
227231
let TestContext {
228232
mut channel,
229233
sender: _sender,
230234
stop_tx,
231235
events: _events,
232-
} = make_hosted_input(SimpleDownlinkConfig::default());
236+
} = make_hosted_input(&agent, SimpleDownlinkConfig::default());
233237

234238
let agent = FakeAgent;
235239

@@ -242,12 +246,14 @@ async fn event_dl_shutdown_on_stop_signal() {
242246

243247
#[tokio::test]
244248
async fn event_dl_terminate_on_error() {
249+
let agent = FakeAgent;
250+
245251
let TestContext {
246252
mut channel,
247253
mut sender,
248254
events,
249255
stop_tx: _stop_tx,
250-
} = make_hosted_input(SimpleDownlinkConfig::default());
256+
} = make_hosted_input(&agent, SimpleDownlinkConfig::default());
251257

252258
let agent = FakeAgent;
253259

@@ -316,7 +322,9 @@ async fn run_with_expectations(
316322

317323
#[tokio::test]
318324
async fn event_dl_emit_linked_handler() {
319-
let mut context = make_hosted_input(SimpleDownlinkConfig::default());
325+
let agent = FakeAgent;
326+
327+
let mut context = make_hosted_input(&agent, SimpleDownlinkConfig::default());
320328

321329
let agent = FakeAgent;
322330

@@ -334,7 +342,9 @@ async fn event_dl_emit_linked_handler() {
334342

335343
#[tokio::test]
336344
async fn event_dl_emit_synced_handler() {
337-
let mut context = make_hosted_input(SimpleDownlinkConfig::default());
345+
let agent = FakeAgent;
346+
347+
let mut context = make_hosted_input(&agent, SimpleDownlinkConfig::default());
338348

339349
let agent = FakeAgent;
340350

@@ -356,7 +366,9 @@ async fn event_dl_emit_synced_handler() {
356366

357367
#[tokio::test]
358368
async fn event_dl_emit_event_handlers() {
359-
let mut context = make_hosted_input(SimpleDownlinkConfig::default());
369+
let agent = FakeAgent;
370+
371+
let mut context = make_hosted_input(&agent, SimpleDownlinkConfig::default());
360372

361373
let agent = FakeAgent;
362374

@@ -382,11 +394,13 @@ async fn event_dl_emit_event_handlers() {
382394

383395
#[tokio::test]
384396
async fn event_dl_emit_events_before_synced() {
397+
let agent = FakeAgent;
398+
385399
let config = SimpleDownlinkConfig {
386400
events_when_not_synced: true,
387401
terminate_on_unlinked: true,
388402
};
389-
let mut context = make_hosted_input(config);
403+
let mut context = make_hosted_input(&agent, config);
390404

391405
let agent = FakeAgent;
392406

@@ -411,7 +425,9 @@ async fn event_dl_emit_events_before_synced() {
411425

412426
#[tokio::test]
413427
async fn event_dl_emit_unlinked_handler() {
414-
let mut context = make_hosted_input(SimpleDownlinkConfig::default());
428+
let agent = FakeAgent;
429+
430+
let mut context = make_hosted_input(&agent, SimpleDownlinkConfig::default());
415431

416432
let agent = FakeAgent;
417433

@@ -437,12 +453,14 @@ async fn event_dl_emit_unlinked_handler() {
437453

438454
#[tokio::test]
439455
async fn event_dl_revive_unlinked_downlink() {
456+
let agent = FakeAgent;
457+
440458
let config = SimpleDownlinkConfig {
441459
events_when_not_synced: true,
442460
terminate_on_unlinked: false,
443461
};
444462

445-
let mut context = make_hosted_input(config);
463+
let mut context = make_hosted_input(&agent, config);
446464

447465
let agent = FakeAgent;
448466

server/swimos_agent/src/agent_model/downlink/hosted/map/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use tokio::sync::mpsc;
4545
use tokio_util::codec::{FramedRead, FramedWrite};
4646
use tracing::{debug, error, info, trace};
4747

48-
use crate::event_handler::LocalBoxEventHandler;
48+
use crate::{agent_model::downlink::DownlinkChannelFactory, event_handler::LocalBoxEventHandler};
4949
use crate::{
5050
agent_model::downlink::{
5151
BoxDownlinkChannel, DownlinkChannel, DownlinkChannelError, DownlinkChannelEvent,
@@ -273,15 +273,25 @@ where
273273
}
274274
}
275275

276-
pub fn create<Context>(
276+
pub fn dl_state(&self) -> &Arc<AtomicU8> {
277+
&self.dl_state
278+
}
279+
}
280+
281+
impl<K, V, LC, Context> DownlinkChannelFactory<Context> for MapDownlinkFactory<K, V, LC>
282+
where
283+
K: Hash + Eq + Ord + Clone + Form + Send + 'static,
284+
V: Form + Send + 'static,
285+
K::Rec: Send,
286+
V::Rec: Send,
287+
LC: MapDownlinkLifecycle<K, V, Context> + 'static,
288+
{
289+
fn create(
277290
self,
278291
context: &Context,
279292
sender: ByteWriter,
280293
receiver: ByteReader,
281-
) -> BoxDownlinkChannel<Context>
282-
where
283-
LC: MapDownlinkLifecycle<K, V, Context> + 'static,
284-
{
294+
) -> BoxDownlinkChannel<Context> {
285295
let MapDownlinkFactory {
286296
address,
287297
state,
@@ -306,8 +316,17 @@ where
306316
Box::new(chan)
307317
}
308318

309-
pub fn dl_state(&self) -> &Arc<AtomicU8> {
310-
&self.dl_state
319+
fn create_box(
320+
self: Box<Self>,
321+
context: &Context,
322+
tx: ByteWriter,
323+
rx: ByteReader,
324+
) -> BoxDownlinkChannel<Context> {
325+
(*self).create(context, tx, rx)
326+
}
327+
328+
fn kind(&self) -> DownlinkKind {
329+
DownlinkKind::Map
311330
}
312331
}
313332

server/swimos_agent/src/agent_model/downlink/hosted/map/tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ use tokio::{io::AsyncWriteExt, sync::mpsc};
4848
use tokio_util::codec::{Encoder, FramedRead, FramedWrite};
4949

5050
use crate::{
51-
agent_model::downlink::{BoxDownlinkChannel, DownlinkChannelEvent, MapDownlinkHandle},
51+
agent_model::downlink::{
52+
BoxDownlinkChannel, DownlinkChannelEvent, DownlinkChannelFactory, MapDownlinkHandle,
53+
},
5254
config::MapDownlinkConfig,
5355
downlink_lifecycle::{
5456
OnDownlinkClear, OnDownlinkRemove, OnDownlinkUpdate, OnFailed, OnLinked, OnSynced,

0 commit comments

Comments
 (0)