Skip to content

Commit 646db0f

Browse files
committed
Tests for downlink starting.
1 parent 3e03253 commit 646db0f

File tree

2 files changed

+191
-7
lines changed
  • server/swimos_agent/src/agent_model

2 files changed

+191
-7
lines changed

server/swimos_agent/src/agent_model/mod.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,7 +1140,7 @@ where
11401140
maybe_downlink = downlinks.next(), if !downlinks.is_empty() => {
11411141
maybe_downlink.map(|downlink_event| TaskEvent::DownlinkReady { downlink_event })
11421142
}
1143-
maybe_req = lane_readers.next() => {
1143+
maybe_req = lane_readers.next(), if !lane_readers.is_empty() => {
11441144
maybe_req.map(|req| {
11451145
match req {
11461146
(id, Ok(LaneReadEvent::Value(request))) => TaskEvent::ValueRequest{
@@ -1158,8 +1158,10 @@ where
11581158
}
11591159
})
11601160
}
1161+
else => None,
11611162
}
11621163
};
1164+
11631165
let task_event: TaskEvent<ItemModel> = tokio::select! {
11641166
biased;
11651167
maybe_cmd_result = &mut cmd_send_fut, if !cmd_send_fut.is_terminated() => {
@@ -1264,7 +1266,15 @@ where
12641266
}
12651267
OpenDownlinkResult::Failed(error, retry) => {
12661268
if error.is_fatal() {
1267-
error!(error = %error, address = %{request.path}, kind = ?{request.kind}, "Start a downlink failed with a fatal error.");
1269+
let DownlinkSpawnRequest {
1270+
path,
1271+
kind,
1272+
on_done,
1273+
..
1274+
} = request;
1275+
error!(error = %error, address = %path, kind = ?kind, "Start a downlink failed with a fatal error.");
1276+
let handler = on_done(Err(error));
1277+
exec_handler!(handler);
12681278
} else {
12691279
error!(address = %{&request.path}, kind = ?{request.kind}, error = %error, "Starting a downlink failed. Attempting to retry.");
12701280
let fut = open_new_downlink(
@@ -1778,6 +1788,17 @@ struct DownlinkSpawnRequest<Context> {
17781788
on_done: DownlinkSpawnOnDone<Context>,
17791789
}
17801790

1791+
impl<Context> std::fmt::Debug for DownlinkSpawnRequest<Context> {
1792+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1793+
f.debug_struct("DownlinkSpawnRequest")
1794+
.field("path", &self.path)
1795+
.field("kind", &self.kind)
1796+
.field("make_channel", &"...")
1797+
.field("on_done", &"...")
1798+
.finish()
1799+
}
1800+
}
1801+
17811802
impl<Context> DownlinkSpawner<Context> for RefCell<Vec<DownlinkSpawnRequest<Context>>> {
17821803
fn spawn_downlink(
17831804
&self,
@@ -1868,6 +1889,7 @@ where
18681889
}
18691890
}
18701891

1892+
#[derive(Debug)]
18711893
enum OpenDownlinkResult {
18721894
Success(ByteWriter, ByteReader),
18731895
Failed(DownlinkRuntimeError, RetryStrategy),

server/swimos_agent/src/agent_model/tests/downlinks/mod.rs

Lines changed: 167 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use swimos_api::{
3030
AgentConfig, AgentContext, AgentTask, DownlinkKind, HttpLaneRequestChannel, LaneConfig,
3131
StoreKind, WarpLaneKind,
3232
},
33-
error::{AgentRuntimeError, DownlinkRuntimeError, OpenStoreError},
33+
error::{AgentRuntimeError, DownlinkFailureReason, DownlinkRuntimeError, OpenStoreError},
3434
};
3535
use swimos_model::Text;
3636
use swimos_utilities::{
@@ -92,12 +92,32 @@ impl DlTestContext {
9292
ad_hoc_channel: Arc::new(Mutex::new((ad_hoc_reader, Some(ad_hoc_writer)))),
9393
}
9494
}
95+
96+
fn with_errors(
97+
expected_address: Address<Text>,
98+
expected_kind: DownlinkKind,
99+
errors: Vec<DownlinkRuntimeError>,
100+
io: Option<(ByteWriter, ByteReader)>,
101+
) -> Self {
102+
let (ad_hoc_writer, ad_hoc_reader) = byte_channel(BUFFER_SIZE);
103+
DlTestContext {
104+
expected_address,
105+
expected_kind,
106+
responses: Arc::new(Mutex::new(DownlinkResponses::new(errors, io))),
107+
ad_hoc_channel: Arc::new(Mutex::new((ad_hoc_reader, Some(ad_hoc_writer)))),
108+
}
109+
}
95110
}
96111

97112
impl AgentContext for DlTestContext {
98113
fn ad_hoc_commands(&self) -> BoxFuture<'static, Result<ByteWriter, DownlinkRuntimeError>> {
99114
let DlTestContext { ad_hoc_channel, .. } = self;
100-
ready(Ok(ad_hoc_channel.lock().1.take().expect("Reader taken twice."))).boxed()
115+
ready(Ok(ad_hoc_channel
116+
.lock()
117+
.1
118+
.take()
119+
.expect("Reader taken twice.")))
120+
.boxed()
101121
}
102122

103123
fn add_lane(
@@ -126,7 +146,7 @@ impl AgentContext for DlTestContext {
126146
assert_eq!(addr, expected_address.borrow_parts());
127147
assert_eq!(kind, *expected_kind);
128148
let result = responses.lock().pop();
129-
ready(result).boxed()
149+
async move { result }.boxed()
130150
}
131151

132152
fn add_store(
@@ -146,6 +166,7 @@ impl AgentContext for DlTestContext {
146166
}
147167

148168
struct TestContext {
169+
_tx: mpsc::UnboundedSender<DlResult>,
149170
rx: mpsc::UnboundedReceiver<DlResult>,
150171
}
151172

@@ -163,6 +184,7 @@ async fn init_agent(context: Box<DlTestContext>) -> (AgentTask, TestContext) {
163184
let address = addr();
164185

165186
let (tx, rx) = mpsc::unbounded_channel();
187+
let tx_cpy = tx.clone();
166188

167189
let model =
168190
AgentModel::<EmptyAgent, StartDownlinkLifecycle>::from_fn(EmptyAgent::default, move || {
@@ -173,7 +195,7 @@ async fn init_agent(context: Box<DlTestContext>) -> (AgentTask, TestContext) {
173195
.initialize_agent(make_uri(), HashMap::new(), config(), context)
174196
.await
175197
.expect("Initialization failed.");
176-
(task, TestContext { rx })
198+
(task, TestContext { rx, _tx: tx_cpy })
177199
}
178200

179201
type DlResult = Result<(), DownlinkRuntimeError>;
@@ -207,10 +229,72 @@ async fn immediately_successful_downlink() {
207229

208230
let agent_context = DlTestContext::new(addr(), DownlinkKind::Value, (out_tx, in_rx));
209231

210-
let (agent_task, TestContext { mut rx }) = init_agent(Box::new(agent_context)).await;
232+
let (agent_task, TestContext { mut rx, _tx }) = init_agent(Box::new(agent_context)).await;
233+
234+
let check = async move {
235+
let result = rx.recv().await.expect("Result expected.");
236+
assert!(result.is_ok());
237+
};
238+
239+
let (result, _) = join(agent_task, check).await;
240+
assert!(result.is_ok());
241+
})
242+
.await
243+
.expect("Test timed out.");
244+
}
245+
246+
#[tokio::test]
247+
async fn immediate_fatal_error_downlink() {
248+
tokio::time::timeout(TEST_TIMEOUT, async {
249+
let agent_context = DlTestContext::with_errors(
250+
addr(),
251+
DownlinkKind::Value,
252+
vec![DownlinkRuntimeError::DownlinkConnectionFailed(
253+
DownlinkFailureReason::InvalidUrl,
254+
)],
255+
None,
256+
);
257+
258+
let (agent_task, TestContext { mut rx, _tx }) = init_agent(Box::new(agent_context)).await;
259+
260+
let check = async move {
261+
let result = rx.recv().await.expect("Result expected.");
262+
263+
assert!(matches!(
264+
result,
265+
Err(DownlinkRuntimeError::DownlinkConnectionFailed(
266+
DownlinkFailureReason::InvalidUrl
267+
))
268+
));
269+
};
270+
271+
let (result, _) = join(agent_task, check).await;
272+
assert!(result.is_ok());
273+
})
274+
.await
275+
.expect("Test timed out.");
276+
}
277+
278+
#[tokio::test]
279+
async fn error_recovery_open_downlink() {
280+
tokio::time::timeout(TEST_TIMEOUT, async {
281+
let (_in_tx, in_rx) = byte_channel(BUFFER_SIZE);
282+
let (out_tx, _out_rx) = byte_channel(BUFFER_SIZE);
283+
284+
let agent_context = DlTestContext::with_errors(
285+
addr(),
286+
DownlinkKind::Value,
287+
vec![DownlinkRuntimeError::DownlinkConnectionFailed(
288+
DownlinkFailureReason::DownlinkStopped,
289+
)],
290+
Some((out_tx, in_rx)),
291+
);
292+
293+
let (agent_task, TestContext { mut rx, _tx }) = init_agent(Box::new(agent_context)).await;
211294

212295
let check = async move {
213296
let result = rx.recv().await.expect("Result expected.");
297+
214298
assert!(result.is_ok());
215299
};
216300

@@ -220,3 +304,81 @@ async fn immediately_successful_downlink() {
220304
.await
221305
.expect("Test timed out.");
222306
}
307+
308+
#[tokio::test]
309+
async fn eventual_fatal_error_downlink() {
310+
tokio::time::timeout(TEST_TIMEOUT, async {
311+
let agent_context = DlTestContext::with_errors(
312+
addr(),
313+
DownlinkKind::Value,
314+
vec![
315+
DownlinkRuntimeError::DownlinkConnectionFailed(
316+
DownlinkFailureReason::DownlinkStopped,
317+
),
318+
DownlinkRuntimeError::DownlinkConnectionFailed(DownlinkFailureReason::InvalidUrl),
319+
],
320+
None,
321+
);
322+
323+
let (agent_task, TestContext { mut rx, _tx }) = init_agent(Box::new(agent_context)).await;
324+
325+
let check = async move {
326+
let result = rx.recv().await.expect("Result expected.");
327+
328+
assert!(matches!(
329+
result,
330+
Err(DownlinkRuntimeError::DownlinkConnectionFailed(
331+
DownlinkFailureReason::InvalidUrl
332+
))
333+
));
334+
};
335+
336+
let (result, _) = join(agent_task, check).await;
337+
assert!(result.is_ok());
338+
})
339+
.await
340+
.expect("Test timed out.");
341+
}
342+
343+
#[tokio::test]
344+
async fn exhaust_open_downlink_retries() {
345+
tokio::time::timeout(TEST_TIMEOUT, async {
346+
let (_in_tx, in_rx) = byte_channel(BUFFER_SIZE);
347+
let (out_tx, _out_rx) = byte_channel(BUFFER_SIZE);
348+
349+
let agent_context = DlTestContext::with_errors(
350+
addr(),
351+
DownlinkKind::Value,
352+
vec![
353+
DownlinkRuntimeError::DownlinkConnectionFailed(
354+
DownlinkFailureReason::DownlinkStopped,
355+
),
356+
DownlinkRuntimeError::DownlinkConnectionFailed(
357+
DownlinkFailureReason::DownlinkStopped,
358+
),
359+
DownlinkRuntimeError::DownlinkConnectionFailed(
360+
DownlinkFailureReason::DownlinkStopped,
361+
),
362+
],
363+
Some((out_tx, in_rx)),
364+
);
365+
366+
let (agent_task, TestContext { mut rx, _tx }) = init_agent(Box::new(agent_context)).await;
367+
368+
let check = async move {
369+
let result = rx.recv().await.expect("Result expected.");
370+
371+
assert!(matches!(
372+
result,
373+
Err(DownlinkRuntimeError::DownlinkConnectionFailed(
374+
DownlinkFailureReason::DownlinkStopped
375+
))
376+
));
377+
};
378+
379+
let (result, _) = join(agent_task, check).await;
380+
assert!(result.is_ok());
381+
})
382+
.await
383+
.expect("Test timed out.");
384+
}

0 commit comments

Comments
 (0)