Skip to content

Commit 56b9fc2

Browse files
committed
netdriver: Implement driver that works over the network
1 parent 9e48622 commit 56b9fc2

File tree

13 files changed

+891
-45
lines changed

13 files changed

+891
-45
lines changed

badgerdriver/driver.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
310310
for {
311311

312312
if err := d.db.Update(func(txn *badger.Txn) error {
313-
for i, subCmd0 := range cmd.Commands {
313+
for _, subCmd0 := range cmd.Commands {
314314
nextRev, err := getRev()
315315
if err != nil {
316316
return fmt.Errorf("get next sequence: %w", err)
@@ -335,9 +335,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
335335
return err
336336
}
337337
if stateCtx.Committed.Rev != commitedRev {
338-
conflictErr := &flowstate.ErrRevMismatch{}
339-
conflictErr.Add(fmt.Sprintf("%T", cmd.Commands[i]), stateCtx.Current.ID, nil)
340-
return conflictErr
338+
return &flowstate.ErrRevMismatch{IDS: []flowstate.StateID{stateCtx.Current.ID}}
341339
}
342340

343341
commitedState := stateCtx.Current.CopyTo(&flowstate.State{})

delay.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ func NewDelayer(e Engine, l *slog.Logger) (*Delayer, error) {
173173
} else if err != nil {
174174
return nil, fmt.Errorf("commit meta state: %w", err)
175175
}
176+
} else if err != nil {
177+
return nil, fmt.Errorf("get meta state: %w", err)
176178
}
179+
177180
d.metaStateCtx = metaStateCtx
178181
d.commitSince, d.commitOffset = getDelayerMetaState(metaStateCtx)
179182
d.since, d.offset = d.commitSince, d.commitOffset
@@ -346,7 +349,7 @@ func getDelayerMetaState(metaStateCtx *StateCtx) (time.Time, int64) {
346349
offset0 := metaStateCtx.Current.Annotations[`flowstate.delayer.offset`]
347350
offset, err := strconv.ParseInt(offset0, 10, 64)
348351
if err != nil {
349-
panic(fmt.Errorf("cannot parse flowstate.delayer.offset=%s into int64: %w", offset0, err))
352+
panic(fmt.Errorf("cannot parse flowstate.delayer.offset='%s' into int64: %w", offset0, err))
350353
}
351354

352355
since0 := metaStateCtx.Current.Annotations[`flowstate.delayer.since`]

errors.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package flowstate
22

3-
import "errors"
3+
import (
4+
"errors"
5+
)
46

57
// ErrRevMismatch is an error that indicates a revision mismatch during a commit operation.
68
type ErrRevMismatch struct {
7-
cmds []string
8-
stateIDs []StateID
9-
errs []error
9+
IDS []StateID
1010
}
1111

1212
func (err ErrRevMismatch) As(target interface{}) bool {
@@ -19,30 +19,28 @@ func (err ErrRevMismatch) As(target interface{}) bool {
1919
}
2020

2121
func (err ErrRevMismatch) Error() string {
22-
msg := "conflict;"
23-
for i := range err.cmds {
24-
msg += " cmd: " + err.cmds[i] + " sid: " + string(err.stateIDs[i]) + ";"
25-
if err.errs[i] != nil {
26-
msg += " err: " + err.errs[i].Error() + ";"
22+
msg := "rev mismatch: "
23+
for i, id := range err.IDS {
24+
if i > 0 {
25+
msg += ", "
2726
}
28-
}
2927

28+
msg += string(id)
29+
}
3030
return msg
3131
}
3232

33-
func (err *ErrRevMismatch) Add(cmd string, sID StateID, cmdErr error) {
34-
err.cmds = append(err.cmds, cmd)
35-
err.stateIDs = append(err.stateIDs, sID)
36-
err.errs = append(err.errs, cmdErr)
33+
func (err *ErrRevMismatch) Add(id StateID) {
34+
err.IDS = append(err.IDS, id)
3735
}
3836

39-
func (err *ErrRevMismatch) TaskIDs() []StateID {
40-
return err.stateIDs
37+
func (err *ErrRevMismatch) All() []StateID {
38+
return err.IDS
4139
}
4240

43-
func (err *ErrRevMismatch) Contains(sID StateID) bool {
44-
for _, s := range err.stateIDs {
45-
if s == sID {
41+
func (err *ErrRevMismatch) Contains(id StateID) bool {
42+
for _, s := range err.IDS {
43+
if s == id {
4644
return true
4745
}
4846
}

memdriver/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
192192
}
193193

194194
if _, rev := d.stateLog.GetLatestByID(stateCtx.Current.ID); rev != stateCtx.Committed.Rev {
195-
conflictErr := &flowstate.ErrRevMismatch{}
196-
conflictErr.Add(fmt.Sprintf("%T", cmd), stateCtx.Current.ID, fmt.Errorf("rev mismatch"))
197-
return conflictErr
195+
return &flowstate.ErrRevMismatch{IDS: []flowstate.StateID{stateCtx.Current.ID}}
198196
}
199197

200198
d.stateLog.Append(stateCtx)

0 commit comments

Comments
 (0)