Skip to content

Commit 37a651e

Browse files
committed
Added end to end test for MQTT ingress connector.
1 parent 60ebd76 commit 37a651e

File tree

6 files changed

+255
-21
lines changed

6 files changed

+255
-21
lines changed

server/swimos_connector/src/config/ingress.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ pub struct ValueRelaySpecification {
9191
pub required: bool,
9292
}
9393

94+
impl ValueRelaySpecification {
95+
pub fn new<S: Into<String>>(node: S, lane: S, payload: S, required: bool) -> Self {
96+
ValueRelaySpecification {
97+
node: node.into(),
98+
lane: lane.into(),
99+
payload: payload.into(),
100+
required,
101+
}
102+
}
103+
}
104+
94105
/// Specification of a map relay for the connector.
95106
#[derive(Clone, Debug, Form, PartialEq, Eq)]
96107
#[form(tag = "MapRelaySpec")]
@@ -111,6 +122,26 @@ pub struct MapRelaySpecification {
111122
pub remove_when_no_value: bool,
112123
}
113124

125+
impl MapRelaySpecification {
126+
pub fn new<S: Into<String>>(
127+
node: S,
128+
lane: S,
129+
key: S,
130+
value: S,
131+
required: bool,
132+
remove_when_no_value: bool,
133+
) -> Self {
134+
MapRelaySpecification {
135+
node: node.into(),
136+
lane: lane.into(),
137+
key: key.into(),
138+
value: value.into(),
139+
required,
140+
remove_when_no_value,
141+
}
142+
}
143+
}
144+
114145
/// Specification of a relay for the connector.
115146
#[derive(Clone, Debug, Form, PartialEq, Eq)]
116147
pub enum RelaySpecification {
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::Duration;
16+
17+
use bytes::BytesMut;
18+
use futures::{future::join, TryStreamExt};
19+
use swimos_agent::agent_model::{AgentSpec, ItemDescriptor, ItemFlags};
20+
use swimos_agent_protocol::{encoding::command::CommandMessageDecoder, CommandMessage};
21+
use swimos_api::{address::Address, agent::WarpLaneKind};
22+
use swimos_connector::{
23+
config::{
24+
format::DataFormat, IngressValueLaneSpec, RelaySpecification, ValueRelaySpecification,
25+
},
26+
BaseConnector, ConnectorAgent, IngressConnector, IngressContext,
27+
};
28+
use swimos_connector_util::{run_handler_with_futures, run_handler_with_futures_and_cmds};
29+
use swimos_model::Value;
30+
use swimos_utilities::trigger;
31+
use tokio::time::timeout;
32+
use tokio_util::codec::Decoder;
33+
34+
use crate::{
35+
facade::MqttFactory,
36+
generator::{generate_data, TOPIC},
37+
MqttIngressConfiguration, MqttIngressConnector, Subscription,
38+
};
39+
40+
const CLIENT_URL: &str = "mqtt://localhost:1883?client_id=test";
41+
const GEN_URL: &str = "mqtt://localhost:1883?client_id=generator";
42+
const LANE_NAME: &str = "value_lane";
43+
44+
fn make_configuration() -> MqttIngressConfiguration {
45+
MqttIngressConfiguration {
46+
url: CLIENT_URL.to_string(),
47+
value_lanes: vec![IngressValueLaneSpec::new(Some(LANE_NAME), "$payload", true)],
48+
map_lanes: vec![],
49+
relays: vec![RelaySpecification::Value(ValueRelaySpecification::new(
50+
"/node", "lane", "$payload", true,
51+
))],
52+
payload_deserializer: DataFormat::String,
53+
subscription: Subscription::Topic(TOPIC.to_string()),
54+
keep_alive_secs: None,
55+
max_packet_size: None,
56+
channel_size: Some(0),
57+
credentials: None,
58+
}
59+
}
60+
61+
struct TestContext<'a> {
62+
agent: &'a ConnectorAgent,
63+
}
64+
65+
impl<'a> IngressContext for TestContext<'a> {
66+
fn open_lane(&mut self, name: &str, kind: WarpLaneKind) {
67+
let TestContext { agent } = self;
68+
agent
69+
.register_dynamic_item(
70+
name,
71+
ItemDescriptor::WarpLane {
72+
kind,
73+
flags: ItemFlags::TRANSIENT,
74+
},
75+
)
76+
.expect("Registering lane failed.");
77+
}
78+
}
79+
80+
async fn init_connector(agent: &ConnectorAgent) -> MqttIngressConnector<MqttFactory> {
81+
let config = make_configuration();
82+
let connector = MqttIngressConnector::for_config(config);
83+
84+
let mut init_context = TestContext { agent };
85+
connector
86+
.initialize(&mut init_context)
87+
.expect("Initialization failed.");
88+
89+
let (done_tx, done_rx) = trigger::trigger();
90+
let handler = connector.on_start(done_tx);
91+
92+
assert!(run_handler_with_futures(agent, handler).await.is_empty());
93+
done_rx.await.expect("Starting connector failed.");
94+
connector
95+
}
96+
97+
async fn drive_connector(
98+
agent: &mut ConnectorAgent,
99+
connector: &MqttIngressConnector<MqttFactory>,
100+
events: usize,
101+
) {
102+
let mut stream = connector.create_stream().expect("Creating stream failed.");
103+
let mut command_buffer = BytesMut::new();
104+
let mut prev = None;
105+
for _ in 0..events {
106+
let handler = stream
107+
.try_next()
108+
.await
109+
.expect("Receiving message failed.")
110+
.expect("Stream stopped.");
111+
command_buffer.clear();
112+
run_handler_with_futures_and_cmds(agent, handler, &mut command_buffer).await;
113+
check_value(&mut prev, agent, &mut command_buffer);
114+
}
115+
}
116+
117+
fn check_value(
118+
prev: &mut Option<String>,
119+
agent: &mut ConnectorAgent,
120+
command_buffer: &mut BytesMut,
121+
) {
122+
let guard = agent.value_lane(LANE_NAME).expect("Lane not defined");
123+
let mut decoder = CommandMessageDecoder::<String, Value>::default();
124+
let expected_addr = Address::new(None, "/node", "lane").owned();
125+
match guard.read(Value::clone) {
126+
Value::Text(s) => {
127+
let string = s.to_string();
128+
if let Some(p) = prev.take() {
129+
assert_ne!(string, p);
130+
}
131+
let cmd_message = decoder
132+
.decode_eof(command_buffer)
133+
.expect("Decoding failed.")
134+
.expect("Expected command.");
135+
match cmd_message {
136+
CommandMessage::Addressed {
137+
target,
138+
command,
139+
overwrite_permitted,
140+
} => {
141+
assert_eq!(target, expected_addr);
142+
assert_eq!(command, Value::text(&string));
143+
assert!(!overwrite_permitted);
144+
}
145+
ow => panic!("Unexpected command message: {:?}", ow),
146+
}
147+
*prev = Some(string);
148+
}
149+
_ => panic!("Expected text."),
150+
}
151+
}
152+
153+
async fn connector_task(done_tx: trigger::Sender) {
154+
let mut agent = ConnectorAgent::default();
155+
let connector = init_connector(&agent).await;
156+
drive_connector(&mut agent, &connector, 10).await;
157+
done_tx.trigger();
158+
}
159+
160+
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
161+
162+
#[tokio::test]
163+
#[ignore] // Ignored by default as this relies on an external service being present.
164+
async fn run_ingress_connector() {
165+
let (done_tx, done_rx) = trigger::trigger();
166+
let run_connector = connector_task(done_tx);
167+
let generator = generate_data(GEN_URL.to_string(), done_rx);
168+
let (gen_result, _) = timeout(TEST_TIMEOUT, join(generator, run_connector))
169+
.await
170+
.expect("Timed out.");
171+
assert!(gen_result.is_ok());
172+
}

