Skip to content

Commit 48f05ac

Browse files
Merge pull request #697 from swimos/connector-api2
Adds the first version of the connector API and Kafka connector.
2 parents 2ac5ce6 + 69f0d22 commit 48f05ac

File tree

60 files changed

+8948
-132
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+8948
-132
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ jobs:
126126
- name: Checkout repository
127127
uses: actions/checkout@v2
128128

129-
- name: Install Clang
130-
# Required for rocksdb
131-
run: apt-get update && apt-get install -y llvm llvm-dev clang
129+
- name: Install Clang & cmake
130+
# Required for rocksdb and libdkafka
131+
run: apt-get update && apt-get install -y llvm llvm-dev clang cmake
132132

133133
- name: Set libclang path
134134
run: echo "LIBCLANG_PATH=$(llvm-config --libdir)" >> $GITHUB_ENV

Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ swimos_agent = { path = "server/swimos_agent", version = "0.1.0" }
8181
swimos_agent_derive = { path = "server/swimos_agent_derive", version = "0.1.0" }
8282
swimos_introspection = { path = "server/swimos_introspection", version = "0.1.0" }
8383
swimos_server_app = { path = "server/swimos_server_app", version = "0.1.0" }
84+
swimos_connector = { path = "server/swimos_connector", version = "0.1.0"}
85+
swimos_connector_kafka = { path = "server/swimos_connector_kafka", version = "0.1.0"}
8486
swimos = { path = "swimos", version = "0.1.0" }
8587
swimos_client = { path = "swimos_client", version = "0.1.0" }
8688
swimos_downlink = { path = "swimos_downlink", version = "0.1.0" }
@@ -174,4 +176,7 @@ waker-fn = "1.1.0"
174176
num = "0.4"
175177
smol_str = "0.2.0"
176178
http-body-util = "0.1.2"
177-
hyper-util = "0.1.5"
179+
hyper-util = "0.1.5"
180+
rdkafka = "0.25"
181+
apache-avro = "0.16.0"
182+
time = "0.3.36"

api/formats/swimos_recon/src/recon_parser/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub enum ParseError {
2929
line: u32,
3030
column: usize,
3131
},
32-
/// The parsed strucuture was not valid for the target type.
32+
/// The parsed structure was not valid for the target type.
3333
Structure(ReadError),
3434
/// The parser produced an invalid stream of events. This likely indicates
3535
/// a bug in the parser.

api/swimos_api/src/error/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use swimos_utilities::{errors::Recoverable, routing::UnapplyError, trigger::prom
2424
use thiserror::Error;
2525
use tokio::sync::{mpsc, oneshot, watch};
2626

27+
use crate::agent::WarpLaneKind;
2728
use crate::{address::RelativeAddress, agent::StoreKind};
2829

2930
mod introspection;
@@ -127,6 +128,34 @@ pub enum OpenStoreError {
127128
},
128129
}
129130

