Skip to content

Commit f53badf

Browse files
committed
Added egress connector example.
1 parent c72830a commit f53badf

File tree

8 files changed

+293
-10
lines changed

8 files changed

+293
-10
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ members = [
4646
"example_apps/kafka_ingress_connector",
4747
"example_apps/kafka_egress_connector",
4848
"example_apps/mqtt_ingress_connector",
49+
"example_apps/mqtt_egress_connector"
4950
]
5051

5152
[workspace.package]
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "mqtt-egress-connector"
3+
version.workspace = true
4+
edition.workspace = true
5+
publish = false
6+
7+
[dependencies]
8+
swimos = { workspace = true, features = ["server", "agent"] }
9+
swimos_client = { workspace = true }
10+
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
11+
example-util = { path = "../example_util" }
12+
swimos_form = { workspace = true }
13+
swimos_connector = { workspace = true }
14+
swimos_connector_mqtt = { workspace = true, features = ["json"] }
15+
swimos_recon = { workspace = true }
16+
clap = { workspace = true, features = ["derive"]}
17+
tracing = { workspace = true }
18+
tracing-subscriber = { workspace = true, features = ["env-filter"] }
19+
rand = { workspace = true }
20+
21+
[[bin]]
22+
name = "mqtt_connector_client"
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::SystemTime;
16+
17+
use rand::Rng;
18+
use swimos_form::Form;
19+
20+
#[derive(Debug, Clone, Copy, Form)]
21+
pub struct Status {
22+
pub severity: f64,
23+
}
24+
25+
#[derive(Debug, Clone, Copy, Form)]
26+
pub struct Run {
27+
pub mean_ul_sinr: i32,
28+
pub rrc_re_establishment_failures: u32,
29+
pub recorded_time: u64,
30+
}
31+
32+
#[derive(Debug, Clone, Copy, Form)]
33+
pub struct Event {
34+
pub status: Status,
35+
#[form(name = "ranLatest")]
36+
pub ran_latest: Run,
37+
}
38+
39+
#[derive(Debug, Clone, Copy, Form)]
40+
pub struct EventWithKey {
41+
pub key: i32,
42+
pub payload: Event,
43+
}
44+
45+
pub fn random_record() -> EventWithKey {
46+
let mut rng = rand::thread_rng();
47+
let severity = rng.gen_range(0.0..2.0);
48+
let mean_ul_sinr = rng.gen_range(1..50);
49+
let rrc_re_establishment_failures = rng.gen_range(1u32..10u32);
50+
51+
let t = SystemTime::now()
52+
.duration_since(SystemTime::UNIX_EPOCH)
53+
.expect("Time out of range.");
54+
let recorded_time = t.as_secs() * 1000 + t.subsec_millis() as u64;
55+
56+
let status = Status { severity };
57+
let ran_latest = Run {
58+
mean_ul_sinr,
59+
rrc_re_establishment_failures,
60+
recorded_time,
61+
};
62+
63+
let key = rng.gen_range(50..4000);
64+
EventWithKey {
65+
key,
66+
payload: Event { status, ran_latest },
67+
}
68+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::error::Error;
16+
use swimos_client::Commander;
17+
use tokio::time::{sleep, Duration};
18+
19+
mod model;
20+
21+
#[tokio::main]
22+
async fn main() -> Result<(), Box<dyn Error>> {
23+
let host = "ws://127.0.0.1:8080";
24+
25+
let mut commander = Commander::default();
26+
27+
for _ in 0..10 {
28+
let record = model::random_record();
29+
commander
30+
.send_command(host, "/mqtt", "event", &record)
31+
.await?;
32+
sleep(Duration::from_secs(5)).await;
33+
}
34+
commander.close().await;
35+
36+
Ok(())
37+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! An example demonstrating an MQTT egress connector.
16+
//!
17+
//! Run the server using the following:
18+
//! ```text
19+
//! $ cargo run --bin mqtt-egress-connector
20+
//! ```
21+
//!
22+
//! And run the client with the following:
23+
//! ```text
24+
//! $ cargo run --bin mqtt_connector_client
25+
//! ```
26+
27+
use std::{error::Error, str::FromStr, time::Duration};
28+
29+
use clap::Parser;
30+
use example_util::{example_filter, manage_handle};
31+
use swimos::{
32+
route::{RoutePattern, RouteUri},
33+
server::{Server, ServerBuilder},
34+
};
35+
use swimos_connector::EgressConnectorModel;
36+
use swimos_connector_mqtt::{MqttEgressConfiguration, MqttEgressConnector};
37+
use swimos_recon::parser::parse_recognize;
38+
39+
mod params;
40+
41+
use params::Params;
42+
use tracing::error;
43+
use tracing_subscriber::filter::LevelFilter;
44+
45+
#[tokio::main]
46+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
47+
let Params {
48+
config,
49+
enable_logging,
50+
broker_url,
51+
} = Params::parse();
52+
if enable_logging {
53+
setup_logging()?;
54+
}
55+
56+
let connector_config = load_config(config, broker_url).await?;
57+
58+
let route = RoutePattern::parse_str("/mqtt")?;
59+
60+
let connector_agent = EgressConnectorModel::for_fn(move || {
61+
MqttEgressConnector::for_config(connector_config.clone())
62+
});
63+
64+
let server = ServerBuilder::with_plane_name("Example Plane")
65+
.set_bind_addr("127.0.0.1:8080".parse()?)
66+
.add_route(route, connector_agent)
67+
.update_config(|config| {
68+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
69+
})
70+
.build()
71+
.await?;
72+
73+
let (task, handle) = server.run();
74+
75+
let uri = RouteUri::from_str("/mqtt")?;
76+
77+
let shutdown = async move {
78+
if let Err(error) = handle.start_agent(uri).await {
79+
error!(error = %error, "Failed to start connector agent.");
80+
}
81+
manage_handle(handle).await
82+
};
83+
84+
let (_, result) = tokio::join!(shutdown, task);
85+
86+
result?;
87+
println!("Server stopped successfully.");
88+
Ok(())
89+
}
90+
91+
const CONNECTOR_CONFIG: &str = include_str!("mqtt_connector.recon");
92+
93+
async fn load_config(
94+
path: Option<String>,
95+
broker_url: Option<String>,
96+
) -> Result<MqttEgressConfiguration, Box<dyn Error + Send + Sync>> {
97+
let recon = if let Some(path) = path {
98+
tokio::fs::read_to_string(path).await?
99+
} else if let Some(url) = broker_url {
100+
CONNECTOR_CONFIG.replace("##BROKER##", &url)
101+
} else {
102+
CONNECTOR_CONFIG.replace("##BROKER##", "mqtt://localhost:1883?client_id=example")
103+
};
104+
let config = parse_recognize::<MqttEgressConfiguration>(recon.as_str(), true)?;
105+
Ok(config)
106+
}
107+
108+
pub fn setup_logging() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109+
let filter = example_filter()?.add_directive(LevelFilter::INFO.into());
110+
tracing_subscriber::fmt().with_env_filter(filter).init();
111+
Ok(())
112+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
@mqtt {
2+
url: "##BROKER##",
3+
fixed_topic: "example/topic",
4+
value_lanes: {
5+
@EgressLaneSpec {
6+
name: event,
7+
extractor: @ExtractionSpec {
8+
topic_specifier: @Fixed,
9+
payload_selector: "$value"
10+
}
11+
}
12+
},
13+
map_lanes: {},
14+
value_downlinks: {},
15+
map_downlinks: {},
16+
payload_serializer: @Json,
17+
channel_size: 0
18+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::Parser;
16+
17+
#[derive(Parser)]
18+
#[command(author, version, about, long_about = None)]
19+
pub struct Params {
20+
/// Specify a Recon configuration file for the connector.
21+
#[arg(long)]
22+
pub config: Option<String>,
23+
#[arg(long, default_value = "false")]
24+
/// Specify that logging should be enabled.
25+
pub enable_logging: bool,
26+
/// Specify the MQTT broker (used if --config is not specified.)
27+
#[arg(short, long)]
28+
pub broker_url: Option<String>,
29+
}
Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
@mqtt {
22
url: "mqtt://localhost:1883?client_id=example",
3-
fixed_topic: "example/topic",
43
value_lanes: {
5-
@EgressLaneSpec {
4+
@ValueLaneSpec {
65
name: value,
7-
extractor: @ExtractionSpec {
8-
topic_specifier: @Fixed,
9-
payload_selector: "$value"
10-
}
6+
selector: "$payload",
7+
required: true
118
}
129
},
1310
map_lanes: {},
14-
value_downlinks: {},
15-
map_downlinks: {},
16-
payload_serializer: @Json,
17-
channel_size: 0
11+
relays: {},
12+
payload_deserializer: @Json,
13+
subscription: @topic("example/topic")
1814
}

0 commit comments

Comments
 (0)