Skip to content

Commit d3293b1

Browse files
committed
Resolves PR comments
1 parent 47f7726 commit d3293b1

File tree

33 files changed

+2495
-1745
lines changed

33 files changed

+2495
-1745
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ swimos_rtree = { path = "swimos_utilities/swimos_rtree", version = "0.1.1" }
104104
swimos_sync = { path = "swimos_utilities/swimos_sync", version = "0.1.1" }
105105
swimos_time = { path = "swimos_utilities/swimos_time", version = "0.1.1" }
106106
swimos_encoding = { path = "swimos_utilities/swimos_encoding", version = "0.1.1" }
107+
swimos_connector_util = { path = "server/swimos_connector_util", version = "0.1.1" }
107108

108109
bytes = "1.3"
109110
tokio = "1.22"

server/swimos_connector/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ tokio = { workspace = true, features = ["rt", "test-util", "time"] }
4444
parking_lot = { workspace = true }
4545
serde = { workspace = true }
4646
serde_json = { workspace = true }
47-
uuid = { workspace = true, features = ["v4"] }
47+
uuid = { workspace = true, features = ["v4"] }
48+
swimos_connector_util = { workspace = true }

server/swimos_connector/src/config/ingress.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::selector::{BadSelector, Relay, Relays};
1+
use crate::selector::{BadSelector, PubSubSelector, Relay, Relays};
22
use swimos_form::Form;
33

44
/// Specification of a value lane for the connector.
@@ -73,66 +73,97 @@ impl IngressMapLaneSpec {
7373
}
7474
}
7575