131+
/// Error type for requests to a running agent to register a new lane.
132+
#[derive(Clone, Debug, Error, PartialEq, Eq)]
133+
pub enum DynamicRegistrationError {
134+
#[error("This agent does not support dynamically registered items.")]
135+
DynamicRegistrationsNotSupported,
136+
#[error("This agent only supports dynamic registration during initialization.")]
137+
AfterInitialization,
138+
#[error("The requested item name '{0}' is already in use.")]
139+
DuplicateName(String),
140+
#[error("This agent does not support dynamically adding lanes of type: {0}")]
141+
LaneKindUnsupported(WarpLaneKind),
142+
#[error("This agent does not support dynamically adding stores of type: {0}")]
143+
StoreKindUnsupported(StoreKind),
144+
#[error("This agent does not support dynamically adding HTTP lanes.")]
145+
HttpLanesUnsupported,
146+
}
147+
148+
/// Error type for an operation to add a new lane to a running agent.
149+
#[derive(Clone, Debug, Error, PartialEq, Eq)]
150+
pub enum LaneSpawnError {
151+
/// The agent runtime stopped before the request could be completed,
152+
#[error(transparent)]
153+
Runtime(#[from] AgentRuntimeError),
154+
/// The agent refused to register the lane.
155+
#[error(transparent)]
156+
Registration(#[from] DynamicRegistrationError),
157+
}
158+
130159
impl<T> From<mpsc::error::SendError<T>> for AgentRuntimeError {
131160
fn from(_: mpsc::error::SendError<T>) -> Self {
132161
AgentRuntimeError::Terminated
@@ -235,6 +264,8 @@ pub enum AgentInitError {
235264
UserCodeError(Box<dyn std::error::Error + Send>),
236265
#[error("Initializing the state of an agent lane failed: {0}")]
237266
LaneInitializationFailure(FrameIoError),
267+
#[error("Attempting to dynamically register a lane failed: {0}")]
268+
RegistrationFailure(#[from] DynamicRegistrationError),
238269
#[error(
239270
"Requested a store of kind {requested} for item {name} but store was of kind {actual}."
240271
)]

example_apps/console/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ console-views = { path = "console_views" }
2222
http = { workspace = true }
2323
rand = { workspace = true }
2424
duration-str = { workspace = true }
25+
time = { workspace = true }
2526
thiserror = { workspace = true }
2627

2728
[target.'cfg(windows)'.dependencies]

server/swimos_agent/src/agent_lifecycle/on_start.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ use super::utility::HandlerContext;
2020

2121
/// Lifecycle event for the `on_start` event of an agent.
2222
pub trait OnStart<Context>: Send {
23-
type OnStartHandler<'a>: EventHandler<Context> + 'a
24-
where
25-
Self: 'a;
26-
27-
fn on_start(&self) -> Self::OnStartHandler<'_>;
23+
fn on_start(&self) -> impl EventHandler<Context> + '_;
2824
}
2925

3026
/// Lifecycle event for the `on_start` event of an agent where the event handler
@@ -46,10 +42,8 @@ pub trait OnStartShared<Context, Shared>: Send {
4642
}
4743

4844
impl<Context> OnStart<Context> for NoHandler {
49-
type OnStartHandler<'a> = UnitHandler where Self: 'a;
50-
51-
fn on_start(&self) -> Self::OnStartHandler<'_> {
52-
Default::default()
45+
fn on_start(&self) -> impl EventHandler<Context> + '_ {
46+
UnitHandler::default()
5347
}
5448
}
5549

@@ -58,11 +52,7 @@ where
5852
F: Fn() -> H + Send,
5953
H: EventHandler<Context> + 'static,
6054
{
61-
type OnStartHandler<'a> = H
62-
where
63-
Self: 'a;
64-
65-
fn on_start(&self) -> Self::OnStartHandler<'_> {
55+
fn on_start(&self) -> impl EventHandler<Context> + '_ {
6656
let FnHandler(f) = self;
6757
f()
6858
}

server/swimos_agent/src/agent_lifecycle/on_stop.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ use super::utility::HandlerContext;
2020

2121
/// Lifecycle event for the `on_stop` event of an agent.
2222
pub trait OnStop<Context>: Send {
23-
type OnStopHandler<'a>: EventHandler<Context> + 'a
24-
where
25-
Self: 'a;
26-
27-
fn on_stop(&self) -> Self::OnStopHandler<'_>;
23+
fn on_stop(&self) -> impl EventHandler<Context> + '_;
2824
}
2925

3026
/// Lifecycle event for the `on_stop` event of an agent where the event handler
@@ -46,12 +42,8 @@ pub trait OnStopShared<Context, Shared>: Send {
4642
}
4743

4844
impl<Context> OnStop<Context> for NoHandler {
49-
type OnStopHandler<'a> = UnitHandler
50-
where
51-
Self: 'a;
52-
53-
fn on_stop(&self) -> Self::OnStopHandler<'_> {
54-
Default::default()
45+
fn on_stop(&self) -> impl EventHandler<Context> + '_ {
46+
UnitHandler::default()
5547
}
5648
}
5749

@@ -75,11 +67,7 @@ where
7567
F: Fn() -> H + Send,
7668
H: EventHandler<Context> + 'static,
7769
{
78-
type OnStopHandler<'a> = H
79-
where
80-
Self: 'a;
81-
82-
fn on_stop(&self) -> Self::OnStopHandler<'_> {
70+
fn on_stop(&self) -> impl EventHandler<Context> + '_ {
8371
let FnHandler(f) = self;
8472
f()
8573
}

server/swimos_agent/src/agent_lifecycle/stateful/mod.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
use static_assertions::assert_impl_all;
1616
use swimos_utilities::handlers::{FnHandler, NoHandler};
1717

18-
use crate::{agent_lifecycle::AgentLifecycle, event_handler::ActionContext, meta::AgentMetadata};
18+
use crate::{
19+
agent_lifecycle::AgentLifecycle,
20+
event_handler::{ActionContext, EventHandler},
21+
meta::AgentMetadata,
22+
};
1923

2024
use super::{
2125
item_event::{ItemEvent, ItemEventShared},
@@ -110,9 +114,7 @@ where
110114
FStop: Send,
111115
ItemEv: Send,
112116
{
113-
type OnStartHandler<'a> = FStart::OnStartHandler<'a> where Self: 'a;
114-
115-
fn on_start(&self) -> Self::OnStartHandler<'_> {
117+
fn on_start(&self) -> impl EventHandler<Context> + '_ {
116118
let StatefulAgentLifecycle {
117119
state,
118120
handler_context,
@@ -132,11 +134,7 @@ where
132134
FStart: Send,
133135
ItemEv: Send,
134136
{
135-
type OnStopHandler<'a> = FStop::OnStopHandler<'a>
136-
where
137-
Self: 'a;
138-
139-
fn on_stop(&self) -> Self::OnStopHandler<'_> {
137+
fn on_stop(&self) -> impl EventHandler<Context> + '_ {
140138
let StatefulAgentLifecycle {
141139
state,
142140
handler_context,

server/swimos_agent/src/agent_lifecycle/utility/mod.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::{collections::HashMap, marker::PhantomData};
2121

2222
use futures::stream::unfold;
2323
use futures::{Future, FutureExt, Stream, StreamExt};
24+
use swimos_api::agent::WarpLaneKind;
25+
use swimos_api::error::LaneSpawnError;
2426
use tokio::time::Instant;
2527

2628
use swimos_api::address::Address;
@@ -37,7 +39,7 @@ use crate::config::{MapDownlinkConfig, SimpleDownlinkConfig};
3739
use crate::downlink_lifecycle::ValueDownlinkLifecycle;
3840
use crate::downlink_lifecycle::{EventDownlinkLifecycle, MapDownlinkLifecycle};
3941
use crate::event_handler::{
40-
run_after, run_schedule, run_schedule_async, ConstHandler, EventHandler, GetParameter,
42+
run_after, run_schedule, run_schedule_async, ConstHandler, EventHandler, Fail, GetParameter,
4143
HandlerActionExt, SendCommand, Sequentially, Stop, Suspend, UnitHandler,
4244
};
4345
use crate::event_handler::{GetAgentUri, HandlerAction, SideEffect};
@@ -51,7 +53,7 @@ use crate::lanes::demand_map::CueKey;
5153
use crate::lanes::join_map::JoinMapAddDownlink;
5254
use crate::lanes::join_value::{JoinValueAddDownlink, JoinValueLane};
5355
use crate::lanes::supply::{Supply, SupplyLane};
54-
use crate::lanes::{DemandMapLane, JoinMapLane};
56+
use crate::lanes::{DemandMapLane, JoinMapLane, OpenLane};
5557

5658
pub use self::downlink_builder::event::{
5759
StatefulEventDownlinkBuilder, StatelessEventDownlinkBuilder,
@@ -862,6 +864,56 @@ impl<Agent: 'static> HandlerContext<Agent> {
862864
pub fn stop(&self) -> impl EventHandler<Agent> + Send + 'static {
863865
HandlerActionExt::<Agent>::discard(Stop)
864866
}
867+
868+
/// Create a handler that will fail with the provided error.
869+
///
870+
/// # Arguments
871+
/// * `error` - The error.
872+
pub fn fail<T, E>(&self, error: E) -> impl HandlerAction<Agent, Completion = T> + Send + 'static
873+
where
874+
T: Send + 'static,
875+
E: std::error::Error + Send + 'static,
876+
{
877+
Fail::<T, E>::new(error)
878+
}
879+
880+
/// Attempt to open a new value lane on this agent. Note that the implementation of the underlying agent
881+
/// used must support this.
882+
///
883+
/// #Arguments
884+
///
885+
/// * `name` - The name of the lane.
886+
/// * `on_done` - A callback that will create an event handler that will be executed when the request completes.
887+
pub fn open_value_lane<OnDone, H>(
888+
&self,
889+
name: &str,
890+
on_done: OnDone,
891+
) -> impl EventHandler<Agent> + Send + 'static
892+
where
893+
OnDone: FnOnce(Result<(), LaneSpawnError>) -> H + Send + 'static,
894+
H: EventHandler<Agent> + Send + 'static,
895+
{
896+
OpenLane::new(name.to_string(), WarpLaneKind::Value, on_done)
897+
}
898+
899+
/// Attempt to open a new map lane on this agent. Note that the implementation of the underlying agent
900+
/// used must support this.
901+
///
902+
/// #Arguments
903+
///
904+
/// * `name` - The name of the lane.
905+
/// * `on_done` - A callback that will create an event handler that will be executed when the request completes.
906+
pub fn open_map_lane<OnDone, H>(
907+
&self,
908+
name: &str,
909+
on_done: OnDone,
910+
) -> impl EventHandler<Agent> + Send + 'static
911+
where
912+
OnDone: FnOnce(Result<(), LaneSpawnError>) -> H + Send + 'static,
913+
H: EventHandler<Agent> + Send + 'static,
914+
{
915+
OpenLane::new(name.to_string(), WarpLaneKind::Map, on_done)
916+
}
865917
}
866918

867919
/// Context passed to agent methods used to construct lifecycles for [`JoinValueLane`]s.

server/swimos_agent/src/agent_lifecycle/utility/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use swimos_utilities::routing::RouteUri;
2323
use crate::{
2424
event_handler::{ActionContext, HandlerAction, LocalBoxEventHandler, StepResult},
2525
meta::AgentMetadata,
26-
test_context::{no_downlink, DummyAgentContext},
26+
test_context::{no_downlink, DummyAgentContext, NO_DYN_LANES},
2727
};
2828

2929
use super::HandlerContext;
@@ -103,6 +103,7 @@ async fn suspend_repeatedly() {
103103
&spawner,
104104
&DummyAgentContext,
105105
&no_downlink,
106+
&NO_DYN_LANES,
106107
&mut join_lane_init,
107108
&mut ad_hoc_buffer,
108109
),

0 commit comments

Comments
 (0)