Skip to content

Commit 5948f08

Browse files
committed
netflow: add slow path to Flow func
for case where flow has not been yet synced. follow up on #85
1 parent bee733c commit 5948f08

File tree

2 files changed

+85
-51
lines changed

2 files changed

+85
-51
lines changed

netflow/registry.go

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Registry struct {
2020

2121
hostsFlowsMux sync.Mutex
2222
hostFlows map[string]*Flow
23+
notFoundFlows map[string]struct{}
2324
closeCh chan struct{}
2425
closedCh chan struct{}
2526

@@ -43,7 +44,20 @@ func NewRegistry(httpHost string, d flowstate.Driver, l *slog.Logger) *Registry
4344
}
4445

4546
func (fr *Registry) Flow(id flowstate.TransitionID) (flowstate.Flow, error) {
46-
return fr.fr.Flow(id)
47+
f, err := fr.fr.Flow(id)
48+
49+
// slow path, flow not found locally, might not synced yet
50+
if errors.Is(err, flowstate.ErrFlowNotFound) {
51+
stateCtx := &flowstate.StateCtx{}
52+
if err := fr.d.GetStateByID(flowstate.GetStateByID(stateCtx, flowStateID(id), 0)); err != nil {
53+
return nil, err
54+
}
55+
56+
fr.setFlow(stateCtx.Current)
57+
return fr.fr.Flow(id)
58+
}
59+
60+
return f, nil
4761
}
4862

