Skip to content

Commit a433b7e

Browse files
authored
Merge pull request #71 from makasim/move-recovery-pkg-to-flowstate
Move recovery logic into flowstate package
2 parents a160fdc + 7bdad64 commit a433b7e

File tree

3 files changed

+164
-173
lines changed

3 files changed

+164
-173
lines changed

recovery/recoverer.go renamed to recovery.go

Lines changed: 132 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package recovery
1+
package flowstate
22

33
import (
44
"context"
@@ -8,16 +8,92 @@ import (
88
"strconv"
99
"sync"
1010
"time"
11-
12-
"github.com/makasim/flowstate"
1311
)
1412

15-
var stateID = flowstate.StateID(`flowstate.recovery.state`)
13+
var RecoveryEnabledAnnotation = `flowstate.recovery.enabled`
14+
15+
func DisableRecovery(stateCtx *StateCtx) {
16+
stateCtx.Current.SetAnnotation(RecoveryEnabledAnnotation, "false")
17+
}
18+
19+
func recoveryEnabled(state State) bool {
20+
return state.Annotations[RecoveryEnabledAnnotation] != "false"
21+
}
22+
23+
var RecoveryAttemptAnnotation = `flowstate.recovery.attempt`
24+
25+
func RecoveryAttempt(state State) int {
26+
attempt, _ := strconv.Atoi(state.Transition.Annotations[RecoveryAttemptAnnotation])
27+
return attempt
28+
}
29+
30+
func setRecoveryAttempt(stateCtx *StateCtx, attempt int) {
31+
stateCtx.Current.Transition.SetAnnotation(RecoveryAttemptAnnotation, strconv.Itoa(attempt))
32+
}
33+
34+
var DefaultMaxRecoveryAttempts = 3
35+
var MaxRecoveryAttemptsAnnotation = `flowstate.recovery.max_attempts`
36+
37+
func MaxRecoveryAttempts(state State) int {
38+
attempt, _ := strconv.Atoi(state.Annotations[MaxRecoveryAttemptsAnnotation])
39+
if attempt <= 0 {
40+
return DefaultMaxRecoveryAttempts
41+
}
42+
43+
return attempt
44+
}
45+
46+
func SetMaxRecoveryAttempts(stateCtx *StateCtx, attempts int) {
47+
if attempts <= 0 {
48+
attempts = DefaultMaxRecoveryAttempts
49+
}
50+
51+
stateCtx.Current.SetAnnotation(MaxRecoveryAttemptsAnnotation, strconv.Itoa(attempts))
52+
}
53+
54+
var DefaultRetryAfter = time.Minute * 2
55+
var MinRetryAfter = time.Minute
56+
var MaxRetryAfter = time.Minute * 5
57+
var RetryAfterAnnotation = `flowstate.recovery.retry_after`
58+
59+
func SetRetryAfter(stateCtx *StateCtx, retryAfter time.Duration) {
60+
if retryAfter < MinRetryAfter {
61+
retryAfter = MinRetryAfter
62+
}
63+
if retryAfter > MaxRetryAfter {
64+
retryAfter = MaxRetryAfter
65+
}
66+
67+
stateCtx.Current.SetAnnotation(RetryAfterAnnotation, retryAfter.String())
68+
}
69+
70+
func retryAt(state State) time.Time {
71+
retryAfterStr := state.Annotations[RetryAfterAnnotation]
72+
if retryAfterStr == "" {
73+
retryAfterStr = DefaultRetryAfter.String()
74+
}
75+
76+
retryAfter, err := time.ParseDuration(retryAfterStr)
77+
if err != nil {
78+
return state.CommittedAt().Add(MinRetryAfter)
79+
}
80+
81+
if retryAfter < MinRetryAfter {
82+
retryAfter = MinRetryAfter
83+
}
84+
if retryAfter > MaxRetryAfter {
85+
retryAfter = MaxRetryAfter
86+
}
87+
88+
return state.CommittedAt().Add(retryAfter)
89+
}
90+
91+
var recoveryStateID = StateID(`flowstate.recovery.meta`)
1692

1793
type Recoverer struct {
1894
mux sync.Mutex
1995

20-
recoveryStateCtx *flowstate.StateCtx
96+
recoveryStateCtx *StateCtx
2197

2298
active bool
2399
sinceRev int64
@@ -26,7 +102,7 @@ type Recoverer struct {
26102
tailRev int64
27103
tailTime time.Time
28104

29-
states map[flowstate.StateID]retryableState
105+
states map[StateID]retryableState
30106
statesMaxSize int
31107
statesMaxTailHeadDur time.Duration
32108

@@ -36,13 +112,13 @@ type Recoverer struct {
36112
dropped int64
37113
commited int64
38114

39-
e flowstate.Engine
115+
e Engine
40116
doneCh chan struct{}
41117
stoppedCh chan struct{}
42118
l *slog.Logger
43119
}
44120

45-
func New(e flowstate.Engine, l *slog.Logger) *Recoverer {
121+
func NewRecoverer(e Engine, l *slog.Logger) *Recoverer {
46122
return &Recoverer{
47123
e: e,
48124
l: l,
@@ -56,21 +132,21 @@ func New(e flowstate.Engine, l *slog.Logger) *Recoverer {
56132
}
57133

58134
func (r *Recoverer) Init() error {
59-
recoverStateCtx := &flowstate.StateCtx{}
135+
recoverStateCtx := &StateCtx{}
60136
active := true
61-
if err := r.e.Do(flowstate.GetByID(recoverStateCtx, stateID, 0)); errors.Is(err, flowstate.ErrNotFound) {
62-
recoverStateCtx = &flowstate.StateCtx{
63-
Current: flowstate.State{
64-
ID: stateID,
137+
if err := r.e.Do(GetByID(recoverStateCtx, recoveryStateID, 0)); errors.Is(err, ErrNotFound) {
138+
recoverStateCtx = &StateCtx{
139+
Current: State{
140+
ID: recoveryStateID,
65141
Rev: 0,
66142
},
67143
}
68-
Disable(recoverStateCtx)
69-
setSinceRev(recoverStateCtx, 0)
144+
DisableRecovery(recoverStateCtx)
145+
setRecoverySinceRev(recoverStateCtx, 0)
70146

71-
if err := r.e.Do(flowstate.Commit(
72-
flowstate.Transit(recoverStateCtx, `na`),
73-
)); flowstate.IsErrRevMismatch(err) {
147+
if err := r.e.Do(Commit(
148+
Transit(recoverStateCtx, `na`),
149+
)); IsErrRevMismatch(err) {
74150
// another process is already doing recovery, we can continue in standby mode
75151
active = false
76152
} else if err != nil {
@@ -80,7 +156,7 @@ func (r *Recoverer) Init() error {
80156
} else if err != nil {
81157
return fmt.Errorf("get recovery state: %w", err)
82158
} else {
83-
active = flowstate.Paused(recoverStateCtx.Current)
159+
active = Paused(recoverStateCtx.Current)
84160
}
85161

86162
r.reset(recoverStateCtx, active)
@@ -110,8 +186,8 @@ func (r *Recoverer) Shutdown(ctx context.Context) error {
110186

111187
select {
112188
case <-r.stoppedCh:
113-
setSinceRev(r.recoveryStateCtx, r.nextSinceRev())
114-
if err := r.e.Do(flowstate.Commit(flowstate.Pause(r.recoveryStateCtx))); flowstate.IsErrRevMismatch(err) {
189+
setRecoverySinceRev(r.recoveryStateCtx, r.nextSinceRev())
190+
if err := r.e.Do(Commit(Pause(r.recoveryStateCtx))); IsErrRevMismatch(err) {
115191
return nil
116192
} else if err != nil {
117193
return fmt.Errorf("commit: pause recovery state: %w", err)
@@ -125,7 +201,7 @@ func (r *Recoverer) Shutdown(ctx context.Context) error {
125201
}
126202
}
127203

128-
type Stats struct {
204+
type RecovererStats struct {
129205
HeadRev int64
130206
HeadTime time.Time
131207
TailRev int64
@@ -140,11 +216,11 @@ type Stats struct {
140216
Active bool
141217
}
142218

143-
func (r *Recoverer) Stats() Stats {
219+
func (r *Recoverer) Stats() RecovererStats {
144220
r.mux.Lock()
145221
defer r.mux.Unlock()
146222

147-
return Stats{
223+
return RecovererStats{
148224
HeadRev: r.headRev,
149225
HeadTime: r.headTime,
150226
TailRev: r.tailRev,
@@ -198,7 +274,7 @@ func (r *Recoverer) doUpdateHead(dur time.Duration) error {
198274
// return nil
199275
//}
200276

201-
getManyCmd := flowstate.GetManyByLabels(nil).WithSinceRev(r.sinceRev)
277+
getManyCmd := GetManyByLabels(nil).WithSinceRev(r.sinceRev)
202278
if err := r.e.Do(getManyCmd); err != nil {
203279
return fmt.Errorf("get many states: %w; since_rev=%d", err, r.sinceRev)
204280
}
@@ -211,16 +287,16 @@ func (r *Recoverer) doUpdateHead(dur time.Duration) error {
211287
for _, state := range res.States {
212288
r.sinceRev = state.Rev
213289

214-
if state.ID == stateID {
290+
if state.ID == recoveryStateID {
215291
r.recoveryStateCtx = state.CopyToCtx(r.recoveryStateCtx)
216292
if state.Rev > r.recoveryStateCtx.Committed.Rev {
217-
active := flowstate.Paused(state)
293+
active := Paused(state)
218294
r.reset(r.recoveryStateCtx, active)
219295
}
220296
continue
221297
}
222298

223-
if !enabled(state) {
299+
if !recoveryEnabled(state) {
224300
continue
225301
}
226302

@@ -234,15 +310,15 @@ func (r *Recoverer) doUpdateHead(dur time.Duration) error {
234310
if !r.active {
235311
continue
236312
}
237-
if flowstate.Ended(state) || flowstate.Paused(state) {
313+
if Ended(state) || Paused(state) {
238314
delete(r.states, state.ID)
239315
r.completed++
240316

241317
continue
242318
}
243319

244320
r.states[state.ID] = retryableState{
245-
State: state.CopyTo(&flowstate.State{}),
321+
State: state.CopyTo(&State{}),
246322
retryAt: retryAt(state),
247323
}
248324
r.added++
@@ -295,10 +371,10 @@ func (r *Recoverer) doUpdateTail() error {
295371

296372
if !r.active {
297373
commitedAt := r.recoveryStateCtx.Committed.CommittedAt()
298-
if (commitedAt.Add(MaxRetryAfter+time.Minute).Before(time.Now()) && r.nextSinceRev() > getSinceRev(r.recoveryStateCtx)) ||
299-
flowstate.Paused(r.recoveryStateCtx.Current) {
300-
nextRecoveryStateCtx := r.recoveryStateCtx.CopyTo(&flowstate.StateCtx{})
301-
if err := r.e.Do(flowstate.Commit(flowstate.Resume(r.recoveryStateCtx))); flowstate.IsErrRevMismatch(err) {
374+
if (commitedAt.Add(MaxRetryAfter+time.Minute).Before(time.Now()) && r.nextSinceRev() > getRecoverySinceRev(r.recoveryStateCtx)) ||
375+
Paused(r.recoveryStateCtx.Current) {
376+
nextRecoveryStateCtx := r.recoveryStateCtx.CopyTo(&StateCtx{})
377+
if err := r.e.Do(Commit(Resume(r.recoveryStateCtx))); IsErrRevMismatch(err) {
302378
r.reset(r.recoveryStateCtx, false)
303379
return nil
304380
} else if err != nil {
@@ -331,12 +407,12 @@ func (r *Recoverer) doUpdateTail() error {
331407
r.tailTime = tailTime
332408

333409
commitedAt := r.recoveryStateCtx.Committed.CommittedAt()
334-
if r.tailRev > getSinceRev(r.recoveryStateCtx)+1000 ||
335-
(commitedAt.Add(MaxRetryAfter).Before(now) && r.nextSinceRev() > getSinceRev(r.recoveryStateCtx)) {
336-
nextStateCtx := r.recoveryStateCtx.CopyTo(&flowstate.StateCtx{})
410+
if r.tailRev > getRecoverySinceRev(r.recoveryStateCtx)+1000 ||
411+
(commitedAt.Add(MaxRetryAfter).Before(now) && r.nextSinceRev() > getRecoverySinceRev(r.recoveryStateCtx)) {
412+
nextStateCtx := r.recoveryStateCtx.CopyTo(&StateCtx{})
337413

338-
setSinceRev(nextStateCtx, r.nextSinceRev())
339-
if err := r.e.Do(flowstate.Commit(flowstate.Resume(nextStateCtx))); flowstate.IsErrRevMismatch(err) {
414+
setRecoverySinceRev(nextStateCtx, r.nextSinceRev())
415+
if err := r.e.Do(Commit(Resume(nextStateCtx))); IsErrRevMismatch(err) {
340416
r.reset(r.recoveryStateCtx, false)
341417
return nil
342418
} else if err != nil {
@@ -355,25 +431,25 @@ func (r *Recoverer) doRetry() error {
355431
return nil
356432
}
357433

358-
states := make([]flowstate.State, 0)
434+
states := make([]State, 0)
359435

360436
for id, retState := range r.states {
361437
if retState.retryAt.After(r.headTime) {
362438
continue
363439
}
364440

365-
states = append(states, retState.State.CopyTo(&flowstate.State{}))
441+
states = append(states, retState.State.CopyTo(&State{}))
366442
delete(r.states, id)
367443
}
368444

369445
for _, state := range states {
370-
maxAttempts := MaxAttempts(state)
371-
attempt := Attempt(state) + 1
372-
stateCtx := state.CopyToCtx(&flowstate.StateCtx{})
446+
maxAttempts := MaxRecoveryAttempts(state)
447+
attempt := RecoveryAttempt(state) + 1
448+
stateCtx := state.CopyToCtx(&StateCtx{})
373449

374-
setAttempt(stateCtx, attempt)
450+
setRecoveryAttempt(stateCtx, attempt)
375451
if attempt > maxAttempts {
376-
if err := r.e.Do(flowstate.Commit(flowstate.End(stateCtx))); flowstate.IsErrRevMismatch(err) {
452+
if err := r.e.Do(Commit(End(stateCtx))); IsErrRevMismatch(err) {
377453
continue
378454
} else if err != nil {
379455
return fmt.Errorf("commit state %s:%d reached max retry attempts %d and forcfully ended: %s", state.ID, state.Rev, maxAttempts, err)
@@ -384,9 +460,9 @@ func (r *Recoverer) doRetry() error {
384460
}
385461

386462
if err := r.e.Do(
387-
flowstate.Commit(flowstate.CommitStateCtx(stateCtx)),
388-
flowstate.Execute(stateCtx),
389-
); flowstate.IsErrRevMismatch(err) {
463+
Commit(CommitStateCtx(stateCtx)),
464+
Execute(stateCtx),
465+
); IsErrRevMismatch(err) {
390466
continue
391467
} else if err != nil {
392468
return fmt.Errorf("commit state %s:%d recovery attempt %d: %s", state.ID, state.Rev, attempt, err)
@@ -398,17 +474,17 @@ func (r *Recoverer) doRetry() error {
398474
return nil
399475
}
400476

401-
func (r *Recoverer) reset(recoveryStateCtx *flowstate.StateCtx, active bool) {
477+
func (r *Recoverer) reset(recoveryStateCtx *StateCtx, active bool) {
402478
r.active = active
403-
r.recoveryStateCtx = recoveryStateCtx.CopyTo(&flowstate.StateCtx{})
479+
r.recoveryStateCtx = recoveryStateCtx.CopyTo(&StateCtx{})
404480

405-
r.sinceRev = getSinceRev(r.recoveryStateCtx)
481+
r.sinceRev = getRecoverySinceRev(r.recoveryStateCtx)
406482
r.headRev = r.sinceRev
407483
r.headTime = time.Time{}
408484
r.tailRev = r.headRev
409485
r.tailTime = time.Time{}
410486

411-
r.states = make(map[flowstate.StateID]retryableState)
487+
r.states = make(map[StateID]retryableState)
412488
}
413489

414490
func (r *Recoverer) nextSinceRev() int64 {
@@ -419,7 +495,7 @@ func (r *Recoverer) nextSinceRev() int64 {
419495
return r.tailRev - 1
420496
}
421497

422-
func getSinceRev(stateCtx *flowstate.StateCtx) int64 {
498+
func getRecoverySinceRev(stateCtx *StateCtx) int64 {
423499
sinceRevStr := stateCtx.Current.Annotations[`flowstate.recovery.since_rev`]
424500
if sinceRevStr == "" {
425501
return 0
@@ -429,11 +505,11 @@ func getSinceRev(stateCtx *flowstate.StateCtx) int64 {
429505
return sinceRev
430506
}
431507

432-
func setSinceRev(stateCtx *flowstate.StateCtx, sinceRev int64) {
508+
func setRecoverySinceRev(stateCtx *StateCtx, sinceRev int64) {
433509
stateCtx.Current.SetAnnotation(`flowstate.recovery.since_rev`, strconv.FormatInt(sinceRev, 10))
434510
}
435511

436512
type retryableState struct {
437-
flowstate.State
513+
State
438514
retryAt time.Time
439515
}

0 commit comments

Comments
 (0)