|
| 1 | +// Copyright 2015-2024 Swim Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +use std::{collections::HashMap, time::Duration}; |
| 16 | + |
| 17 | +use futures::{future::join, TryFutureExt}; |
| 18 | +use rumqttc::{Event, Incoming, MqttOptions, QoS}; |
| 19 | +use swimos_agent::agent_model::{AgentSpec, ItemDescriptor, ItemFlags}; |
| 20 | +use swimos_api::{address::Address, agent::WarpLaneKind}; |
| 21 | +use swimos_connector::{ |
| 22 | + config::format::DataFormat, BaseConnector, ConnectorAgent, EgressConnector, |
| 23 | + EgressConnectorSender, EgressContext, MessageSource, SendResult, |
| 24 | +}; |
| 25 | +use swimos_connector_util::run_handler_with_futures; |
| 26 | +use swimos_model::Value; |
| 27 | +use swimos_utilities::trigger; |
| 28 | +use tokio::time::timeout; |
| 29 | +use tracing::debug; |
| 30 | + |
| 31 | +use crate::{ |
| 32 | + config::{ExtractionSpec, TopicSpecifier}, |
| 33 | + facade::MqttFactory, |
| 34 | + EgressDownlinkSpec, EgressLaneSpec, MqttEgressConfiguration, MqttEgressConnector, |
| 35 | +}; |
| 36 | + |
| 37 | +const CLIENT_URL: &str = "mqtt://localhost:1883?client_id=test"; |
| 38 | +const CONSUMER_URL: &str = "mqtt://localhost:1883?client_id=consumer"; |
| 39 | +const LANE_NAME: &str = "lane_name"; |
| 40 | +const TOPIC: &str = "test/egress"; |
| 41 | + |
| 42 | +fn make_config() -> MqttEgressConfiguration { |
| 43 | + MqttEgressConfiguration { |
| 44 | + url: CLIENT_URL.to_string(), |
| 45 | + fixed_topic: Some(TOPIC.to_string()), |
| 46 | + value_lanes: vec![EgressLaneSpec { |
| 47 | + name: LANE_NAME.to_string(), |
| 48 | + extractor: ExtractionSpec { |
| 49 | + topic_specifier: TopicSpecifier::Fixed, |
| 50 | + payload_selector: None, |
| 51 | + }, |
| 52 | + }], |
| 53 | + map_lanes: vec![], |
| 54 | + value_downlinks: vec![EgressDownlinkSpec { |
| 55 | + address: Address::new(None, "/node", "lane").owned(), |
| 56 | + extractor: ExtractionSpec { |
| 57 | + topic_specifier: TopicSpecifier::Fixed, |
| 58 | + payload_selector: None, |
| 59 | + }, |
| 60 | + }], |
| 61 | + map_downlinks: vec![], |
| 62 | + payload_serializer: DataFormat::String, |
| 63 | + keep_alive_secs: None, |
| 64 | + max_packet_size: None, |
| 65 | + max_inflight: None, |
| 66 | + channel_size: Some(0), |
| 67 | + credentials: None, |
| 68 | + } |
| 69 | +} |
| 70 | + |
| 71 | +struct TestContext<'a> { |
| 72 | + agent: &'a ConnectorAgent, |
| 73 | +} |
| 74 | + |
| 75 | +impl<'a> EgressContext for TestContext<'a> { |
| 76 | + fn open_lane(&mut self, name: &str, kind: WarpLaneKind) { |
| 77 | + self.agent |
| 78 | + .register_dynamic_item( |
| 79 | + name, |
| 80 | + ItemDescriptor::WarpLane { |
| 81 | + kind, |
| 82 | + flags: ItemFlags::TRANSIENT, |
| 83 | + }, |
| 84 | + ) |
| 85 | + .expect("Failed to register lane."); |
| 86 | + } |
| 87 | + |
| 88 | + fn open_event_downlink(&mut self, address: Address<&str>) { |
| 89 | + assert_eq!(address, Address::new(None, "/node", "lane")); |
| 90 | + } |
| 91 | + |
| 92 | + fn open_map_downlink(&mut self, _address: Address<&str>) { |
| 93 | + panic!("Unexpected map downlink."); |
| 94 | + } |
| 95 | +} |
| 96 | + |
| 97 | +async fn init_connector( |
| 98 | + agent: &ConnectorAgent, |
| 99 | + connector: &MqttEgressConnector<MqttFactory>, |
| 100 | + done_tx: trigger::Sender, |
| 101 | +) { |
| 102 | + let mut context = TestContext { agent }; |
| 103 | + |
| 104 | + assert!(connector.initialize(&mut context).is_ok()); |
| 105 | + |
| 106 | + let handler = connector.on_start(done_tx); |
| 107 | + assert!(run_handler_with_futures(agent, handler).await.is_empty()); |
| 108 | +} |
| 109 | + |
| 110 | +async fn read_from_topic(num_messages: usize, subscribed: trigger::Sender) -> Vec<String> { |
| 111 | + let opts = MqttOptions::parse_url(CONSUMER_URL).expect("BAD URL."); |
| 112 | + |
| 113 | + let (client, mut event_loop) = rumqttc::AsyncClient::new(opts, 0); |
| 114 | + |
| 115 | + let sub = async { |
| 116 | + client |
| 117 | + .subscribe(TOPIC, QoS::AtMostOnce) |
| 118 | + .await |
| 119 | + .expect("Subscription request not sent."); |
| 120 | + }; |
| 121 | + |
| 122 | + let mut messages = Vec::with_capacity(num_messages); |
| 123 | + |
| 124 | + let mut sub_trigger = Some(subscribed); |
| 125 | + |
| 126 | + let events = async move { |
| 127 | + while messages.len() < num_messages { |
| 128 | + match event_loop.poll().await.expect("Client failed.") { |
| 129 | + Event::Incoming(Incoming::SubAck(_)) => { |
| 130 | + if let Some(s) = sub_trigger.take() { |
| 131 | + s.trigger(); |
| 132 | + } |
| 133 | + } |
| 134 | + Event::Incoming(Incoming::Publish(body)) => { |
| 135 | + let bytes = body.payload.as_ref(); |
| 136 | + let string = std::str::from_utf8(bytes) |
| 137 | + .expect("Bad payload.") |
| 138 | + .to_string(); |
| 139 | + messages.push(string); |
| 140 | + } |
| 141 | + ow => debug!(event = ?ow, "Processed MQTT event."), |
| 142 | + } |
| 143 | + } |
| 144 | + messages |
| 145 | + }; |
| 146 | + |
| 147 | + let (_, messages) = join(sub, events).await; |
| 148 | + messages |
| 149 | +} |
| 150 | + |
| 151 | +async fn drive_connector( |
| 152 | + agent: &ConnectorAgent, |
| 153 | + connector: &MqttEgressConnector<MqttFactory>, |
| 154 | + lane_messages: Vec<String>, |
| 155 | + dl_messages: Vec<String>, |
| 156 | + init_done: trigger::Receiver, |
| 157 | + subscribed: trigger::Receiver, |
| 158 | +) { |
| 159 | + init_done.await.expect("Initialization did not complete."); |
| 160 | + subscribed.await.expect("Subscription failed."); |
| 161 | + let sender = connector |
| 162 | + .make_sender(&HashMap::new()) |
| 163 | + .expect("Failed to create sender."); |
| 164 | + |
| 165 | + for message in lane_messages { |
| 166 | + let value = Value::text(&message); |
| 167 | + match sender |
| 168 | + .send(MessageSource::Lane(LANE_NAME), None, &value) |
| 169 | + .expect("Expected future.") |
| 170 | + { |
| 171 | + SendResult::Suspend(fut) => { |
| 172 | + let handler = fut.into_future().await.expect("Send failed."); |
| 173 | + assert!(run_handler_with_futures(agent, handler).await.is_empty()); |
| 174 | + } |
| 175 | + _ => panic!("Expected future."), |
| 176 | + } |
| 177 | + } |
| 178 | + |
| 179 | + let addr = Address::new(None, "/node", "lane").owned(); |
| 180 | + |
| 181 | + for message in dl_messages { |
| 182 | + let value = Value::text(&message); |
| 183 | + match sender |
| 184 | + .send(MessageSource::Downlink(&addr), None, &value) |
| 185 | + .expect("Expected future.") |
| 186 | + { |
| 187 | + SendResult::Suspend(fut) => { |
| 188 | + let handler = fut.into_future().await.expect("Send failed."); |
| 189 | + assert!(run_handler_with_futures(agent, handler).await.is_empty()); |
| 190 | + } |
| 191 | + _ => panic!("Expected future."), |
| 192 | + } |
| 193 | + } |
| 194 | +} |
| 195 | + |
| 196 | +const TEST_TIMEOUT: Duration = Duration::from_secs(5); |
| 197 | + |
| 198 | +#[tokio::test] |
| 199 | +#[ignore] // Ignored by default as this relies on an external service being present. |
| 200 | +async fn run_egress_connector() { |
| 201 | + timeout(TEST_TIMEOUT, async { |
| 202 | + let config = make_config(); |
| 203 | + let agent = ConnectorAgent::default(); |
| 204 | + let connector = MqttEgressConnector::for_config(config); |
| 205 | + let (done_tx, done_rx) = trigger::trigger(); |
| 206 | + let (sub_tx, sub_rx) = trigger::trigger(); |
| 207 | + |
| 208 | + let background = init_connector(&agent, &connector, done_tx); |
| 209 | + |
| 210 | + let lane_messages = vec!["lane1".to_string(), "lane2".to_string()]; |
| 211 | + let dl_messages = vec!["downlink1".to_string(), "downlink2".to_string()]; |
| 212 | + |
| 213 | + let connector_task = drive_connector( |
| 214 | + &agent, |
| 215 | + &connector, |
| 216 | + lane_messages.clone(), |
| 217 | + dl_messages.clone(), |
| 218 | + done_rx, |
| 219 | + sub_rx, |
| 220 | + ); |
| 221 | + |
| 222 | + let consume_task = read_from_topic(lane_messages.len() + dl_messages.len(), sub_tx); |
| 223 | + |
| 224 | + let messages = tokio::select! { |
| 225 | + (_, messages) = join(connector_task, consume_task) => messages, |
| 226 | + _ = background => panic!("Background task stopped."), |
| 227 | + }; |
| 228 | + |
| 229 | + let expected_messages = lane_messages |
| 230 | + .into_iter() |
| 231 | + .chain(dl_messages.into_iter()) |
| 232 | + .collect::<Vec<_>>(); |
| 233 | + assert_eq!(messages, expected_messages); |
| 234 | + }) |
| 235 | + .await |
| 236 | + .expect("Timed out."); |
| 237 | +} |
0 commit comments