Skip to content

Commit fede497

Browse files
committed
WIP: Annotated decoder
This is an experiment to provide `runAnnotatedPeer`, which is like `runPeer' but allows us to run a decoder which has access to bytes used when decoding a message. This allows one to record offsets and decode record ByteString from which a piece of data was decoded, e.g. for each `tx` inside `MsgReplyTxs`. The `Codec` type in `typed-protocols` was generalised for this purpose. The core functionality is implemented in `runAnnotatedDecoderWithChannel` which runs `AnnotatedCodec` against a `Channel` which does incremental decoding & recording bytes used so far. We also expose `runAnnotatedPeer` which runs a `Peer` against `Channel` using an `AnnotatedCodec` (using `annotatedDriverSimple`). TODO: * `runAnnotatedPipelinedPeer` * `runAnnotatedPeerWithLimits` * `runAnnotatedPipelinedPeerWithLimits` It's actually the last one that we will need in `tx-submission`. TODO: Find a nice way so we won't need to maintain two codecs for `tx-submission`, e.g. `Codec` and `AnnotatedCodec`.
1 parent ed11046 commit fede497

File tree

3 files changed

+177
-34
lines changed
  • ouroboros-network-framework/src/Ouroboros/Network/Driver
  • ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2

3 files changed

+177
-34
lines changed

cabal.project

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,12 @@ package network-mux
5454
package ouroboros-network
5555
flags: +asserts +cddl
5656

57+
58+
source-repository-package
59+
type: git
60+
location: https://github.com/input-output-hk/typed-protocols
61+
tag: 9a0acda4cd34e37b53e53986e7a71a76bba2ca8c
62+
subdir: typed-protocols
63+
typed-protocols-cborg
64+
allow-newer: typed-protocols:io-classes
65+

ouroboros-network-framework/src/Ouroboros/Network/Driver/Simple.hs

Lines changed: 136 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
{-# LANGUAGE QuantifiedConstraints #-}
77
{-# LANGUAGE RankNTypes #-}
88
{-# LANGUAGE ScopedTypeVariables #-}
9-
{-# LANGUAGE StandaloneDeriving #-}
109
{-# LANGUAGE TypeFamilies #-}
1110
-- @UndecidableInstances@ extensions is required for defining @Show@ instance
1211
-- of @'TraceSendRecv'@.
@@ -19,10 +18,12 @@ module Ouroboros.Network.Driver.Simple
1918
-- $intro
2019
-- * Normal peers
2120
runPeer
21+
, runAnnotatedPeer
2222
, TraceSendRecv (..)
2323
, DecoderFailure (..)
2424
-- * Pipelined peers
2525
, runPipelinedPeer
26+
, runPipelinedAnnotatedPeer
2627
-- * Connected peers
2728
-- TODO: move these to a test lib
2829
, Role (..)
@@ -43,6 +44,9 @@ import Ouroboros.Network.Channel
4344
import Control.Monad.Class.MonadAsync
4445
import Control.Monad.Class.MonadThrow
4546
import Control.Tracer (Tracer (..), contramap, traceWith)
47+
import Data.Maybe (fromMaybe)
48+
import Data.Functor.Identity (Identity)
49+
import Control.Monad.Identity (Identity(..))
4650

4751

4852
-- $intro
@@ -107,18 +111,31 @@ instance Show DecoderFailure where
107111
instance Exception DecoderFailure where
108112

109113

110-
driverSimple :: forall ps failure bytes m.
111-
( MonadThrow m
112-
, Show failure
113-
, forall (st :: ps). Show (ClientHasAgency st)
114-
, forall (st :: ps). Show (ServerHasAgency st)
115-
, ShowProxy ps
116-
)
117-
=> Tracer m (TraceSendRecv ps)
118-
-> Codec ps failure m bytes
119-
-> Channel m bytes
120-
-> Driver ps (Maybe bytes) m
121-
driverSimple tracer Codec{encode, decode} channel@Channel{send} =
114+
mkSimpleDriver :: forall ps failure bytes m f annotator.
115+
( MonadThrow m
116+
, Show failure
117+
, forall (st :: ps). Show (ClientHasAgency st)
118+
, forall (st :: ps). Show (ServerHasAgency st)
119+
, ShowProxy ps
120+
)
121+
=> (forall a.
122+
Channel m bytes
123+
-> Maybe bytes
124+
-> DecodeStep bytes failure m (f a)
125+
-> m (Either failure (a, Maybe bytes))
126+
)
127+
-- ^ run incremental decoder against a channel
128+
129+
-> (forall st. annotator st -> f (SomeMessage st))
130+
-- ^ transform annotator to a container holding the decoded
131+
-- message
132+
133+
-> Tracer m (TraceSendRecv ps)
134+
-> Codec' ps failure m annotator bytes
135+
-> Channel m bytes
136+
-> Driver ps (Maybe bytes) m
137+
138+
mkSimpleDriver runDecodeSteps nat tracer Codec{encode, decode} channel@Channel{send} =
122139
Driver { sendMessage, recvMessage, startDState = Nothing }
123140
where
124141
sendMessage :: forall (pr :: PeerRole) (st :: ps) (st' :: ps).
@@ -135,7 +152,7 @@ driverSimple tracer Codec{encode, decode} channel@Channel{send} =
135152
-> m (SomeMessage st, Maybe bytes)
136153
recvMessage stok trailing = do
137154
decoder <- decode stok
138-
result <- runDecoderWithChannel channel trailing decoder
155+
result <- runDecodeSteps channel trailing (nat <$> decoder)
139156
case result of
140157
Right x@(SomeMessage msg, _trailing') -> do
141158
traceWith tracer (TraceRecvMsg (AnyMessageAndAgency stok msg))
@@ -144,6 +161,36 @@ driverSimple tracer Codec{encode, decode} channel@Channel{send} =
144161
throwIO (DecoderFailure stok failure)
145162

146163

164+
simpleDriver :: forall ps failure bytes m.
165+
( MonadThrow m
166+
, Show failure
167+
, forall (st :: ps). Show (ClientHasAgency st)
168+
, forall (st :: ps). Show (ServerHasAgency st)
169+
, ShowProxy ps
170+
)
171+
=> Tracer m (TraceSendRecv ps)
172+
-> Codec ps failure m bytes
173+
-> Channel m bytes
174+
-> Driver ps (Maybe bytes) m
175+
simpleDriver = mkSimpleDriver runDecoderWithChannel Identity
176+
177+
178+
annotatedSimpleDriver
179+
:: forall ps failure bytes m.
180+
( MonadThrow m
181+
, Monoid bytes
182+
, Show failure
183+
, forall (st :: ps). Show (ClientHasAgency st)
184+
, forall (st :: ps). Show (ServerHasAgency st)
185+
, ShowProxy ps
186+
)
187+
=> Tracer m (TraceSendRecv ps)
188+
-> AnnotatedCodec ps failure m bytes
189+
-> Channel m bytes
190+
-> Driver ps (Maybe bytes) m
191+
annotatedSimpleDriver = mkSimpleDriver runAnnotatedDecoderWithChannel runAnnotator
192+
193+
147194
-- | Run a peer with the given channel via the given codec.
148195
--
149196
-- This runs the peer to completion (if the protocol allows for termination).
@@ -164,7 +211,31 @@ runPeer
164211
runPeer tracer codec channel peer =
165212
runPeerWithDriver driver peer (startDState driver)
166213
where
167-
driver = driverSimple tracer codec channel
214+
driver = simpleDriver tracer codec channel
215+
216+
217+
-- | Run a peer with the given channel via the given annotated codec.
218+
--
219+
-- This runs the peer to completion (if the protocol allows for termination).
220+
--
221+
runAnnotatedPeer
222+
:: forall ps (st :: ps) pr failure bytes m a .
223+
( MonadThrow m
224+
, Monoid bytes
225+
, Show failure
226+
, forall (st' :: ps). Show (ClientHasAgency st')
227+
, forall (st' :: ps). Show (ServerHasAgency st')
228+
, ShowProxy ps
229+
)
230+
=> Tracer m (TraceSendRecv ps)
231+
-> AnnotatedCodec ps failure m bytes
232+
-> Channel m bytes
233+
-> Peer ps pr st m a
234+
-> m (a, Maybe bytes)
235+
runAnnotatedPeer tracer codec channel peer =
236+
runPeerWithDriver driver peer (startDState driver)
237+
where
238+
driver = annotatedSimpleDriver tracer codec channel
168239

169240

170241
-- | Run a pipelined peer with the given channel via the given codec.
@@ -191,7 +262,35 @@ runPipelinedPeer
191262
runPipelinedPeer tracer codec channel peer =
192263
runPipelinedPeerWithDriver driver peer (startDState driver)
193264
where
194-
driver = driverSimple tracer codec channel
265+
driver = simpleDriver tracer codec channel
266+
267+
268+
-- | Run a pipelined peer with the given channel via the given annotated codec.
269+
--
270+
-- This runs the peer to completion (if the protocol allows for termination).
271+
--
272+
-- Unlike normal peers, running pipelined peers rely on concurrency, hence the
273+
-- 'MonadAsync' constraint.
274+
--
275+
runPipelinedAnnotatedPeer
276+
:: forall ps (st :: ps) pr failure bytes m a.
277+
( MonadAsync m
278+
, MonadThrow m
279+
, Monoid bytes
280+
, Show failure
281+
, forall (st' :: ps). Show (ClientHasAgency st')
282+
, forall (st' :: ps). Show (ServerHasAgency st')
283+
, ShowProxy ps
284+
)
285+
=> Tracer m (TraceSendRecv ps)
286+
-> AnnotatedCodec ps failure m bytes
287+
-> Channel m bytes
288+
-> PeerPipelined ps pr st m a
289+
-> m (a, Maybe bytes)
290+
runPipelinedAnnotatedPeer tracer codec channel peer =
291+
runPipelinedPeerWithDriver driver peer (startDState driver)
292+
where
293+
driver = annotatedSimpleDriver tracer codec channel
195294

196295

197296
--
@@ -204,17 +303,36 @@ runPipelinedPeer tracer codec channel peer =
204303
runDecoderWithChannel :: Monad m
205304
=> Channel m bytes
206305
-> Maybe bytes
207-
-> DecodeStep bytes failure m a
306+
-> DecodeStep bytes failure m (Identity a)
208307
-> m (Either failure (a, Maybe bytes))
209308

210309
runDecoderWithChannel Channel{recv} = go
211310
where
212-
go _ (DecodeDone x trailing) = return (Right (x, trailing))
311+
go _ (DecodeDone (Identity x) trailing) = return (Right (x, trailing))
213312
go _ (DecodeFail failure) = return (Left failure)
214313
go Nothing (DecodePartial k) = recv >>= k >>= go Nothing
215314
go (Just trailing) (DecodePartial k) = k (Just trailing) >>= go Nothing
216315

217316

317+
runAnnotatedDecoderWithChannel
318+
:: forall m bytes failure a.
319+
( Monad m
320+
, Monoid bytes
321+
)
322+
=> Channel m bytes
323+
-> Maybe bytes
324+
-> DecodeStep bytes failure m (bytes -> a)
325+
-> m (Either failure (a, Maybe bytes))
326+
327+
runAnnotatedDecoderWithChannel Channel{recv} bs0 = go (fromMaybe mempty bs0) bs0
328+
where
329+
go :: bytes -> Maybe bytes -> DecodeStep bytes failure m (bytes -> a) -> m (Either failure (a, Maybe bytes))
330+
go bytes _ (DecodeDone f trailing) = return $ Right (f bytes, trailing)
331+
go _bytes _ (DecodeFail failure) = return (Left failure)
332+
go bytes Nothing (DecodePartial k) = recv >>= \bs -> k bs >>= go (bytes <> fromMaybe mempty bs) Nothing
333+
go bytes (Just trailing) (DecodePartial k) = k (Just trailing) >>= go (bytes <> trailing) Nothing
334+
335+
218336
data Role = Client | Server
219337

220338
-- | Run two 'Peer's via a pair of connected 'Channel's and a common 'Codec'.

ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Codec.hs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,26 +64,30 @@ timeLimitsTxSubmission2 = ProtocolTimeLimits stateToLimit
6464

6565

6666
codecTxSubmission2
67-
:: forall txid tx m.
67+
:: forall txid tx annotator m.
6868
MonadST m
6969
=> (txid -> CBOR.Encoding)
7070
-> (forall s . CBOR.Decoder s txid)
7171
-> (tx -> CBOR.Encoding)
7272
-> (forall s . CBOR.Decoder s tx)
73-
-> Codec (TxSubmission2 txid tx) CBOR.DeserialiseFailure m ByteString
73+
-- the codec is polymorphic in annotator. The primary use case is an
74+
-- `Identity` functor or `Annotator LBS.ByteString`.
75+
-> (forall st. SomeMessage st -> annotator st)
76+
-> Codec' (TxSubmission2 txid tx) CBOR.DeserialiseFailure m annotator ByteString
7477
codecTxSubmission2 encodeTxId decodeTxId
75-
encodeTx decodeTx =
78+
encodeTx decodeTx
79+
annotate =
7680
mkCodecCborLazyBS
7781
(encodeTxSubmission2 encodeTxId encodeTx)
7882
decode
7983
where
8084
decode :: forall (pr :: PeerRole) (st :: TxSubmission2 txid tx).
8185
PeerHasAgency pr st
82-
-> forall s. CBOR.Decoder s (SomeMessage st)
86+
-> forall s. CBOR.Decoder s (annotator st)
8387
decode stok = do
8488
len <- CBOR.decodeListLen
8589
key <- CBOR.decodeWord
86-
decodeTxSubmission2 decodeTxId decodeTx stok len key
90+
decodeTxSubmission2 decodeTxId decodeTx annotate stok len key
8791

8892
encodeTxSubmission2
8993
:: forall txid tx.
@@ -149,30 +153,31 @@ encodeTxSubmission2 encodeTxId encodeTx = encode
149153

150154

151155
decodeTxSubmission2
152-
:: forall txid tx.
156+
:: forall txid tx annotator.
153157
(forall s . CBOR.Decoder s txid)
154158
-> (forall s . CBOR.Decoder s tx)
159+
-> (forall st. SomeMessage st -> annotator st)
155160
-> (forall (pr :: PeerRole) (st :: TxSubmission2 txid tx) s.
156161
PeerHasAgency pr st
157162
-> Int
158163
-> Word
159-
-> CBOR.Decoder s (SomeMessage st))
160-
decodeTxSubmission2 decodeTxId decodeTx = decode
164+
-> CBOR.Decoder s (annotator st))
165+
decodeTxSubmission2 decodeTxId decodeTx annotate = decode
161166
where
162167
decode :: forall (pr :: PeerRole) s (st :: TxSubmission2 txid tx).
163168
PeerHasAgency pr st
164169
-> Int
165170
-> Word
166-
-> CBOR.Decoder s (SomeMessage st)
171+
-> CBOR.Decoder s (annotator st)
167172
decode stok len key = do
168173
case (stok, len, key) of
169174
(ClientAgency TokInit, 1, 6) ->
170-
return (SomeMessage MsgInit)
175+
return (annotate $ SomeMessage MsgInit)
171176
(ServerAgency TokIdle, 4, 0) -> do
172177
blocking <- CBOR.decodeBool
173178
ackNo <- NumTxIdsToAck <$> CBOR.decodeWord16
174179
reqNo <- NumTxIdsToReq <$> CBOR.decodeWord16
175-
return $!
180+
return $! annotate $
176181
if blocking
177182
then SomeMessage (MsgRequestTxIds TokBlocking ackNo reqNo)
178183
else SomeMessage (MsgRequestTxIds TokNonBlocking ackNo reqNo)
@@ -187,11 +192,11 @@ decodeTxSubmission2 decodeTxId decodeTx = decode
187192
return (txid, SizeInBytes sz))
188193
case (b, txids) of
189194
(TokBlocking, t:ts) ->
190-
return $
195+
return $ annotate $
191196
SomeMessage (MsgReplyTxIds (BlockingReply (t NonEmpty.:| ts)))
192197

193198
(TokNonBlocking, ts) ->
194-
return $
199+
return $ annotate $
195200
SomeMessage (MsgReplyTxIds (NonBlockingReply ts))
196201

197202
(TokBlocking, []) ->
@@ -201,15 +206,26 @@ decodeTxSubmission2 decodeTxId decodeTx = decode
201206
(ServerAgency TokIdle, 2, 2) -> do
202207
CBOR.decodeListLenIndef
203208
txids <- CBOR.decodeSequenceLenIndef (flip (:)) [] reverse decodeTxId
204-
return (SomeMessage (MsgRequestTxs txids))
209+
return (annotate $ SomeMessage (MsgRequestTxs txids))
205210

206211
(ClientAgency TokTxs, 2, 3) -> do
207212
CBOR.decodeListLenIndef
208213
txids <- CBOR.decodeSequenceLenIndef (flip (:)) [] reverse decodeTx
209-
return (SomeMessage (MsgReplyTxs txids))
214+
-- ^ TODO: `txids -> txs` :grin:
215+
216+
-- TODO: here we have access to bytes from which the message was decoded.
217+
-- we can use `Codec.CBOR.Decoding.decodeWithByteSpan`
218+
-- around each `tx` and wrap each `tx` in `WithBytes`.
219+
--
220+
-- `decodeTxSubmission2` can be polymorphic by adding an
221+
-- extra argument of type
222+
-- `ByteString -> ByteOffSet -> ByteOffset -> tx -> a`
223+
-- this way we could wrap `tx` in `WithBytes` or just
224+
-- return `tx`.
225+
return (annotate $ SomeMessage (MsgReplyTxs txids))
210226

211227
(ClientAgency (TokTxIds TokBlocking), 1, 4) ->
212-
return (SomeMessage MsgDone)
228+
return (annotate $ SomeMessage MsgDone)
213229

214230
--
215231
-- failures per protocol state

0 commit comments

Comments
 (0)