From 661f0e079aab716ad0396e6234cd5de1530cc3e7 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:19:15 +0000 Subject: [PATCH 01/10] Added deserializer container to AgentSpec trait. --- server/swimos_agent/src/agent_model/mod.rs | 32 +++++++++++---- .../tests/external_links/empty_agent.rs | 24 ++++++++--- .../src/agent_model/tests/fake_agent.rs | 24 ++++++++--- .../src/lane_model_derive/mod.rs | 21 +++++++--- server/swimos_connector/src/generic/mod.rs | 40 ++++++++++++++++--- server/swimos_connector/src/generic/tests.rs | 5 ++- swimos/tests/deriveagentlanemodel.rs | 17 ++++++-- 7 files changed, 131 insertions(+), 32 deletions(-) diff --git a/server/swimos_agent/src/agent_model/mod.rs b/server/swimos_agent/src/agent_model/mod.rs index b00b2e316..ece7b2b06 100644 --- a/server/swimos_agent/src/agent_model/mod.rs +++ b/server/swimos_agent/src/agent_model/mod.rs @@ -204,10 +204,14 @@ pub trait AgentDescription { /// although it will not provided any lifecycle events for the agent or its lanes. pub trait AgentSpec: AgentDescription + Sized + Send { /// The type of handler to run when a command is received for a value lane. - type ValCommandHandler: HandlerAction + Send + 'static; + type ValCommandHandler<'a>: HandlerAction + Send + 'static + where + Self: 'a; /// The type of handler to run when a command is received for a map lane. - type MapCommandHandler: HandlerAction + Send + 'static; + type MapCommandHandler<'a>: HandlerAction + Send + 'static + where + Self: 'a; /// The type of handler to run when a request is received to sync with a lane. type OnSyncHandler: HandlerAction + Send + 'static; @@ -215,6 +219,10 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// The type of the handler to run when an HTTP request is received for a lane. type HttpRequestHandler: HandlerAction + Send + 'static; + type Deserializers: Send + 'static; + + fn initializer_deserializers(&self) -> Self::Deserializers; + /// The names and flags of all items (lanes and stores) in the agent. fn item_specs() -> HashMap<&'static str, ItemSpec>; @@ -225,7 +233,12 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// # Arguments /// * `lane` - The name of the lane. /// * `body` - The content of the command. - fn on_value_command(&self, lane: &str, body: BytesMut) -> Option; + fn on_value_command<'a>( + &self, + deserializers: &'a mut Self::Deserializers, + lane: &str, + body: BytesMut, + ) -> Option>; /// Create an initializer that will consume the state of a value-like item, as reported by the runtime. /// @@ -249,11 +262,12 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// # Arguments /// * `lane` - The name of the lane. /// * `body` - The content of the command. - fn on_map_command( + fn on_map_command<'a>( &self, + deserializers: &'a mut Self::Deserializers, lane: &str, body: MapMessage, - ) -> Option; + ) -> Option>; /// Create a handler that will update the state of an agent when a request is made to /// sync with a lane. There will be no handler if the lane does not exist. @@ -1217,6 +1231,7 @@ where let add_commander = |address: Address| cmd_ids.borrow_mut().get_request(&address); let add_link = (add_downlink, add_commander); let add_lane = NoDynLanes; + let mut deserializers = item_model.initializer_deserializers(); // Calling run_handler is very verbose so is pulled out into this macro to make the code easier to read. macro_rules! exec_handler { @@ -1399,7 +1414,8 @@ where match request { LaneRequest::Command(body) => { trace!(name = %name, "Received a command for a value-like lane."); - if let Some(handler) = item_model.on_value_command(name.as_str(), body) + if let Some(handler) = + item_model.on_value_command(&mut deserializers, name.as_str(), body) { let result = run_handler( &mut ActionContext::new( @@ -1453,7 +1469,9 @@ where match request { LaneRequest::Command(body) => { trace!(name = %name, "Received a command for a map-like lane."); - if let Some(handler) = item_model.on_map_command(name.as_str(), body) { + if let Some(handler) = + item_model.on_map_command(&mut deserializers, name.as_str(), body) + { let result = run_handler( &mut ActionContext::new( &suspended, diff --git a/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs b/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs index b94899c2a..427f5cfdd 100644 --- a/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs +++ b/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs @@ -37,19 +37,32 @@ impl AgentDescription for EmptyAgent { } impl AgentSpec for EmptyAgent { - type ValCommandHandler = UnitHandler; + type ValCommandHandler<'a> = UnitHandler + where + Self: 'a; - type MapCommandHandler = UnitHandler; + type MapCommandHandler<'a> = UnitHandler + where + Self: 'a; type OnSyncHandler = UnitHandler; type HttpRequestHandler = UnitHandler; + type Deserializers = (); + + fn initializer_deserializers(&self) -> Self::Deserializers {} + fn item_specs() -> HashMap<&'static str, ItemSpec> { HashMap::new() } - fn on_value_command(&self, _lane: &str, _body: BytesMut) -> Option { + fn on_value_command( + &self, + _: &mut (), + _lane: &str, + _body: BytesMut, + ) -> Option> { None } @@ -67,11 +80,12 @@ impl AgentSpec for EmptyAgent { None } - fn on_map_command( + fn on_map_command<'a>( &self, + _: &'a mut (), _lane: &str, _body: MapMessage, - ) -> Option { + ) -> Option> { None } diff --git a/server/swimos_agent/src/agent_model/tests/fake_agent.rs b/server/swimos_agent/src/agent_model/tests/fake_agent.rs index bb9b95107..1d5eb2cd1 100644 --- a/server/swimos_agent/src/agent_model/tests/fake_agent.rs +++ b/server/swimos_agent/src/agent_model/tests/fake_agent.rs @@ -168,9 +168,13 @@ impl AgentDescription for TestAgent { } impl AgentSpec for TestAgent { - type ValCommandHandler = TestHandler; + type ValCommandHandler<'a> = TestHandler + where + Self: 'a; - type MapCommandHandler = TestHandler; + type MapCommandHandler<'a> = TestHandler + where + Self: 'a; type OnSyncHandler = TestHandler; @@ -219,7 +223,12 @@ impl AgentSpec for TestAgent { lanes } - fn on_value_command(&self, lane: &str, body: BytesMut) -> Option { + fn on_value_command<'a>( + &self, + _: &'a mut (), + lane: &str, + body: BytesMut, + ) -> Option> { match lane { VAL_LANE => Some( TestEvent::Value { @@ -264,11 +273,12 @@ impl AgentSpec for TestAgent { None } - fn on_map_command( + fn on_map_command<'a>( &self, + _: &'a mut (), lane: &str, body: MapMessage, - ) -> Option { + ) -> Option> { match lane { MAP_LANE => Some( TestEvent::Map { @@ -426,6 +436,10 @@ impl AgentSpec for TestAgent { Err(DynamicRegistrationError::DuplicateName(name.to_string())) } } + + type Deserializers = (); + + fn initializer_deserializers(&self) -> Self::Deserializers {} } impl HandlerAction for TestHandler { diff --git a/server/swimos_agent_derive/src/lane_model_derive/mod.rs b/server/swimos_agent_derive/src/lane_model_derive/mod.rs index d29ef8412..e27c764dc 100644 --- a/server/swimos_agent_derive/src/lane_model_derive/mod.rs +++ b/server/swimos_agent_derive/src/lane_model_derive/mod.rs @@ -214,32 +214,43 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { #[automatically_derived] impl #root::agent_model::AgentSpec for #agent_type { - type ValCommandHandler = #value_handler; + type ValCommandHandler<'a> = #value_handler + where + Self: 'a; - type MapCommandHandler = #map_handler; + type MapCommandHandler<'a> = #map_handler + where + Self: 'a; type OnSyncHandler = #sync_handler; type HttpRequestHandler = #http_handler; + type Deserializers = (); + + fn initializer_deserializers(&self) -> Self::Deserializers { + + } + fn item_specs() -> ::std::collections::HashMap<&'static str, #root::agent_model::ItemSpec> { let mut lanes = ::std::collections::HashMap::new(); #(#item_specs;)* lanes } - fn on_value_command(&self, lane: &str, body: #root::reexport::bytes::BytesMut) -> ::core::option::Option { + fn on_value_command<'a>(&self, deserializers: &'a mut Self::Deserializers, lane: &str, body: #root::reexport::bytes::BytesMut) -> ::core::option::Option> { match lane { #(#value_match_blocks,)* _ => ::core::option::Option::None, } } - fn on_map_command( + fn on_map_command<'a>( &self, + deserializers: &'a mut Self::Deserializers, lane: &str, body: #root::model::MapMessage<#root::reexport::bytes::BytesMut, #root::reexport::bytes::BytesMut>, - ) -> ::core::option::Option { + ) -> ::core::option::Option> { match lane { #(#map_match_blocks,)* _ => ::core::option::Option::None, diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index 685388cd5..c90de5de1 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -43,7 +43,9 @@ use swimos_api::{ agent::{HttpLaneRequest, WarpLaneKind}, error::DynamicRegistrationError, }; +use swimos_form::read::RecognizerReadable; use swimos_model::Value; +use swimos_recon::parser::RecognizerDecoder; use tracing::{error, info}; type GenericValueLane = ValueLane; @@ -174,20 +176,47 @@ impl AgentDescription for ConnectorAgent { } } +pub struct ConnectorAgentDeserializers { + value: RecognizerDecoder<::Rec>, +} + +impl Default for ConnectorAgentDeserializers { + fn default() -> Self { + Self { + value: RecognizerDecoder::new(Value::make_recognizer()), + } + } +} + impl AgentSpec for ConnectorAgent { - type ValCommandHandler = ValueHandler; + type ValCommandHandler<'a> = ValueHandler + where + Self: 'a; - type MapCommandHandler = MapHandler; + type MapCommandHandler<'a> = MapHandler + where + Self: 'a; type OnSyncHandler = Coprod!(ValueSync, MapSync); type HttpRequestHandler = UnitHandler; + type Deserializers = ConnectorAgentDeserializers; + + fn initializer_deserializers(&self) -> Self::Deserializers { + ConnectorAgentDeserializers::default() + } + fn item_specs() -> HashMap<&'static str, ItemSpec> { HashMap::new() } - fn on_value_command(&self, lane: &str, body: BytesMut) -> Option { + fn on_value_command<'a>( + &self, + deserializers: &'a mut ConnectorAgentDeserializers, + lane: &str, + body: BytesMut, + ) -> Option> { if self.value_lanes.borrow().contains_key(lane) { Some(decode_and_select_set( body, @@ -212,11 +241,12 @@ impl AgentSpec for ConnectorAgent { None } - fn on_map_command( + fn on_map_command<'a>( &self, + deserializers: &'a mut ConnectorAgentDeserializers, lane: &str, body: MapMessage, - ) -> Option { + ) -> Option> { if self.map_lanes.borrow().contains_key(lane) { Some(decode_and_select_apply( body, diff --git a/server/swimos_connector/src/generic/tests.rs b/server/swimos_connector/src/generic/tests.rs index eeef65dc8..c413ed4a5 100644 --- a/server/swimos_connector/src/generic/tests.rs +++ b/server/swimos_connector/src/generic/tests.rs @@ -285,9 +285,10 @@ fn with_map_lane(agent: &ConnectorAgent, f: impl FnOnce(&GenericMapLane)) { #[test] fn value_lane_command() { let agent = ConnectorAgent::default(); + let mut deserializers = agent.initializer_deserializers(); let (val_id, _) = init(&agent); let handler = agent - .on_value_command("value_lane", to_buffer(Value::from(45))) + .on_value_command(&mut deserializers, "value_lane", to_buffer(Value::from(45))) .expect("No handler."); let ids = run_handler(&agent, handler); assert_eq!(ids, [val_id].into_iter().collect()); @@ -331,9 +332,11 @@ fn value_lane_sync() { #[test] fn map_lane_command() { let agent = ConnectorAgent::default(); + let mut deserializers = agent.initializer_deserializers(); let (_, map_id) = init(&agent); let handler = agent .on_map_command( + &mut deserializers, "map_lane", MapMessage::Update { key: to_buffer(Value::text("a")), diff --git a/swimos/tests/deriveagentlanemodel.rs b/swimos/tests/deriveagentlanemodel.rs index ce72d55a4..b38f7c759 100644 --- a/swimos/tests/deriveagentlanemodel.rs +++ b/swimos/tests/deriveagentlanemodel.rs @@ -121,6 +121,7 @@ where { let agent = A::default(); let expected = specs.into_iter().collect::>(); + let mut deserializers = agent.initializer_deserializers(); assert_eq!(A::item_specs(), expected); @@ -130,17 +131,25 @@ where match descriptor { ItemDescriptor::WarpLane { kind, .. } => { if kind.map_like() { - assert!(agent.on_map_command(name, MapMessage::Clear).is_some()); + assert!(agent + .on_map_command(&mut deserializers, name, MapMessage::Clear) + .is_some()); assert!(agent.on_sync(name, SYNC_ID).is_some()); } else { - assert!(agent.on_value_command(name, get_i32_buffer(4)).is_some()); + assert!(agent + .on_value_command(&mut deserializers, name, get_i32_buffer(4)) + .is_some()); assert!(agent.on_sync(name, SYNC_ID).is_some()); } } ItemDescriptor::Store { .. } => { - assert!(agent.on_map_command(name, MapMessage::Clear).is_none()); + assert!(agent + .on_map_command(&mut deserializers, name, MapMessage::Clear) + .is_none()); assert!(agent.on_sync(name, SYNC_ID).is_none()); - assert!(agent.on_value_command(name, get_i32_buffer(4)).is_none()); + assert!(agent + .on_value_command(&mut deserializers, name, get_i32_buffer(4)) + .is_none()); assert!(agent.on_sync(name, SYNC_ID).is_none()); } ItemDescriptor::Http => { From b2b369e0888e921e4a07db3cabf280dfb56f69f0 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Tue, 26 Nov 2024 15:25:09 +0000 Subject: [PATCH 02/10] Added borrowing decode handlers. --- server/swimos_agent/src/event_handler/mod.rs | 37 ++++++++++ server/swimos_agent/src/lanes/map/mod.rs | 76 +++++++++++++++++++- 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/server/swimos_agent/src/event_handler/mod.rs b/server/swimos_agent/src/event_handler/mod.rs index 355443ed4..d5fe88c15 100644 --- a/server/swimos_agent/src/event_handler/mod.rs +++ b/server/swimos_agent/src/event_handler/mod.rs @@ -1340,6 +1340,43 @@ impl HandlerAction for Decode { } } +pub struct DecodeRef<'a, T: RecognizerReadable> { + decoder: Option<&'a mut RecognizerDecoder>, + buffer: &'a mut BytesMut, +} + +impl<'a, T: RecognizerReadable> DecodeRef<'a, T> { + pub fn new(decoder: &'a mut RecognizerDecoder, buffer: &'a mut BytesMut) -> Self { + DecodeRef { + decoder: Some(decoder), + buffer, + } + } +} + +impl<'a, T: RecognizerReadable, Context> HandlerAction for DecodeRef<'a, T> { + type Completion = T; + + fn step( + &mut self, + _action_context: &mut ActionContext, + _meta: AgentMetadata, + _context: &Context, + ) -> StepResult { + let DecodeRef { decoder, buffer } = self; + if let Some(decoder) = decoder.take() { + decoder.reset(); + match decoder.decode_eof(buffer) { + Ok(Some(value)) => StepResult::done(value), + Ok(_) => StepResult::Fail(EventHandlerError::IncompleteCommand), + Err(e) => StepResult::Fail(EventHandlerError::BadCommand(e)), + } + } else { + StepResult::after_done() + } + } +} + impl HandlerAction for Either where H1: HandlerAction, diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index 195653b27..fe5c9b329 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -630,9 +630,15 @@ impl DecodeMapMessage { } } -//TODO: The decoders should be shifted elsewhere so they don't need constantly recreating. -fn try_decode(mut buffer: BytesMut) -> Result { +fn try_decode(buffer: BytesMut) -> Result { let mut decoder = RecognizerDecoder::new(T::make_recognizer()); + try_with_decoder(&mut decoder, buffer) +} + +fn try_with_decoder( + decoder: &mut RecognizerDecoder, + mut buffer: BytesMut, +) -> Result { match decoder.decode_eof(&mut buffer) { Ok(Some(value)) => Ok(value), Ok(_) => Err(EventHandlerError::IncompleteCommand), @@ -705,6 +711,72 @@ where } } +pub struct DecodeMapMessageRef<'a, K: RecognizerReadable, V: RecognizerReadable> { + key_decoder: &'a mut RecognizerDecoder, + value_decoder: &'a mut RecognizerDecoder, + message: Option>, +} + +impl<'a, K: RecognizerReadable, V: RecognizerReadable> DecodeMapMessageRef<'a, K, V> { + pub fn new( + key_decoder: &'a mut RecognizerDecoder, + value_decoder: &'a mut RecognizerDecoder, + message: MapMessage, + ) -> Self { + DecodeMapMessageRef { + key_decoder, + value_decoder, + message: Some(message), + } + } +} + +impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction + for DecodeMapMessageRef<'a, K, V> +{ + type Completion = MapMessage; + + fn step( + &mut self, + _action_context: &mut ActionContext, + _meta: AgentMetadata, + _context: &Context, + ) -> StepResult { + let DecodeMapMessageRef { + key_decoder, + value_decoder, + message, + } = self; + if let Some(message) = message.take() { + match message { + MapMessage::Update { key, value } => { + key_decoder.reset(); + value_decoder.reset(); + match try_with_decoder::(key_decoder, key).and_then(|k| { + try_with_decoder::(value_decoder, value) + .map(|v| (MapMessage::Update { key: k, value: v })) + }) { + Ok(msg) => StepResult::done(msg), + Err(e) => StepResult::Fail(e), + } + } + MapMessage::Remove { key } => { + key_decoder.reset(); + match try_with_decoder::(key_decoder, key) { + Ok(k) => StepResult::done(MapMessage::Remove { key: k }), + Err(e) => StepResult::Fail(e), + } + } + MapMessage::Clear => StepResult::done(MapMessage::Clear), + MapMessage::Take(n) => StepResult::done(MapMessage::Take(n)), + MapMessage::Drop(n) => StepResult::done(MapMessage::Drop(n)), + } + } else { + StepResult::after_done() + } + } +} + pub type DecodeAndApply> = AndThen, MapLaneHandler, ProjTransform>>; From af6359cf3012377e4497776eb8aaa8c7755a7d93 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 09:52:56 +0000 Subject: [PATCH 03/10] Adapted ConnectorAgent to reuse decoders. --- server/swimos_agent/src/agent_model/mod.rs | 4 +- server/swimos_agent/src/event_handler/mod.rs | 4 +- server/swimos_agent/src/lanes/map/mod.rs | 204 ++++++++++++++----- server/swimos_agent/src/lanes/value/mod.rs | 108 +++++++++- server/swimos_connector/src/generic/mod.rs | 38 ++-- 5 files changed, 285 insertions(+), 73 deletions(-) diff --git a/server/swimos_agent/src/agent_model/mod.rs b/server/swimos_agent/src/agent_model/mod.rs index ece7b2b06..6af03054d 100644 --- a/server/swimos_agent/src/agent_model/mod.rs +++ b/server/swimos_agent/src/agent_model/mod.rs @@ -204,12 +204,12 @@ pub trait AgentDescription { /// although it will not provided any lifecycle events for the agent or its lanes. pub trait AgentSpec: AgentDescription + Sized + Send { /// The type of handler to run when a command is received for a value lane. - type ValCommandHandler<'a>: HandlerAction + Send + 'static + type ValCommandHandler<'a>: HandlerAction + Send + 'a where Self: 'a; /// The type of handler to run when a command is received for a map lane. - type MapCommandHandler<'a>: HandlerAction + Send + 'static + type MapCommandHandler<'a>: HandlerAction + Send + 'a where Self: 'a; diff --git a/server/swimos_agent/src/event_handler/mod.rs b/server/swimos_agent/src/event_handler/mod.rs index d5fe88c15..fb28918b8 100644 --- a/server/swimos_agent/src/event_handler/mod.rs +++ b/server/swimos_agent/src/event_handler/mod.rs @@ -1342,11 +1342,11 @@ impl HandlerAction for Decode { pub struct DecodeRef<'a, T: RecognizerReadable> { decoder: Option<&'a mut RecognizerDecoder>, - buffer: &'a mut BytesMut, + buffer: BytesMut, } impl<'a, T: RecognizerReadable> DecodeRef<'a, T> { - pub fn new(decoder: &'a mut RecognizerDecoder, buffer: &'a mut BytesMut) -> Self { + pub fn new(decoder: &'a mut RecognizerDecoder, buffer: BytesMut) -> Self { DecodeRef { decoder: Some(decoder), buffer, diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index fe5c9b329..8bc8d7d94 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -777,6 +777,65 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction { + decoder: &'a mut RecognizerDecoder, + message: Option>, +} + +impl<'a, T: RecognizerReadable> DecodeMapMessageSharedRef<'a, T> { + pub fn new( + decoder: &'a mut RecognizerDecoder, + message: MapMessage, + ) -> Self { + DecodeMapMessageSharedRef { + decoder, + message: Some(message), + } + } +} + +impl<'a, T: RecognizerReadable, Context> HandlerAction + for DecodeMapMessageSharedRef<'a, T> +{ + type Completion = MapMessage; + + fn step( + &mut self, + _action_context: &mut ActionContext, + _meta: AgentMetadata, + _context: &Context, + ) -> StepResult { + let DecodeMapMessageSharedRef { decoder, message } = self; + if let Some(message) = message.take() { + match message { + MapMessage::Update { key, value } => { + decoder.reset(); + match try_with_decoder::(decoder, key).and_then(|k| { + decoder.reset(); + try_with_decoder::(decoder, value) + .map(|v| (MapMessage::Update { key: k, value: v })) + }) { + Ok(msg) => StepResult::done(msg), + Err(e) => StepResult::Fail(e), + } + } + MapMessage::Remove { key } => { + decoder.reset(); + match try_with_decoder::(decoder, key) { + Ok(k) => StepResult::done(MapMessage::Remove { key: k }), + Err(e) => StepResult::Fail(e), + } + } + MapMessage::Clear => StepResult::done(MapMessage::Clear), + MapMessage::Take(n) => StepResult::done(MapMessage::Take(n)), + MapMessage::Drop(n) => StepResult::done(MapMessage::Drop(n)), + } + } else { + StepResult::after_done() + } + } +} + pub type DecodeAndApply> = AndThen, MapLaneHandler, ProjTransform>>; @@ -1213,10 +1272,65 @@ where } } +pub type DecodeAndSelectApply = + DecodeWithAndSelectApply, C, K, V, F>; +pub type DecodeRefAndSelectApply<'a, C, K, V, F> = + DecodeWithAndSelectApply, C, K, V, F>; +pub type DecodeSharedRefAndSelectApply<'a, C, T, F> = + DecodeWithAndSelectApply, C, T, T, F>; + +/// Create an event handler that will decode an incoming map message and apply the value into a map lane. +pub fn decode_and_select_apply( + message: MapMessage, + projection: F, +) -> DecodeAndSelectApply +where + K: Clone + Eq + Hash + RecognizerReadable, + V: RecognizerReadable, + F: SelectorFn>, + M: MapOps, +{ + let decode: DecodeMapMessage = DecodeMapMessage::new(message); + DecodeAndSelectApply::Decoding(decode, projection) +} + +/// Create an event handler that will decode an incoming map message and apply the value into a map lane. +pub fn decode_ref_and_select_apply<'a, C, K, V, M, F>( + key_decoder: &'a mut RecognizerDecoder, + value_decoder: &'a mut RecognizerDecoder, + message: MapMessage, + projection: F, +) -> DecodeRefAndSelectApply<'a, C, K, V, F> +where + K: Clone + Eq + Hash + RecognizerReadable, + V: RecognizerReadable, + F: SelectorFn>, + M: MapOps, +{ + let decode: DecodeMapMessageRef = + DecodeMapMessageRef::new(key_decoder, value_decoder, message); + DecodeRefAndSelectApply::Decoding(decode, projection) +} + +/// Create an event handler that will decode an incoming map message and apply the value into a map lane. +pub fn decode_shared_ref_and_select_apply( + decoder: &mut RecognizerDecoder, + message: MapMessage, + projection: F, +) -> DecodeSharedRefAndSelectApply<'_, C, T, F> +where + T: Clone + Eq + Hash + RecognizerReadable, + F: SelectorFn>, + M: MapOps, +{ + let decode: DecodeMapMessageSharedRef = DecodeMapMessageSharedRef::new(decoder, message); + DecodeSharedRefAndSelectApply::Decoding(decode, projection) +} + #[derive(Default)] #[doc(hidden)] -pub enum DecodeAndSelectApply { - Decoding(DecodeMapMessage, F), +pub enum DecodeWithAndSelectApply { + Decoding(H, F), Updating(MapLaneSelectUpdate), Removing(MapLaneSelectRemove), Clearing(MapLaneSelectClear), @@ -1225,8 +1339,9 @@ pub enum DecodeAndSelectApply { Done, } -impl HandlerAction for DecodeAndSelectApply +impl HandlerAction for DecodeWithAndSelectApply where + H: HandlerAction>, C: AgentDescription, K: Form + Clone + Eq + Hash, V: RecognizerReadable, @@ -1242,10 +1357,10 @@ where context: &C, ) -> StepResult { match std::mem::take(self) { - DecodeAndSelectApply::Decoding(mut decoding, selector) => { + DecodeWithAndSelectApply::Decoding(mut decoding, selector) => { match decoding.step(action_context, meta, context) { StepResult::Continue { modified_item } => { - *self = DecodeAndSelectApply::Decoding(decoding, selector); + *self = DecodeWithAndSelectApply::Decoding(decoding, selector); StepResult::Continue { modified_item } } StepResult::Fail(err) => StepResult::Fail(err), @@ -1255,27 +1370,27 @@ where } => { match result { MapMessage::Update { key, value } => { - *self = DecodeAndSelectApply::Updating(MapLaneSelectUpdate::new( - selector, key, value, - )); + *self = DecodeWithAndSelectApply::Updating( + MapLaneSelectUpdate::new(selector, key, value), + ); } MapMessage::Remove { key } => { - *self = DecodeAndSelectApply::Removing(MapLaneSelectRemove::new( - selector, key, - )); + *self = DecodeWithAndSelectApply::Removing( + MapLaneSelectRemove::new(selector, key), + ); } MapMessage::Clear => { - *self = DecodeAndSelectApply::Clearing(MapLaneSelectClear::new( - selector, - )); + *self = DecodeWithAndSelectApply::Clearing( + MapLaneSelectClear::new(selector), + ); } MapMessage::Drop(n) => { - *self = DecodeAndSelectApply::DroppingOrTaking( + *self = DecodeWithAndSelectApply::DroppingOrTaking( MapLaneSelectDropOrTake::new(selector, DropOrTake::Drop, n), ); } MapMessage::Take(n) => { - *self = DecodeAndSelectApply::DroppingOrTaking( + *self = DecodeWithAndSelectApply::DroppingOrTaking( MapLaneSelectDropOrTake::new(selector, DropOrTake::Take, n), ); } @@ -1284,89 +1399,74 @@ where } } } - DecodeAndSelectApply::Updating(mut selector) => { + DecodeWithAndSelectApply::Updating(mut selector) => { let result = selector.step(action_context, meta, context); if result.is_cont() { - *self = DecodeAndSelectApply::Updating(selector); + *self = DecodeWithAndSelectApply::Updating(selector); } result } - DecodeAndSelectApply::Removing(mut selector) => { + DecodeWithAndSelectApply::Removing(mut selector) => { let result = selector.step(action_context, meta, context); if result.is_cont() { - *self = DecodeAndSelectApply::Removing(selector); + *self = DecodeWithAndSelectApply::Removing(selector); } result } - DecodeAndSelectApply::Clearing(mut selector) => { + DecodeWithAndSelectApply::Clearing(mut selector) => { let result = selector.step(action_context, meta, context); if result.is_cont() { - *self = DecodeAndSelectApply::Clearing(selector); + *self = DecodeWithAndSelectApply::Clearing(selector); } result } - DecodeAndSelectApply::DroppingOrTaking(mut selector) => { + DecodeWithAndSelectApply::DroppingOrTaking(mut selector) => { let result = selector.step(action_context, meta, context); if result.is_cont() { - *self = DecodeAndSelectApply::DroppingOrTaking(selector); + *self = DecodeWithAndSelectApply::DroppingOrTaking(selector); } result } - DecodeAndSelectApply::Done => StepResult::after_done(), + DecodeWithAndSelectApply::Done => StepResult::after_done(), } } fn describe(&self, context: &C, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { match self { - DecodeAndSelectApply::Decoding(decode_map_message, proj) => f - .debug_struct("DecodeAndSelectApply") + DecodeWithAndSelectApply::Decoding(decode_map_message, proj) => f + .debug_struct("DecodeWithAndSelectApply") .field("state", &"Decoding") .field("decoder", &Described::new(context, decode_map_message)) .field("lane_name", &proj.name()) .finish(), - DecodeAndSelectApply::Updating(selector) => f - .debug_struct("DecodeAndSelectApply") + DecodeWithAndSelectApply::Updating(selector) => f + .debug_struct("DecodeWithAndSelectApply") .field("state", &"Updating") .field("selector", &Described::new(context, selector)) .finish(), - DecodeAndSelectApply::Removing(selector) => f - .debug_struct("DecodeAndSelectApply") + DecodeWithAndSelectApply::Removing(selector) => f + .debug_struct("DecodeWithAndSelectApply") .field("state", &"Removing") .field("selector", &Described::new(context, selector)) .finish(), - DecodeAndSelectApply::Clearing(selector) => f - .debug_struct("DecodeAndSelectApply") + DecodeWithAndSelectApply::Clearing(selector) => f + .debug_struct("DecodeWithAndSelectApply") .field("state", &"Clearing") .field("selector", &Described::new(context, selector)) .finish(), - DecodeAndSelectApply::DroppingOrTaking(selector) => f - .debug_struct("DecodeAndSelectApply") + DecodeWithAndSelectApply::DroppingOrTaking(selector) => f + .debug_struct("DecodeWithAndSelectApply") .field("state", &"DroppingOrTaking") .field("selector", &Described::new(context, selector)) .finish(), - DecodeAndSelectApply::Done => f - .debug_tuple("DecodeAndSelectSet") + DecodeWithAndSelectApply::Done => f + .debug_tuple("DecodeWithAndSelectApply") .field(&"<>") .finish(), } } } -/// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_and_select_apply( - message: MapMessage, - projection: F, -) -> DecodeAndSelectApply -where - K: Clone + Eq + Hash + RecognizerReadable, - V: RecognizerReadable, - F: SelectorFn>, - M: MapOps, -{ - let decode: DecodeMapMessage = DecodeMapMessage::new(message); - DecodeAndSelectApply::Decoding(decode, projection) -} - type SelectType = fn(&C) -> (&K, &V); /// An [event handler](crate::event_handler::EventHandler) that will request a sync from a map lane diff --git a/server/swimos_agent/src/lanes/value/mod.rs b/server/swimos_agent/src/lanes/value/mod.rs index 26c8aae74..e936260f3 100644 --- a/server/swimos_agent/src/lanes/value/mod.rs +++ b/server/swimos_agent/src/lanes/value/mod.rs @@ -30,13 +30,14 @@ use bytes::BytesMut; use static_assertions::assert_impl_all; use swimos_agent_protocol::{encoding::lane::ValueLaneResponseEncoder, LaneResponse}; use swimos_form::{read::RecognizerReadable, write::StructuralWritable}; +use swimos_recon::parser::RecognizerDecoder; use tokio_util::codec::Encoder; use uuid::Uuid; use crate::{ agent_model::{AgentDescription, WriteResult}, event_handler::{ - ActionContext, AndThen, Decode, Described, EventHandlerError, HandlerAction, + ActionContext, AndThen, Decode, DecodeRef, Described, EventHandlerError, HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult, }, item::{AgentItem, MutableValueLikeItem, ValueItem, ValueLikeItem}, @@ -390,6 +391,9 @@ impl HandlerTrans for ProjTransform> { pub type DecodeAndSet = AndThen, ValueLaneSet, ProjTransform>>; +pub type DecodeRefAndSet<'a, C, T> = + AndThen, ValueLaneSet, ProjTransform>>; + /// Create an event handler that will decode an incoming command and set the value into a value lane. pub fn decode_and_set( buffer: BytesMut, @@ -399,6 +403,16 @@ pub fn decode_and_set( decode.and_then(ProjTransform::new(projection)) } +/// Create an event handler that will decode an incoming command and set the value into a value lane. +pub fn decode_ref_and_set<'a, C: AgentDescription, T: RecognizerReadable>( + decoder: &'a mut RecognizerDecoder, + buffer: BytesMut, + projection: fn(&C) -> &ValueLane, +) -> DecodeRefAndSet<'a, C, T> { + let decode: DecodeRef<'a, T> = DecodeRef::new(decoder, buffer); + decode.and_then(ProjTransform::new(projection)) +} + impl ValueLikeItem for ValueLane where T: Clone + Send + 'static, @@ -611,6 +625,84 @@ where } } +#[derive(Default)] +#[doc(hidden)] +pub enum DecodeRefAndSelectSet<'a, C, T: RecognizerReadable, F> { + Decoding(DecodeRef<'a, T>, F), + Selecting(ValueLaneSelectSet), + #[default] + Done, +} + +impl<'a, C, T, F> HandlerAction for DecodeRefAndSelectSet<'a, C, T, F> +where + C: AgentDescription, + T: RecognizerReadable, + F: SelectorFn>, +{ + type Completion = (); + + fn step( + &mut self, + action_context: &mut ActionContext, + meta: AgentMetadata, + context: &C, + ) -> StepResult { + match std::mem::take(self) { + DecodeRefAndSelectSet::Decoding(mut decoding, selector) => { + match decoding.step(action_context, meta, context) { + StepResult::Continue { modified_item } => { + *self = DecodeRefAndSelectSet::Decoding(decoding, selector); + StepResult::Continue { modified_item } + } + StepResult::Fail(err) => StepResult::Fail(err), + StepResult::Complete { + modified_item, + result, + } => { + *self = DecodeRefAndSelectSet::Selecting(ValueLaneSelectSet::new( + selector, result, + )); + StepResult::Continue { modified_item } + } + } + } + DecodeRefAndSelectSet::Selecting(mut selector) => { + let result = selector.step(action_context, meta, context); + if !result.is_cont() { + *self = DecodeRefAndSelectSet::Done; + } + result + } + DecodeRefAndSelectSet::Done => StepResult::after_done(), + } + } + + fn describe( + &self, + context: &C, + f: &mut std::fmt::Formatter<'_>, + ) -> Result<(), std::fmt::Error> { + match self { + DecodeRefAndSelectSet::Decoding(decode, proj) => f + .debug_struct("DecodeRefAndSelectSet") + .field("state", &"Decoding") + .field("decoder", &Described::new(context, decode)) + .field("lane_name", &proj.name()) + .finish(), + DecodeRefAndSelectSet::Selecting(selector) => f + .debug_struct("DecodeRefAndSelectSet") + .field("state", &"Selecting") + .field("selector", &Described::new(context, selector)) + .finish(), + DecodeRefAndSelectSet::Done => f + .debug_tuple("DecodeRefAndSelectSet") + .field(&"<>") + .finish(), + } + } +} + /// Create an event handler that will decode an incoming command and set the value into a value lane. pub fn decode_and_select_set( buffer: BytesMut, @@ -624,6 +716,20 @@ where DecodeAndSelectSet::Decoding(decode, projection) } +/// Create an event handler that will decode an incoming command and set the value into a value lane. +pub fn decode_ref_and_select_set( + decoder: &mut RecognizerDecoder, + buffer: BytesMut, + projection: F, +) -> DecodeRefAndSelectSet<'_, C, T, F> +where + T: RecognizerReadable, + F: SelectorFn>, +{ + let decode: DecodeRef = DecodeRef::new(decoder, buffer); + DecodeRefAndSelectSet::Decoding(decode, projection) +} + /// An [event handler](crate::event_handler::EventHandler)`] that will request a sync from the lane. pub struct ValueLaneSelectSync { _type: PhantomData &T>, diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index c90de5de1..2967bb0da 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -32,8 +32,10 @@ use swimos_agent::{ }, event_handler::{ActionContext, HandlerAction, StepResult, UnitHandler}, lanes::{ - map::{decode_and_select_apply, DecodeAndSelectApply, MapLaneSelectSync}, - value::{decode_and_select_set, DecodeAndSelectSet, ValueLaneSelectSync}, + map::{ + decode_shared_ref_and_select_apply, DecodeSharedRefAndSelectApply, MapLaneSelectSync, + }, + value::{decode_ref_and_select_set, DecodeRefAndSelectSet, ValueLaneSelectSync}, LaneItem, MapLane, Selector, SelectorFn, ValueLane, }, AgentItem, AgentMetadata, @@ -64,8 +66,8 @@ pub struct ConnectorAgent { flags: Cell, } -type ValueHandler = DecodeAndSelectSet; -type MapHandler = DecodeAndSelectApply; +type ValueHandler<'a> = DecodeRefAndSelectSet<'a, ConnectorAgent, Value, ValueLaneSelectorFn>; +type MapHandler<'a> = DecodeSharedRefAndSelectApply<'a, ConnectorAgent, Value, MapLaneSelectorFn>; type ValueSync = ValueLaneSelectSync; type MapSync = MapLaneSelectSync; @@ -176,24 +178,24 @@ impl AgentDescription for ConnectorAgent { } } -pub struct ConnectorAgentDeserializers { - value: RecognizerDecoder<::Rec>, +pub struct GenericDeserializer { + value_deser: RecognizerDecoder<::Rec>, } -impl Default for ConnectorAgentDeserializers { +impl Default for GenericDeserializer { fn default() -> Self { Self { - value: RecognizerDecoder::new(Value::make_recognizer()), + value_deser: RecognizerDecoder::new(Value::make_recognizer()), } } } impl AgentSpec for ConnectorAgent { - type ValCommandHandler<'a> = ValueHandler + type ValCommandHandler<'a> = ValueHandler<'a> where Self: 'a; - type MapCommandHandler<'a> = MapHandler + type MapCommandHandler<'a> = MapHandler<'a> where Self: 'a; @@ -201,10 +203,10 @@ impl AgentSpec for ConnectorAgent { type HttpRequestHandler = UnitHandler; - type Deserializers = ConnectorAgentDeserializers; + type Deserializers = GenericDeserializer; fn initializer_deserializers(&self) -> Self::Deserializers { - ConnectorAgentDeserializers::default() + GenericDeserializer::default() } fn item_specs() -> HashMap<&'static str, ItemSpec> { @@ -213,12 +215,14 @@ impl AgentSpec for ConnectorAgent { fn on_value_command<'a>( &self, - deserializers: &'a mut ConnectorAgentDeserializers, + deserializers: &'a mut GenericDeserializer, lane: &str, body: BytesMut, ) -> Option> { + let GenericDeserializer { value_deser } = deserializers; if self.value_lanes.borrow().contains_key(lane) { - Some(decode_and_select_set( + Some(decode_ref_and_select_set( + value_deser, body, ValueLaneSelectorFn::new(lane.to_string()), )) @@ -243,12 +247,14 @@ impl AgentSpec for ConnectorAgent { fn on_map_command<'a>( &self, - deserializers: &'a mut ConnectorAgentDeserializers, + deserializers: &'a mut GenericDeserializer, lane: &str, body: MapMessage, ) -> Option> { + let GenericDeserializer { value_deser } = deserializers; if self.map_lanes.borrow().contains_key(lane) { - Some(decode_and_select_apply( + Some(decode_shared_ref_and_select_apply( + value_deser, body, MapLaneSelectorFn::new(lane.to_string()), )) From 48726a10fa15ab2db2bbc9291aafa232109041a9 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:30:02 +0000 Subject: [PATCH 04/10] Added deserializers to agent derive macro. --- .../src/recon_parser/async_parser/mod.rs | 1 + server/swimos_agent/src/event_handler/mod.rs | 5 +- server/swimos_agent/src/lanes/map/mod.rs | 27 +++++------ server/swimos_agent/src/lanes/value/mod.rs | 6 +-- server/swimos_agent/src/lib.rs | 46 +++++++++++++++++++ .../src/lane_model_derive/mod.rs | 43 ++++++++++++++++- server/swimos_connector/src/generic/mod.rs | 15 ++---- swimos/src/agent.rs | 1 + 8 files changed, 110 insertions(+), 34 deletions(-) diff --git a/api/formats/swimos_recon/src/recon_parser/async_parser/mod.rs b/api/formats/swimos_recon/src/recon_parser/async_parser/mod.rs index 015e708b9..aae92cf4d 100644 --- a/api/formats/swimos_recon/src/recon_parser/async_parser/mod.rs +++ b/api/formats/swimos_recon/src/recon_parser/async_parser/mod.rs @@ -289,6 +289,7 @@ impl LocationTracker { /// it completes. Not that this is cannot be used as a stand-alone decoder as it has no concept of /// a separator between frames. It needs to be incorporated into another decoder that can determine /// where one record ends and another begins. +#[derive(Debug)] pub struct RecognizerDecoder { parser: IncrementalReconParser, recognizer: R, diff --git a/server/swimos_agent/src/event_handler/mod.rs b/server/swimos_agent/src/event_handler/mod.rs index fb28918b8..290238991 100644 --- a/server/swimos_agent/src/event_handler/mod.rs +++ b/server/swimos_agent/src/event_handler/mod.rs @@ -45,6 +45,7 @@ use crate::{ }, lanes::JoinLaneKind, meta::AgentMetadata, + ReconDecoder, }; use bitflags::bitflags; @@ -1341,12 +1342,12 @@ impl HandlerAction for Decode { } pub struct DecodeRef<'a, T: RecognizerReadable> { - decoder: Option<&'a mut RecognizerDecoder>, + decoder: Option<&'a mut ReconDecoder>, buffer: BytesMut, } impl<'a, T: RecognizerReadable> DecodeRef<'a, T> { - pub fn new(decoder: &'a mut RecognizerDecoder, buffer: BytesMut) -> Self { + pub fn new(decoder: &'a mut ReconDecoder, buffer: BytesMut) -> Self { DecodeRef { decoder: Some(decoder), buffer, diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index 8bc8d7d94..6527c00a4 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -26,7 +26,6 @@ use std::{ }; use swimos_agent_protocol::{encoding::lane::MapLaneResponseEncoder, MapMessage}; use swimos_form::{read::RecognizerReadable, write::StructuralWritable, Form}; -use swimos_recon::parser::RecognizerDecoder; use tokio_util::codec::{Decoder, Encoder}; use uuid::Uuid; @@ -47,6 +46,7 @@ use crate::{ drop_or_take, DropOrTake, MapOps, MapOpsWithEntry, MapStoreInner, TransformEntryResult, }, meta::AgentMetadata, + ReconDecoder, }; use super::{queues::WriteQueues, Selector, SelectorFn}; @@ -631,12 +631,12 @@ impl DecodeMapMessage { } fn try_decode(buffer: BytesMut) -> Result { - let mut decoder = RecognizerDecoder::new(T::make_recognizer()); + let mut decoder = ReconDecoder::::default(); try_with_decoder(&mut decoder, buffer) } fn try_with_decoder( - decoder: &mut RecognizerDecoder, + decoder: &mut ReconDecoder, mut buffer: BytesMut, ) -> Result { match decoder.decode_eof(&mut buffer) { @@ -712,15 +712,15 @@ where } pub struct DecodeMapMessageRef<'a, K: RecognizerReadable, V: RecognizerReadable> { - key_decoder: &'a mut RecognizerDecoder, - value_decoder: &'a mut RecognizerDecoder, + key_decoder: &'a mut ReconDecoder, + value_decoder: &'a mut ReconDecoder, message: Option>, } impl<'a, K: RecognizerReadable, V: RecognizerReadable> DecodeMapMessageRef<'a, K, V> { pub fn new( - key_decoder: &'a mut RecognizerDecoder, - value_decoder: &'a mut RecognizerDecoder, + key_decoder: &'a mut ReconDecoder, + value_decoder: &'a mut ReconDecoder, message: MapMessage, ) -> Self { DecodeMapMessageRef { @@ -778,15 +778,12 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction { - decoder: &'a mut RecognizerDecoder, + decoder: &'a mut ReconDecoder, message: Option>, } impl<'a, T: RecognizerReadable> DecodeMapMessageSharedRef<'a, T> { - pub fn new( - decoder: &'a mut RecognizerDecoder, - message: MapMessage, - ) -> Self { + pub fn new(decoder: &'a mut ReconDecoder, message: MapMessage) -> Self { DecodeMapMessageSharedRef { decoder, message: Some(message), @@ -1296,8 +1293,8 @@ where /// Create an event handler that will decode an incoming map message and apply the value into a map lane. pub fn decode_ref_and_select_apply<'a, C, K, V, M, F>( - key_decoder: &'a mut RecognizerDecoder, - value_decoder: &'a mut RecognizerDecoder, + key_decoder: &'a mut ReconDecoder, + value_decoder: &'a mut ReconDecoder, message: MapMessage, projection: F, ) -> DecodeRefAndSelectApply<'a, C, K, V, F> @@ -1314,7 +1311,7 @@ where /// Create an event handler that will decode an incoming map message and apply the value into a map lane. pub fn decode_shared_ref_and_select_apply( - decoder: &mut RecognizerDecoder, + decoder: &mut ReconDecoder, message: MapMessage, projection: F, ) -> DecodeSharedRefAndSelectApply<'_, C, T, F> diff --git a/server/swimos_agent/src/lanes/value/mod.rs b/server/swimos_agent/src/lanes/value/mod.rs index e936260f3..eb166c024 100644 --- a/server/swimos_agent/src/lanes/value/mod.rs +++ b/server/swimos_agent/src/lanes/value/mod.rs @@ -30,7 +30,6 @@ use bytes::BytesMut; use static_assertions::assert_impl_all; use swimos_agent_protocol::{encoding::lane::ValueLaneResponseEncoder, LaneResponse}; use swimos_form::{read::RecognizerReadable, write::StructuralWritable}; -use swimos_recon::parser::RecognizerDecoder; use tokio_util::codec::Encoder; use uuid::Uuid; @@ -43,6 +42,7 @@ use crate::{ item::{AgentItem, MutableValueLikeItem, ValueItem, ValueLikeItem}, meta::AgentMetadata, stores::value::ValueStore, + ReconDecoder, }; use super::{LaneItem, ProjTransform, Selector, SelectorFn}; @@ -405,7 +405,7 @@ pub fn decode_and_set( /// Create an event handler that will decode an incoming command and set the value into a value lane. pub fn decode_ref_and_set<'a, C: AgentDescription, T: RecognizerReadable>( - decoder: &'a mut RecognizerDecoder, + decoder: &'a mut ReconDecoder, buffer: BytesMut, projection: fn(&C) -> &ValueLane, ) -> DecodeRefAndSet<'a, C, T> { @@ -718,7 +718,7 @@ where /// Create an event handler that will decode an incoming command and set the value into a value lane. pub fn decode_ref_and_select_set( - decoder: &mut RecognizerDecoder, + decoder: &mut ReconDecoder, buffer: BytesMut, projection: F, ) -> DecodeRefAndSelectSet<'_, C, T, F> diff --git a/server/swimos_agent/src/lib.rs b/server/swimos_agent/src/lib.rs index 181f5e252..43ee97bec 100644 --- a/server/swimos_agent/src/lib.rs +++ b/server/swimos_agent/src/lib.rs @@ -74,6 +74,8 @@ mod test_util; #[cfg(test)] mod tests; +use std::fmt::Debug; + #[cfg(test)] pub use agent_model::AgentSpec; @@ -84,6 +86,9 @@ pub use meta::AgentMetadata; #[doc(hidden)] pub use map_storage::MapBacking; +use swimos_form::read::RecognizerReadable; +use swimos_recon::parser::{AsyncParseError, RecognizerDecoder}; +use tokio_util::codec::Decoder; #[doc(hidden)] pub mod model { @@ -107,3 +112,44 @@ pub mod reexport { pub use uuid::Uuid; } } + +#[doc(hidden)] +pub struct ReconDecoder { + decoder: RecognizerDecoder, +} + +impl Default for ReconDecoder { + fn default() -> Self { + Self { + decoder: RecognizerDecoder::new(T::make_recognizer()), + } + } +} + +impl Debug for ReconDecoder +where + T: RecognizerReadable, + T::Rec: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReconDecoder") + .field("decoder", &self.decoder) + .finish() + } +} + +impl Decoder for ReconDecoder { + type Item = T; + + type Error = AsyncParseError; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + self.decoder.decode(src) + } +} + +impl ReconDecoder { + pub fn reset(&mut self) { + self.decoder.reset(); + } +} diff --git a/server/swimos_agent_derive/src/lane_model_derive/mod.rs b/server/swimos_agent_derive/src/lane_model_derive/mod.rs index e27c764dc..03817fe53 100644 --- a/server/swimos_agent_derive/src/lane_model_derive/mod.rs +++ b/server/swimos_agent_derive/src/lane_model_derive/mod.rs @@ -140,6 +140,14 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { no_handler }; + let deser_types = warp_lane_models + .iter() + .map(|model| DecoderType(model.clone()).into_tokens(root)); + let deser_tup_type = quote!((#(#deser_types),*)); + let deser_inits = + (0..warp_lane_models.len()).map(|_| quote!(::core::default::Default::default())); + let deser_init_statement = quote!((#(#deser_inits),*)); + let item_specs = item_models .iter() .map(|model| LaneSpecInsert(model.ordinal, model.model.clone())) @@ -213,6 +221,7 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { } #[automatically_derived] + #[allow(clippy::unused_unit)] impl #root::agent_model::AgentSpec for #agent_type { type ValCommandHandler<'a> = #value_handler where @@ -226,10 +235,10 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { type HttpRequestHandler = #http_handler; - type Deserializers = (); + type Deserializers = #deser_tup_type; fn initializer_deserializers(&self) -> Self::Deserializers { - + #deser_init_statement } fn item_specs() -> ::std::collections::HashMap<&'static str, #root::agent_model::ItemSpec> { @@ -466,6 +475,8 @@ struct SyncHandlerType<'a>(OrdinalWarpLaneModel<'a>); struct HttpHandlerType<'a>(OrdinalHttpLaneModel<'a>); +struct DecoderType<'a>(OrdinalWarpLaneModel<'a>); + impl<'a> HandlerType<'a> { fn into_tokens(self, root: &syn::Path) -> impl ToTokens { let HandlerType(OrdinalWarpLaneModel { @@ -556,6 +567,34 @@ impl<'a> HttpHandlerType<'a> { } } +impl<'a> DecoderType<'a> { + fn into_tokens(self, root: &syn::Path) -> impl ToTokens { + let DecoderType(OrdinalWarpLaneModel { + model: WarpLaneModel { kind, .. }, + .. + }) = self; + + match kind { + WarpLaneSpec::Command(t) => { + quote!(#root::ReconDecoder<#t>) + } + WarpLaneSpec::Value(t) => { + quote!(#root::ReconDecoder<#t>) + } + WarpLaneSpec::Map(k, v, _) => { + quote!((#root::ReconDecoder<#k>, #root::ReconDecoder<#v>)) + } + WarpLaneSpec::Demand(_) + | WarpLaneSpec::DemandMap(_, _) + | WarpLaneSpec::JoinValue(_, _) + | WarpLaneSpec::JoinMap(_, _, _) + | WarpLaneSpec::Supply(_) => { + quote!(()) + } + } + } +} + struct WarpLaneHandlerMatch<'a> { group_ordinal: usize, model: OrdinalWarpLaneModel<'a>, diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index 2967bb0da..1699272d8 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -38,16 +38,14 @@ use swimos_agent::{ value::{decode_ref_and_select_set, DecodeRefAndSelectSet, ValueLaneSelectSync}, LaneItem, MapLane, Selector, SelectorFn, ValueLane, }, - AgentItem, AgentMetadata, + AgentItem, AgentMetadata, ReconDecoder, }; use swimos_agent_protocol::MapMessage; use swimos_api::{ agent::{HttpLaneRequest, WarpLaneKind}, error::DynamicRegistrationError, }; -use swimos_form::read::RecognizerReadable; use swimos_model::Value; -use swimos_recon::parser::RecognizerDecoder; use tracing::{error, info}; type GenericValueLane = ValueLane; @@ -178,16 +176,9 @@ impl AgentDescription for ConnectorAgent { } } +#[derive(Default)] pub struct GenericDeserializer { - value_deser: RecognizerDecoder<::Rec>, -} - -impl Default for GenericDeserializer { - fn default() -> Self { - Self { - value_deser: RecognizerDecoder::new(Value::make_recognizer()), - } - } + value_deser: ReconDecoder, } impl AgentSpec for ConnectorAgent { diff --git a/swimos/src/agent.rs b/swimos/src/agent.rs index 52ce31d4f..c17e8fc0f 100644 --- a/swimos/src/agent.rs +++ b/swimos/src/agent.rs @@ -262,6 +262,7 @@ pub mod agent_model { } pub use swimos_agent::AgentItem; +pub use swimos_agent::ReconDecoder; /// Defines the lane types that can be included in agent specifications. Lanes are exposed externally by the runtime, /// using the WARP protocol (or HTTP in the case of [HTTP lanes](`lanes::HttpLane`)). The states of lanes may be stored From fd9c78b792774014cee506b75d3da8eff146f4c8 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:11:35 +0000 Subject: [PATCH 05/10] Adapted agent derive macro to share decoders. --- server/swimos_agent/src/lanes/command/mod.rs | 16 +++++++- server/swimos_agent/src/lanes/map/mod.rs | 23 +++++++++++ .../src/lane_model_derive/mod.rs | 38 ++++++++++++------- swimos/src/agent.rs | 6 +-- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/server/swimos_agent/src/lanes/command/mod.rs b/server/swimos_agent/src/lanes/command/mod.rs index ec76d6f7d..3af6de6b9 100644 --- a/server/swimos_agent/src/lanes/command/mod.rs +++ b/server/swimos_agent/src/lanes/command/mod.rs @@ -26,11 +26,12 @@ use tokio_util::codec::Encoder; use crate::{ agent_model::{AgentDescription, WriteResult}, event_handler::{ - ActionContext, AndThen, Decode, HandlerAction, HandlerActionExt, HandlerTrans, + ActionContext, AndThen, Decode, DecodeRef, HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult, }, item::AgentItem, meta::AgentMetadata, + ReconDecoder, }; use super::{LaneItem, ProjTransform}; @@ -160,6 +161,9 @@ impl HandlerTrans for ProjTransform> { pub type DecodeAndCommand = AndThen, DoCommand, ProjTransform>>; +pub type DecodeRefAndCommand<'a, C, T> = + AndThen, DoCommand, ProjTransform>>; + /// Create an event handler that will decode an incoming command and apply it to a command lane. pub fn decode_and_command( buffer: BytesMut, @@ -169,6 +173,16 @@ pub fn decode_and_command( decode.and_then(ProjTransform::new(projection)) } +/// Create an event handler that will decode an incoming command and apply it to a command lane. +pub fn decode_ref_and_command( + decoder: &mut ReconDecoder, + buffer: BytesMut, + projection: fn(&C) -> &CommandLane, +) -> DecodeRefAndCommand { + let decode: DecodeRef = DecodeRef::new(decoder, buffer); + decode.and_then(ProjTransform::new(projection)) +} + impl LaneItem for CommandLane { fn write_to_buffer(&self, buffer: &mut BytesMut) -> WriteResult { let CommandLane { diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index 6527c00a4..e7b043151 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -835,6 +835,11 @@ impl<'a, T: RecognizerReadable, Context> HandlerAction pub type DecodeAndApply> = AndThen, MapLaneHandler, ProjTransform>>; +pub type DecodeRefAndApply<'a, C, K, V, M = HashMap> = AndThen< + DecodeMapMessageRef<'a, K, V>, + MapLaneHandler, + ProjTransform>, +>; /// Create an event handler that will decode an incoming map message and apply the value into a map lane. pub fn decode_and_apply( @@ -851,6 +856,24 @@ where decode.and_then(ProjTransform::new(projection)) } +/// Create an event handler that will decode an incoming map message and apply the value into a map lane. +pub fn decode_ref_and_apply<'a, C, K, V, M>( + decoders: &'a mut (ReconDecoder, ReconDecoder), + message: MapMessage, + projection: fn(&C) -> &MapLane, +) -> DecodeRefAndApply<'a, C, K, V, M> +where + C: AgentDescription, + K: Form + Clone + Eq + Hash, + V: RecognizerReadable, + M: MapOps, +{ + let (key_decoder, value_decoder) = decoders; + let decode: DecodeMapMessageRef<'a, K, V> = + DecodeMapMessageRef::new(key_decoder, value_decoder, message); + decode.and_then(ProjTransform::new(projection)) +} + /// An (event handler)[`crate::event_handler::EventHandler`] that will alter an entry in the map. pub struct MapLaneTransformEntry> { projection: for<'a> fn(&'a C) -> &'a MapLane, diff --git a/server/swimos_agent_derive/src/lane_model_derive/mod.rs b/server/swimos_agent_derive/src/lane_model_derive/mod.rs index 03817fe53..00fba0a64 100644 --- a/server/swimos_agent_derive/src/lane_model_derive/mod.rs +++ b/server/swimos_agent_derive/src/lane_model_derive/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::iter::once; + use proc_macro2::TokenStream; use quote::{quote, ToTokens, TokenStreamExt}; use swimos_utilities::errors::{Errors, Validation}; @@ -142,10 +144,15 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { let deser_types = warp_lane_models .iter() - .map(|model| DecoderType(model.clone()).into_tokens(root)); + .map(|model| { + let tokens = DecoderType(model.clone()).into_tokens(root); + let t: syn::Type = parse_quote!(#tokens); + t + }) + .chain(once(parse_quote!(()))); let deser_tup_type = quote!((#(#deser_types),*)); let deser_inits = - (0..warp_lane_models.len()).map(|_| quote!(::core::default::Default::default())); + (0..=warp_lane_models.len()).map(|_| quote!(::core::default::Default::default())); let deser_init_statement = quote!((#(#deser_inits),*)); let item_specs = item_models @@ -487,16 +494,16 @@ impl<'a> HandlerType<'a> { match kind { WarpLaneSpec::Command(t) => { - quote!(#root::lanes::command::DecodeAndCommand<#agent_name, #t>) + quote!(#root::lanes::command::DecodeRefAndCommand<'a, #agent_name, #t>) } WarpLaneSpec::Value(t) => { - quote!(#root::lanes::value::DecodeAndSet<#agent_name, #t>) + quote!(#root::lanes::value::DecodeRefAndSet<'a, #agent_name, #t>) } WarpLaneSpec::Map(k, v, m) => { if let Some(map_t) = m { - quote!(#root::lanes::map::DecodeAndApply<#agent_name, #k, #v, #map_t>) + quote!(#root::lanes::map::DecodeRefAndApply<'a, #agent_name, #k, #v, #map_t>) } else { - quote!(#root::lanes::map::DecodeAndApply<#agent_name, #k, #v>) + quote!(#root::lanes::map::DecodeRefAndApply<'a, #agent_name, #k, #v>) } } WarpLaneSpec::Demand(_) @@ -611,26 +618,31 @@ impl<'a> WarpLaneHandlerMatch<'a> { fn into_tokens(self, root: &syn::Path) -> impl ToTokens { let WarpLaneHandlerMatch { group_ordinal, - model: OrdinalWarpLaneModel { - agent_name, model, .. - }, + model: + OrdinalWarpLaneModel { + agent_name, + model, + lane_ordinal, + .. + }, } = self; + let index = syn::Index::from(lane_ordinal); let name_lit = model.literal(); let WarpLaneModel { name, kind, .. } = model; let handler_base: syn::Expr = parse_quote!(handler); let coprod_con = coproduct_constructor(root, handler_base, group_ordinal); let lane_handler_expr = match kind { WarpLaneSpec::Command(ty) => { - quote!(#root::lanes::command::decode_and_command::<#agent_name, #ty>(body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::command::decode_ref_and_command::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } WarpLaneSpec::Value(ty) => { - quote!(#root::lanes::value::decode_and_set::<#agent_name, #ty>(body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::value::decode_ref_and_set::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } WarpLaneSpec::Map(k, v, m) => { if let Some(map_t) = m { - quote!(#root::lanes::map::decode_and_apply::<#agent_name, #k, #v, #map_t>(body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::map::decode_ref_and_apply::<#agent_name, #k, #v, #map_t>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } else { - quote!(#root::lanes::map::decode_and_apply::<#agent_name, #k, #v, _>(body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::map::decode_ref_and_apply::<#agent_name, #k, #v, _>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } } WarpLaneSpec::Demand(_) diff --git a/swimos/src/agent.rs b/swimos/src/agent.rs index c17e8fc0f..b7aa066e5 100644 --- a/swimos/src/agent.rs +++ b/swimos/src/agent.rs @@ -277,7 +277,7 @@ pub mod lanes { #[doc(hidden)] pub mod command { - pub use swimos_agent::lanes::command::{decode_and_command, DecodeAndCommand}; + pub use swimos_agent::lanes::command::{decode_ref_and_command, DecodeRefAndCommand}; pub mod lifecycle { pub use swimos_agent::lanes::command::lifecycle::StatefulCommandLaneLifecycle; } @@ -303,7 +303,7 @@ pub mod lanes { #[doc(hidden)] pub mod value { - pub use swimos_agent::lanes::value::{decode_and_set, DecodeAndSet, ValueLaneSync}; + pub use swimos_agent::lanes::value::{decode_ref_and_set, DecodeRefAndSet, ValueLaneSync}; pub mod lifecycle { pub use swimos_agent::lanes::value::lifecycle::StatefulValueLaneLifecycle; } @@ -311,7 +311,7 @@ pub mod lanes { #[doc(hidden)] pub mod map { - pub use swimos_agent::lanes::map::{decode_and_apply, DecodeAndApply, MapLaneSync}; + pub use swimos_agent::lanes::map::{decode_ref_and_apply, DecodeRefAndApply, MapLaneSync}; pub mod lifecycle { pub use swimos_agent::lanes::map::lifecycle::StatefulMapLaneLifecycle; } From 1f6ef847de8b9e65b67855480752fc061ff2670f Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:32:35 +0000 Subject: [PATCH 06/10] Removed dead code and simplified names. --- server/swimos_agent/src/event_handler/mod.rs | 70 +---- .../swimos_agent/src/event_handler/tests.rs | 11 +- server/swimos_agent/src/lanes/command/mod.rs | 22 +- server/swimos_agent/src/lanes/map/mod.rs | 244 +++++++----------- server/swimos_agent/src/lanes/value/mod.rs | 131 +--------- .../src/lane_model_derive/mod.rs | 16 +- server/swimos_connector/src/generic/mod.rs | 14 +- swimos/src/agent.rs | 6 +- 8 files changed, 145 insertions(+), 369 deletions(-) diff --git a/server/swimos_agent/src/event_handler/mod.rs b/server/swimos_agent/src/event_handler/mod.rs index 290238991..602ad0885 100644 --- a/server/swimos_agent/src/event_handler/mod.rs +++ b/server/swimos_agent/src/event_handler/mod.rs @@ -33,7 +33,7 @@ use swimos_api::{ }; use swimos_form::{read::RecognizerReadable, write::StructuralWritable}; use swimos_model::Text; -use swimos_recon::parser::{AsyncParseError, RecognizerDecoder}; +use swimos_recon::parser::AsyncParseError; use swimos_utilities::{never::Never, routing::RouteUri}; use thiserror::Error; use tokio::time::Instant; @@ -1284,23 +1284,21 @@ impl> HandlerAction for GetParameter { /// An event handler that will attempt to decode a [readable](`swimos_form::read::StructuralReadable`) type /// from a buffer, immediately returning the result or an error. -pub struct Decode { - _target_type: PhantomData T>, +pub struct Decode<'a, T: RecognizerReadable> { + decoder: Option<&'a mut ReconDecoder>, buffer: BytesMut, - complete: bool, } -impl Decode { - pub fn new(buffer: BytesMut) -> Self { +impl<'a, T: RecognizerReadable> Decode<'a, T> { + pub fn new(decoder: &'a mut ReconDecoder, buffer: BytesMut) -> Self { Decode { - _target_type: PhantomData, + decoder: Some(decoder), buffer, - complete: false, } } } -impl HandlerAction for Decode { +impl<'a, T: RecognizerReadable, Context> HandlerAction for Decode<'a, T> { type Completion = T; fn step( @@ -1309,27 +1307,24 @@ impl HandlerAction for Decode { _meta: AgentMetadata, _context: &Context, ) -> StepResult { - let Decode { - buffer, complete, .. - } = self; - if *complete { - StepResult::after_done() - } else { - let mut decoder = RecognizerDecoder::new(T::make_recognizer()); - *complete = true; + let Decode { decoder, buffer } = self; + if let Some(decoder) = decoder.take() { + decoder.reset(); match decoder.decode_eof(buffer) { Ok(Some(value)) => StepResult::done(value), Ok(_) => StepResult::Fail(EventHandlerError::IncompleteCommand), Err(e) => StepResult::Fail(EventHandlerError::BadCommand(e)), } + } else { + StepResult::after_done() } } fn describe(&self, _context: &Context, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { let Decode { - buffer, complete, .. + buffer, decoder, .. } = self; - let content = if *complete { + let content = if decoder.is_none() { CONSUMED } else { std::str::from_utf8(buffer.as_ref()).unwrap_or("<>") @@ -1341,43 +1336,6 @@ impl HandlerAction for Decode { } } -pub struct DecodeRef<'a, T: RecognizerReadable> { - decoder: Option<&'a mut ReconDecoder>, - buffer: BytesMut, -} - -impl<'a, T: RecognizerReadable> DecodeRef<'a, T> { - pub fn new(decoder: &'a mut ReconDecoder, buffer: BytesMut) -> Self { - DecodeRef { - decoder: Some(decoder), - buffer, - } - } -} - -impl<'a, T: RecognizerReadable, Context> HandlerAction for DecodeRef<'a, T> { - type Completion = T; - - fn step( - &mut self, - _action_context: &mut ActionContext, - _meta: AgentMetadata, - _context: &Context, - ) -> StepResult { - let DecodeRef { decoder, buffer } = self; - if let Some(decoder) = decoder.take() { - decoder.reset(); - match decoder.decode_eof(buffer) { - Ok(Some(value)) => StepResult::done(value), - Ok(_) => StepResult::Fail(EventHandlerError::IncompleteCommand), - Err(e) => StepResult::Fail(EventHandlerError::BadCommand(e)), - } - } else { - StepResult::after_done() - } - } -} - impl HandlerAction for Either where H1: HandlerAction, diff --git a/server/swimos_agent/src/event_handler/tests.rs b/server/swimos_agent/src/event_handler/tests.rs index 48bcc9c42..112e00b64 100644 --- a/server/swimos_agent/src/event_handler/tests.rs +++ b/server/swimos_agent/src/event_handler/tests.rs @@ -23,9 +23,10 @@ use tokio::time::Instant; use crate::agent_model::AgentDescription; use crate::event_handler::check_step::{check_is_complete, check_is_continue}; -use crate::event_handler::{GetParameter, ModificationFlags, WithParameters}; +use crate::event_handler::{Decode, GetParameter, ModificationFlags, WithParameters}; use crate::test_context::{NO_DOWNLINKS, NO_DYN_LANES}; +use crate::ReconDecoder; use crate::{ event_handler::{ ConstHandler, EventHandlerError, GetAgentUri, HandlerActionExt, Sequentially, SideEffects, @@ -36,7 +37,7 @@ use crate::{ }; use super::{ - join, ActionContext, Decode, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent, + join, ActionContext, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent, SideEffect, Spawner, StepResult, }; @@ -526,6 +527,7 @@ fn followed_by_handler() { #[test] fn decoding_handler_success() { + let mut decoder = ReconDecoder::::default(); let uri = make_uri(); let route_params = HashMap::new(); let meta = make_meta(&uri, &route_params); @@ -533,7 +535,7 @@ fn decoding_handler_success() { let mut buffer = BytesMut::new(); write!(buffer, "56").expect("Write failed."); - let mut handler = Decode::::new(buffer); + let mut handler = Decode::::new(&mut decoder, buffer); let result = handler.step( &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), @@ -561,6 +563,7 @@ fn decoding_handler_success() { #[test] fn decoding_handler_failure() { + let mut decoder = ReconDecoder::::default(); let uri = make_uri(); let route_params = HashMap::new(); let meta = make_meta(&uri, &route_params); @@ -568,7 +571,7 @@ fn decoding_handler_failure() { let mut buffer = BytesMut::new(); write!(buffer, "boom").expect("Write failed."); - let mut handler = Decode::::new(buffer); + let mut handler = Decode::::new(&mut decoder, buffer); let result = handler.step( &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), diff --git a/server/swimos_agent/src/lanes/command/mod.rs b/server/swimos_agent/src/lanes/command/mod.rs index 3af6de6b9..d55c3e4d0 100644 --- a/server/swimos_agent/src/lanes/command/mod.rs +++ b/server/swimos_agent/src/lanes/command/mod.rs @@ -26,7 +26,7 @@ use tokio_util::codec::Encoder; use crate::{ agent_model::{AgentDescription, WriteResult}, event_handler::{ - ActionContext, AndThen, Decode, DecodeRef, HandlerAction, HandlerActionExt, HandlerTrans, + ActionContext, AndThen, Decode, HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult, }, item::AgentItem, @@ -158,28 +158,16 @@ impl HandlerTrans for ProjTransform> { } } -pub type DecodeAndCommand = - AndThen, DoCommand, ProjTransform>>; - -pub type DecodeRefAndCommand<'a, C, T> = - AndThen, DoCommand, ProjTransform>>; +pub type DecodeAndCommand<'a, C, T> = + AndThen, DoCommand, ProjTransform>>; /// Create an event handler that will decode an incoming command and apply it to a command lane. pub fn decode_and_command( - buffer: BytesMut, - projection: fn(&C) -> &CommandLane, -) -> DecodeAndCommand { - let decode: Decode = Decode::new(buffer); - decode.and_then(ProjTransform::new(projection)) -} - -/// Create an event handler that will decode an incoming command and apply it to a command lane. -pub fn decode_ref_and_command( decoder: &mut ReconDecoder, buffer: BytesMut, projection: fn(&C) -> &CommandLane, -) -> DecodeRefAndCommand { - let decode: DecodeRef = DecodeRef::new(decoder, buffer); +) -> DecodeAndCommand { + let decode: Decode = Decode::new(decoder, buffer); decode.and_then(ProjTransform::new(projection)) } diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index e7b043151..74e41c21a 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -616,26 +616,7 @@ impl HandlerTrans> for ProjTransform { - _target_type: PhantomData MapMessage>, - message: Option>, -} - -impl DecodeMapMessage { - pub fn new(message: MapMessage) -> Self { - DecodeMapMessage { - _target_type: Default::default(), - message: Some(message), - } - } -} - -fn try_decode(buffer: BytesMut) -> Result { - let mut decoder = ReconDecoder::::default(); - try_with_decoder(&mut decoder, buffer) -} - -fn try_with_decoder( +fn try_decode( decoder: &mut ReconDecoder, mut buffer: BytesMut, ) -> Result { @@ -646,84 +627,19 @@ fn try_with_decoder( } } -impl HandlerAction for DecodeMapMessage -where - Context: AgentDescription, - K: RecognizerReadable, - V: RecognizerReadable, -{ - type Completion = MapMessage; - - fn step( - &mut self, - _action_context: &mut ActionContext, - _meta: AgentMetadata, - _context: &Context, - ) -> StepResult { - let DecodeMapMessage { message, .. } = self; - if let Some(message) = message.take() { - match message { - MapMessage::Update { key, value } => { - match try_decode::(key).and_then(|k| { - try_decode::(value).map(|v| (MapMessage::Update { key: k, value: v })) - }) { - Ok(msg) => StepResult::done(msg), - Err(e) => StepResult::Fail(e), - } - } - MapMessage::Remove { key } => match try_decode::(key) { - Ok(k) => StepResult::done(MapMessage::Remove { key: k }), - Err(e) => StepResult::Fail(e), - }, - MapMessage::Clear => StepResult::done(MapMessage::Clear), - MapMessage::Take(n) => StepResult::done(MapMessage::Take(n)), - MapMessage::Drop(n) => StepResult::done(MapMessage::Drop(n)), - } - } else { - StepResult::after_done() - } - } - - fn describe(&self, _context: &Context, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { - let DecodeMapMessage { message, .. } = self; - let content = message.as_ref().map(|msg| match msg { - MapMessage::Update { key, value } => { - let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); - let value_str = std::str::from_utf8(value.as_ref()).unwrap_or("<>"); - MapMessage::Update { - key: key_str, - value: value_str, - } - } - MapMessage::Remove { key } => { - let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); - MapMessage::Remove { key: key_str } - } - MapMessage::Clear => MapMessage::Clear, - MapMessage::Take(n) => MapMessage::Take(*n), - MapMessage::Drop(n) => MapMessage::Drop(*n), - }); - f.debug_struct("Decode") - .field("key_type", &type_name::()) - .field("value_type", &type_name::()) - .field("content", &content) - .finish() - } -} - -pub struct DecodeMapMessageRef<'a, K: RecognizerReadable, V: RecognizerReadable> { +pub struct DecodeMapMessage<'a, K: RecognizerReadable, V: RecognizerReadable> { key_decoder: &'a mut ReconDecoder, value_decoder: &'a mut ReconDecoder, message: Option>, } -impl<'a, K: RecognizerReadable, V: RecognizerReadable> DecodeMapMessageRef<'a, K, V> { +impl<'a, K: RecognizerReadable, V: RecognizerReadable> DecodeMapMessage<'a, K, V> { pub fn new( key_decoder: &'a mut ReconDecoder, value_decoder: &'a mut ReconDecoder, message: MapMessage, ) -> Self { - DecodeMapMessageRef { + DecodeMapMessage { key_decoder, value_decoder, message: Some(message), @@ -732,7 +648,7 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable> DecodeMapMessageRef<'a, K } impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction - for DecodeMapMessageRef<'a, K, V> + for DecodeMapMessage<'a, K, V> { type Completion = MapMessage; @@ -742,7 +658,7 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction StepResult { - let DecodeMapMessageRef { + let DecodeMapMessage { key_decoder, value_decoder, message, @@ -752,8 +668,8 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction { key_decoder.reset(); value_decoder.reset(); - match try_with_decoder::(key_decoder, key).and_then(|k| { - try_with_decoder::(value_decoder, value) + match try_decode::(key_decoder, key).and_then(|k| { + try_decode::(value_decoder, value) .map(|v| (MapMessage::Update { key: k, value: v })) }) { Ok(msg) => StepResult::done(msg), @@ -762,7 +678,7 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction { key_decoder.reset(); - match try_with_decoder::(key_decoder, key) { + match try_decode::(key_decoder, key) { Ok(k) => StepResult::done(MapMessage::Remove { key: k }), Err(e) => StepResult::Fail(e), } @@ -775,25 +691,49 @@ impl<'a, K: RecognizerReadable, V: RecognizerReadable, Context> HandlerAction) -> Result<(), std::fmt::Error> { + let DecodeMapMessage { message, .. } = self; + let content = message.as_ref().map(|msg| match msg { + MapMessage::Update { key, value } => { + let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); + let value_str = std::str::from_utf8(value.as_ref()).unwrap_or("<>"); + MapMessage::Update { + key: key_str, + value: value_str, + } + } + MapMessage::Remove { key } => { + let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); + MapMessage::Remove { key: key_str } + } + MapMessage::Clear => MapMessage::Clear, + MapMessage::Take(n) => MapMessage::Take(*n), + MapMessage::Drop(n) => MapMessage::Drop(*n), + }); + f.debug_struct("DecodeMapMessage") + .field("key_type", &type_name::()) + .field("value_type", &type_name::()) + .field("content", &content) + .finish() + } } -pub struct DecodeMapMessageSharedRef<'a, T: RecognizerReadable> { +pub struct DecodeMapMessageShared<'a, T: RecognizerReadable> { decoder: &'a mut ReconDecoder, message: Option>, } -impl<'a, T: RecognizerReadable> DecodeMapMessageSharedRef<'a, T> { +impl<'a, T: RecognizerReadable> DecodeMapMessageShared<'a, T> { pub fn new(decoder: &'a mut ReconDecoder, message: MapMessage) -> Self { - DecodeMapMessageSharedRef { + DecodeMapMessageShared { decoder, message: Some(message), } } } -impl<'a, T: RecognizerReadable, Context> HandlerAction - for DecodeMapMessageSharedRef<'a, T> -{ +impl<'a, T: RecognizerReadable, Context> HandlerAction for DecodeMapMessageShared<'a, T> { type Completion = MapMessage; fn step( @@ -802,14 +742,14 @@ impl<'a, T: RecognizerReadable, Context> HandlerAction _meta: AgentMetadata, _context: &Context, ) -> StepResult { - let DecodeMapMessageSharedRef { decoder, message } = self; + let DecodeMapMessageShared { decoder, message } = self; if let Some(message) = message.take() { match message { MapMessage::Update { key, value } => { decoder.reset(); - match try_with_decoder::(decoder, key).and_then(|k| { + match try_decode::(decoder, key).and_then(|k| { decoder.reset(); - try_with_decoder::(decoder, value) + try_decode::(decoder, value) .map(|v| (MapMessage::Update { key: k, value: v })) }) { Ok(msg) => StepResult::done(msg), @@ -818,7 +758,7 @@ impl<'a, T: RecognizerReadable, Context> HandlerAction } MapMessage::Remove { key } => { decoder.reset(); - match try_with_decoder::(decoder, key) { + match try_decode::(decoder, key) { Ok(k) => StepResult::done(MapMessage::Remove { key: k }), Err(e) => StepResult::Fail(e), } @@ -831,37 +771,46 @@ impl<'a, T: RecognizerReadable, Context> HandlerAction StepResult::after_done() } } + + fn describe(&self, _context: &Context, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + let DecodeMapMessageShared { message, .. } = self; + let content = message.as_ref().map(|msg| match msg { + MapMessage::Update { key, value } => { + let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); + let value_str = std::str::from_utf8(value.as_ref()).unwrap_or("<>"); + MapMessage::Update { + key: key_str, + value: value_str, + } + } + MapMessage::Remove { key } => { + let key_str = std::str::from_utf8(key.as_ref()).unwrap_or("<>"); + MapMessage::Remove { key: key_str } + } + MapMessage::Clear => MapMessage::Clear, + MapMessage::Take(n) => MapMessage::Take(*n), + MapMessage::Drop(n) => MapMessage::Drop(*n), + }); + f.debug_struct("DecodeMapMessageShared") + .field("key_type", &type_name::()) + .field("value_type", &type_name::()) + .field("content", &content) + .finish() + } } -pub type DecodeAndApply> = - AndThen, MapLaneHandler, ProjTransform>>; -pub type DecodeRefAndApply<'a, C, K, V, M = HashMap> = AndThen< - DecodeMapMessageRef<'a, K, V>, +pub type DecodeAndApply<'a, C, K, V, M = HashMap> = AndThen< + DecodeMapMessage<'a, K, V>, MapLaneHandler, ProjTransform>, >; /// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_and_apply( - message: MapMessage, - projection: fn(&C) -> &MapLane, -) -> DecodeAndApply -where - C: AgentDescription, - K: Form + Clone + Eq + Hash, - V: RecognizerReadable, - M: MapOps, -{ - let decode: DecodeMapMessage = DecodeMapMessage::new(message); - decode.and_then(ProjTransform::new(projection)) -} - -/// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_ref_and_apply<'a, C, K, V, M>( +pub fn decode_and_apply<'a, C, K, V, M>( decoders: &'a mut (ReconDecoder, ReconDecoder), message: MapMessage, projection: fn(&C) -> &MapLane, -) -> DecodeRefAndApply<'a, C, K, V, M> +) -> DecodeAndApply<'a, C, K, V, M> where C: AgentDescription, K: Form + Clone + Eq + Hash, @@ -869,8 +818,8 @@ where M: MapOps, { let (key_decoder, value_decoder) = decoders; - let decode: DecodeMapMessageRef<'a, K, V> = - DecodeMapMessageRef::new(key_decoder, value_decoder, message); + let decode: DecodeMapMessage<'a, K, V> = + DecodeMapMessage::new(key_decoder, value_decoder, message); decode.and_then(ProjTransform::new(projection)) } @@ -1292,59 +1241,42 @@ where } } -pub type DecodeAndSelectApply = - DecodeWithAndSelectApply, C, K, V, F>; -pub type DecodeRefAndSelectApply<'a, C, K, V, F> = - DecodeWithAndSelectApply, C, K, V, F>; -pub type DecodeSharedRefAndSelectApply<'a, C, T, F> = - DecodeWithAndSelectApply, C, T, T, F>; +pub type DecodeAndSelectApply<'a, C, K, V, F> = + DecodeWithAndSelectApply, C, K, V, F>; +pub type DecodeSharedAndSelectApply<'a, C, T, F> = + DecodeWithAndSelectApply, C, T, T, F>; /// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_and_select_apply( - message: MapMessage, - projection: F, -) -> DecodeAndSelectApply -where - K: Clone + Eq + Hash + RecognizerReadable, - V: RecognizerReadable, - F: SelectorFn>, - M: MapOps, -{ - let decode: DecodeMapMessage = DecodeMapMessage::new(message); - DecodeAndSelectApply::Decoding(decode, projection) -} - -/// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_ref_and_select_apply<'a, C, K, V, M, F>( +pub fn decode_and_select_apply<'a, C, K, V, M, F>( key_decoder: &'a mut ReconDecoder, value_decoder: &'a mut ReconDecoder, message: MapMessage, projection: F, -) -> DecodeRefAndSelectApply<'a, C, K, V, F> +) -> DecodeAndSelectApply<'a, C, K, V, F> where K: Clone + Eq + Hash + RecognizerReadable, V: RecognizerReadable, F: SelectorFn>, M: MapOps, { - let decode: DecodeMapMessageRef = - DecodeMapMessageRef::new(key_decoder, value_decoder, message); - DecodeRefAndSelectApply::Decoding(decode, projection) + let decode: DecodeMapMessage = DecodeMapMessage::new(key_decoder, value_decoder, message); + DecodeAndSelectApply::Decoding(decode, projection) } /// Create an event handler that will decode an incoming map message and apply the value into a map lane. -pub fn decode_shared_ref_and_select_apply( +/// Specialized for the case where the key and value types are the same and the decoder can be shared. +pub fn decode_shared_and_select_apply( decoder: &mut ReconDecoder, message: MapMessage, projection: F, -) -> DecodeSharedRefAndSelectApply<'_, C, T, F> +) -> DecodeSharedAndSelectApply<'_, C, T, F> where T: Clone + Eq + Hash + RecognizerReadable, F: SelectorFn>, M: MapOps, { - let decode: DecodeMapMessageSharedRef = DecodeMapMessageSharedRef::new(decoder, message); - DecodeSharedRefAndSelectApply::Decoding(decode, projection) + let decode: DecodeMapMessageShared = DecodeMapMessageShared::new(decoder, message); + DecodeSharedAndSelectApply::Decoding(decode, projection) } #[derive(Default)] diff --git a/server/swimos_agent/src/lanes/value/mod.rs b/server/swimos_agent/src/lanes/value/mod.rs index eb166c024..df6f8e697 100644 --- a/server/swimos_agent/src/lanes/value/mod.rs +++ b/server/swimos_agent/src/lanes/value/mod.rs @@ -36,7 +36,7 @@ use uuid::Uuid; use crate::{ agent_model::{AgentDescription, WriteResult}, event_handler::{ - ActionContext, AndThen, Decode, DecodeRef, Described, EventHandlerError, HandlerAction, + ActionContext, AndThen, Decode, Described, EventHandlerError, HandlerAction, HandlerActionExt, HandlerTrans, Modification, StepResult, }, item::{AgentItem, MutableValueLikeItem, ValueItem, ValueLikeItem}, @@ -388,28 +388,16 @@ impl HandlerTrans for ProjTransform> { } } -pub type DecodeAndSet = - AndThen, ValueLaneSet, ProjTransform>>; - -pub type DecodeRefAndSet<'a, C, T> = - AndThen, ValueLaneSet, ProjTransform>>; +pub type DecodeAndSet<'a, C, T> = + AndThen, ValueLaneSet, ProjTransform>>; /// Create an event handler that will decode an incoming command and set the value into a value lane. -pub fn decode_and_set( - buffer: BytesMut, - projection: fn(&C) -> &ValueLane, -) -> DecodeAndSet { - let decode: Decode = Decode::new(buffer); - decode.and_then(ProjTransform::new(projection)) -} - -/// Create an event handler that will decode an incoming command and set the value into a value lane. -pub fn decode_ref_and_set<'a, C: AgentDescription, T: RecognizerReadable>( +pub fn decode_and_set<'a, C: AgentDescription, T: RecognizerReadable>( decoder: &'a mut ReconDecoder, buffer: BytesMut, projection: fn(&C) -> &ValueLane, -) -> DecodeRefAndSet<'a, C, T> { - let decode: DecodeRef<'a, T> = DecodeRef::new(decoder, buffer); +) -> DecodeAndSet<'a, C, T> { + let decode: Decode<'a, T> = Decode::new(decoder, buffer); decode.and_then(ProjTransform::new(projection)) } @@ -549,14 +537,14 @@ where #[derive(Default)] #[doc(hidden)] -pub enum DecodeAndSelectSet { - Decoding(Decode, F), +pub enum DecodeAndSelectSet<'a, C, T: RecognizerReadable, F> { + Decoding(Decode<'a, T>, F), Selecting(ValueLaneSelectSet), #[default] Done, } -impl HandlerAction for DecodeAndSelectSet +impl<'a, C, T, F> HandlerAction for DecodeAndSelectSet<'a, C, T, F> where C: AgentDescription, T: RecognizerReadable, @@ -607,95 +595,17 @@ where ) -> Result<(), std::fmt::Error> { match self { DecodeAndSelectSet::Decoding(decode, proj) => f - .debug_struct("DecodeAndSelectSet") - .field("state", &"Decoding") - .field("decoder", &Described::new(context, decode)) - .field("lane_name", &proj.name()) - .finish(), - DecodeAndSelectSet::Selecting(selector) => f - .debug_struct("DecodeAndSelectSet") - .field("state", &"Selecting") - .field("selector", &Described::new(context, selector)) - .finish(), - DecodeAndSelectSet::Done => f - .debug_tuple("DecodeAndSelectSet") - .field(&"<>") - .finish(), - } - } -} - -#[derive(Default)] -#[doc(hidden)] -pub enum DecodeRefAndSelectSet<'a, C, T: RecognizerReadable, F> { - Decoding(DecodeRef<'a, T>, F), - Selecting(ValueLaneSelectSet), - #[default] - Done, -} - -impl<'a, C, T, F> HandlerAction for DecodeRefAndSelectSet<'a, C, T, F> -where - C: AgentDescription, - T: RecognizerReadable, - F: SelectorFn>, -{ - type Completion = (); - - fn step( - &mut self, - action_context: &mut ActionContext, - meta: AgentMetadata, - context: &C, - ) -> StepResult { - match std::mem::take(self) { - DecodeRefAndSelectSet::Decoding(mut decoding, selector) => { - match decoding.step(action_context, meta, context) { - StepResult::Continue { modified_item } => { - *self = DecodeRefAndSelectSet::Decoding(decoding, selector); - StepResult::Continue { modified_item } - } - StepResult::Fail(err) => StepResult::Fail(err), - StepResult::Complete { - modified_item, - result, - } => { - *self = DecodeRefAndSelectSet::Selecting(ValueLaneSelectSet::new( - selector, result, - )); - StepResult::Continue { modified_item } - } - } - } - DecodeRefAndSelectSet::Selecting(mut selector) => { - let result = selector.step(action_context, meta, context); - if !result.is_cont() { - *self = DecodeRefAndSelectSet::Done; - } - result - } - DecodeRefAndSelectSet::Done => StepResult::after_done(), - } - } - - fn describe( - &self, - context: &C, - f: &mut std::fmt::Formatter<'_>, - ) -> Result<(), std::fmt::Error> { - match self { - DecodeRefAndSelectSet::Decoding(decode, proj) => f .debug_struct("DecodeRefAndSelectSet") .field("state", &"Decoding") .field("decoder", &Described::new(context, decode)) .field("lane_name", &proj.name()) .finish(), - DecodeRefAndSelectSet::Selecting(selector) => f + DecodeAndSelectSet::Selecting(selector) => f .debug_struct("DecodeRefAndSelectSet") .field("state", &"Selecting") .field("selector", &Described::new(context, selector)) .finish(), - DecodeRefAndSelectSet::Done => f + DecodeAndSelectSet::Done => f .debug_tuple("DecodeRefAndSelectSet") .field(&"<>") .finish(), @@ -705,29 +615,16 @@ where /// Create an event handler that will decode an incoming command and set the value into a value lane. pub fn decode_and_select_set( - buffer: BytesMut, - projection: F, -) -> DecodeAndSelectSet -where - T: RecognizerReadable, - F: SelectorFn>, -{ - let decode: Decode = Decode::new(buffer); - DecodeAndSelectSet::Decoding(decode, projection) -} - -/// Create an event handler that will decode an incoming command and set the value into a value lane. -pub fn decode_ref_and_select_set( decoder: &mut ReconDecoder, buffer: BytesMut, projection: F, -) -> DecodeRefAndSelectSet<'_, C, T, F> +) -> DecodeAndSelectSet<'_, C, T, F> where T: RecognizerReadable, F: SelectorFn>, { - let decode: DecodeRef = DecodeRef::new(decoder, buffer); - DecodeRefAndSelectSet::Decoding(decode, projection) + let decode: Decode = Decode::new(decoder, buffer); + DecodeAndSelectSet::Decoding(decode, projection) } /// An [event handler](crate::event_handler::EventHandler)`] that will request a sync from the lane. diff --git a/server/swimos_agent_derive/src/lane_model_derive/mod.rs b/server/swimos_agent_derive/src/lane_model_derive/mod.rs index 00fba0a64..8d2860ee5 100644 --- a/server/swimos_agent_derive/src/lane_model_derive/mod.rs +++ b/server/swimos_agent_derive/src/lane_model_derive/mod.rs @@ -494,16 +494,16 @@ impl<'a> HandlerType<'a> { match kind { WarpLaneSpec::Command(t) => { - quote!(#root::lanes::command::DecodeRefAndCommand<'a, #agent_name, #t>) + quote!(#root::lanes::command::DecodeAndCommand<'a, #agent_name, #t>) } WarpLaneSpec::Value(t) => { - quote!(#root::lanes::value::DecodeRefAndSet<'a, #agent_name, #t>) + quote!(#root::lanes::value::DecodeAndSet<'a, #agent_name, #t>) } WarpLaneSpec::Map(k, v, m) => { if let Some(map_t) = m { - quote!(#root::lanes::map::DecodeRefAndApply<'a, #agent_name, #k, #v, #map_t>) + quote!(#root::lanes::map::DecodeAndApply<'a, #agent_name, #k, #v, #map_t>) } else { - quote!(#root::lanes::map::DecodeRefAndApply<'a, #agent_name, #k, #v>) + quote!(#root::lanes::map::DecodeAndApply<'a, #agent_name, #k, #v>) } } WarpLaneSpec::Demand(_) @@ -633,16 +633,16 @@ impl<'a> WarpLaneHandlerMatch<'a> { let coprod_con = coproduct_constructor(root, handler_base, group_ordinal); let lane_handler_expr = match kind { WarpLaneSpec::Command(ty) => { - quote!(#root::lanes::command::decode_ref_and_command::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::command::decode_and_command::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } WarpLaneSpec::Value(ty) => { - quote!(#root::lanes::value::decode_ref_and_set::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::value::decode_and_set::<#agent_name, #ty>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } WarpLaneSpec::Map(k, v, m) => { if let Some(map_t) = m { - quote!(#root::lanes::map::decode_ref_and_apply::<#agent_name, #k, #v, #map_t>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::map::decode_and_apply::<#agent_name, #k, #v, #map_t>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } else { - quote!(#root::lanes::map::decode_ref_and_apply::<#agent_name, #k, #v, _>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) + quote!(#root::lanes::map::decode_and_apply::<#agent_name, #k, #v, _>(&mut deserializers.#index, body, |agent: &#agent_name| &agent.#name)) } } WarpLaneSpec::Demand(_) diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index 1699272d8..d6aab7271 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -32,10 +32,8 @@ use swimos_agent::{ }, event_handler::{ActionContext, HandlerAction, StepResult, UnitHandler}, lanes::{ - map::{ - decode_shared_ref_and_select_apply, DecodeSharedRefAndSelectApply, MapLaneSelectSync, - }, - value::{decode_ref_and_select_set, DecodeRefAndSelectSet, ValueLaneSelectSync}, + map::{decode_shared_and_select_apply, DecodeSharedAndSelectApply, MapLaneSelectSync}, + value::{decode_and_select_set, DecodeAndSelectSet, ValueLaneSelectSync}, LaneItem, MapLane, Selector, SelectorFn, ValueLane, }, AgentItem, AgentMetadata, ReconDecoder, @@ -64,8 +62,8 @@ pub struct ConnectorAgent { flags: Cell, } -type ValueHandler<'a> = DecodeRefAndSelectSet<'a, ConnectorAgent, Value, ValueLaneSelectorFn>; -type MapHandler<'a> = DecodeSharedRefAndSelectApply<'a, ConnectorAgent, Value, MapLaneSelectorFn>; +type ValueHandler<'a> = DecodeAndSelectSet<'a, ConnectorAgent, Value, ValueLaneSelectorFn>; +type MapHandler<'a> = DecodeSharedAndSelectApply<'a, ConnectorAgent, Value, MapLaneSelectorFn>; type ValueSync = ValueLaneSelectSync; type MapSync = MapLaneSelectSync; @@ -212,7 +210,7 @@ impl AgentSpec for ConnectorAgent { ) -> Option> { let GenericDeserializer { value_deser } = deserializers; if self.value_lanes.borrow().contains_key(lane) { - Some(decode_ref_and_select_set( + Some(decode_and_select_set( value_deser, body, ValueLaneSelectorFn::new(lane.to_string()), @@ -244,7 +242,7 @@ impl AgentSpec for ConnectorAgent { ) -> Option> { let GenericDeserializer { value_deser } = deserializers; if self.map_lanes.borrow().contains_key(lane) { - Some(decode_shared_ref_and_select_apply( + Some(decode_shared_and_select_apply( value_deser, body, MapLaneSelectorFn::new(lane.to_string()), diff --git a/swimos/src/agent.rs b/swimos/src/agent.rs index b7aa066e5..c17e8fc0f 100644 --- a/swimos/src/agent.rs +++ b/swimos/src/agent.rs @@ -277,7 +277,7 @@ pub mod lanes { #[doc(hidden)] pub mod command { - pub use swimos_agent::lanes::command::{decode_ref_and_command, DecodeRefAndCommand}; + pub use swimos_agent::lanes::command::{decode_and_command, DecodeAndCommand}; pub mod lifecycle { pub use swimos_agent::lanes::command::lifecycle::StatefulCommandLaneLifecycle; } @@ -303,7 +303,7 @@ pub mod lanes { #[doc(hidden)] pub mod value { - pub use swimos_agent::lanes::value::{decode_ref_and_set, DecodeRefAndSet, ValueLaneSync}; + pub use swimos_agent::lanes::value::{decode_and_set, DecodeAndSet, ValueLaneSync}; pub mod lifecycle { pub use swimos_agent::lanes::value::lifecycle::StatefulValueLaneLifecycle; } @@ -311,7 +311,7 @@ pub mod lanes { #[doc(hidden)] pub mod map { - pub use swimos_agent::lanes::map::{decode_ref_and_apply, DecodeRefAndApply, MapLaneSync}; + pub use swimos_agent::lanes::map::{decode_and_apply, DecodeAndApply, MapLaneSync}; pub mod lifecycle { pub use swimos_agent::lanes::map::lifecycle::StatefulMapLaneLifecycle; } From 86011d028e7ef71379b84866636e7926f25e7998 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:05:12 +0000 Subject: [PATCH 07/10] FIxed typo. --- server/swimos_agent/src/agent_model/mod.rs | 4 ++-- .../src/agent_model/tests/external_links/empty_agent.rs | 2 +- server/swimos_agent/src/agent_model/tests/fake_agent.rs | 2 +- server/swimos_agent_derive/src/lane_model_derive/mod.rs | 2 +- server/swimos_connector/src/generic/mod.rs | 2 +- server/swimos_connector/src/generic/tests.rs | 4 ++-- swimos/tests/deriveagentlanemodel.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/swimos_agent/src/agent_model/mod.rs b/server/swimos_agent/src/agent_model/mod.rs index 6af03054d..b2fa59f36 100644 --- a/server/swimos_agent/src/agent_model/mod.rs +++ b/server/swimos_agent/src/agent_model/mod.rs @@ -221,7 +221,7 @@ pub trait AgentSpec: AgentDescription + Sized + Send { type Deserializers: Send + 'static; - fn initializer_deserializers(&self) -> Self::Deserializers; + fn initialize_deserializers(&self) -> Self::Deserializers; /// The names and flags of all items (lanes and stores) in the agent. fn item_specs() -> HashMap<&'static str, ItemSpec>; @@ -1231,7 +1231,7 @@ where let add_commander = |address: Address| cmd_ids.borrow_mut().get_request(&address); let add_link = (add_downlink, add_commander); let add_lane = NoDynLanes; - let mut deserializers = item_model.initializer_deserializers(); + let mut deserializers = item_model.initialize_deserializers(); // Calling run_handler is very verbose so is pulled out into this macro to make the code easier to read. macro_rules! exec_handler { diff --git a/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs b/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs index 427f5cfdd..7f218a86f 100644 --- a/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs +++ b/server/swimos_agent/src/agent_model/tests/external_links/empty_agent.rs @@ -51,7 +51,7 @@ impl AgentSpec for EmptyAgent { type Deserializers = (); - fn initializer_deserializers(&self) -> Self::Deserializers {} + fn initialize_deserializers(&self) -> Self::Deserializers {} fn item_specs() -> HashMap<&'static str, ItemSpec> { HashMap::new() diff --git a/server/swimos_agent/src/agent_model/tests/fake_agent.rs b/server/swimos_agent/src/agent_model/tests/fake_agent.rs index 1d5eb2cd1..3f5216986 100644 --- a/server/swimos_agent/src/agent_model/tests/fake_agent.rs +++ b/server/swimos_agent/src/agent_model/tests/fake_agent.rs @@ -439,7 +439,7 @@ impl AgentSpec for TestAgent { type Deserializers = (); - fn initializer_deserializers(&self) -> Self::Deserializers {} + fn initialize_deserializers(&self) -> Self::Deserializers {} } impl HandlerAction for TestHandler { diff --git a/server/swimos_agent_derive/src/lane_model_derive/mod.rs b/server/swimos_agent_derive/src/lane_model_derive/mod.rs index 8d2860ee5..a58cfe824 100644 --- a/server/swimos_agent_derive/src/lane_model_derive/mod.rs +++ b/server/swimos_agent_derive/src/lane_model_derive/mod.rs @@ -244,7 +244,7 @@ impl<'a> ToTokens for DeriveAgentLaneModel<'a> { type Deserializers = #deser_tup_type; - fn initializer_deserializers(&self) -> Self::Deserializers { + fn initialize_deserializers(&self) -> Self::Deserializers { #deser_init_statement } diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index d6aab7271..e6fe2c392 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -194,7 +194,7 @@ impl AgentSpec for ConnectorAgent { type Deserializers = GenericDeserializer; - fn initializer_deserializers(&self) -> Self::Deserializers { + fn initialize_deserializers(&self) -> Self::Deserializers { GenericDeserializer::default() } diff --git a/server/swimos_connector/src/generic/tests.rs b/server/swimos_connector/src/generic/tests.rs index c413ed4a5..bd7309db2 100644 --- a/server/swimos_connector/src/generic/tests.rs +++ b/server/swimos_connector/src/generic/tests.rs @@ -285,7 +285,7 @@ fn with_map_lane(agent: &ConnectorAgent, f: impl FnOnce(&GenericMapLane)) { #[test] fn value_lane_command() { let agent = ConnectorAgent::default(); - let mut deserializers = agent.initializer_deserializers(); + let mut deserializers = agent.initialize_deserializers(); let (val_id, _) = init(&agent); let handler = agent .on_value_command(&mut deserializers, "value_lane", to_buffer(Value::from(45))) @@ -332,7 +332,7 @@ fn value_lane_sync() { #[test] fn map_lane_command() { let agent = ConnectorAgent::default(); - let mut deserializers = agent.initializer_deserializers(); + let mut deserializers = agent.initialize_deserializers(); let (_, map_id) = init(&agent); let handler = agent .on_map_command( diff --git a/swimos/tests/deriveagentlanemodel.rs b/swimos/tests/deriveagentlanemodel.rs index b38f7c759..fa1e7a333 100644 --- a/swimos/tests/deriveagentlanemodel.rs +++ b/swimos/tests/deriveagentlanemodel.rs @@ -121,7 +121,7 @@ where { let agent = A::default(); let expected = specs.into_iter().collect::>(); - let mut deserializers = agent.initializer_deserializers(); + let mut deserializers = agent.initialize_deserializers(); assert_eq!(A::item_specs(), expected); From 440ba8e7fba905b2580d5844c414e474893afcd6 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:07:01 +0000 Subject: [PATCH 08/10] Added missing doc comments. --- server/swimos_agent/src/agent_model/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/swimos_agent/src/agent_model/mod.rs b/server/swimos_agent/src/agent_model/mod.rs index b2fa59f36..a0a679a2c 100644 --- a/server/swimos_agent/src/agent_model/mod.rs +++ b/server/swimos_agent/src/agent_model/mod.rs @@ -219,8 +219,11 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// The type of the handler to run when an HTTP request is received for a lane. type HttpRequestHandler: HandlerAction + Send + 'static; + /// A store of persistent deserializers that may be used by [`AgentSpec::on_value_command`] + /// and [`AgentSpec::on_map_command`]. type Deserializers: Send + 'static; + /// Create th store of deserializers that can be reused any number of times. fn initialize_deserializers(&self) -> Self::Deserializers; /// The names and flags of all items (lanes and stores) in the agent. @@ -231,6 +234,7 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// accept commands. /// /// # Arguments + /// * `deserializers` - The store of persistent deserializers for the agent. /// * `lane` - The name of the lane. /// * `body` - The content of the command. fn on_value_command<'a>( @@ -260,6 +264,7 @@ pub trait AgentSpec: AgentDescription + Sized + Send { /// for a map lane. There will be no handler if the lane does not exist or does not /// accept commands. /// # Arguments + /// * `deserializers` - The store of persistent deserializers for the agent. /// * `lane` - The name of the lane. /// * `body` - The content of the command. fn on_map_command<'a>( From 61f5d81b932d2645f7653083b24f86748da72f04 Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:09:32 +0000 Subject: [PATCH 09/10] Corrected description names. --- server/swimos_agent/src/lanes/value/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/swimos_agent/src/lanes/value/mod.rs b/server/swimos_agent/src/lanes/value/mod.rs index df6f8e697..3d791f6b7 100644 --- a/server/swimos_agent/src/lanes/value/mod.rs +++ b/server/swimos_agent/src/lanes/value/mod.rs @@ -595,18 +595,18 @@ where ) -> Result<(), std::fmt::Error> { match self { DecodeAndSelectSet::Decoding(decode, proj) => f - .debug_struct("DecodeRefAndSelectSet") + .debug_struct("DecodeAndSelectSet") .field("state", &"Decoding") .field("decoder", &Described::new(context, decode)) .field("lane_name", &proj.name()) .finish(), DecodeAndSelectSet::Selecting(selector) => f - .debug_struct("DecodeRefAndSelectSet") + .debug_struct("DecodeAndSelectSet") .field("state", &"Selecting") .field("selector", &Described::new(context, selector)) .finish(), DecodeAndSelectSet::Done => f - .debug_tuple("DecodeRefAndSelectSet") + .debug_tuple("DecodeAndSelectSet") .field(&"<>") .finish(), } From da287264f10e0ca613caa009b6a9af12fa69830b Mon Sep 17 00:00:00 2001 From: Greg Holland <30577851+horned-sphere@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:11:53 +0000 Subject: [PATCH 10/10] Added missing doc comment. --- server/swimos_connector/src/generic/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index e6fe2c392..dd2517014 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -174,6 +174,7 @@ impl AgentDescription for ConnectorAgent { } } +/// A single deserializer for [`Value`] objects, shared between all lanes of a [`ConnectorAgent`]. #[derive(Default)] pub struct GenericDeserializer { value_deser: ReconDecoder,