4963
func (fr *Registry) SetFlow(id flowstate.TransitionID, flow flowstate.Flow) error {
@@ -162,52 +176,54 @@ func (fr *Registry) sync() bool {
162176

163177
for _, state := range res.States {
164178
fr.sinceRev = state.Rev
179+
fr.setFlow(state)
180+
}
165181

166-
if state.Annotations[`flowstate.flow.http_host`] == `` {
167-
fr.l.Warn("state has no 'flowstate.flow.http_host' annotation set, skipping", "state_id", state.ID, "state_rev", state.Rev)
168-
continue
169-
}
170-
httpHost := state.Annotations[`flowstate.flow.http_host`]
182+
return res.More
183+
}
171184

172-
// local flow, skip
173-
if httpHost == fr.httpHost {
174-
continue
175-
}
185+
func (fr *Registry) setFlow(state flowstate.State) {
186+
if state.Annotations[`flowstate.flow.http_host`] == `` {
187+
fr.l.Warn("state has no 'flowstate.flow.http_host' annotation set, skipping", "state_id", state.ID, "state_rev", state.Rev)
188+
}
189+
httpHost := state.Annotations[`flowstate.flow.http_host`]
176190

177-
if state.Annotations[`flowstate.flow.transition_id`] == `` {
178-
fr.l.Warn("flow state has no 'flowstate.flow.transition_id' annotation set, skipping", "state_id", state.ID, "state_rev", state.Rev)
179-
continue
180-
}
181-
tsID := flowstate.TransitionID(state.Annotations[`flowstate.flow.transition_id`])
191+
// local flow, skip
192+
if httpHost == fr.httpHost {
193+
return
194+
}
182195

183-
if flowstate.Ended(state) {
184-
if err := fr.fr.UnsetFlow(tsID); err != nil {
185-
fr.l.Warn("flow registry: unset flow failed", "error", err, "transition_id", tsID, "state_id", state.ID, "state_rev", state.Rev)
186-
}
196+
if state.Annotations[`flowstate.flow.transition_id`] == `` {
197+
fr.l.Warn("flow state has no 'flowstate.flow.transition_id' annotation set, skipping", "state_id", state.ID, "state_rev", state.Rev)
198+
return
199+
}
200+
tsID := flowstate.TransitionID(state.Annotations[`flowstate.flow.transition_id`])
187201

188-
continue
202+
if flowstate.Ended(state) {
203+
if err := fr.fr.UnsetFlow(tsID); err != nil {
204+
fr.l.Warn("flow registry: unset flow failed", "error", err, "transition_id", tsID, "state_id", state.ID, "state_rev", state.Rev)
189205
}
190206

191-
fr.hostsFlowsMux.Lock()
192-
193-
if fr.hostFlows == nil {
194-
fr.hostFlows = make(map[string]*Flow)
195-
}
207+
return
208+
}
196209

197-
f, ok := fr.hostFlows[httpHost]
198-
if !ok {
199-
f = New(httpHost)
200-
fr.hostFlows[httpHost] = f
201-
}
210+
fr.hostsFlowsMux.Lock()
202211

203-
fr.hostsFlowsMux.Unlock()
212+
if fr.hostFlows == nil {
213+
fr.hostFlows = make(map[string]*Flow)
214+
}
204215

205-
if err := fr.fr.SetFlow(tsID, f); err != nil {
206-
fr.l.Warn("flow registry: set flow failed", "error", err, "transition_id", tsID, "state_id", state.ID, "state_rev", state.Rev)
207-
}
216+
f, ok := fr.hostFlows[httpHost]
217+
if !ok {
218+
f = New(httpHost)
219+
fr.hostFlows[httpHost] = f
208220
}
209221

210-
return res.More
222+
fr.hostsFlowsMux.Unlock()
223+
224+
if err := fr.fr.SetFlow(tsID, f); err != nil {
225+
fr.l.Warn("flow registry: set flow failed", "error", err, "transition_id", tsID, "state_id", state.ID, "state_rev", state.Rev)
226+
}
211227
}
212228

213229
func flowStateID(tsID flowstate.TransitionID) flowstate.StateID {

netflow/registry_synctest_test.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,7 @@ func TestRegistry(t *testing.T) {
4444
})); err != nil {
4545
t.Fatalf("expected no error, got %v", err)
4646
}
47-
48-
// only local flows should be available
49-
50-
if _, err := firstFR.Flow(`aFlowOnFirstFR`); err != nil {
51-
t.Fatalf("expected no error, got %v", err)
52-
}
53-
if _, err := firstFR.Flow(`aFlowOnSecondFR`); err == nil {
54-
t.Fatalf("expected no error, got %v", err)
55-
}
56-
if _, err := secondFR.Flow(`aFlowOnSecondFR`); err != nil {
57-
t.Fatalf("expected no error, got %v", err)
58-
}
59-
if _, err := secondFR.Flow(`aFlowOnFirstFR`); err == nil {
60-
t.Fatalf("expected no error, got %v", err)
61-
}
62-
47+
6348
time.Sleep(time.Second * 7)
6449

6550
// now we should be able to get all flows
@@ -143,3 +128,36 @@ func TestRegistry(t *testing.T) {
143128
}
144129
})
145130
}
131+
132+
func TestRegistry_Flow_SlowPath(t *testing.T) {
133+
synctest.Run(func() {
134+
lh := slogassert.New(t, slog.LevelDebug, nil)
135+
l := slog.New(slogassert.New(t, slog.LevelDebug, lh))
136+
l = slog.Default()
137+
138+
d := memdriver.New(l)
139+
140+
fr := netflow.NewRegistry(`http://aHost:8080`, d, l)
141+
defer fr.Close()
142+
143+
// make sure we skipped the first round of Registry.watchFlows call
144+
time.Sleep(time.Second * 5)
145+
146+
// simulate a flow registered by someone else, but not yet available in the registry
147+
stateCtx := &flowstate.StateCtx{
148+
Current: flowstate.State{
149+
ID: `flowstate.flow.aFlowID`,
150+
},
151+
}
152+
stateCtx.Current.SetLabel(`flow.type`, `remote`)
153+
stateCtx.Current.SetAnnotation(`flowstate.flow.transition_id`, `aFlowID`)
154+
stateCtx.Current.SetAnnotation(`flowstate.flow.http_host`, `http://anotherHost:8080`)
155+
if err := d.Commit(flowstate.Commit(flowstate.Pause(stateCtx))); err != nil {
156+
t.Fatalf("expected no error, got %v", err)
157+
}
158+
159+
if _, err := fr.Flow(`aFlowID`); err != nil {
160+
t.Fatalf("expected no error, got %v", err)
161+
}
162+
})
163+
}

0 commit comments

Comments
 (0)