76+
/// Specification of a value relay for the connector.
77+
#[cfg(feature = "pubsub")]
7678
#[derive(Clone, Debug, Form, PartialEq, Eq)]
7779
#[form(tag = "ValueRelaySpec")]
78-
pub struct ValueRelaySpecification {
80+
pub struct PubSubValueRelaySpecification {
81+
/// A node URI selector. See [`crate::selector::NodeSelector`] for more information.
7982
pub node: String,
83+
/// A lane URI selector. See [`crate::selector::LaneSelector`] for more information.
8084
pub lane: String,
85+
/// A payload URI selector. See [`crate::selector::RelayPayloadSelector::value`] for more information.
8186
pub payload: String,
87+
/// Whether the payload selector must yield a value. If it does not, then the selector will
88+
/// yield an error.
8289
pub required: bool,
8390
}
8491

92+
/// Specification of a map relay for the connector.
93+
#[cfg(feature = "pubsub")]
8594
#[derive(Clone, Debug, Form, PartialEq, Eq)]
8695
#[form(tag = "MapRelaySpec")]
87-
pub struct MapRelaySpecification {
96+
pub struct PubSubMapRelaySpecification {
97+
/// A node URI selector. See [`crate::selector::NodeSelector`] for more information.
8898
pub node: String,
99+
/// A lane URI selector. See [`crate::selector::LaneSelector`] for more information.
89100
pub lane: String,
101+
/// A payload URI selector. See [`crate::selector::RelayPayloadSelector::map`] for more information.
90102
pub key: String,
103+
/// A payload URI selector. See [`crate::selector::RelayPayloadSelector::map`] for more information.
91104
pub value: String,
105+
/// Whether the payload selector must yield a value. If it does not, then the selector will
106+
/// yield an error.
92107
pub required: bool,
108+
/// If the value selector fails to select, then it will emit a map remove command to remove the
109+
/// corresponding entry.
93110
pub remove_when_no_value: bool,
94111
}
95112

113+
/// Specification of a relay for the connector.
114+
#[cfg(feature = "pubsub")]
96115
#[derive(Clone, Debug, Form, PartialEq, Eq)]
97-
pub enum RelaySpecification {
98-
Value(ValueRelaySpecification),
99-
Map(MapRelaySpecification),
116+
pub enum PubSubRelaySpecification {
117+
/// Specification of a value relay for the connector.
118+
Value(PubSubValueRelaySpecification),
119+
/// Specification of a map relay for the connector.
120+
Map(PubSubMapRelaySpecification),
100121
}
101122

102-
impl TryFrom<Vec<RelaySpecification>> for Relays {
123+
#[cfg(feature = "pubsub")]
124+
impl TryFrom<Vec<PubSubRelaySpecification>> for Relays<PubSubSelector> {
103125
type Error = BadSelector;
104126

105-
fn try_from(value: Vec<RelaySpecification>) -> Result<Self, Self::Error> {
127+
fn try_from(value: Vec<PubSubRelaySpecification>) -> Result<Self, Self::Error> {
128+
use crate::selector::{
129+
parse_lane_selector, parse_map_selector, parse_node_selector, parse_value_selector,
130+
};
131+
106132
let mut chain = Vec::with_capacity(value.len());
107133

108134
for spec in value {
109135
match spec {
110-
RelaySpecification::Value(ValueRelaySpecification {
136+
PubSubRelaySpecification::Value(PubSubValueRelaySpecification {
111137
node,
112138
lane,
113139
payload,
114140
required,
115141
}) => {
116-
let relay =
117-
Relay::value(node.as_str(), lane.as_str(), payload.as_str(), required)?;
142+
let relay = Relay::new(
143+
parse_node_selector(node.as_str())?,
144+
parse_lane_selector(lane.as_str())?,
145+
parse_value_selector(payload.as_str(), required)?,
146+
);
118147
chain.push(relay);
119148
}
120-
RelaySpecification::Map(MapRelaySpecification {
149+
PubSubRelaySpecification::Map(PubSubMapRelaySpecification {
121150
node,
122151
lane,
123152
key,
124153
value,
125154
required,
126155
remove_when_no_value,
127156
}) => {
128-
let relay = Relay::map(
129-
node.as_str(),
130-
lane.as_str(),
131-
key.as_str(),
132-
value.as_str(),
133-
required,
134-
remove_when_no_value,
135-
)?;
157+
let relay = Relay::new(
158+
parse_node_selector(node.as_str())?,
159+
parse_lane_selector(lane.as_str())?,
160+
parse_map_selector(
161+
key.as_str(),
162+
value.as_str(),
163+
required,
164+
remove_when_no_value,
165+
)?,
166+
);
136167
chain.push(relay);
137168
}
138169
}

server/swimos_connector/src/deser/mod.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ pub use json::JsonDeserializer;
2626
#[cfg(feature = "avro")]
2727
pub use avro::AvroDeserializer;
2828

29-
use std::cell::RefCell;
3029
use std::error::Error;
3130
use std::{array::TryFromSliceError, convert::Infallible};
3231
use swimos_form::Form;
@@ -242,40 +241,37 @@ where
242241
pub struct Deferred<'a> {
243242
buf: &'a [u8],
244243
deser: &'a BoxMessageDeserializer,
245-
state: RefCell<Option<Value>>,
244+
state: Option<Value>,
246245
}
247246

248247
impl<'a> Deferred<'a> {
249248
pub fn new(buf: &'a [u8], deser: &'a BoxMessageDeserializer) -> Deferred<'a> {
250249
Deferred {
251250
buf,
252251
deser,
253-
state: RefCell::new(None),
252+
state: None,
254253
}
255254
}
256255

257-
pub fn get(&self) -> Result<Value, DeserializationError> {
256+
pub fn get(&mut self) -> Result<Value, DeserializationError> {
258257
let Deferred { buf, deser, state } = self;
259-
if let Some(v) = &*state.borrow() {
258+
if let Some(v) = state {
260259
Ok(v.clone())
261260
} else {
262-
let inner = &mut *state.borrow_mut();
263-
Ok(inner.insert(deser.deserialize(buf)?).clone())
261+
Ok(state.insert(deser.deserialize(buf)?).clone())
264262
}
265263
}
266264

267-
pub fn with<F>(&self, f: F) -> Result<Option<Value>, DeserializationError>
265+
pub fn with<F>(&mut self, f: F) -> Result<Option<Value>, DeserializationError>
268266
where
269267
F: FnOnce(&Value) -> Option<Value>,
270268
{
271269
let Deferred { buf, deser, state } = self;
272-
{
273-
if let Some(v) = &*state.borrow() {
274-
return Ok(f(v));
275-
}
270+
if let Some(v) = state {
271+
Ok(f(v))
272+
} else {
273+
let val = state.insert(deser.deserialize(buf)?);
274+
Ok(f(val))
276275
}
277-
let inner = &mut *state.borrow_mut();
278-
let val = inner.insert(deser.deserialize(buf)?);
279-
Ok(f(val))
280276
}
281277
}

server/swimos_connector/src/ingress/lanes.rs

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,40 +13,50 @@
1313
// limitations under the License.
1414

1515
use crate::config::{IngressMapLaneSpec, IngressValueLaneSpec};
16-
use crate::ingress::check_selectors;
17-
use crate::selector::{
18-
InvalidLanes, MapLaneSelector, PubSubMapLaneSelector, PubSubValueLaneSelector,
19-
ValueLaneSelector,
20-
};
16+
use crate::selector::{InvalidLaneSpec, InvalidLanes, MapLaneSelector, ValueLaneSelector};
17+
use std::collections::HashSet;
2118

2219
// Information about the lanes of the connector. These are computed from the configuration in the `on_start` handler
2320
// and stored in the lifecycle to be used to start the consumer stream.
24-
#[derive(Debug, Default, Clone)]
25-
pub struct Lanes {
26-
value_lanes: Vec<PubSubValueLaneSelector>,
27-
map_lanes: Vec<PubSubMapLaneSelector>,
21+
#[derive(Debug, Clone)]
22+
pub struct Lanes<S> {
23+
value_lanes: Vec<ValueLaneSelector<S>>,
24+
map_lanes: Vec<MapLaneSelector<S, S>>,
2825
}
2926

30-
impl Lanes {
31-
pub fn value_lanes(&self) -> &[PubSubValueLaneSelector] {
27+
impl<S> Default for Lanes<S> {
28+
fn default() -> Self {
29+
Lanes {
30+
value_lanes: vec![],
31+
map_lanes: vec![],
32+
}
33+
}
34+
}
35+
36+
impl<S> Lanes<S> {
37+
pub fn value_lanes(&self) -> &[ValueLaneSelector<S>] {
3238
&self.value_lanes
3339
}
3440

35-
pub fn map_lanes(&self) -> &[PubSubMapLaneSelector] {
41+
pub fn map_lanes(&self) -> &[MapLaneSelector<S, S>] {
3642
&self.map_lanes
3743
}
3844

39-
pub fn try_from_lane_specs(
40-
value_lanes: &[IngressValueLaneSpec],
41-
map_lanes: &[IngressMapLaneSpec],
42-
) -> Result<Lanes, InvalidLanes> {
45+
pub fn try_from_lane_specs<'a>(
46+
value_lanes: &'a [IngressValueLaneSpec],
47+
map_lanes: &'a [IngressMapLaneSpec],
48+
) -> Result<Lanes<S>, InvalidLanes>
49+
where
50+
ValueLaneSelector<S>: TryFrom<&'a IngressValueLaneSpec, Error = InvalidLaneSpec>,
51+
MapLaneSelector<S, S>: TryFrom<&'a IngressMapLaneSpec, Error = InvalidLaneSpec>,
52+
{
4353
let value_selectors = value_lanes
4454
.iter()
45-
.map(ValueLaneSelector::try_from)
55+
.map(ValueLaneSelector::<S>::try_from)
4656
.collect::<Result<Vec<_>, _>>()?;
4757
let map_selectors = map_lanes
4858
.iter()
49-
.map(MapLaneSelector::try_from)
59+
.map(MapLaneSelector::<S, S>::try_from)
5060
.collect::<Result<Vec<_>, _>>()?;
5161
check_selectors(&value_selectors, &map_selectors)?;
5262
Ok(Lanes {
@@ -55,3 +65,27 @@ impl Lanes {
5565
})
5666
}
5767
}
68+
69+
fn check_selectors<S>(
70+
value_selectors: &[ValueLaneSelector<S>],
71+
map_selectors: &[MapLaneSelector<S, S>],
72+
) -> Result<(), InvalidLanes> {
73+
let mut names = HashSet::new();
74+
for value_selector in value_selectors {
75+
let name = value_selector.name();
76+
if names.contains(name) {
77+
return Err(InvalidLanes::NameCollision(name.to_string()));
78+
} else {
79+
names.insert(name);
80+
}
81+
}
82+
for map_selector in map_selectors {
83+
let name = map_selector.name();
84+
if names.contains(name) {
85+
return Err(InvalidLanes::NameCollision(name.to_string()));
86+
} else {
87+
names.insert(name);
88+
}
89+
}
90+
Ok(())
91+
}

0 commit comments

Comments
 (0)