Skip to content

Commit 57d5769

Browse files
authored
Merge pull request #84 from makasim/netflow
Flow execute over network
2 parents 0469cae + e6cfc62 commit 57d5769

File tree

4 files changed

+313
-18
lines changed

4 files changed

+313
-18
lines changed

.github/workflows/go.yml

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,51 @@ on:
1111

1212
jobs:
1313

14+
run-examples:
15+
runs-on: ubuntu-latest
16+
17+
steps:
18+
- uses: actions/checkout@v4
19+
20+
- name: Set up Go
21+
uses: actions/setup-go@v4
22+
with:
23+
go-version: '1.24'
24+
25+
- name: Examples
26+
run: |
27+
set -e # Exit on any error
28+
29+
for file in examples/*.go; do
30+
if [[ "$file" == "examples/setup.go" ]]; then
31+
continue
32+
fi
33+
34+
echo "Running $file"
35+
go run "$file" examples/setup.go
36+
echo "----------------------"
37+
echo
38+
done
39+
1440
build:
1541
runs-on: ubuntu-latest
1642

43+
steps:
44+
- uses: actions/checkout@v4
45+
46+
- name: Set up Go
47+
uses: actions/setup-go@v4
48+
with:
49+
go-version: '1.24'
50+
51+
- name: build
52+
run: |
53+
rm -rf examples/
54+
go build -v ./...
55+
56+
test:
57+
runs-on: ubuntu-latest
58+
1759
services:
1860
postgres:
1961
image: postgres:16
@@ -37,29 +79,12 @@ jobs:
3779
with:
3880
go-version: '1.24'
3981

40-
- name: Build
41-
run: go build ./...
42-
4382
- name: Test
4483
run: |
4584
until pg_isready -h 127.0.0.1 -p 5432 -U user; do
4685
echo "Waiting for Postgres...";
4786
sleep 3;
4887
done
4988
89+
rm -rf examples/
5090
GOEXPERIMENT=synctest GODEBUG=asynctimerchan=0 go test -v ./...
51-
52-
- name: Examples
53-
run: |
54-
set -e # Exit on any error
55-
56-
for file in examples/*.go; do
57-
if [[ "$file" == "examples/setup.go" ]]; then
58-
continue
59-
fi
60-
61-
echo "Running $file"
62-
go run "$file" examples/setup.go
63-
echo "----------------------"
64-
echo
65-
done

netflow/flow.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package netflow
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
10+
"github.com/VictoriaMetrics/easyproto"
11+
"github.com/makasim/flowstate"
12+
)
13+
14+
type Config struct {
15+
}
16+
17+
type Flow struct {
18+
httpHost string
19+
20+
c *http.Client
21+
}
22+
23+
func New(httpHost string) *Flow {
24+
return &Flow{
25+
httpHost: httpHost,
26+
27+
c: &http.Client{},
28+
}
29+
}
30+
31+
func (f *Flow) Execute(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) {
32+
b := flowstate.MarshalStateCtx(stateCtx, nil)
33+
req, err := http.NewRequest(`POST`, strings.TrimRight(f.httpHost, `/`)+`/flowstate.v1.Flow/Execute`, bytes.NewBuffer(b))
34+
if err != nil {
35+
return nil, fmt.Errorf("new request: %w", err)
36+
}
37+
38+
resp, err := f.c.Do(req)
39+
if err != nil {
40+
return nil, fmt.Errorf("do request: %w", err)
41+
}
42+
defer resp.Body.Close()
43+
44+
b, err = io.ReadAll(resp.Body)
45+
if err != nil {
46+
return nil, fmt.Errorf("read response body: %w", err)
47+
}
48+
49+
if http.StatusOK != resp.StatusCode {
50+
code, message, err := unmarshalError(b)
51+
if err != nil {
52+
return nil, fmt.Errorf("response status code: %d; unmarshal error: %s", resp.StatusCode, err)
53+
}
54+
55+
return nil, fmt.Errorf("%s: %s", code, message)
56+
}
57+
58+
cmd, err := flowstate.UnmarshalCommand(b)
59+
if err != nil {
60+
return nil, fmt.Errorf("unmarshal response: %w", err)
61+
}
62+
return cmd, nil
63+
}
64+
65+
func unmarshalError(src []byte) (code, message string, err error) {
66+
var fc easyproto.FieldContext
67+
for len(src) > 0 {
68+
src, err = fc.NextField(src)
69+
if err != nil {
70+
return "", "", fmt.Errorf("cannot read next field")
71+
}
72+
switch fc.FieldNum {
73+
case 1:
74+
v, ok := fc.String()
75+
if !ok {
76+
return "", "", fmt.Errorf("cannot read code field")
77+
}
78+
code = strings.Clone(v)
79+
case 2:
80+
v, ok := fc.String()
81+
if !ok {
82+
return "", "", fmt.Errorf("cannot read message field")
83+
}
84+
message = strings.Clone(v)
85+
}
86+
}
87+
88+
return code, message, nil
89+
}

netflow/flow_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package netflow_test
2+
3+
import (
4+
"fmt"
5+
"log/slog"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/makasim/flowstate"
12+
"github.com/makasim/flowstate/memdriver"
13+
"github.com/makasim/flowstate/netflow"
14+
"github.com/thejerf/slogassert"
15+
)
16+
17+
func TestFlowExecute(t *testing.T) {
18+
lh := slogassert.New(t, slog.LevelDebug, nil)
19+
l := slog.New(slogassert.New(t, slog.LevelDebug, lh))
20+
21+
d := memdriver.New(l)
22+
executedCh := make(chan struct{})
23+
if err := d.SetFlow(`aFlow`, flowstate.FlowFunc(func(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) {
24+
close(executedCh)
25+
return flowstate.End(stateCtx), nil
26+
})); err != nil {
27+
t.Fatalf("failed to set flow: %v", err)
28+
}
29+
30+
e, err := flowstate.NewEngine(d, l)
31+
if err != nil {
32+
t.Fatalf("failed to create flowstate engine: %v", err)
33+
}
34+
35+
srv := startSrv(t, e)
36+
37+
f := netflow.New(srv.URL)
38+
39+
stateCtx := &flowstate.StateCtx{
40+
Current: flowstate.State{
41+
ID: `anID`,
42+
Transition: flowstate.Transition{
43+
To: `aFlow`,
44+
},
45+
},
46+
}
47+
48+
cmd0, err := f.Execute(stateCtx, e)
49+
if err != nil {
50+
t.Fatalf("failed to execute flow: %v", err)
51+
}
52+
if cmd0 == nil {
53+
t.Fatal("expected command, got nil")
54+
}
55+
56+
cmd, ok := cmd0.(*flowstate.NoopCommand)
57+
if !ok {
58+
t.Fatalf("expected NoopCommand, got %T", cmd0)
59+
}
60+
if cmd.StateCtx.Current.ID != `anID` {
61+
t.Fatalf("expected state ID to be 'anID', got '%s'", cmd.StateCtx.Current.ID)
62+
}
63+
64+
select {
65+
case <-executedCh:
66+
// OK
67+
case <-time.After(time.Second * 3):
68+
t.Fatal("remote flow was not executed")
69+
}
70+
}
71+
72+
func startSrv(t *testing.T, e flowstate.Engine) *httptest.Server {
73+
74+
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
75+
if r.URL.Path == `/health` {
76+
rw.WriteHeader(http.StatusOK)
77+
return
78+
}
79+
if netflow.HandleExecute(rw, r, e) {
80+
return
81+
}
82+
83+
http.Error(rw, fmt.Sprintf("path %s not supported", r.URL.Path), http.StatusNotFound)
84+
}))
85+
86+
t.Cleanup(srv.Close)
87+
88+
timeoutT := time.NewTimer(time.Second)
89+
defer timeoutT.Stop()
90+
readyT := time.NewTicker(time.Millisecond * 50)
91+
defer readyT.Stop()
92+
93+
loop:
94+
for {
95+
select {
96+
case <-timeoutT.C:
97+
t.Fatalf("app not ready within %s", time.Second)
98+
case <-readyT.C:
99+
100+
resp, err := http.Get(srv.URL + `/health`)
101+
if err != nil {
102+
continue loop
103+
}
104+
resp.Body.Close()
105+
106+
return srv
107+
}
108+
}
109+
}

netflow/server.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package netflow
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
8+
"github.com/VictoriaMetrics/easyproto"
9+
"github.com/makasim/flowstate"
10+
)
11+
12+
func HandleExecute(rw http.ResponseWriter, r *http.Request, e flowstate.Engine) bool {
13+
if r.URL.Path != "/flowstate.v1.Flow/Execute" {
14+
return false
15+
}
16+
17+
reqBody, err := io.ReadAll(r.Body)
18+
if err != nil {
19+
writeInvalidArgumentError(rw, "failed to read request body: "+err.Error())
20+
return true
21+
}
22+
23+
stateCtx := &flowstate.StateCtx{}
24+
if err := flowstate.UnmarshalStateCtx(reqBody, stateCtx); err != nil {
25+
writeInvalidArgumentError(rw, "failed to unmarshal command: "+err.Error())
26+
return true
27+
}
28+
resStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{})
29+
30+
if err := e.Do(flowstate.Execute(stateCtx)); err != nil {
31+
writeUnknownError(rw, fmt.Sprintf("failed to execute command: %s", err.Error()))
32+
return true
33+
}
34+
35+
writeCmd(rw, flowstate.Noop(resStateCtx))
36+
return true
37+
}
38+
39+
func writeCmd(rw http.ResponseWriter, cmd flowstate.Command) {
40+
rw.Header().Set("Content-Type", "application/proto")
41+
rw.WriteHeader(http.StatusOK)
42+
43+
_, _ = rw.Write(flowstate.MarshalCommand(cmd, nil))
44+
}
45+
46+
func writeUnknownError(rw http.ResponseWriter, message string) {
47+
rw.Header().Set("Content-Type", "application/proto")
48+
rw.WriteHeader(http.StatusInternalServerError)
49+
50+
_, _ = rw.Write(marshalError("unknown", message))
51+
}
52+
53+
func writeInvalidArgumentError(rw http.ResponseWriter, message string) {
54+
rw.Header().Set("Content-Type", "application/proto")
55+
rw.WriteHeader(http.StatusBadRequest)
56+
57+
_, _ = rw.Write(marshalError("invalid_argument", message))
58+
}
59+
60+
func marshalError(code, message string) []byte {
61+
m := &easyproto.Marshaler{}
62+
mm := m.MessageMarshaler()
63+
64+
if code != "" {
65+
mm.AppendString(1, code)
66+
}
67+
if message != "" {
68+
mm.AppendString(2, message)
69+
}
70+
71+
return m.Marshal(nil)
72+
}

0 commit comments

Comments
 (0)