Skip to content

Commit 58d1d99

Browse files
committed
Merge branch 'kafka-egress-example' into connector-lanes
2 parents f2b5ab7 + 06eb12e commit 58d1d99

File tree

16 files changed

+56
-72
lines changed

16 files changed

+56
-72
lines changed

example_apps/aggregations/src/car.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ impl CarLifecycle {
5454
// deregister this car with its current area
5555
let deregister_handler = context.send_command(
5656
None,
57-
format!("/area/{old_area}"),
58-
"registrations".to_string(),
57+
&format!("/area/{old_area}"),
58+
"registrations",
5959
Action::Deregister(car_id),
6060
);
6161
// register this car with its new assigned area
6262
let register_handler = context.send_command(
6363
None,
64-
format!("/area/{new_area}"),
65-
"registrations".to_string(),
64+
&format!("/area/{new_area}"),
65+
"registrations",
6666
Action::Register(car_id),
6767
);
6868

example_apps/join_value/src/room.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ impl RoomLifecycle {
4545

4646
context.send_command(
4747
None,
48-
format!("/buildings/{building_name}"),
49-
"register_room".to_string(),
48+
&format!("/buildings/{building_name}"),
49+
"register_room",
5050
room_id_str,
5151
)
5252
})

example_apps/transit/src/agents/agency.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,7 @@ impl AgencyLifecycle {
8080
let state_uri = self.agency.state_uri();
8181

8282
//Associate this agency with the state that contains it.
83-
let add_to_state = context.send_command(
84-
None,
85-
state_uri,
86-
"addAgency".to_string(),
87-
self.agency.clone(),
88-
);
83+
let add_to_state = context.send_command(None, &state_uri, "addAgency", self.agency.clone());
8984
context
9085
.get_agent_uri()
9186
.and_then(move |uri| {
@@ -254,8 +249,7 @@ fn process_new_vehicles(
254249
let additions = new_vehicles
255250
.into_iter()
256251
.map(move |(k, v)| {
257-
let to_vehicle_agent =
258-
context.send_command(None, v.uri.clone(), "addVehicle".to_string(), v.clone());
252+
let to_vehicle_agent = context.send_command(None, &v.uri, "addVehicle", v.clone());
259253
let add_vehicle = context.update(AgencyAgent::VEHICLES, k, v);
260254
to_vehicle_agent.followed_by(add_vehicle)
261255
})

example_apps/transit/src/agents/state.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ impl StateLifecycle {
8989
&agency_uri,
9090
"speed",
9191
);
92-
let add_to_country =
93-
context.send_command(None, country_uri, "addAgency".to_string(), agency.clone());
92+
let add_to_country = context.send_command(None, &country_uri, "addAgency", agency.clone());
9493
log_uri
9594
.followed_by(link_count)
9695
.followed_by(link_speed)

runtime/swimos_rocks_store/src/server/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ impl ServerPersistence for RocksServerPersistence {
279279
}
280280

281281
/// Open a RocksDB persistence store from a path in the local filesystem. If the specified database does
282-
/// not exist it will be crated.
282+
/// not exist it will be created.
283283
///
284284
/// # Arguments
285285
/// * `path` - The filesystem path to the database. If none is specified, a new database will be created in a temporary directory.

server/swimos_agent/src/agent_lifecycle/utility/downlink_builder/event.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct StatelessEventDownlinkBuilder<
4343
address: Address<Text>,
4444
config: SimpleDownlinkConfig,
4545
inner: LC,
46+
// This determines whether then downlink reports a kind of Event or MapEvent and makes no functional difference.
4647
map_events: bool,
4748
}
4849

@@ -74,7 +75,9 @@ impl<Context, T> StatelessEventDownlinkBuilder<Context, T> {
7475
}
7576

7677
impl<Context, K, V> StatelessEventDownlinkBuilder<Context, MapMessage<K, V>> {
77-
pub fn new_map(address: Address<Text>, config: SimpleDownlinkConfig) -> Self {
78+
// Creates a lifecycle for an event downlink that consumes events from a remote map lane. This alters
79+
// the type reported by the downlink from Event to MapEvent and makes not function difference.
80+
pub(crate) fn new_map(address: Address<Text>, config: SimpleDownlinkConfig) -> Self {
7881
StatelessEventDownlinkBuilder {
7982
_type: PhantomData,
8083
address,
@@ -97,18 +100,6 @@ impl<Context, T, State> StatefulEventDownlinkBuilder<Context, T, State> {
97100
}
98101
}
99102

100-
impl<Context, K, V, State> StatefulEventDownlinkBuilder<Context, MapMessage<K, V>, State> {
101-
pub fn new_map(address: Address<Text>, config: SimpleDownlinkConfig, state: State) -> Self {
102-
StatefulEventDownlinkBuilder {
103-
_type: PhantomData,
104-
address,
105-
config,
106-
inner: StatefulEventDownlinkLifecycle::new(state),
107-
map_events: false,
108-
}
109-
}
110-
}
111-
112103
impl<Context, T, LC> StatelessEventDownlinkBuilder<Context, T, LC>
113104
where
114105
LC: StatelessEventLifecycle<Context, T>,

server/swimos_agent/src/agent_lifecycle/utility/mod.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use futures::{Future, FutureExt, Stream, StreamExt};
2424
use swimos_agent_protocol::MapMessage;
2525
use swimos_api::agent::WarpLaneKind;
2626
use swimos_api::error::LaneSpawnError;
27-
use swimos_model::Text;
2827
use tokio::time::Instant;
2928

3029
use swimos_api::address::Address;
@@ -130,18 +129,17 @@ impl<Agent: 'static> HandlerContext<Agent> {
130129
/// * `node` - The target node hosting the lane.
131130
/// * `lane` - The name of the target lane.
132131
/// * `command` - The value to send.
133-
pub fn send_command<'a, S, T>(
132+
pub fn send_command<'a, T>(
134133
&self,
135-
host: Option<S>,
136-
node: S,
137-
lane: S,
134+
host: Option<&str>,
135+
node: &str,
136+
lane: &str,
138137
command: T,
139138
) -> impl EventHandler<Agent> + 'a
140139
where
141-
S: AsRef<str> + 'a,
142140
T: StructuralWritable + 'a,
143141
{
144-
let addr = Address::new(host, node, lane);
142+
let addr = Address::text(host, node, lane);
145143
SendCommand::new(addr, command, true)
146144
}
147145

@@ -962,11 +960,11 @@ impl<Agent: 'static> HandlerContext<Agent> {
962960
/// * `lane` - The name of the target lane.
963961
pub fn create_commander(
964962
&self,
965-
host: Option<impl Into<Text>>,
966-
node: impl Into<Text>,
967-
lane: impl Into<Text>,
963+
host: Option<&str>,
964+
node: &str,
965+
lane: &str,
968966
) -> impl HandlerAction<Agent, Completion = Commander<Agent>> + 'static {
969-
let address = Address::new(host.map(Into::into), node.into(), lane.into());
967+
let address = Address::text(host, node, lane);
970968
RegisterCommander::new(address)
971969
}
972970

server/swimos_agent/src/agent_model/downlink/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ impl<T, LC> OpenValueDownlinkAction<T, LC> {
8383
}
8484

8585
impl<T, LC> OpenEventDownlinkAction<T, LC> {
86+
/// # Arguments
87+
/// * `address` - The address of the remote lane to link to.
88+
/// * `lifecycle` - The lifecycle events associated with the downlink.
89+
/// * `config` - Runtime configuration parameters for the downlink.
90+
/// * `map_events` - Determines whether the downlink will report [`DownlinkKind::Event`] or
91+
/// [`DownlinkKind::MapEvent`] as its kind (and makes no functional difference).
8692
pub fn new(
8793
address: Address<Text>,
8894
lifecycle: LC,

server/swimos_agent/src/agent_model/tests/external_links/commander_lifecycle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ impl OnStart<EmptyAgent> for CommanderLifecycle {
7272
context.stop()
7373
});
7474

75-
let Address { host, node, lane } = &self.address;
76-
let create = context.create_commander(host.as_ref(), node, lane);
75+
let Address { host, node, lane } = self.address.borrow_parts();
76+
let create = context.create_commander(host, node, lane);
7777

7878
let commands = create.and_then(move |commander: Commander<EmptyAgent>| {
7979
commander

server/swimos_connector/src/connector/egress/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{collections::HashMap, error::Error, time::Duration};
15+
use std::{collections::HashMap, time::Duration};
1616

1717
use swimos_api::{address::Address, agent::WarpLaneKind};
1818
use swimos_model::Value;
@@ -26,28 +26,28 @@ use super::{BaseConnector, ConnectorFuture};
2626
/// lanes, by default, but allows for them to be added dynamically by the lifecycle. The lanes that a connector
2727
/// registers can be derived from static configuration or inferred from the external data source itself. Currently,
2828
/// it is only possible to register dynamic lanes in the initialization phase of the agent (during the `on_start`
29-
/// event). This restriction should be relaxed in the future.
29+
/// event). This restriction may be relaxed in the future.
3030
///
3131
/// When the connector starts, it will open some number of value and map lanes. Additionally, a number of event and
3232
/// map-event downlinks may be opened to remote lanes on other agents. Each time a changes is made to one of the
33-
/// lanes or an update is received on a downlinks, a message will be sent on a sender, crated by the
33+
/// lanes or an update is received on a downlinks, a message will be sent on a sender, created by the
3434
/// [`EgressConnector::make_sender`] method.
3535
///
3636
/// Note that the sender must implement [`Clone`] so that it can be shared between the agent's lanes and the
3737
/// downlinks.
3838
pub trait EgressConnector: BaseConnector {
3939
/// The type of the errors produced by the connector.
40-
type SendError: Error + Send + 'static;
40+
type Error: std::error::Error + Send + 'static;
4141

4242
/// The type of the sender created by this connector.
43-
type Sender: EgressConnectorSender<Self::SendError> + 'static;
43+
type Sender: EgressConnectorSender<Self::Error> + 'static;
4444

4545
/// Open the lanes and downlinks required by the connector. This is called during the agent's `on_start`
4646
/// event.
4747
///
4848
/// # Arguments
4949
/// * `context` - The connector makes calls to the context to request the lanes and downlinks.
50-
fn initialize(&self, context: &mut dyn EgressContext) -> Result<(), Self::SendError>;
50+
fn initialize(&self, context: &mut dyn EgressContext) -> Result<(), Self::Error>;
5151

5252
/// Create sender for the connector which is used to send messages to the external data sink. This is called
5353
/// exactly ones during the agent's `on_start` event but must implement [`Clone`] so that copies can be passed
@@ -58,7 +58,7 @@ pub trait EgressConnector: BaseConnector {
5858
fn make_sender(
5959
&self,
6060
agent_params: &HashMap<String, String>,
61-
) -> Result<Self::Sender, Self::SendError>;
61+
) -> Result<Self::Sender, Self::Error>;
6262
}
6363

6464
/// Possible results of sending a message to the external sink.

0 commit comments

Comments
 (0)