Skip to content

Commit 6281c84

Browse files
committed
Merge branch 'main' of https://github.com/swimos/swim-rust into fluvio
2 parents 583c3b4 + e0e0885 commit 6281c84

File tree

6 files changed

+17
-33
lines changed

6 files changed

+17
-33
lines changed

server/swimos_connector/src/connector/egress/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ pub trait EgressConnectorSender<SendError>: Send + Clone {
142142
/// A reference to an egress context is passed to an [egress connector](`EgressConnector`) when it starts
143143
/// allowing it to request that lanes or downlinks to remote lanes be opened.
144144
pub trait EgressContext {
145+
/// Request a new, dynamic WARP lane be opened on the agent.
146+
///
147+
/// # Arguments
148+
/// * `name` - The name of the lane.
149+
/// * `kind` - The kind of the lane.
145150
fn open_lane(&mut self, name: &str, kind: WarpLaneKind);
146151

147152
/// Request an event downlink to a remote lane.

server/swimos_connector/src/connector/ingress/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,10 @@ where
9898
/// A reference to an ingress context is passed to an [ingress connector](`IngressConnector`) when it starts
9999
/// allowing it to request that lanes be opened.
100100
pub trait IngressContext {
101+
/// Request a new, dynamic WARP lane be opened on the agent.
102+
///
103+
/// # Arguments
104+
/// * `name` - The name of the lane.
105+
/// * `kind` - The kind of the lane.
101106
fn open_lane(&mut self, name: &str, kind: WarpLaneKind);
102107
}

server/swimos_connector_kafka/src/config/egress.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ impl From<&DownlinkAddress> for Address<String> {
9595
}
9696
}
9797

98-
impl DownlinkAddress {
99-
pub fn borrow_as_addr(&self) -> Address<&str> {
100-
let DownlinkAddress { host, node, lane } = self;
98+
impl<'a> From<&'a DownlinkAddress> for Address<&'a str> {
99+
fn from(value: &'a DownlinkAddress) -> Self {
100+
let DownlinkAddress { host, node, lane } = value;
101101
Address {
102102
host: host.as_ref().map(|s| s.as_str()),
103103
node,

server/swimos_connector_kafka/src/connector/egress/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,10 @@ fn open_downlinks(config: &KafkaEgressConfiguration, context: &mut dyn EgressCon
251251
..
252252
} = config;
253253
for value_dl in value_downlinks {
254-
context.open_event_downlink(value_dl.address.borrow_as_addr());
254+
context.open_event_downlink(<Address<&str>>::from(&value_dl.address));
255255
}
256256
for map_dl in map_downlinks {
257-
context.open_map_downlink(map_dl.address.borrow_as_addr());
257+
context.open_map_downlink(<Address<&str>>::from(&map_dl.address));
258258
}
259259
}
260260

server/swimos_connector_kafka/src/connector/egress/tests/integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ fn initialize_connector() {
277277
.into_iter()
278278
.collect::<HashMap<_, _>>();
279279

280-
assert_eq!(value_downlinks, vec![Address::from(&addr1())]);
281-
assert_eq!(map_downlinks, vec![Address::from(&addr2())]);
280+
assert_eq!(value_downlinks, vec![Address::<String>::from(&addr1())]);
281+
assert_eq!(map_downlinks, vec![Address::<String>::from(&addr2())]);
282282
assert_eq!(lanes_map, expected_lanes);
283283
}
284284

server/swimos_connector_kafka/src/connector/egress/tests/mod.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -87,32 +87,6 @@ fn downlinks_config() -> KafkaEgressConfiguration {
8787
}
8888
}
8989

90-
/* #[tokio::test]
91-
async fn open_lanes() {
92-
let config = lanes_config();
93-
let agent = ConnectorAgent::default();
94-
let selectors = MessageSelectors::try_from(&config).expect("Bad configuration.");
95-
96-
let semaphore = Arc::new(Semaphore::new(0));
97-
let (done_tx, done_rx) = trigger::trigger();
98-
let handler = selectors.open_lanes(done_tx, semaphore, 0);
99-
100-
let handler_task = run_handler_with_futures(&agent, handler);
101-
102-
let (modified, done_result) = timeout(TEST_TIMEOUT, join(handler_task, done_rx))
103-
.await
104-
.expect("Test timed out.");
105-
106-
assert!(modified.is_empty());
107-
assert!(done_result.is_ok());
108-
109-
let expected_value_lanes = [VALUE_LANE.to_string()].into_iter().collect::<HashSet<_>>();
110-
let expected_map_lanes = [MAP_LANE.to_string()].into_iter().collect::<HashSet<_>>();
111-
112-
assert_eq!(agent.value_lanes(), expected_value_lanes);
113-
assert_eq!(agent.map_lanes(), expected_map_lanes);
114-
} */
115-
11690
#[derive(Default, Debug, PartialEq, Eq)]
11791
struct TestEgressContext {
11892
lanes: Vec<(String, WarpLaneKind)>,

0 commit comments

Comments
 (0)