Skip to content

Commit abc11b6

Browse files
authored
Merge pull request #698 from swimos/kafka-example
Kafka connector example app.
2 parents 48f05ac + 1f2c967 commit abc11b6

File tree

9 files changed

+230
-5
lines changed

9 files changed

+230
-5
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ members = [
4343
"example_apps/devguide",
4444
"example_apps/ripple",
4545
"example_apps/stocks_simulated",
46+
"example_apps/kafka_connector"
4647
]
4748

4849
[workspace.package]

example_apps/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ binaries for their corresponding client applications. Each directory contains de
1313
- [http_lane](http_lane): Application demonstrating HTTP Lanes.
1414
- [join_map](join_map): Application demonstrating Join Map Lanes.
1515
- [join_value](join_value): Application demonstrating Join Value Lanes.
16+
- [kafka_connector](kafka_connector): Application demonstrating consuming data from Apache Kafka.
1617
- [local_downlink](local_downlink): Application demonstrating Downlinks.
1718
- [map_downlink](map_downlink): Application demonstrating Map Downlinks.
1819
- [map_lane](map_lane): Application demonstrating Map Lanes.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "kafka-connector"
3+
version.workspace = true
4+
edition.workspace = true
5+
publish = false
6+
7+
[dependencies]
8+
swimos = { workspace = true, features = ["server", "agent"] }
9+
swimos_client = { workspace = true }
10+
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
11+
example-util = { path = "../example_util" }
12+
swimos_connector = { workspace = true }
13+
swimos_connector_kafka = { workspace = true, features = ["json"] }
14+
swimos_recon = { workspace = true }
15+
clap = { workspace = true, features = ["derive"]}
16+
tracing = { workspace = true }
17+
tracing-subscriber = { workspace = true, features = ["env-filter"] }
18+
19+
[[bin]]
20+
name = "kafka_connector_client"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::error::Error;
16+
use swimos_client::{BasicValueDownlinkLifecycle, RemotePath, SwimClientBuilder};
17+
18+
#[tokio::main]
19+
async fn main() -> Result<(), Box<dyn Error>> {
20+
let host = "ws://127.0.0.1:8080";
21+
let (client, task) = SwimClientBuilder::default().build().await;
22+
let task_handle = tokio::spawn(task);
23+
24+
let client_handle = client.handle();
25+
26+
let lifecycle =
27+
BasicValueDownlinkLifecycle::default().on_set_blocking(|old_value, new_value| {
28+
println!("'{old_value:?}' -> '{new_value:?}'");
29+
});
30+
let _view = client_handle
31+
.value_downlink::<i32>(RemotePath::new(host, "/kafka", "latest_key"))
32+
.lifecycle(lifecycle)
33+
.open()
34+
.await?;
35+
36+
task_handle.await?;
37+
Ok(())
38+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
@kafka {
2+
properties: {
3+
"group.id": rust-consumer-test,
4+
"message.timeout.ms": "5000",
5+
"bootstrap.servers": "datagen.nstream.cloud:9092",
6+
"auto.offset.reset": smallest
7+
},
8+
log_level: @Debug,
9+
value_lanes: {
10+
@ValueLaneSpec {
11+
name: latest_key,
12+
selector: "$key",
13+
required: true
14+
}
15+
},
16+
map_lanes: {
17+
@MapLaneSpec {
18+
name: times,
19+
key_selector: "$payload.ranLatest.mean_ul_sinr",
20+
value_selector: "$payload.ranLatest.recorded_time",
21+
remove_when_no_value: false,
22+
required: true
23+
}
24+
},
25+
key_deserializer: @Int32(@LittleEndian),
26+
payload_deserializer: @Json,
27+
topics: {
28+
cellular-integer-json
29+
}
30+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! An example demonstrating Value Lanes.
16+
//!
17+
//! Run the server using the following:
18+
//! ```text
19+
//! $ cargo run --bin value-lane
20+
//! ```
21+
//!
22+
//! And run the client with the following:
23+
//! ```text
24+
//! $ cargo run --bin value_client
25+
//! ```
26+
27+
use std::{error::Error, str::FromStr, time::Duration};
28+
29+
use clap::Parser;
30+
use example_util::{example_filter, manage_handle};
31+
use swimos::{
32+
route::{RoutePattern, RouteUri},
33+
server::{Server, ServerBuilder},
34+
};
35+
use swimos_connector::ConnectorModel;
36+
use swimos_connector_kafka::{KafkaConnector, KafkaConnectorConfiguration};
37+
use swimos_recon::parser::parse_recognize;
38+
39+
mod params;
40+
41+
use params::Params;
42+
use tracing::error;
43+
use tracing_subscriber::filter::LevelFilter;
44+
45+
#[tokio::main]
46+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
47+
let Params {
48+
config,
49+
enable_logging,
50+
} = Params::parse();
51+
if enable_logging {
52+
setup_logging()?;
53+
}
54+
55+
let connector_config = load_config(config).await?;
56+
57+
let route = RoutePattern::parse_str("/kafka")?;
58+
59+
let connector_agent =
60+
ConnectorModel::for_fn(move || KafkaConnector::for_config(connector_config.clone()));
61+
62+
let server = ServerBuilder::with_plane_name("Example Plane")
63+
.set_bind_addr("127.0.0.1:8080".parse()?)
64+
.add_route(route, connector_agent)
65+
.update_config(|config| {
66+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
67+
})
68+
.build()
69+
.await?;
70+
71+
let (task, handle) = server.run();
72+
73+
let uri = RouteUri::from_str("/kafka")?;
74+
75+
let shutdown = async move {
76+
if let Err(error) = handle.start_agent(uri).await {
77+
error!(error = %error, "Failed to start connector agent.");
78+
}
79+
manage_handle(handle).await
80+
};
81+
82+
let (_, result) = tokio::join!(shutdown, task);
83+
84+
result?;
85+
println!("Server stopped successfully.");
86+
Ok(())
87+
}
88+
89+
const CONNECTOR_CONFIG: &str = include_str!("kafka_connector.recon");
90+
91+
async fn load_config(
92+
path: Option<String>,
93+
) -> Result<KafkaConnectorConfiguration, Box<dyn Error + Send + Sync>> {
94+
let content: String;
95+
let recon = if let Some(path) = path {
96+
content = tokio::fs::read_to_string(path).await?;
97+
&content
98+
} else {
99+
CONNECTOR_CONFIG
100+
};
101+
let config = parse_recognize::<KafkaConnectorConfiguration>(recon, true)?;
102+
Ok(config)
103+
}
104+
105+
pub fn setup_logging() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
106+
let filter = example_filter()?.add_directive(LevelFilter::INFO.into());
107+
tracing_subscriber::fmt().with_env_filter(filter).init();
108+
Ok(())
109+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::Parser;
16+
17+
#[derive(Parser)]
18+
#[command(author, version, about, long_about = None)]
19+
pub struct Params {
20+
/// Specify a Recon configuration file for the connector.
21+
#[arg(long)]
22+
pub config: Option<String>,
23+
#[arg(long, default_value = "false")]
24+
/// Specify that logging should be enabled.
25+
pub enable_logging: bool,
26+
}

server/swimos_connector_kafka/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
};
2828