server/swimos_connector_mqqt/src/connector/ingress/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod end_to_end;
1516
mod integration;
1617
mod mock;

server/swimos_connector_mqqt/src/generator.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::pin::pin;
15+
use std::time::Duration;
1616

1717
use bytes::Bytes;
1818
use futures::future::join;
@@ -21,24 +21,30 @@ use rumqttc::{ClientError, MqttOptions, QoS};
2121
use swimos_utilities::trigger;
2222
use tracing::debug;
2323

24-
async fn generate_data(mqtt_uri: String, mut stop: trigger::Receiver) -> Result<(), Box<dyn std::error::Error>> {
24+
pub const TOPIC: &str = "test/topic";
2525

26+
pub async fn generate_data(
27+
mqtt_uri: String,
28+
mut stop: trigger::Receiver,
29+
) -> Result<(), Box<dyn std::error::Error>> {
2630
let opts = MqttOptions::parse_url(mqtt_uri)?;
2731

2832
let (client, mut event_loop) = rumqttc::AsyncClient::new(opts, 0);
2933
let stop_cpy = stop.clone();
30-
let events_task = async move {
34+
let events_task = async move {
3135
loop {
32-
tokio::select! {
33-
biased;
34-
_ = &mut stop => break Ok(()),
35-
event = event_loop.poll() => {
36-
match event {
37-
Ok(ev) => debug!(event = ?ev, "Processed MQTT event."),
38-
Err(err) => break Err(err),
39-
}
40-
}
41-
}
36+
tokio::select! {
37+
biased;
38+
_ = &mut stop => break Ok(()),
39+
event = event_loop.poll() => {
40+
match event {
41+
Ok(ev) => debug!(event = ?ev, "Processed MQTT event."),
42+
Err(err) => {
43+
break Err(err);
44+
}
45+
}
46+
}
47+
}
4248
}
4349
};
4450

@@ -52,7 +58,10 @@ async fn generate_data(mqtt_uri: String, mut stop: trigger::Receiver) -> Result<
5258
for _ in 0..10 {
5359
data.push(rand.gen_range('A'..='Z'));
5460
}
55-
client.publish_bytes("test/topic", QoS::AtMostOnce, true, Bytes::from(data)).await?;
61+
client
62+
.publish_bytes(TOPIC, QoS::AtMostOnce, true, Bytes::from(data))
63+
.await?;
64+
tokio::time::sleep(Duration::from_millis(100)).await;
5665
}
5766
Ok(()) as Result<(), ClientError>
5867
};
@@ -61,4 +70,4 @@ async fn generate_data(mqtt_uri: String, mut stop: trigger::Receiver) -> Result<
6170
ev_res?;
6271
gen_res?;
6372
Ok(())
64-
}
73+
}

