Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions badgerdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,20 @@ func (d *Driver) GetDelayedStates(cmd *flowstate.GetDelayedStatesCommand) (*flow
it.Seek(seekPrefix)

for ; it.Valid(); it.Next() {
delayedStateKey, err := it.Item().ValueCopy(nil)
if err != nil {
return fmt.Errorf("get delayed state execute at: %w", err)
}
delayedState := flowstate.DelayedState{}
if err := getGOB(txn, delayedStateKey, &delayedState); err != nil {
return fmt.Errorf("get delayed state: %w", err)
if err := it.Item().Value(func(delayedStateKey []byte) error {
item, err := txn.Get(delayedStateKey)
if err != nil {
return fmt.Errorf("get delayed state: %w", err)
}

return item.Value(func(d []byte) error {
return flowstate.UnmarshalDelayedState(d, &delayedState)
})
}); err != nil {
return err
}

if delayedState.ExecuteAt.Unix() <= cmd.Since.Unix() {
continue
}
Expand Down
67 changes: 16 additions & 51 deletions badgerdriver/op.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package badgerdriver

import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"time"
Expand All @@ -13,12 +11,22 @@ import (
)

func setState(txn *badger.Txn, state flowstate.State) error {
return setGOB(txn, stateKey(state), state)
return txn.Set(
stateKey(state),
flowstate.MarshalState(state, nil),
)
}

func getState(txn *badger.Txn, stateID flowstate.StateID, stateRev int64) (flowstate.State, error) {
state := flowstate.State{ID: stateID, Rev: stateRev}
if err := getGOB(txn, stateKey(state), &state); err != nil {
item, err := txn.Get(stateKey(state))
if err != nil {
return flowstate.State{}, err
}

if err := item.Value(func(val []byte) error {
return flowstate.UnmarshalState(val, &state)
}); err != nil {
return flowstate.State{}, err
}

Expand All @@ -29,42 +37,6 @@ func stateKey(state flowstate.State) []byte {
return []byte(fmt.Sprintf(`flowstate.states.%020d.%s`, state.Rev, state.ID))
}

func setGOB(txn *badger.Txn, k []byte, m any) error {
var v bytes.Buffer
if err := gob.NewEncoder(&v).Encode(m); err != nil {
return err
}

if err := txn.Set(k, v.Bytes()); err != nil {
return err
}

return nil
}

func getGOB(txn *badger.Txn, k []byte, m any) error {
item, err := txn.Get(k)
if err != nil {
return err
}

return getItemGOB(item, m)
}

func getItemGOB(item *badger.Item, m any) error {
if err := item.Value(func(val []byte) error {
if err := gob.NewDecoder(bytes.NewBuffer(val)).Decode(m); err != nil {
return err
}

return nil
}); err != nil {
return err
}

return nil
}

func setLatestRevIndex(txn *badger.Txn, state flowstate.State) error {
return setInt64(txn, latestRevKey(state.ID), state.Rev)
}
Expand Down Expand Up @@ -203,17 +175,10 @@ func getData(txn *badger.Txn, data *flowstate.Data) error {
}

func setDelayedState(txn *badger.Txn, delayedState flowstate.DelayedState) error {
return setGOB(txn, delayedStateKey(delayedState.ExecuteAt.Unix(), delayedState.Offset), delayedState)
}

func getDelayedState(txn *badger.Txn, executeAt, offset int64) (flowstate.DelayedState, error) {
delayedState := flowstate.DelayedState{}

if err := getGOB(txn, delayedStateKey(executeAt, offset), delayedState); err != nil {
return flowstate.DelayedState{}, fmt.Errorf("get delayed state: %w", err)
}

return delayedState, nil
return txn.Set(
delayedStateKey(delayedState.ExecuteAt.Unix(), delayedState.Offset),
flowstate.MarshalDelayedState(delayedState, nil),
)
}

func delayedStateKey(executeAt, offset int64) []byte {
Expand Down
4 changes: 2 additions & 2 deletions badgerdriver/orlabelsiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestOrLabelsIterator(t *testing.T) {
{
ID: "id2",
Rev: 2,
Labels: map[string]string{},
Labels: nil,
},
{
ID: "id3",
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestOrLabelsIterator(t *testing.T) {
{
ID: "id2",
Rev: 2,
Labels: map[string]string{},
Labels: nil,
},
{
ID: "id1",
Expand Down
5 changes: 2 additions & 3 deletions cmd_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flowstate

import (
"encoding/base64"
"encoding/json"
"fmt"
)

Expand Down Expand Up @@ -33,8 +32,8 @@ func (cmd *DeserializeCommand) Do() error {
return fmt.Errorf("base64 decode: %s", err)
}

if err := json.Unmarshal(b, cmd.DeserializedStateCtx); err != nil {
return fmt.Errorf("json unmarshal: %s", err)
if err := UnmarshalStateCtx(b, cmd.DeserializedStateCtx); err != nil {
return fmt.Errorf("unmarshal: %s", err)
}

cmd.StateCtx.Current.Annotations[cmd.Annotation] = ``
Expand Down
6 changes: 1 addition & 5 deletions cmd_serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flowstate

import (
"encoding/base64"
"encoding/json"
"fmt"
)

Expand Down Expand Up @@ -30,10 +29,7 @@ func (cmd *SerializeCommand) Do() error {
return fmt.Errorf("store annotation already set")
}

b, err := json.Marshal(cmd.SerializableStateCtx)
if err != nil {
return fmt.Errorf("json marshal prev state ctx: %s", err)
}
b := MarshalStateCtx(cmd.SerializableStateCtx, nil)
serialized := base64.StdEncoding.EncodeToString(b)

cmd.StateCtx.Current.SetAnnotation(cmd.Annotation, serialized)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/makasim/flowstate
go 1.24.0

require (
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/dgraph-io/badger/v4 v4.7.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/jackc/pgx/v5 v5.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V2KhTBPf+Sc=
github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
189 changes: 189 additions & 0 deletions proto/flowstate/v1/messages.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
syntax = "proto3";

package flowstate.v1;

message State {
string id = 1;
int64 rev = 2;
map<string, string> annotations = 3;
map<string, string> labels = 4;
int64 committed_at_unix_milli = 5;
Transition transition = 6;
}

message StateCtx {
State committed = 1;
State current = 2;
repeated Transition transitions = 3;
}

message StateRef {
string id = 1;
int64 rev = 2;
}

message DelayedState {
State state = 1;
int64 offset = 2;
int64 execute_at_sec = 3;
}

message Transition {
string from = 1;
string to = 2;
map<string, string> annotations = 3;
}

message Data {
string id = 1;
int64 rev = 2;
bool binary = 3;
string b = 4;
}

message DataRef {
string id = 1;
int64 rev = 2;
}

message Command {
repeated StateCtx state_ctxs = 1;
repeated Data datas = 2;

TransitCommand transit = 3;
PauseCommand pause = 4;
ResumeCommand resume = 5;
EndCommand end = 6;
ExecuteCommand execute = 7;
DelayCommand delay = 8;
CommitCommand commit = 9;
NoopCommand noop = 10;
SerializeCommand serialize = 11;
DeserializeCommand deserialize = 12;
StoreDataCommand store_data = 13;
GetDataCommand get_data = 14;
ReferenceDataCommand reference_data = 15;
DereferenceDataCommand dereference_data = 16;
GetStateByIDCommand get_state_by_id = 17;
GetStateByLabelsCommand get_state_by_labels = 18;
GetStatesCommand get_states = 19;
GetDelayedStatesCommand get_delayed_states = 20;
CommitStateCtxCommand commit_state = 21;
}

message TransitCommand {
StateRef state_ref = 1;
string flow_id = 2;
}

message PauseCommand {
StateRef state_ref = 1;
string flow_id = 2;
}

message ResumeCommand {
StateRef state_ref = 1;
}

message EndCommand {
StateRef state_ref = 1;
}

message ExecuteCommand {
StateRef state_ref = 1;
}

message DelayCommand {
StateRef state_ref = 1;
State delaying_state = 2;
int64 execute_at_sec = 3;
bool commit = 4;
}

message CommitCommand {
repeated Command commands = 1;
}

message NoopCommand {
StateRef state_ref = 1;
}

message SerializeCommand {
StateRef serializable_state_ref = 1;
StateRef state_ref = 2;
string annotation = 3;
}

message DeserializeCommand {
StateRef deserialized_state_ref = 1;
StateRef state_ref = 2;
string annotation = 3;
}

message StoreDataCommand {
DataRef data_ref = 1;
}

message GetDataCommand {
DataRef data_ref = 1;
}

message ReferenceDataCommand {
StateRef state_ref = 1;
DataRef data_ref = 2;
string annotation = 3;
}

message DereferenceDataCommand {
StateRef state_ref = 1;
DataRef data_ref = 2;
string annotation = 3;
}

message GetStateByIDCommand {
string id = 1;
int64 rev = 2;
StateRef state_ref = 3;
}

message GetStateByLabelsCommand {
map<string, string> labels = 1;
StateRef state_ref = 2;
}

message GetStatesCommand {
message Labels {
map<string, string> labels = 1;
}

message Result {
repeated State states = 1;
bool more = 2;
}

int64 since_rev = 1;
int64 since_time_usec = 2;
repeated Labels labels = 3;
bool latest_only = 4;
int64 limit = 5;

Result result = 6;
}

message GetDelayedStatesCommand {
message Result {
repeated DelayedState delayed_states = 1;
bool more = 2;
}

int64 since_time_sec = 1;
int64 until_time_sec = 2;
int64 offset = 3;
int64 limit = 4;

Result result = 5;
}

message CommitStateCtxCommand {
StateRef state_ref = 1;
}
Loading