Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,51 @@ on:

jobs:

run-examples:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.24'

- name: Examples
run: |
set -e # Exit on any error
for file in examples/*.go; do
if [[ "$file" == "examples/setup.go" ]]; then
continue
fi
echo "Running $file"
go run "$file" examples/setup.go
echo "----------------------"
echo
done
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.24'

- name: build
run: |
rm -rf examples/
go build -v ./...
test:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:16
Expand All @@ -37,29 +79,12 @@ jobs:
with:
go-version: '1.24'

- name: Build
run: go build ./...

- name: Test
run: |
until pg_isready -h 127.0.0.1 -p 5432 -U user; do
echo "Waiting for Postgres...";
sleep 3;
done
rm -rf examples/
GOEXPERIMENT=synctest GODEBUG=asynctimerchan=0 go test -v ./...
- name: Examples
run: |
set -e # Exit on any error
for file in examples/*.go; do
if [[ "$file" == "examples/setup.go" ]]; then
continue
fi

echo "Running $file"
go run "$file" examples/setup.go
echo "----------------------"
echo
done
89 changes: 89 additions & 0 deletions netflow/flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package netflow

import (
"bytes"
"fmt"
"io"
"net/http"
"strings"

"github.com/VictoriaMetrics/easyproto"
"github.com/makasim/flowstate"
)

type Config struct {
}

type Flow struct {
httpHost string

c *http.Client
}

func New(httpHost string) *Flow {
return &Flow{
httpHost: httpHost,

c: &http.Client{},
}
}

func (f *Flow) Execute(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) {
b := flowstate.MarshalStateCtx(stateCtx, nil)
req, err := http.NewRequest(`POST`, strings.TrimRight(f.httpHost, `/`)+`/flowstate.v1.Flow/Execute`, bytes.NewBuffer(b))
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}

resp, err := f.c.Do(req)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()

b, err = io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body: %w", err)
}

if http.StatusOK != resp.StatusCode {
code, message, err := unmarshalError(b)
if err != nil {
return nil, fmt.Errorf("response status code: %d; unmarshal error: %s", resp.StatusCode, err)
}

return nil, fmt.Errorf("%s: %s", code, message)
}

cmd, err := flowstate.UnmarshalCommand(b)
if err != nil {
return nil, fmt.Errorf("unmarshal response: %w", err)
}
return cmd, nil
}

func unmarshalError(src []byte) (code, message string, err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return "", "", fmt.Errorf("cannot read next field")
}
switch fc.FieldNum {
case 1:
v, ok := fc.String()
if !ok {
return "", "", fmt.Errorf("cannot read code field")
}
code = strings.Clone(v)
case 2:
v, ok := fc.String()
if !ok {
return "", "", fmt.Errorf("cannot read message field")
}
message = strings.Clone(v)
}
}

return code, message, nil
}
109 changes: 109 additions & 0 deletions netflow/flow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package netflow_test

import (
"fmt"
"log/slog"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/makasim/flowstate"
"github.com/makasim/flowstate/memdriver"
"github.com/makasim/flowstate/netflow"
"github.com/thejerf/slogassert"
)

func TestFlowExecute(t *testing.T) {
lh := slogassert.New(t, slog.LevelDebug, nil)
l := slog.New(slogassert.New(t, slog.LevelDebug, lh))

d := memdriver.New(l)
executedCh := make(chan struct{})
if err := d.SetFlow(`aFlow`, flowstate.FlowFunc(func(stateCtx *flowstate.StateCtx, _ flowstate.Engine) (flowstate.Command, error) {
close(executedCh)
return flowstate.End(stateCtx), nil
})); err != nil {
t.Fatalf("failed to set flow: %v", err)
}

e, err := flowstate.NewEngine(d, l)
if err != nil {
t.Fatalf("failed to create flowstate engine: %v", err)
}

srv := startSrv(t, e)

f := netflow.New(srv.URL)

stateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: `anID`,
Transition: flowstate.Transition{
To: `aFlow`,
},
},
}

cmd0, err := f.Execute(stateCtx, e)
if err != nil {
t.Fatalf("failed to execute flow: %v", err)
}
if cmd0 == nil {
t.Fatal("expected command, got nil")
}

cmd, ok := cmd0.(*flowstate.NoopCommand)
if !ok {
t.Fatalf("expected NoopCommand, got %T", cmd0)
}
if cmd.StateCtx.Current.ID != `anID` {
t.Fatalf("expected state ID to be 'anID', got '%s'", cmd.StateCtx.Current.ID)
}

select {
case <-executedCh:
// OK
case <-time.After(time.Second * 3):
t.Fatal("remote flow was not executed")
}
}

func startSrv(t *testing.T, e flowstate.Engine) *httptest.Server {

srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if r.URL.Path == `/health` {
rw.WriteHeader(http.StatusOK)
return
}
if netflow.HandleExecute(rw, r, e) {
return
}

http.Error(rw, fmt.Sprintf("path %s not supported", r.URL.Path), http.StatusNotFound)
}))

t.Cleanup(srv.Close)

timeoutT := time.NewTimer(time.Second)
defer timeoutT.Stop()
readyT := time.NewTicker(time.Millisecond * 50)
defer readyT.Stop()

loop:
for {
select {
case <-timeoutT.C:
t.Fatalf("app not ready within %s", time.Second)
case <-readyT.C:

resp, err := http.Get(srv.URL + `/health`)
if err != nil {
continue loop
}
resp.Body.Close()

return srv
}
}
}
72 changes: 72 additions & 0 deletions netflow/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package netflow

import (
"fmt"
"io"
"net/http"

"github.com/VictoriaMetrics/easyproto"
"github.com/makasim/flowstate"
)

func HandleExecute(rw http.ResponseWriter, r *http.Request, e flowstate.Engine) bool {
if r.URL.Path != "/flowstate.v1.Flow/Execute" {
return false
}

reqBody, err := io.ReadAll(r.Body)
if err != nil {
writeInvalidArgumentError(rw, "failed to read request body: "+err.Error())
return true
}

stateCtx := &flowstate.StateCtx{}
if err := flowstate.UnmarshalStateCtx(reqBody, stateCtx); err != nil {
writeInvalidArgumentError(rw, "failed to unmarshal command: "+err.Error())
return true
}
resStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{})

if err := e.Do(flowstate.Execute(stateCtx)); err != nil {
writeUnknownError(rw, fmt.Sprintf("failed to execute command: %s", err.Error()))
return true
}

writeCmd(rw, flowstate.Noop(resStateCtx))
return true
}

func writeCmd(rw http.ResponseWriter, cmd flowstate.Command) {
rw.Header().Set("Content-Type", "application/proto")
rw.WriteHeader(http.StatusOK)

_, _ = rw.Write(flowstate.MarshalCommand(cmd, nil))
}

func writeUnknownError(rw http.ResponseWriter, message string) {
rw.Header().Set("Content-Type", "application/proto")
rw.WriteHeader(http.StatusInternalServerError)

_, _ = rw.Write(marshalError("unknown", message))
}

func writeInvalidArgumentError(rw http.ResponseWriter, message string) {
rw.Header().Set("Content-Type", "application/proto")
rw.WriteHeader(http.StatusBadRequest)

_, _ = rw.Write(marshalError("invalid_argument", message))
}

func marshalError(code, message string) []byte {
m := &easyproto.Marshaler{}
mm := m.MessageMarshaler()

if code != "" {
mm.AppendString(1, code)
}
if message != "" {
mm.AppendString(2, message)
}

return m.Marshal(nil)
}