diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index ec46b14..e0ba660 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -11,9 +11,51 @@ on: jobs: + run-examples: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.24' + + - name: Examples + run: | + set -e # Exit on any error + + for file in examples/*.go; do + if [[ "$file" == "examples/setup.go" ]]; then + continue + fi + + echo "Running $file" + go run "$file" examples/setup.go + echo "----------------------" + echo + done + build: runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.24' + + - name: build + run: | + rm -rf examples/ + go build -v ./... + + test: + runs-on: ubuntu-latest + services: postgres: image: postgres:16 @@ -37,9 +79,6 @@ jobs: with: go-version: '1.24' - - name: Build - run: go build ./... - - name: Test run: | until pg_isready -h 127.0.0.1 -p 5432 -U user; do @@ -47,19 +86,5 @@ jobs: sleep 3; done + rm -rf examples/ GOEXPERIMENT=synctest GODEBUG=asynctimerchan=0 go test -v ./... - - - name: Examples - run: | - set -e # Exit on any error - - for file in examples/*.go; do - if [[ "$file" == "examples/setup.go" ]]; then - continue - fi - - echo "Running $file" - go run "$file" examples/setup.go - echo "----------------------" - echo - done diff --git a/netflow/flow.go b/netflow/flow.go new file mode 100644 index 0000000..b7c94eb --- /dev/null +++ b/netflow/flow.go @@ -0,0 +1,89 @@ +package netflow + +import ( + "bytes" + "fmt" + "io" + "net/http" + "strings" + + "github.com/VictoriaMetrics/easyproto" + "github.com/makasim/flowstate" +) + +type Config struct { +} + +type Flow struct { + httpHost string + + c *http.Client +} + +func New(httpHost string) *Flow { + return &Flow{ + httpHost: httpHost, + + c: &http.Client{}, + } +} + +func (f *Flow) Execute(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) { + b := flowstate.MarshalStateCtx(stateCtx, nil) + req, err := http.NewRequest(`POST`, strings.TrimRight(f.httpHost, `/`)+`/flowstate.v1.Flow/Execute`, bytes.NewBuffer(b)) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + resp, err := f.c.Do(req) + if err != nil { + return nil, fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + b, err = io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + + if http.StatusOK != resp.StatusCode { + code, message, err := unmarshalError(b) + if err != nil { + return nil, fmt.Errorf("response status code: %d; unmarshal error: %s", resp.StatusCode, err) + } + + return nil, fmt.Errorf("%s: %s", code, message) + } + + cmd, err := flowstate.UnmarshalCommand(b) + if err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + return cmd, nil +} + +func unmarshalError(src []byte) (code, message string, err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return "", "", fmt.Errorf("cannot read next field") + } + switch fc.FieldNum { + case 1: + v, ok := fc.String() + if !ok { + return "", "", fmt.Errorf("cannot read code field") + } + code = strings.Clone(v) + case 2: + v, ok := fc.String() + if !ok { + return "", "", fmt.Errorf("cannot read message field") + } + message = strings.Clone(v) + } + } + + return code, message, nil +} diff --git a/netflow/flow_test.go b/netflow/flow_test.go new file mode 100644 index 0000000..e5fe377 --- /dev/null +++ b/netflow/flow_test.go @@ -0,0 +1,109 @@ +package netflow_test + +import ( + "fmt" + "log/slog" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/makasim/flowstate" + "github.com/makasim/flowstate/memdriver" + "github.com/makasim/flowstate/netflow" + "github.com/thejerf/slogassert" +) + +func TestFlowExecute(t *testing.T) { + lh := slogassert.New(t, slog.LevelDebug, nil) + l := slog.New(slogassert.New(t, slog.LevelDebug, lh)) + + d := memdriver.New(l) + executedCh := make(chan struct{}) + if err := d.SetFlow(`aFlow`, flowstate.FlowFunc(func(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) { + close(executedCh) + return flowstate.End(stateCtx), nil + })); err != nil { + t.Fatalf("failed to set flow: %v", err) + } + + e, err := flowstate.NewEngine(d, l) + if err != nil { + t.Fatalf("failed to create flowstate engine: %v", err) + } + + srv := startSrv(t, e) + + f := netflow.New(srv.URL) + + stateCtx := &flowstate.StateCtx{ + Current: flowstate.State{ + ID: `anID`, + Transition: flowstate.Transition{ + To: `aFlow`, + }, + }, + } + + cmd0, err := f.Execute(stateCtx, e) + if err != nil { + t.Fatalf("failed to execute flow: %v", err) + } + if cmd0 == nil { + t.Fatal("expected command, got nil") + } + + cmd, ok := cmd0.(*flowstate.NoopCommand) + if !ok { + t.Fatalf("expected NoopCommand, got %T", cmd0) + } + if cmd.StateCtx.Current.ID != `anID` { + t.Fatalf("expected state ID to be 'anID', got '%s'", cmd.StateCtx.Current.ID) + } + + select { + case <-executedCh: + // OK + case <-time.After(time.Second * 3): + t.Fatal("remote flow was not executed") + } +} + +func startSrv(t *testing.T, e flowstate.Engine) *httptest.Server { + + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if r.URL.Path == `/health` { + rw.WriteHeader(http.StatusOK) + return + } + if netflow.HandleExecute(rw, r, e) { + return + } + + http.Error(rw, fmt.Sprintf("path %s not supported", r.URL.Path), http.StatusNotFound) + })) + + t.Cleanup(srv.Close) + + timeoutT := time.NewTimer(time.Second) + defer timeoutT.Stop() + readyT := time.NewTicker(time.Millisecond * 50) + defer readyT.Stop() + +loop: + for { + select { + case <-timeoutT.C: + t.Fatalf("app not ready within %s", time.Second) + case <-readyT.C: + + resp, err := http.Get(srv.URL + `/health`) + if err != nil { + continue loop + } + resp.Body.Close() + + return srv + } + } +} diff --git a/netflow/server.go b/netflow/server.go new file mode 100644 index 0000000..9ceec44 --- /dev/null +++ b/netflow/server.go @@ -0,0 +1,72 @@ +package netflow + +import ( + "fmt" + "io" + "net/http" + + "github.com/VictoriaMetrics/easyproto" + "github.com/makasim/flowstate" +) + +func HandleExecute(rw http.ResponseWriter, r *http.Request, e flowstate.Engine) bool { + if r.URL.Path != "/flowstate.v1.Flow/Execute" { + return false + } + + reqBody, err := io.ReadAll(r.Body) + if err != nil { + writeInvalidArgumentError(rw, "failed to read request body: "+err.Error()) + return true + } + + stateCtx := &flowstate.StateCtx{} + if err := flowstate.UnmarshalStateCtx(reqBody, stateCtx); err != nil { + writeInvalidArgumentError(rw, "failed to unmarshal command: "+err.Error()) + return true + } + resStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{}) + + if err := e.Do(flowstate.Execute(stateCtx)); err != nil { + writeUnknownError(rw, fmt.Sprintf("failed to execute command: %s", err.Error())) + return true + } + + writeCmd(rw, flowstate.Noop(resStateCtx)) + return true +} + +func writeCmd(rw http.ResponseWriter, cmd flowstate.Command) { + rw.Header().Set("Content-Type", "application/proto") + rw.WriteHeader(http.StatusOK) + + _, _ = rw.Write(flowstate.MarshalCommand(cmd, nil)) +} + +func writeUnknownError(rw http.ResponseWriter, message string) { + rw.Header().Set("Content-Type", "application/proto") + rw.WriteHeader(http.StatusInternalServerError) + + _, _ = rw.Write(marshalError("unknown", message)) +} + +func writeInvalidArgumentError(rw http.ResponseWriter, message string) { + rw.Header().Set("Content-Type", "application/proto") + rw.WriteHeader(http.StatusBadRequest) + + _, _ = rw.Write(marshalError("invalid_argument", message)) +} + +func marshalError(code, message string) []byte { + m := &easyproto.Marshaler{} + mm := m.MessageMarshaler() + + if code != "" { + mm.AppendString(1, code) + } + if message != "" { + mm.AppendString(2, message) + } + + return m.Marshal(nil) +}