Skip to content

Commit 1924a79

Browse files
committed
Commander integration test.
1 parent 91d8c6e commit 1924a79

File tree

3 files changed

+141
-61
lines changed

3 files changed

+141
-61
lines changed

server/swimos_agent/src/agent_model/mod.rs

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ use futures::{
3030
};
3131
use futures::{Future, FutureExt};
3232
use pin_project::pin_project;
33-
use swimos_agent_protocol::encoding::ad_hoc::CommandMessageEncoder;
3433
use swimos_agent_protocol::encoding::store::{RawMapStoreInitDecoder, RawValueStoreInitDecoder};
35-
use swimos_agent_protocol::{CommandMessage, LaneRequest, MapMessage};
34+
use swimos_agent_protocol::{LaneRequest, MapMessage};
3635
use swimos_api::agent::DownlinkKind;
3736
use swimos_api::agent::{HttpLaneRequest, LaneConfig, RawHttpLaneResponse};
3837
use swimos_api::error::{
@@ -52,7 +51,6 @@ use swimos_utilities::future::RetryStrategy;
5251
use swimos_utilities::routing::RouteUri;
5352
use tokio::io::AsyncWriteExt;
5453
use tokio::sync::mpsc;
55-
use tokio_util::codec::Encoder;
5654
use tracing::{debug, error, info, trace};
5755
use uuid::Uuid;
5856

@@ -1064,9 +1062,8 @@ where
10641062
mut suspended,
10651063
link_requests:
10661064
LinkRequestCollector {
1067-
next_commander_id,
10681065
downlinks: downlink_requests,
1069-
commanders: commander_requests,
1066+
commander_ids,
10701067
},
10711068
mut join_lane_init,
10721069
mut ad_hoc_buffer,
@@ -1078,7 +1075,7 @@ where
10781075
let mut pending_writes = FuturesUnordered::new();
10791076
let mut link_futures = FuturesUnordered::new();
10801077
let mut external_item_ids_rev = HashMap::new();
1081-
let cmd_ids = RefCell::new(CommanderIds::new(next_commander_id));
1078+
let cmd_ids = RefCell::new(commander_ids);
10821079

10831080
for (name, id) in external_item_ids.iter() {
10841081
external_item_ids_rev.insert(*id, name);
@@ -1106,17 +1103,6 @@ where
11061103
link_futures.push(LinkFuture::Opening(Some(request), fut));
11071104
}
11081105

1109-
// Request commanders from the init phase.
1110-
let mut cmd_encoder = CommandMessageEncoder::default();
1111-
for request in commander_requests {
1112-
let CommanderRequest { path, id } = request;
1113-
1114-
let msg = CommandMessage::<_, ()>::register(path, id);
1115-
cmd_encoder
1116-
.encode(msg, &mut ad_hoc_buffer)
1117-
.expect("Encoding should be infallible.");
1118-
}
1119-
11201106
for ((name, kind), (tx, rx)) in lane_io {
11211107
if kind.map_like() {
11221108
let id = external_item_ids[&name];
@@ -1934,24 +1920,16 @@ async fn open_with_retry(
19341920
}
19351921
}
19361922

1937-
#[derive(Debug)]
1938-
struct CommanderRequest {
1939-
path: Address<Text>,
1940-
id: u16,
1941-
}
1942-
19431923
struct LinkRequestCollector<Context> {
1944-
next_commander_id: u16,
19451924
downlinks: Vec<DownlinkSpawnRequest<Context>>,
1946-
commanders: Vec<CommanderRequest>,
1925+
commander_ids: CommanderIds,
19471926
}
19481927

19491928
impl<Context> Default for LinkRequestCollector<Context> {
19501929
fn default() -> Self {
19511930
Self {
1952-
next_commander_id: 0,
19531931
downlinks: Default::default(),
1954-
commanders: Default::default(),
1932+
commander_ids: CommanderIds::default(),
19551933
}
19561934
}
19571935
}
@@ -1960,7 +1938,7 @@ impl<Context> std::fmt::Debug for LinkRequestCollector<Context> {
19601938
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19611939
f.debug_struct("LinkRequestCollector")
19621940
.field("downlinks", &self.downlinks)
1963-
.field("commanders", &self.commanders)
1941+
.field("commander_ids", &self.commander_ids)
19641942
.finish()
19651943
}
19661944
}
@@ -1982,14 +1960,7 @@ impl<Context> LinkSpawner<Context> for RefCell<LinkRequestCollector<Context>> {
19821960

19831961
fn register_commander(&self, path: Address<Text>) -> Result<u16, CommanderRegistrationError> {
19841962
let mut guard = self.borrow_mut();
1985-
let id = guard.next_commander_id;
1986-
if let Some(next) = guard.next_commander_id.checked_add(1) {
1987-
guard.next_commander_id = next;
1988-
} else {
1989-
return Err(CommanderRegistrationError::CommanderIdOverflow);
1990-
}
1991-
guard.commanders.push(CommanderRequest { path, id });
1992-
Ok(id)
1963+
guard.commander_ids.get_request(&path)
19931964
}
19941965
}
19951966

@@ -2017,19 +1988,13 @@ where
20171988
}
20181989
}
20191990

