Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions badgerdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
for {

if err := d.db.Update(func(txn *badger.Txn) error {
for i, subCmd0 := range cmd.Commands {
for _, subCmd0 := range cmd.Commands {
nextRev, err := getRev()
if err != nil {
return fmt.Errorf("get next sequence: %w", err)
Expand All @@ -335,9 +335,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
return err
}
if stateCtx.Committed.Rev != commitedRev {
conflictErr := &flowstate.ErrRevMismatch{}
conflictErr.Add(fmt.Sprintf("%T", cmd.Commands[i]), stateCtx.Current.ID, nil)
return conflictErr
return &flowstate.ErrRevMismatch{IDS: []flowstate.StateID{stateCtx.Current.ID}}
}

commitedState := stateCtx.Current.CopyTo(&flowstate.State{})
Expand Down
5 changes: 4 additions & 1 deletion delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func NewDelayer(e Engine, l *slog.Logger) (*Delayer, error) {
} else if err != nil {
return nil, fmt.Errorf("commit meta state: %w", err)
}
} else if err != nil {
return nil, fmt.Errorf("get meta state: %w", err)
}

d.metaStateCtx = metaStateCtx
d.commitSince, d.commitOffset = getDelayerMetaState(metaStateCtx)
d.since, d.offset = d.commitSince, d.commitOffset
Expand Down Expand Up @@ -346,7 +349,7 @@ func getDelayerMetaState(metaStateCtx *StateCtx) (time.Time, int64) {
offset0 := metaStateCtx.Current.Annotations[`flowstate.delayer.offset`]
offset, err := strconv.ParseInt(offset0, 10, 64)
if err != nil {
panic(fmt.Errorf("cannot parse flowstate.delayer.offset=%s into int64: %w", offset0, err))
panic(fmt.Errorf("cannot parse flowstate.delayer.offset='%s' into int64: %w", offset0, err))
}

since0 := metaStateCtx.Current.Annotations[`flowstate.delayer.since`]
Expand Down
36 changes: 17 additions & 19 deletions errors.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package flowstate

import "errors"
import (
"errors"
)

// ErrRevMismatch is an error that indicates a revision mismatch during a commit operation.
type ErrRevMismatch struct {
cmds []string
stateIDs []StateID
errs []error
IDS []StateID
}

func (err ErrRevMismatch) As(target interface{}) bool {
Expand All @@ -19,30 +19,28 @@ func (err ErrRevMismatch) As(target interface{}) bool {
}

func (err ErrRevMismatch) Error() string {
msg := "conflict;"
for i := range err.cmds {
msg += " cmd: " + err.cmds[i] + " sid: " + string(err.stateIDs[i]) + ";"
if err.errs[i] != nil {
msg += " err: " + err.errs[i].Error() + ";"
msg := "rev mismatch: "
for i, id := range err.IDS {
if i > 0 {
msg += ", "
}
}

msg += string(id)
}
return msg
}

func (err *ErrRevMismatch) Add(cmd string, sID StateID, cmdErr error) {
err.cmds = append(err.cmds, cmd)
err.stateIDs = append(err.stateIDs, sID)
err.errs = append(err.errs, cmdErr)
func (err *ErrRevMismatch) Add(id StateID) {
err.IDS = append(err.IDS, id)
}

func (err *ErrRevMismatch) TaskIDs() []StateID {
return err.stateIDs
func (err *ErrRevMismatch) All() []StateID {
return err.IDS
}

func (err *ErrRevMismatch) Contains(sID StateID) bool {
for _, s := range err.stateIDs {
if s == sID {
func (err *ErrRevMismatch) Contains(id StateID) bool {
for _, s := range err.IDS {
if s == id {
return true
}
}
Expand Down
4 changes: 1 addition & 3 deletions memdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ func (d *Driver) Commit(cmd *flowstate.CommitCommand) error {
}

if _, rev := d.stateLog.GetLatestByID(stateCtx.Current.ID); rev != stateCtx.Committed.Rev {
conflictErr := &flowstate.ErrRevMismatch{}
conflictErr.Add(fmt.Sprintf("%T", cmd), stateCtx.Current.ID, fmt.Errorf("rev mismatch"))
return conflictErr
return &flowstate.ErrRevMismatch{IDS: []flowstate.StateID{stateCtx.Current.ID}}
}

d.stateLog.Append(stateCtx)
Expand Down
Loading