Skip to content

Commit 7e0bd0c

Browse files
authored
Merge pull request #693 from swimos/stocks
Simulated Stocks Demo
2 parents 841f64b + ae4d8b6 commit 7e0bd0c

32 files changed

+8993
-2
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ members = [
4040
"example_apps/join_value",
4141
"example_apps/aggregations",
4242
"example_apps/time_series",
43-
"example_apps/devguide", "example_apps/ripple",
43+
"example_apps/devguide",
44+
"example_apps/ripple",
45+
"example_apps/stocks_simulated",
4446
]
4547

4648
[workspace.package]
@@ -172,4 +174,4 @@ waker-fn = "1.1.0"
172174
num = "0.4"
173175
smol_str = "0.2.0"
174176
http-body-util = "0.1.2"
175-
hyper-util = "0.1.5"
177+
hyper-util = "0.1.5"

example_apps/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ binaries for their corresponding client applications. Each directory contains de
2121
- [map_store_persistence](map_store_persistence): Application demonstrating Map Stores with persistence.
2222
- [ripple](ripple): Real-time synchronous shared multiplayer experience.
2323
- [supply_lane](supply_lane): Application demonstrating Supply Lanes.
24+
- [stocks_simulated](stocks_simulated): Application demonstrating simulated stock market data.
2425
- [time_series](time_series): Reference code for the [Time Series](https://www.swimos.org/server/rust/time-series/)
2526
guide.
2627
- [transit](transit): Rust port of the [Transit](https://github.com/swimos/transit) application.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "stocks_simulated"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
9+
[dependencies]
10+
swimos = { workspace = true, features = ["server", "agent"] }
11+
swimos_form = { workspace = true }
12+
example-util = { path = "../example_util" }
13+
tracing = { workspace = true }
14+
tracing-subscriber = { workspace = true, features = ["env-filter"] }
15+
futures = { workspace = true }
16+
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
17+
rand = { workspace = true }
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use futures::stream::unfold;
16+
use futures::StreamExt;
17+
use rand::rngs::ThreadRng;
18+
use rand::Rng;
19+
use std::time::Duration;
20+
use swimos::agent::agent_lifecycle::HandlerContext;
21+
use swimos::agent::event_handler::{EventHandler, HandlerActionExt};
22+
use swimos::agent::lanes::{CommandLane, JoinValueLane, ValueLane};
23+
use swimos::agent::{lifecycle, projections, AgentLaneModel};
24+
use swimos::model::Timestamp;
25+
use swimos_form::Form;
26+
use tokio::time::sleep;
27+
use tracing::info;
28+
29+
/// Agent for tracking all the stocks.
30+
#[projections]
31+
#[derive(AgentLaneModel)]
32+
pub struct SymbolsAgent {
33+
/// Lane for receiving stocks to track.
34+
add: CommandLane<String>,
35+
/// Stock downlinks.
36+
stocks: JoinValueLane<String, Stock>,
37+
}
38+
39+
/// Symbols Agent Lifecycle.
40+
#[derive(Clone)]
41+
pub struct SymbolsLifecycle;
42+
43+
#[lifecycle(SymbolsAgent)]
44+
impl SymbolsLifecycle {
45+
/// Lifecycle event handler which is invoked exactly once when `add` lane receives a command to
46+
/// add a downlink to `id`.
47+
#[on_command(add)]
48+
pub fn add(
49+
&self,
50+
context: HandlerContext<SymbolsAgent>,
51+
id: &str,
52+
) -> impl EventHandler<SymbolsAgent> {
53+
let node = format!("/stock/{id}");
54+
context.add_downlink(
55+
SymbolsAgent::STOCKS,
56+
id.to_string(),
57+
None,
58+
node.as_str(),
59+
"status",
60+
)
61+
}
62+
}
63+
64+
/// Stock Agent implementation.
65+
#[projections]
66+
#[derive(AgentLaneModel)]
67+
pub struct StockAgent {
68+
/// The state of the current stock.
69+
status: ValueLane<Stock>,
70+
}
71+
72+
/// Stock model.
73+
#[derive(Form, Clone)]
74+
pub struct Stock {
75+
/// The timestamp that the stock was last updated.
76+
timestamp: Timestamp,
77+
/// The stock's current price.
78+
price: f64,
79+
/// The stock's current volume.
80+
volume: f64,
81+
/// The stock's current bid.
82+
bid: f64,
83+
/// The stock's current add.
84+
ask: f64,
85+
/// The stock's movement since it was last updated.
86+
movement: f64,
87+
}
88+
89+
impl Default for Stock {
90+
fn default() -> Self {
91+
Stock::select_random(0.0)
92+
}
93+
}
94+
95+
impl Stock {
96+
/// Generates a stock. Tracking its movement since `previous_price`.
97+
fn select_random(previous_price: f64) -> Stock {
98+
let mut rng = ThreadRng::default();
99+
let current_price = truncate(rng.gen());
100+
101+
Stock {
102+
timestamp: Timestamp::now(),
103+
price: current_price,
104+
volume: truncate(rng.gen::<f64>() * 400.0).powi(3),
105+
bid: truncate(rng.gen::<f64>() * 100.0),
106+
ask: truncate(rng.gen::<f64>() * 100.0),
107+
movement: current_price - previous_price,
108+
}
109+
}
110+
}
111+
112+
/// Truncates `f` to two decimal places.
113+
fn truncate(f: f64) -> f64 {
114+
(f * 100.0).round() / 100.0
115+
}
116+
117+
/// Stock Agent's lifecycle implementation.
118+
#[derive(Clone)]
119+
pub struct StockLifecycle;
120+
121+
#[lifecycle(StockAgent)]
122+
impl StockLifecycle {
123+
/// Lifecycle event handler which is invoked exactly once when the agent starts.
124+
///
125+
/// The handler will spawn a stream which will generate random stocks and send a command to the
126+
/// `SymbolsAgent` to add a downlink to this agent.
127+
#[on_start]
128+
pub fn on_start(&self, context: HandlerContext<StockAgent>) -> impl EventHandler<StockAgent> {
129+
let stream = unfold(Duration::default(), move |delay| async move {
130+
sleep(delay).await;
131+
132+
let mut rng = ThreadRng::default();
133+
let handler = generate_stock(context);
134+
let next_delay = Duration::from_secs(rng.gen_range(5..=10));
135+
136+
Some((handler, next_delay))
137+
});
138+
context
139+
.get_parameter("symbol")
140+
.and_then(move |symbol_opt: Option<String>| {
141+
let symbol = symbol_opt.expect("Missing symbol for stock");
142+
context.send_command(None, "/symbols", "add", symbol)
143+
})
144+
.followed_by(context.suspend_schedule(stream.boxed()))
145+
.followed_by(context.get_agent_uri())
146+
.and_then(move |uri| context.effect(move || info!(%uri, "Started agent")))
147+
}
148+
}
149+
150+
/// Returns an event handler which will generate a new stock and set it to the `status` lane.
151+
fn generate_stock(context: HandlerContext<StockAgent>) -> impl EventHandler<StockAgent> {
152+
context
153+
.get_value(StockAgent::STATUS)
154+
.and_then(move |previous_stock: Stock| {
155+
context.set_value(
156+
StockAgent::STATUS,
157+
Stock::select_random(previous_stock.price),
158+
)
159+
})
160+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2015-2024 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Rust port of the [Simulated Stock Demo](https://github.com/nstreamio/sim-stock-demo) application.
16+
//!
17+
//! Run the application using the following:
18+
//! ```shell
19+
//! $ cargo run --bin stocks_simulated
20+
//! ```
21+
//!
22+
//! Run the UI with the following:
23+
//! ```shell
24+
//! cd ui
25+
//! npm install
26+
//! npm run dev
27+
//! ```
28+
//! Then head to `localhost:5173` to see it in action.
29+
//! ```
30+
31+
use std::str::FromStr;
32+
use std::{error::Error, time::Duration};
33+
34+
use rand::{thread_rng, Rng};
35+
use tracing_subscriber::filter::LevelFilter;
36+
37+
use swimos::route::RouteUri;
38+
use swimos::server::ServerHandle;
39+
use swimos::{
40+
agent::agent_model::AgentModel,
41+
route::RoutePattern,
42+
server::{Server, ServerBuilder},
43+
};
44+
45+
use crate::agent::{StockAgent, StockLifecycle, SymbolsAgent, SymbolsLifecycle};
46+
47+
mod agent;
48+
49+
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
50+
51+
#[tokio::main]
52+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
53+
example_logging()?;
54+
run_swim_server().await?;
55+
56+
Ok(())
57+
}
58+
59+
/// Spawns the Swim server on 0.0.0.0:9001.
60+
async fn run_swim_server() -> Result<(), Box<dyn Error + Send + Sync>> {
61+
let symbols_agent = AgentModel::new(SymbolsAgent::default, SymbolsLifecycle.into_lifecycle());
62+
let stock_agent = AgentModel::new(StockAgent::default, StockLifecycle.into_lifecycle());
63+
64+
let swim_server = ServerBuilder::with_plane_name("Ripple Plane")
65+
.set_bind_addr("0.0.0.0:9001".parse().unwrap())
66+
.add_route(RoutePattern::parse_str("/symbols")?, symbols_agent)
67+
.add_route(RoutePattern::parse_str("/stock/:symbol")?, stock_agent)
68+
.update_config(|config| {
69+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
70+
})
71+
.build()
72+
.await?;
73+
74+
let (task, handle) = swim_server.run();
75+
let task = tokio::spawn(task);
76+
77+
start_agents(&handle).await?;
78+
task.await??;
79+
80+
println!("Server stopped");
81+
82+
Ok(())
83+
}
84+
85+
async fn start_agents(handle: &ServerHandle) -> Result<(), Box<dyn Error + Send + Sync>> {
86+
for _ in 0..20000 {
87+
let stock_id = {
88+
let mut rng = thread_rng();
89+
(0..4)
90+
.map(|_| {
91+
let idx = rng.gen_range(0..CHARSET.len());
92+
CHARSET[idx] as char
93+
})
94+
.collect::<String>()
95+
};
96+
handle
97+
.start_agent(RouteUri::from_str(format!("/stock/{stock_id}").as_str())?)
98+
.await?;
99+
}
100+
Ok(())
101+
}
102+
103+
/// Enables logging if `--enable-logging` was provided.
104+
fn example_logging() -> Result<(), Box<dyn Error + Send + Sync>> {
105+
let args = std::env::args().collect::<Vec<_>>();
106+
if args.get(1).map(String::as_str) == Some("--enable-logging") {
107+
let filter = example_util::example_filter()?
108+
.add_directive("stocks_simulated=trace".parse()?)
109+
.add_directive(LevelFilter::WARN.into());
110+
tracing_subscriber::fmt().with_env_filter(filter).init();
111+
}
112+
Ok(())
113+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module.exports = {
2+
root: true,
3+
env: { browser: true, es2020: true },
4+
extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended", "plugin:react-hooks/recommended"],
5+
ignorePatterns: ["dist", ".eslintrc.cjs"],
6+
parser: "@typescript-eslint/parser",
7+
plugins: ["react-refresh"],
8+
rules: {
9+
"react-refresh/only-export-components": ["warn", { allowConstantExport: true }],
10+
"@typescript-eslint/no-unnecessary-type-constraint": "warn",
11+
"@typescript-eslint/no-explicit-any": "off",
12+
"@typescript-eslint/no-unused-vars": "warn",
13+
},
14+
};
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Logs
2+
logs
3+
*.log
4+
npm-debug.log*
5+
yarn-debug.log*
6+
yarn-error.log*
7+
pnpm-debug.log*
8+
lerna-debug.log*
9+
10+
node_modules
11+
dist
12+
dist-ssr
13+
*.local
14+
15+
# Editor directories and files
16+
.vscode/*
17+
!.vscode/extensions.json
18+
.idea
19+
.DS_Store
20+
*.suo
21+
*.ntvs*
22+
*.njsproj
23+
*.sln
24+
*.sw?
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Stocks Demo
2+
3+
This UI displays a real-time table of a simulated collection of stocks that one might find listed on an exchange. Their
4+
key properties such as current price and daily movement are simulated in the Rust Swim application located in the `/src`
5+
directory of this repository. A single MapDownlink is opened by the UI to the `stocks` lane of the backend application's
6+
`SymbolsAgent`. This downlink syncs with the lane's state containing pricing for all entries and then receives follow-on
7+
price updates until the downlink is closed. With each update received, UI local state and table content is updated.
8+
9+
The UI in this folder was bootstrapped with React + TypeScript + Vite and uses `ag-grid-react` for its table component.
10+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<!doctype html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8" />
5+
<link rel="icon" type="image/png" href="/favicon.ico" />
6+
<link href='https://fonts.googleapis.com/css?family=Open Sans' rel='stylesheet'>
7+
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
8+
<title>NStream Stocks Demo</title>
9+
</head>
10+
<body>
11+
<div id="root"></div>
12+
<script type="module" src="/src/main.tsx"></script>
13+
</body>
14+
</html>

0 commit comments

Comments
 (0)