Skip to content

Fluvio connector example #722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ members = [
"example_apps/ripple",
"example_apps/stocks_simulated",
"example_apps/kafka_ingress_connector",
"example_apps/kafka_egress_connector"
"example_apps/kafka_egress_connector",
"example_apps/fluvio_ingress_connector"
]

[workspace.package]
Expand Down Expand Up @@ -85,6 +86,7 @@ swimos_introspection = { path = "server/swimos_introspection", version = "0.1.1"
swimos_server_app = { path = "server/swimos_server_app", version = "0.1.1" }
swimos_connector = { path = "server/swimos_connector", version = "0.1.1" }
swimos_connector_kafka = { path = "server/swimos_connector_kafka", version = "0.1.1" }
swimos_connector_fluvio = { path = "server/swimos_connector_fluvio", version = "0.1.1" }
swimos = { path = "swimos", version = "0.1.1" }
swimos_client = { path = "swimos_client", version = "0.1.1" }
swimos_downlink = { path = "swimos_downlink", version = "0.1.1" }
Expand Down
24 changes: 24 additions & 0 deletions example_apps/fluvio_ingress_connector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "fluvio_ingress_connector"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
publish = false

[dependencies]
swimos = { workspace = true, features = ["server", "agent"] }
swimos_client = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
example-util = { path = "../example_util" }
swimos_connector = { workspace = true }
swimos_connector_fluvio = { workspace = true, features = ["json"] }
swimos_recon = { workspace = true }
clap = { workspace = true, features = ["derive"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
serde_json = { workspace = true }
rand = { workspace = true }
fluvio = { workspace = true }
bytes = { workspace = true }
82 changes: 82 additions & 0 deletions example_apps/fluvio_ingress_connector/src/agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use swimos::agent::agent_lifecycle::HandlerContext;
use swimos::agent::event_handler::{EventHandler, HandlerActionExt};
use swimos::agent::lanes::{MapLane, ValueLane};
use swimos::agent::{lifecycle, AgentLaneModel};

/// Sensor Agent model.
#[derive(AgentLaneModel)]
pub struct SensorAgent {
/// The latest temperature reading.
temperature: ValueLane<i64>,
/// The latest voltage reading.
/// Key: timestamp that the key was updated.
/// Value: voltage.
voltage: MapLane<i64, f64>,
}

/// Sensor Agent lifecycle.
#[derive(Clone)]
pub struct SensorLifecycle;

#[lifecycle(SensorAgent)]
impl SensorLifecycle {
#[on_start]
pub fn on_start(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
context.get_agent_uri().and_then(move |uri| {
context.effect(move || {
println!("Starting agent at: {}", uri);
})
})
}

#[on_stop]
pub fn on_stop(&self, context: HandlerContext<SensorAgent>) -> impl EventHandler<SensorAgent> {
context.get_agent_uri().and_then(move |uri| {
context.effect(move || {
println!("Stopping agent at: {}", uri);
})
})
}

#[on_event(temperature)]
pub fn on_temperature(
&self,
context: HandlerContext<SensorAgent>,
value: &i64,
) -> impl EventHandler<SensorAgent> {
let n = *value;
context.effect(move || {
println!("Setting temperature to: {}", n);
})
}

#[on_update(voltage)]
pub fn on_update(
&self,
context: HandlerContext<SensorAgent>,
_map: &HashMap<i64, f64>,
timestamp: i64,
_prev: Option<f64>,
new_value: &f64,
) -> impl EventHandler<SensorAgent> + '_ {
let new_value = *new_value;
context.effect(move || {
println!("Setting voltage entry for {} to '{}'", timestamp, new_value);
})
}
}
26 changes: 26 additions & 0 deletions example_apps/fluvio_ingress_connector/src/fluvio_connector.recon
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@fluvio {
topic: "sensors",
addr: "127.0.0.1:9003",
partition: 0,
offset: @End {
},
value_lanes: {
@ValueLaneSpec {
name: temperature,
selector: "$payload.temperature",
required: true
}
},
map_lanes: {
},
relays: {
@Value {
node: "/sensors/$key",
lane: "temperature",
payload: "$payload.temperature",
required: true
}
},
key_deserializer: @UInt32(@LittleEndian),
payload_deserializer: @Json
}
155 changes: 155 additions & 0 deletions example_apps/fluvio_ingress_connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! An example demonstrating a Fluvio connector.
//!
//! Run the application using the following:
//! ```text
//! $ cargo run --bin fluvio_connector
//! ```

use clap::Parser;
use example_util::{example_filter, manage_handle};
use fluvio::RecordKey;
use rand::Rng;
use serde_json::json;
use std::collections::HashSet;
use std::{error::Error, str::FromStr, time::Duration};
use swimos::{
route::{RoutePattern, RouteUri},
server::{Server, ServerBuilder},
};
use swimos_connector::IngressConnectorModel;
use tokio::time::sleep;

mod agent;
mod params;

use crate::agent::{SensorAgent, SensorLifecycle};
use params::Params;
use swimos::agent::agent_model::AgentModel;
use swimos_connector_fluvio::{FluvioIngressConfiguration, FluvioIngressConnector};
use tracing::error;
use tracing_subscriber::filter::LevelFilter;

const FLUVIO_TOPIC: &str = "sensors";
const MAX_AGENTS: usize = 50;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let Params {
config,
enable_logging,
} = Params::parse();
if enable_logging {
setup_logging()?;
}

let connector_config = load_config(config).await?;

let route = RoutePattern::parse_str("/fluvio")?;

let connector_agent = IngressConnectorModel::for_fn(move || {
FluvioIngressConnector::for_config(connector_config.clone())
});
let sensor_agent = AgentModel::new(SensorAgent::default, SensorLifecycle.into_lifecycle());

let server = ServerBuilder::with_plane_name("Example Plane")
.set_bind_addr("127.0.0.1:8080".parse()?)
.add_route(route, connector_agent)
.add_route(RoutePattern::parse_str("/sensors/:id")?, sensor_agent)
.update_config(|config| {
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
})
.build()
.await?;

let (task, handle) = server.run();

let uri = RouteUri::from_str("/fluvio")?;

let shutdown = async move {
if let Err(error) = handle.start_agent(uri).await {
error!(error = %error, "Failed to start connector agent.");
}
manage_handle(handle).await
};

let (_, task_result, producer_result) = tokio::join!(shutdown, task, run_fluvio());

producer_result?;
task_result?;
println!("Server stopped successfully.");
Ok(())
}

async fn run_fluvio() -> Result<(), Box<dyn Error + Send + Sync>> {
let producer = fluvio::producer(FLUVIO_TOPIC).await?;
let mut agent_ids = HashSet::new();

loop {
let (agent_id, payload) = {
let len = agent_ids.len();
let mut rng = rand::thread_rng();

let agent_id = if len == MAX_AGENTS {
rng.gen_range(0..len)
} else {
let id = len + 1;
agent_ids.insert(id);
id
};

let payload = json! {
{
"temperature": rng.gen_range(10..100),
"voltage": rng.gen_range::<f64, _>(0.0..12.0)
}
};

(agent_id, serde_json::to_vec(&payload)?)
};

producer
.send(RecordKey::from((agent_id as u32).to_le_bytes()), payload)
.await?;
producer.flush().await?;

sleep(Duration::from_millis(500)).await;
}
}

const CONNECTOR_CONFIG: &str = include_str!("fluvio_connector.recon");

async fn load_config(
path: Option<String>,
) -> Result<FluvioIngressConfiguration, Box<dyn Error + Send + Sync>> {
let content: String;
let recon = if let Some(path) = path {
content = tokio::fs::read_to_string(path).await?;
&content
} else {
CONNECTOR_CONFIG
};
FluvioIngressConfiguration::from_str(recon)
}

pub fn setup_logging() -> Result<(), Box<dyn Error + Send + Sync>> {
let filter = example_filter()?
.add_directive(LevelFilter::INFO.into())
.add_directive("swimos_connector_fluvio=trace".parse()?)
.add_directive("swimos_connector=info".parse()?);
tracing_subscriber::fmt().with_env_filter(filter).init();
Ok(())
}
26 changes: 26 additions & 0 deletions example_apps/fluvio_ingress_connector/src/params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2015-2024 Swim Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use clap::Parser;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Params {
/// Specify a Recon configuration file for the connector.
#[arg(long)]
pub config: Option<String>,
#[arg(long, default_value = "false")]
/// Specify that logging should be enabled.
pub enable_logging: bool,
}
3 changes: 0 additions & 3 deletions runtime/swimos_runtime/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,6 @@ pub enum AgentExecError {
/// Initializing the agent failed.
#[error("Failed to initialize agent: {0}")]
FailedInit(#[from] AgentInitError),
/// Initialization completed but no lanes were registered.
#[error("The agent did not register any lanes.")]
NoInitialLanes,
/// The runtime loop of the agent failed.
#[error("The agent task failed: {0}")]
FailedTask(#[from] AgentTaskError),
Expand Down
23 changes: 10 additions & 13 deletions runtime/swimos_runtime/src/agent/task/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,16 @@ impl<Store: AgentPersistence + Send + Sync> AgentInitTask<Store> {
result?;

let Initialization { reporting, .. } = initialization;
if endpoints.lane_endpoints.is_empty() {
Err(AgentExecError::NoInitialLanes)
} else {
Ok((
InitialEndpoints::new(
reporting,
request_stream.into_inner(),
endpoints,
ext_link_state,
),
store,
))
}

Ok((
InitialEndpoints::new(
reporting,
request_stream.into_inner(),
endpoints,
ext_link_state,
),
store,
))
}
}

Expand Down
8 changes: 1 addition & 7 deletions runtime/swimos_runtime/src/agent/task/init/tests/no_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::agent::{
AgentRuntimeRequest, Endpoints, InitialEndpoints, LaneEndpoint, LaneRuntimeSpec,
StoreRuntimeSpec,
},
AgentExecError, Io, LinkRequest,
Io, LinkRequest,
};

type RawMapOperation = MapOperation<Bytes, BytesMut>;
Expand Down Expand Up @@ -183,12 +183,6 @@ impl TestInit for SingleLaneInit {
}
}

#[tokio::test]
async fn no_lanes() {
let (result, _) = run_test(NoLanesInit, StoreDisabled).await;
assert!(matches!(result, Err(AgentExecError::NoInitialLanes)));
}

#[tokio::test]
async fn single_lane() {
for config in CONFIGS {
Expand Down
Loading
Loading