1991+
#[derive(Default, Debug)]
20201992
struct CommanderIds {
20211993
next_id: u16,
20221994
assigned: HashMap<Address<Text>, u16>,
20231995
}
20241996

20251997
impl CommanderIds {
2026-
fn new(next_id: u16) -> Self {
2027-
CommanderIds {
2028-
next_id,
2029-
assigned: HashMap::new(),
2030-
}
2031-
}
2032-
20331998
fn get_request(&mut self, address: &Address<Text>) -> Result<u16, CommanderRegistrationError> {
20341999
let id = if let Some(id) = self.assigned.get(address) {
20352000
*id

server/swimos_agent/src/agent_model/tests/external_links/commander_lifecycle.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use parking_lot::Mutex;
1518
use swimos_api::address::Address;
1619
use swimos_model::Text;
20+
use swimos_utilities::trigger;
1721

1822
use crate::{
1923
agent_lifecycle::{
@@ -28,11 +32,21 @@ use super::empty_agent::EmptyAgent;
2832

2933
pub struct CommanderLifecycle {
3034
address: Address<Text>,
35+
stop_rx: Arc<Mutex<Option<trigger::Receiver>>>,
36+
}
37+
38+
impl CommanderLifecycle {
39+
pub fn new(address: Address<Text>, stop_rx: trigger::Receiver) -> Self {
40+
CommanderLifecycle {
41+
address,
42+
stop_rx: Arc::new(Mutex::new(Some(stop_rx))),
43+
}
44+
}
3145
}
3246

3347
impl CommanderLifecycle {
34-
pub fn new(address: Address<Text>) -> Self {
35-
CommanderLifecycle { address }
48+
fn take_stop_rx(&self) -> trigger::Receiver {
49+
self.stop_rx.lock().take().expect("Already taken.")
3650
}
3751
}
3852

@@ -51,14 +65,23 @@ impl OnInit<EmptyAgent> for CommanderLifecycle {
5165
impl OnStart<EmptyAgent> for CommanderLifecycle {
5266
fn on_start(&self) -> impl EventHandler<EmptyAgent> + '_ {
5367
let context: HandlerContext<EmptyAgent> = Default::default();
68+
69+
let stop_rx = self.take_stop_rx();
70+
let stop_handler = context.suspend(async move {
71+
assert!(stop_rx.await.is_ok());
72+
context.stop()
73+
});
74+
5475
let Address { host, node, lane } = &self.address;
5576
let create = context.create_commander(host.as_ref(), node, lane);
5677

57-
create.and_then(move |commander: Commander<EmptyAgent>| {
78+
let commands = create.and_then(move |commander: Commander<EmptyAgent>| {
5879
commander
5980
.send(7)
6081
.followed_by(context.suspend(async move { commander.send(22) }))
61-
})
82+
});
83+
84+
stop_handler.followed_by(commands)
6285
}
6386
}
6487

0 commit comments

Comments
 (0)