server/swimos_connector_mqqt/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ pub use connector::{MqttEgressConnector, MqttIngressConnector, MqttSender};
2525
pub use error::MqttConnectorError;
2626

2727
#[cfg(test)]
28-
mod generator;
28+
mod generator;

server/swimos_connector_util/src/lib.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,26 +164,47 @@ where
164164
A: AgentSpec,
165165
{
166166
let mut spawner = TestSpawner::with_downlinks();
167-
let modified = run_handler_with_futures_inner(agent, handler, &mut spawner).await;
167+
let mut command_buffer = BytesMut::new();
168+
let modified =
169+
run_handler_with_futures_inner(agent, handler, &mut spawner, &mut command_buffer).await;
168170
(modified, spawner.take_downlinks())
169171
}
170172

171173
pub async fn run_handler_with_futures<A, H: EventHandler<A>>(agent: &A, handler: H) -> HashSet<u64>
172174
where
173175
A: AgentSpec,
174176
{
175-
run_handler_with_futures_inner(agent, handler, &mut TestSpawner::default()).await
177+
run_handler_with_futures_inner(
178+
agent,
179+
handler,
180+
&mut TestSpawner::default(),
181+
&mut BytesMut::new(),
182+
)
183+
.await
184+
}
185+
186+
pub async fn run_handler_with_futures_and_cmds<A, H: EventHandler<A>>(
187+
agent: &A,
188+
handler: H,
189+
command_buffer: &mut BytesMut,
190+
) -> HashSet<u64>
191+
where
192+
A: AgentSpec,
193+
{
194+
run_handler_with_futures_inner(agent, handler, &mut TestSpawner::default(), command_buffer)
195+
.await
176196
}
177197

178198
async fn run_handler_with_futures_inner<A, H: EventHandler<A>>(
179199
agent: &A,
180200
handler: H,
181201
spawner: &mut TestSpawner<A>,
202+
command_buffer: &mut BytesMut,
182203
) -> HashSet<u64>
183204
where
184205
A: AgentSpec,
185206
{
186-
let mut modified = run_handler(agent, spawner, handler);
207+
let mut modified = run_handler_with_commands(agent, spawner, handler, command_buffer);
187208
let mut handlers = vec![];
188209
let reg = move |req: LaneRequest<A>| {
189210
let LaneRequest {
@@ -211,10 +232,10 @@ where
211232

212233
while !(handlers.is_empty() && spawner.suspended.is_empty()) {
213234
let m = if let Some(h) = handlers.pop() {
214-
run_handler(agent, spawner, h)
235+
run_handler_with_commands(agent, spawner, h, command_buffer)
215236
} else {
216237
let h = spawner.suspended.next().await.expect("No handler.");
217-
run_handler(agent, spawner, h)
238+
run_handler_with_commands(agent, spawner, h, command_buffer)
218239
};
219240
modified.extend(m);
220241
for request in

0 commit comments

Comments
 (0)