Skip to content

Commit 803e2e1

Browse files
committed
Merge branch 'mqtt' into rust-1.82
2 parents ee186e9 + ae059cc commit 803e2e1

File tree

11 files changed

+347
-70
lines changed

11 files changed

+347
-70
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ members = [
4545
"example_apps/stocks_simulated",
4646
"example_apps/kafka_ingress_connector",
4747
"example_apps/kafka_egress_connector",
48+
"example_apps/fluvio_ingress_connector",
4849
"example_apps/mqtt_ingress_connector",
4950
"example_apps/mqtt_egress_connector"
5051
]
@@ -88,6 +89,7 @@ swimos_server_app = { path = "server/swimos_server_app", version = "0.1.1" }
8889
swimos_connector = { path = "server/swimos_connector", version = "0.1.1" }
8990
swimos_connector_kafka = { path = "server/swimos_connector_kafka", version = "0.1.1" }
9091
swimos_connector_mqtt = { path = "server/swimos_connector_mqtt", version = "0.1.1" }
92+
swimos_connector_fluvio = { path = "server/swimos_connector_fluvio", version = "0.1.1" }
9193
swimos = { path = "swimos", version = "0.1.1" }
9294
swimos_client = { path = "swimos_client", version = "0.1.1" }
9395
swimos_downlink = { path = "swimos_downlink", version = "0.1.1" }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "fluvio_ingress_connector"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
publish = false
9+
10+
[dependencies]
11+
swimos = { workspace = true, features = ["server", "agent"] }
12+
swimos_client = { workspace = true }
13+
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
14+
example-util = { path = "../example_util" }
15+
swimos_connector = { workspace = true }
16+
swimos_connector_fluvio = { workspace = true, features = ["json"] }
17+
swimos_recon = { workspace = true }
18+
clap = { workspace = true, features = ["derive"] }
19+
tracing = { workspace = true }
20+
tracing-subscriber = { workspace = true, features = ["env-filter"] }
21+
serde_json = { workspace = true }
22+
rand = { workspace = true }
23+
fluvio = { workspace = true }
24+
bytes = { workspace = true }
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use swimos::agent::agent_lifecycle::HandlerContext;
17+
use swimos::agent::event_handler::{EventHandler, HandlerActionExt};
18+
use swimos::agent::lanes::{MapLane, ValueLane};
19+
use swimos::agent::{lifecycle, AgentLaneModel};
20+
21+
/// Sensor Agent model.
22+
#[derive(AgentLaneModel)]
23+
pub struct SensorAgent {
24+
/// The latest temperature reading.
25+
temperature: ValueLane<i64>,
26+
/// The latest voltage reading.
27+
/// Key: timestamp that the key was updated.
28+
/// Value: voltage.
29+
voltage: MapLane<i64, f64>,
30+
}
31+
32+
/// Sensor Agent lifecycle.
33+
#[derive(Clone)]
34+
pub struct SensorLifecycle;
35+
36+
#[lifecycle(SensorAgent)]
37+
impl SensorLifecycle {
38+
#[on_start]
39+
pub fn on_start(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
40+
context.get_agent_uri().and_then(move |uri| {
41+
context.effect(move || {
42+
println!("Starting agent at: {}", uri);
43+
})
44+
})
45+
}
46+
47+
#[on_stop]
48+
pub fn on_stop(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
49+
context.get_agent_uri().and_then(move |uri| {
50+
context.effect(move || {
51+
println!("Stopping agent at: {}", uri);
52+
})
53+
})
54+
}
55+
56+
#[on_event(temperature)]
57+
pub fn on_temperature(
58+
&self,
59+
context: HandlerContext<SensorAgent>,
60+
value: &i64,
61+
) -> impl EventHandler<SensorAgent> {
62+
let n = *value;
63+
context.effect(move || {
64+
println!("Setting temperature to: {}", n);
65+
})
66+
}
67+
68+
#[on_update(voltage)]
69+
pub fn on_update(
70+
&self,
71+
context: HandlerContext<SensorAgent>,
72+
_map: &HashMap<i64, f64>,
73+
timestamp: i64,
74+
_prev: Option<f64>,
75+
new_value: &f64,
76+
) -> impl EventHandler<SensorAgent> + '_ {
77+
let new_value = *new_value;
78+
context.effect(move || {
79+
println!("Setting voltage entry for {} to '{}'", timestamp, new_value);
80+
})
81+
}
82+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
@fluvio {
2+
topic: "sensors",
3+
addr: "127.0.0.1:9003",
4+
partition: 0,
5+
offset: @End {
6+
},
7+
value_lanes: {
8+
@ValueLaneSpec {
9+
name: temperature,
10+
selector: "$payload.temperature",
11+
required: true
12+
}
13+
},
14+
map_lanes: {
15+
},
16+
relays: {
17+
@Value {
18+
node: "/sensors/$key",
19+
lane: "temperature",
20+
payload: "$payload.temperature",
21+
required: true
22+
}
23+
},
24+
key_deserializer: @UInt32(@LittleEndian),
25+
payload_deserializer: @Json
26+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! An example demonstrating a Fluvio connector.
16+
//!
17+
//! Run the application using the following:
18+
//! ```text
19+
//! $ cargo run --bin fluvio_connector
20+
//! ```
21+
22+
use clap::Parser;
23+
use example_util::{example_filter, manage_handle};
24+
use fluvio::RecordKey;
25+
use rand::Rng;
26+
use serde_json::json;
27+
use std::collections::HashSet;
28+
use std::{error::Error, str::FromStr, time::Duration};
29+
use swimos::{
30+
route::{RoutePattern, RouteUri},
31+
server::{Server, ServerBuilder},
32+
};
33+
use swimos_connector::IngressConnectorModel;
34+
use tokio::time::sleep;
35+
36+
mod agent;
37+
mod params;
38+
39+
use crate::agent::{SensorAgent, SensorLifecycle};
40+
use params::Params;
41+
use swimos::agent::agent_model::AgentModel;
42+
use swimos_connector_fluvio::{FluvioIngressConfiguration, FluvioIngressConnector};
43+
use tracing::error;
44+
use tracing_subscriber::filter::LevelFilter;
45+
46+
const FLUVIO_TOPIC: &str = "sensors";
47+
const MAX_AGENTS: usize = 50;
48+
49+
#[tokio::main]
50+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
51+
let Params {
52+
config,
53+
enable_logging,
54+
} = Params::parse();
55+
if enable_logging {
56+
setup_logging()?;
57+
}
58+
59+
let connector_config = load_config(config).await?;
60+
61+
let route = RoutePattern::parse_str("/fluvio")?;
62+
63+
let connector_agent = IngressConnectorModel::for_fn(move || {
64+
FluvioIngressConnector::for_config(connector_config.clone())
65+
});
66+
let sensor_agent = AgentModel::new(SensorAgent::default, SensorLifecycle.into_lifecycle());
67+
68+
let server = ServerBuilder::with_plane_name("Example Plane")
69+
.set_bind_addr("127.0.0.1:8080".parse()?)
70+
.add_route(route, connector_agent)
71+
.add_route(RoutePattern::parse_str("/sensors/:id")?, sensor_agent)
72+
.update_config(|config| {
73+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
74+
})
75+
.build()
76+
.await?;
77+
78+
let (task, handle) = server.run();
79+
80+
let uri = RouteUri::from_str("/fluvio")?;
81+
82+
let shutdown = async move {
83+
if let Err(error) = handle.start_agent(uri).await {
84+
error!(error = %error, "Failed to start connector agent.");
85+
}
86+
manage_handle(handle).await
87+
};
88+
89+
let (_, task_result, producer_result) = tokio::join!(shutdown, task, run_fluvio());
90+
91+
producer_result?;
92+
task_result?;
93+
println!("Server stopped successfully.");
94+
Ok(())
95+
}
96+
97+
async fn run_fluvio() -> Result<(), Box<dyn Error + Send + Sync>> {
98+
let producer = fluvio::producer(FLUVIO_TOPIC).await?;
99+
let mut agent_ids = HashSet::new();
100+
101+
loop {
102+
let (agent_id, payload) = {
103+
let len = agent_ids.len();
104+
let mut rng = rand::thread_rng();
105+
106+
let agent_id = if len == MAX_AGENTS {
107+
rng.gen_range(0..len)
108+
} else {
109+
let id = len + 1;
110+
agent_ids.insert(id);
111+
id
112+
};
113+
114+
let payload = json! {
115+
{
116+
"temperature": rng.gen_range(10..100),
117+
"voltage": rng.gen_range::<f64, _>(0.0..12.0)
118+
}
119+
};
120+
121+
(agent_id, serde_json::to_vec(&payload)?)
122+
};
123+
124+
producer
125+
.send(RecordKey::from((agent_id as u32).to_le_bytes()), payload)
126+
.await?;
127+
producer.flush().await?;
128+
129+
sleep(Duration::from_millis(500)).await;
130+
}
131+
}
132+
133+
const CONNECTOR_CONFIG: &str = include_str!("fluvio_connector.recon");
134+
135+
async fn load_config(
136+
path: Option<String>,
137+
) -> Result<FluvioIngressConfiguration, Box<dyn Error + Send + Sync>> {
138+
let content: String;
139+
let recon = if let Some(path) = path {
140+
content = tokio::fs::read_to_string(path).await?;
141+
&content
142+
} else {
143+
CONNECTOR_CONFIG
144+
};
145+
FluvioIngressConfiguration::from_str(recon)
146+
}
147+
148+
pub fn setup_logging() -> Result<(), Box<dyn Error + Send + Sync>> {
149+
let filter = example_filter()?
150+
.add_directive(LevelFilter::INFO.into())
151+
.add_directive("swimos_connector_fluvio=trace".parse()?)
152+
.add_directive("swimos_connector=info".parse()?);
153+
tracing_subscriber::fmt().with_env_filter(filter).init();
154+
Ok(())
155+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::Parser;
16+
17+
#[derive(Parser)]
18+
#[command(author, version, about, long_about = None)]
19+
pub struct Params {
20+
/// Specify a Recon configuration file for the connector.
21+
#[arg(long)]
22+
pub config: Option<String>,
23+
#[arg(long, default_value = "false")]
24+
/// Specify that logging should be enabled.
25+
pub enable_logging: bool,
26+
}

runtime/swimos_runtime/src/agent/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,9 +537,6 @@ pub enum AgentExecError {
537537
/// Initializing the agent failed.
538538
#[error("Failed to initialize agent: {0}")]
539539
FailedInit(#[from] AgentInitError),
540-
/// Initialization completed but no lanes were registered.
541-
#[error("The agent did not register any lanes.")]
542-
NoInitialLanes,
543540
/// The runtime loop of the agent failed.
544541
#[error("The agent task failed: {0}")]
545542
FailedTask(#[from] AgentTaskError),

runtime/swimos_runtime/src/agent/task/init/mod.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,16 @@ impl<Store: AgentPersistence + Send + Sync> AgentInitTask<Store> {
185185
result?;
186186

187187
let Initialization { reporting, .. } = initialization;
188-
if endpoints.lane_endpoints.is_empty() {
189-
Err(AgentExecError::NoInitialLanes)
190-
} else {
191-
Ok((
192-
InitialEndpoints::new(
193-
reporting,
194-
request_stream.into_inner(),
195-
endpoints,
196-
ext_link_state,
197-
),
198-
store,
199-
))
200-
}
188+
189+
Ok((
190+
InitialEndpoints::new(
191+
reporting,
192+
request_stream.into_inner(),
193+
endpoints,
194+
ext_link_state,
195+
),
196+
store,
197+
))
201198
}
202199
}
203200

runtime/swimos_runtime/src/agent/task/init/tests/no_store.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::agent::{
4141
AgentRuntimeRequest, Endpoints, InitialEndpoints, LaneEndpoint, LaneRuntimeSpec,
4242
StoreRuntimeSpec,
4343
},
44-
AgentExecError, Io, LinkRequest,
44+
Io, LinkRequest,
4545
};
4646

4747
type RawMapOperation = MapOperation<Bytes, BytesMut>;
@@ -183,12 +183,6 @@ impl TestInit for SingleLaneInit {
183183
}
184184
}
185185

186-
#[tokio::test]
187-
async fn no_lanes() {
188-
let (result, _) = run_test(NoLanesInit, StoreDisabled).await;
189-
assert!(matches!(result, Err(AgentExecError::NoInitialLanes)));
190-
}
191-
192186
#[tokio::test]
193187
async fn single_lane() {
194188
for config in CONFIGS {

0 commit comments

Comments
 (0)