2929
/// Configuration parameters for the Kafka connector.
30-
#[derive(Clone, Debug, Form)]
30+
#[derive(Clone, Debug, Form, PartialEq, Eq)]
3131
#[form(tag = "kafka")]
3232
pub struct KafkaConnectorConfiguration {
3333
/// Properties to configure the Kafka consumer.
@@ -49,7 +49,7 @@ pub struct KafkaConnectorConfiguration {
4949
}
5050

5151
/// Specification of a value lane for the Kafka connector.
52-
#[derive(Clone, Debug, Form)]
52+
#[derive(Clone, Debug, Form, PartialEq, Eq)]
5353
pub struct ValueLaneSpec {
5454
/// A name to use for the lane. If not specified, the connector will attempt to infer one from the selector.
5555
pub name: Option<String>,
@@ -76,7 +76,7 @@ impl ValueLaneSpec {
7676
}
7777

7878
/// Specification of a value lane for the Kafka connector.
79-
#[derive(Clone, Debug, Form)]
79+
#[derive(Clone, Debug, Form, PartialEq, Eq)]
8080
pub struct MapLaneSpec {
8181
/// The name of the lane.
8282
pub name: String,
@@ -119,7 +119,7 @@ impl MapLaneSpec {
119119
}
120120

121121
/// Supported deserialization formats to use to interpret a component of a Kafka message.
122-
#[derive(Clone, Form, Debug, Default)]
122+
#[derive(Clone, Form, Debug, Default, PartialEq, Eq)]
123123
pub enum DeserializationFormat {
124124
#[default]
125125
Bytes,

server/swimos_connector_kafka/src/deser/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ impl MessageDeserializer for ReconDeserializer {
155155
}
156156

157157
/// Endianness for numeric deserializers.
158-
#[derive(Clone, Copy, Default, Debug, Form)]
158+
#[derive(Clone, Copy, Default, Debug, Form, PartialEq, Eq)]
159159
pub enum Endianness {
160160
#[default]
161161
LittleEndian,

0 commit comments

Comments
 (0)