Skip to content

Commit 8a5ed6c

Browse files
authored
Merge pull request #719 from swimos/kafka-egress-example
Example app for Kafka egress connector.
2 parents a41bc96 + c706f8a commit 8a5ed6c

File tree

14 files changed

+329
-11
lines changed

14 files changed

+329
-11
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ members = [
4343
"example_apps/devguide",
4444
"example_apps/ripple",
4545
"example_apps/stocks_simulated",
46-
"example_apps/kafka_connector"
46+
"example_apps/kafka_ingress_connector",
47+
"example_apps/kafka_egress_connector"
4748
]
4849

4950
[workspace.package]
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "kafka-egress-connector"
3+
version.workspace = true
4+
edition.workspace = true
5+
publish = false
6+
7+
[dependencies]
8+
swimos = { workspace = true, features = ["server", "agent"] }
9+
swimos_client = { workspace = true }
10+
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
11+
example-util = { path = "../example_util" }
12+
swimos_form = { workspace = true }
13+
swimos_connector = { workspace = true }
14+
swimos_connector_kafka = { workspace = true, features = ["json"] }
15+
swimos_recon = { workspace = true }
16+
clap = { workspace = true, features = ["derive"]}
17+
tracing = { workspace = true }
18+
tracing-subscriber = { workspace = true, features = ["env-filter"] }
19+
rand = { workspace = true }
20+
21+
[[bin]]
22+
name = "kafka_connector_client"
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::error::Error;
16+
use swimos_client::Commander;
17+
use tokio::time::{sleep, Duration};
18+
19+
mod model;
20+
21+
#[tokio::main]
22+
async fn main() -> Result<(), Box<dyn Error>> {
23+
let host = "ws://127.0.0.1:8080";
24+
25+
let mut commander = Commander::default();
26+
27+
for _ in 0..10 {
28+
let record = model::random_record();
29+
commander
30+
.send_command(host, "/kafka", "event", &record)
31+
.await?;
32+
sleep(Duration::from_secs(5)).await;
33+
}
34+
commander.close().await;
35+
36+
Ok(())
37+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::SystemTime;
16+
17+
use rand::Rng;
18+
use swimos_form::Form;
19+
20+
#[derive(Debug, Clone, Copy, Form)]
21+
pub struct Status {
22+
pub severity: f64,
23+
}
24+
25+
#[derive(Debug, Clone, Copy, Form)]
26+
pub struct Run {
27+
pub mean_ul_sinr: i32,
28+
pub rrc_re_establishment_failures: u32,
29+
pub recorded_time: u64,
30+
}
31+
32+
#[derive(Debug, Clone, Copy, Form)]
33+
pub struct Event {
34+
pub status: Status,
35+
#[form(name = "ranLatest")]
36+
pub ran_latest: Run,
37+
}
38+
39+
#[derive(Debug, Clone, Copy, Form)]
40+
pub struct EventWithKey {
41+
pub key: i32,
42+
pub payload: Event,
43+
}
44+
45+
pub fn random_record() -> EventWithKey {
46+
let mut rng = rand::thread_rng();
47+
let severity = rng.gen_range(0.0..2.0);
48+
let mean_ul_sinr = rng.gen_range(1..50);
49+
let rrc_re_establishment_failures = rng.gen_range(1u32..10u32);
50+
51+
let t = SystemTime::now()
52+
.duration_since(SystemTime::UNIX_EPOCH)
53+
.expect("Time out of range.");
54+
let recorded_time = t.as_secs() * 1000 + t.subsec_millis() as u64;
55+
56+
let status = Status { severity };
57+
let ran_latest = Run {
58+
mean_ul_sinr,
59+
rrc_re_establishment_failures,
60+
recorded_time,
61+
};
62+
63+
let key = rng.gen_range(50..4000);
64+
EventWithKey {
65+
key,
66+
payload: Event { status, ran_latest },
67+
}
68+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
@kafka {
2+
properties: {
3+
"message.timeout.ms": "5000",
4+
"group.id": rust-consumer-test,
5+
"bootstrap.servers": "##SERVERS##"
6+
},
7+
log_level: @Debug,
8+
key_serializer: @Int32(@BigEndian),
9+
payload_serializer: @Json,
10+
fixed_topic: cellular-integer-json,
11+
value_lanes: {
12+
@LaneSpec {
13+
name: event,
14+
extractor: @ExtractionSpec {
15+
topic_specifier: @Fixed,
16+
key_selector: "$value.key",
17+
payload_selector: "$value.payload"
18+
}
19+
}
20+
},
21+
map_lanes: {},
22+
value_downlinks: {},
23+
map_downlinks: {},
24+
retry_timeout_ms: 5000
25+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! An example demonstrating a Kafka egress connector.
16+
//!
17+
//! Run the server using the following:
18+
//! ```text
19+
//! $ cargo run --bin kafka-egress-connector
20+
//! ```
21+
//!
22+
//! And run the client with the following:
23+
//! ```text
24+
//! $ cargo run --bin kafka_connector_client
25+
//! ```
26+
27+
use std::{error::Error, str::FromStr, time::Duration};
28+
29+
use clap::Parser;
30+
use example_util::{example_filter, manage_handle};
31+
use swimos::{
32+
route::{RoutePattern, RouteUri},
33+
server::{Server, ServerBuilder},
34+
};
35+
use swimos_connector::EgressConnectorModel;
36+
use swimos_connector_kafka::{KafkaEgressConfiguration, KafkaEgressConnector};
37+
use swimos_recon::parser::parse_recognize;
38+
39+
mod params;
40+
41+
use params::Params;
42+
use tracing::error;
43+
use tracing_subscriber::filter::LevelFilter;
44+
45+
#[tokio::main]
46+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
47+
let Params {
48+
config,
49+
enable_logging,
50+
bootstrap_servers,
51+
} = Params::parse();
52+
if enable_logging {
53+
setup_logging()?;
54+
}
55+
56+
let connector_config = load_config(config, bootstrap_servers).await?;
57+
58+
let route = RoutePattern::parse_str("/kafka")?;
59+
60+
let connector_agent = EgressConnectorModel::for_fn(move || {
61+
KafkaEgressConnector::for_config(connector_config.clone())
62+
});
63+
64+
let server = ServerBuilder::with_plane_name("Example Plane")
65+
.set_bind_addr("127.0.0.1:8080".parse()?)
66+
.add_route(route, connector_agent)
67+
.update_config(|config| {
68+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
69+
})
70+
.build()
71+
.await?;
72+
73+
let (task, handle) = server.run();
74+
75+
let uri = RouteUri::from_str("/kafka")?;
76+
77+
let shutdown = async move {
78+
if let Err(error) = handle.start_agent(uri).await {
79+
error!(error = %error, "Failed to start connector agent.");
80+
}
81+
manage_handle(handle).await
82+
};
83+
84+
let (_, result) = tokio::join!(shutdown, task);
85+
86+
result?;
87+
println!("Server stopped successfully.");
88+
Ok(())
89+
}
90+
91+
const CONNECTOR_CONFIG: &str = include_str!("kafka_connector.recon");
92+
93+
async fn load_config(
94+
path: Option<String>,
95+
bootstrap_servers: Option<String>,
96+
) -> Result<KafkaEgressConfiguration, Box<dyn Error + Send + Sync>> {
97+
let recon = if let Some(path) = path {
98+
tokio::fs::read_to_string(path).await?
99+
} else if let Some(bootstrap) = bootstrap_servers {
100+
CONNECTOR_CONFIG.replace("##SERVERS##", &bootstrap)
101+
} else {
102+
panic!("Either a configuration file or bootstrap servers must be specified.");
103+
};
104+
let config = parse_recognize::<KafkaEgressConfiguration>(recon.as_str(), true)?;
105+
Ok(config)
106+
}
107+
108+
pub fn setup_logging() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109+
let filter = example_filter()?.add_directive(LevelFilter::INFO.into());
110+
tracing_subscriber::fmt().with_env_filter(filter).init();
111+
Ok(())
112+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::Parser;
16+
17+
#[derive(Parser)]
18+
#[command(author, version, about, long_about = None)]
19+
pub struct Params {
20+
/// Specify a Recon configuration file for the connector.
21+
#[arg(long)]
22+
pub config: Option<String>,
23+
#[arg(long, default_value = "false")]
24+
/// Specify that logging should be enabled.
25+
pub enable_logging: bool,
26+
/// Specify the Kafka bootstrap servers (used if --config is not specified.)
27+
#[arg(short, long)]
28+
pub bootstrap_servers: Option<String>,
29+
}

example_apps/kafka_connector/Cargo.toml renamed to example_apps/kafka_ingress_connector/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "kafka-connector"
2+
name = "kafka-ingress-connector"
33
version.workspace = true
44
edition.workspace = true
55
publish = false

example_apps/kafka_connector/src/kafka_connector.recon renamed to example_apps/kafka_ingress_connector/src/kafka_connector.recon

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
required: true
2323
}
2424
},
25-
key_deserializer: @Int32(@LittleEndian),
25+
key_deserializer: @Int32(@BigEndian),
2626
payload_deserializer: @Json,
2727
topics: {
2828
cellular-integer-json

0 commit comments

Comments
 (0)