From 0447a00d469edb8fb639c263dce97e8485ffdd46 Mon Sep 17 00:00:00 2001 From: David Ansermino <14164624+ansermino@users.noreply.github.com> Date: Wed, 6 Nov 2024 03:47:57 -0500 Subject: [PATCH 1/3] Add modified ConsoleReader --- console_reader/console_reader.go | 349 +++++++++++++++++++++++++++++++ console_reader/metrics.go | 11 + 2 files changed, 360 insertions(+) create mode 100644 console_reader/console_reader.go create mode 100644 console_reader/metrics.go diff --git a/console_reader/console_reader.go b/console_reader/console_reader.go new file mode 100644 index 0000000..83c562c --- /dev/null +++ b/console_reader/console_reader.go @@ -0,0 +1,349 @@ +package firecore + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "io" + "strconv" + "strings" + "sync" + "time" + + "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetrics" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + "github.com/streamingfast/logging" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const FirePrefix = "FIRE " +const FirePrefixLen = len(FirePrefix) +const InitLogPrefix = "INIT " +const InitLogPrefixLen = len(InitLogPrefix) +const BlockLogPrefix = "BLOCK " +const BlockLogPrefixLen = len(BlockLogPrefix) + +// Requried by NewConsoleReader but never used +type BlockEncoder interface { + Encode(block Block) (blk *pbbstream.Block, err error) +} + +// Block represents the chain-specific Protobuf block. Chain specific's block +// model must implement this interface so that Firehose core is able to properly +// marshal/unmarshal your block into/to the Firehose block envelope binary format. +// +// All the methods are prefixed with `GetFirehoseBlock` to avoid any potential +// conflicts with the fields/getters of your chain's block model that would +// prevent you from implementing this interface. +// +// Consumer of your chain's protobuf block model don't need to be aware of those +// details, they are internal Firehose core information that are required to function +// properly. +// +// The value you return for each of those methods must be done respecting Firehose rules +// which are enumarated in the documentation of each method. +type Block interface { + proto.Message + + // GetFirehoseBlockID returns the block ID as a string, usually in the representation + // used by your chain (hex, base58, base64, etc.). The block ID must be unique across + // all blocks that will ever exist on your chain. + GetFirehoseBlockID() string + + // GetFirehoseBlockNumber returns the block number as an unsigned integer. The block + // number could be shared by multiple blocks in which case one is the canonical one + // and the others are forks (resolution of forks is handled by Firehose core later in the + // block processing pipeline). + // + // The value should be sequentially ordered which means that a block with block number 10 + // has come before block 11. Firehose core will deal with block skips without problem though + // (e.g. block 1, is produced then block 3 where block 3's parent is block 1). + GetFirehoseBlockNumber() uint64 + + // GetFirehoseBlockParentID returns the block ID of the parent block as a string. All blocks + // ever produced must have a parent block ID except for the genesis block which is the first + // one. The value must be the same as the one returned by GetFirehoseBlockID() of the parent. + // + // If it's the genesis block, return an empty string. + GetFirehoseBlockParentID() string + + // GetFirehoseBlockParentNumber returns the block number of the parent block as a uint64. + // The value must be the same as the one returned by GetFirehoseBlockNumber() of the parent + // or `0` if the block has no parent + // + // This is useful on chains that have holes. On other chains, this is as simple as "BlockNumber - 1". + GetFirehoseBlockParentNumber() uint64 + + // GetFirehoseBlockTime returns the block timestamp as a time.Time of when the block was + // produced. This should the consensus agreed time of the block. + GetFirehoseBlockTime() time.Time +} + +type ParsingStats struct { +} + +type ConsoleReader struct { + lines chan string + done chan interface{} + closeOnce sync.Once + logger *zap.Logger + tracer logging.Tracer + + // Parsing context + readerProtocolVersion string + protoMessageType string + lastBlock bstream.BlockRef + lastParentBlock bstream.BlockRef + lastBlockTimestamp time.Time + + lib uint64 + + blockRate *dmetrics.AvgRatePromCounter +} + +func NewConsoleReader(lines chan string, blockEncoder BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { + reader := newConsoleReader(lines, logger, tracer) + + delayBetweenStats := 30 * time.Second + if tracer.Enabled() { + delayBetweenStats = 5 * time.Second + } + + go func() { + defer reader.blockRate.Stop() + + for { + select { + case <-reader.done: + return + case <-time.After(delayBetweenStats): + reader.printStats() + } + } + }() + + return reader, nil +} + +func newConsoleReader(lines chan string, logger *zap.Logger, tracer logging.Tracer) *ConsoleReader { + return &ConsoleReader{ + lines: lines, + done: make(chan interface{}), + logger: logger, + tracer: tracer, + + blockRate: dmetrics.MustNewAvgRateFromPromCounter(ConsoleReaderBlockReadCount, 1*time.Second, 30*time.Second, "blocks"), + } +} + +func (r *ConsoleReader) Done() <-chan interface{} { + return r.done +} + +func (r *ConsoleReader) Close() error { + r.closeOnce.Do(func() { + r.blockRate.SyncNow() + r.printStats() + + r.logger.Info("console reader done") + close(r.done) + }) + + return nil +} + +type blockRefView struct { + ref bstream.BlockRef +} + +func (v blockRefView) String() string { + if v.ref == nil { + return "" + } + + return v.ref.String() +} + +type blockRefViewTimestamp struct { + ref bstream.BlockRef + timestamp time.Time +} + +func (v blockRefViewTimestamp) String() string { + return fmt.Sprintf("%s @ %s", blockRefView{v.ref}, v.timestamp.Local().Format(time.RFC822Z)) +} + +func (r *ConsoleReader) printStats() { + r.logger.Info("console reader stats", + zap.Stringer("block_rate", r.blockRate), + zap.Stringer("last_block", blockRefViewTimestamp{r.lastBlock, r.lastBlockTimestamp}), + zap.Stringer("last_parent_block", blockRefView{r.lastParentBlock}), + zap.Uint64("lib", r.lib), + ) +} + +func (r *ConsoleReader) ReadBlock() (out *pbbstream.Block, err error) { + out, err = r.next() + if err != nil { + return nil, err + } + + return out, nil +} + +func (r *ConsoleReader) next() (out *pbbstream.Block, err error) { + for line := range r.lines { + if !strings.HasPrefix(line, "FIRE ") { + continue + } + + line = line[FirePrefixLen:] + + switch { + case strings.HasPrefix(line, BlockLogPrefix): + out, err = r.readBlock(line[BlockLogPrefixLen:]) + + case strings.HasPrefix(line, InitLogPrefix): + err = r.readInit(line[InitLogPrefixLen:]) + default: + if r.tracer.Enabled() { + r.logger.Debug("skipping unknown Firehose log line", zap.String("line", line)) + } + continue + } + + if err != nil { + chunks := strings.SplitN(line, " ", 2) + return nil, fmt.Errorf("%s: %s (line %q)", chunks[0], err, line) + } + + if out != nil { + return out, nil + } + } + + r.Close() + + return nil, io.EOF +} + +// Formats +// [READER_PROTOCOL_VERSION] sf.ethereum.type.v2.Block +func (r *ConsoleReader) readInit(line string) error { + chunks, err := splitInBoundedChunks(line, 2) + if err != nil { + return fmt.Errorf("split: %s", err) + } + + r.readerProtocolVersion = chunks[0] + + switch r.readerProtocolVersion { + // Implementation of RPC poller were set to use 1.0 so we keep support for it for now + case "1.0", "3.0": + // Supported + default: + return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion) + } + + protobufFullyQualifiedName := chunks[1] + if protobufFullyQualifiedName == "" { + return fmt.Errorf("protobuf fully qualified name is empty, it must be set to a valid Protobuf fully qualified message type representing your block format") + } + + r.setProtoMessageType(protobufFullyQualifiedName) + + r.logger.Info("console reader protocol version init", + zap.String("version", r.readerProtocolVersion), + zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName), + ) + + return nil +} + +// Formats +// [block_num:342342342] [block_hash] [parent_num] [parent_hash] [lib:123123123] [timestamp:unix_nano] B64ENCODED_any +func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) { + if r.readerProtocolVersion == "" { + return nil, fmt.Errorf("reader protocol version not set, did you forget to send the 'FIRE INIT ' line?") + } + + chunks, err := splitInBoundedChunks(line, 5) + if err != nil { + return nil, fmt.Errorf("splitting block log line: %w", err) + } + + height, err := strconv.ParseUint(chunks[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing block num %q: %w", chunks[0], err) + } + + blockCount, err := strconv.ParseUint(chunks[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing block num %q: %w", chunks[0], err) + } + + timestampUnixNano, err := strconv.ParseUint(chunks[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[5], err) + } + + timestamp := time.Unix(0, int64(timestampUnixNano)) + + tipsetKey := chunks[3] + + parentTipsetKey := chunks[4] + + parentNum := height - 1 + + libNum := uint64(1) + + blockPayload := &anypb.Any{ + TypeUrl: r.protoMessageType, + Value: []byte{byte(blockCount)}, + } + + block := &pbbstream.Block{ + Id: tipsetKey, + Number: height, + ParentId: parentTipsetKey, + ParentNum: parentNum, + Timestamp: timestamppb.New(timestamp), + LibNum: 1, + Payload: blockPayload, + } + + ConsoleReaderBlockReadCount.Inc() + r.lastBlock = bstream.NewBlockRef(tipsetKey, height) + r.lastParentBlock = bstream.NewBlockRef(parentTipsetKey, parentNum) + r.lastBlockTimestamp = timestamp + r.lib = libNum + + return block, nil +} + +func (r *ConsoleReader) setProtoMessageType(typeURL string) { + if strings.HasPrefix(typeURL, "type.googleapis.com/") { + r.protoMessageType = typeURL + return + } + + if strings.Contains(typeURL, "/") { + panic(fmt.Sprintf("invalid type url %q, expecting type.googleapis.com/", typeURL)) + } + + r.protoMessageType = "type.googleapis.com/" + typeURL +} + +// splitInBoundedChunks splits the line in `count` chunks and returns the slice `chunks[1:count]` (so exclusive end), +// but will accumulate all trailing chunks within the last (for free-form strings, or JSON objects) +func splitInBoundedChunks(line string, count int) ([]string, error) { + chunks := strings.SplitN(line, " ", count) + if len(chunks) != count { + return nil, fmt.Errorf("%d fields required but found %d fields for line %q", count, len(chunks), line) + } + + return chunks, nil +} diff --git a/console_reader/metrics.go b/console_reader/metrics.go new file mode 100644 index 0000000..d8e30d8 --- /dev/null +++ b/console_reader/metrics.go @@ -0,0 +1,11 @@ +package firecore + +import "github.com/streamingfast/dmetrics" + +func RegisterMetrics() { + metrics.Register() +} + +var metrics = dmetrics.NewSet() + +var ConsoleReaderBlockReadCount = metrics.NewCounter("firecore_console_reader_block_read_count", "Number of blocks read by the console reader") From 1497c68792bf33d0f6b1c1c714200d9827fbe937 Mon Sep 17 00:00:00 2001 From: David Ansermino <14164624+ansermino@users.noreply.github.com> Date: Wed, 6 Nov 2024 03:57:06 -0500 Subject: [PATCH 2/3] Fix imports --- console_reader/console_reader.go | 62 ++------------------------------ 1 file changed, 3 insertions(+), 59 deletions(-) diff --git a/console_reader/console_reader.go b/console_reader/console_reader.go index 83c562c..6c6b8e8 100644 --- a/console_reader/console_reader.go +++ b/console_reader/console_reader.go @@ -1,8 +1,8 @@ -package firecore +package console_reader import ( "fmt" - "github.com/golang/protobuf/proto" + firecore "github.com/streamingfast/firehose-core" "io" "strconv" "strings" @@ -26,62 +26,6 @@ const InitLogPrefixLen = len(InitLogPrefix) const BlockLogPrefix = "BLOCK " const BlockLogPrefixLen = len(BlockLogPrefix) -// Requried by NewConsoleReader but never used -type BlockEncoder interface { - Encode(block Block) (blk *pbbstream.Block, err error) -} - -// Block represents the chain-specific Protobuf block. Chain specific's block -// model must implement this interface so that Firehose core is able to properly -// marshal/unmarshal your block into/to the Firehose block envelope binary format. -// -// All the methods are prefixed with `GetFirehoseBlock` to avoid any potential -// conflicts with the fields/getters of your chain's block model that would -// prevent you from implementing this interface. -// -// Consumer of your chain's protobuf block model don't need to be aware of those -// details, they are internal Firehose core information that are required to function -// properly. -// -// The value you return for each of those methods must be done respecting Firehose rules -// which are enumarated in the documentation of each method. -type Block interface { - proto.Message - - // GetFirehoseBlockID returns the block ID as a string, usually in the representation - // used by your chain (hex, base58, base64, etc.). The block ID must be unique across - // all blocks that will ever exist on your chain. - GetFirehoseBlockID() string - - // GetFirehoseBlockNumber returns the block number as an unsigned integer. The block - // number could be shared by multiple blocks in which case one is the canonical one - // and the others are forks (resolution of forks is handled by Firehose core later in the - // block processing pipeline). - // - // The value should be sequentially ordered which means that a block with block number 10 - // has come before block 11. Firehose core will deal with block skips without problem though - // (e.g. block 1, is produced then block 3 where block 3's parent is block 1). - GetFirehoseBlockNumber() uint64 - - // GetFirehoseBlockParentID returns the block ID of the parent block as a string. All blocks - // ever produced must have a parent block ID except for the genesis block which is the first - // one. The value must be the same as the one returned by GetFirehoseBlockID() of the parent. - // - // If it's the genesis block, return an empty string. - GetFirehoseBlockParentID() string - - // GetFirehoseBlockParentNumber returns the block number of the parent block as a uint64. - // The value must be the same as the one returned by GetFirehoseBlockNumber() of the parent - // or `0` if the block has no parent - // - // This is useful on chains that have holes. On other chains, this is as simple as "BlockNumber - 1". - GetFirehoseBlockParentNumber() uint64 - - // GetFirehoseBlockTime returns the block timestamp as a time.Time of when the block was - // produced. This should the consensus agreed time of the block. - GetFirehoseBlockTime() time.Time -} - type ParsingStats struct { } @@ -104,7 +48,7 @@ type ConsoleReader struct { blockRate *dmetrics.AvgRatePromCounter } -func NewConsoleReader(lines chan string, blockEncoder BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { +func NewConsoleReader(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { reader := newConsoleReader(lines, logger, tracer) delayBetweenStats := 30 * time.Second From 01c8563f0e1f5a03c5985c76eb175d5c0975bd27 Mon Sep 17 00:00:00 2001 From: David Ansermino <14164624+ansermino@users.noreply.github.com> Date: Wed, 6 Nov 2024 04:28:39 -0500 Subject: [PATCH 3/3] Adds tests, update cmd --- cmd/firefil/main.go | 3 +- console_reader/console_reader.go | 25 +++++--- console_reader/console_reader_test.go | 83 +++++++++++++++++++++++++++ console_reader/metrics.go | 2 +- 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 console_reader/console_reader_test.go diff --git a/cmd/firefil/main.go b/cmd/firefil/main.go index ee1a48d..7247d39 100644 --- a/cmd/firefil/main.go +++ b/cmd/firefil/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/chainsafe/firehose-filecoin/console_reader" pbfilecoin "github.com/chainsafe/firehose-filecoin/pb/sf/filecoin/type/v1" firecore "github.com/streamingfast/firehose-core" fhCmd "github.com/streamingfast/firehose-core/cmd" @@ -17,7 +18,7 @@ func main() { FirstStreamableBlock: 1, BlockFactory: func() firecore.Block { return new(pbfilecoin.Block) }, - ConsoleReaderFactory: firecore.NewConsoleReader, + ConsoleReaderFactory: console_reader.NewConsoleReader, Tools: &firecore.ToolsConfig[*pbfilecoin.Block]{}, }) diff --git a/console_reader/console_reader.go b/console_reader/console_reader.go index 6c6b8e8..432ca1d 100644 --- a/console_reader/console_reader.go +++ b/console_reader/console_reader.go @@ -2,6 +2,7 @@ package console_reader import ( "fmt" + "github.com/ipfs/go-cid" firecore "github.com/streamingfast/firehose-core" "io" "strconv" @@ -221,24 +222,30 @@ func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) height, err := strconv.ParseUint(chunks[0], 10, 64) if err != nil { - return nil, fmt.Errorf("parsing block num %q: %w", chunks[0], err) + return nil, fmt.Errorf("parsing height %q: %w", chunks[0], err) } blockCount, err := strconv.ParseUint(chunks[1], 10, 64) if err != nil { - return nil, fmt.Errorf("parsing block num %q: %w", chunks[0], err) + return nil, fmt.Errorf("parsing block count %q: %w", chunks[1], err) } timestampUnixNano, err := strconv.ParseUint(chunks[2], 10, 64) if err != nil { - return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[5], err) + return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[2], err) } timestamp := time.Unix(0, int64(timestampUnixNano)) - tipsetKey := chunks[3] + tipsetCID, err := cid.Decode(chunks[3]) + if err != nil { + return nil, fmt.Errorf("parsing tipset CID %q: %w", chunks[3], err) + } - parentTipsetKey := chunks[4] + parentTipsetCID, err := cid.Decode(chunks[4]) + if err != nil { + return nil, fmt.Errorf("parsing parent tipset CID %q: %w", chunks[4], err) + } parentNum := height - 1 @@ -250,9 +257,9 @@ func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) } block := &pbbstream.Block{ - Id: tipsetKey, + Id: tipsetCID.String(), Number: height, - ParentId: parentTipsetKey, + ParentId: parentTipsetCID.String(), ParentNum: parentNum, Timestamp: timestamppb.New(timestamp), LibNum: 1, @@ -260,8 +267,8 @@ func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) } ConsoleReaderBlockReadCount.Inc() - r.lastBlock = bstream.NewBlockRef(tipsetKey, height) - r.lastParentBlock = bstream.NewBlockRef(parentTipsetKey, parentNum) + r.lastBlock = bstream.NewBlockRef(tipsetCID.String(), height) + r.lastParentBlock = bstream.NewBlockRef(parentTipsetCID.String(), parentNum) r.lastBlockTimestamp = timestamp r.lib = libNum diff --git a/console_reader/console_reader_test.go b/console_reader/console_reader_test.go new file mode 100644 index 0000000..ae49f1b --- /dev/null +++ b/console_reader/console_reader_test.go @@ -0,0 +1,83 @@ +package console_reader + +import ( + "fmt" + "github.com/ipfs/go-cid" + "testing" + "time" + + "github.com/streamingfast/logging" + "github.com/stretchr/testify/require" +) + +var zlogTest, tracerTest = logging.PackageLogger("test", "github.com/streamingfast/firehose-core/firecore") + +// Example log: +// FIRE BLOCK 2118596 1 1730884260 bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg + +func Test_Ctx_readBlock(t *testing.T) { + reader := &ConsoleReader{ + logger: zlogTest, + tracer: tracerTest, + + readerProtocolVersion: "1.0", + protoMessageType: "type.googleapis.com/sf.ethereum.type.v2.Block", + } + + height := uint64(2118551) + blockCount := 1 + tipsetCID, _ := cid.Decode("bafy2bzacedv2decyusd7n2kpknsgpiaopxh7ve3grzyfkrxouaza5ohypoi2c") + parentTipsetCID, _ := cid.Decode("bafy2bzacebgnvekynqmddtbkdw77ygogoc4omoy5fsgtrhbd7kxwfsbgcaphm") + + //pbBlock := test.Block{ + // Hash: tipsetCID.Bytes(), + // Number: height, + //} + + //anypbBlock, err := anypb.New(&pbBlock) + + //require.NoError(t, err) + nowNano := time.Now().UnixNano() + line := fmt.Sprintf( + "%d %d %d %s %s", + height, + blockCount, + nowNano, + tipsetCID.String(), + parentTipsetCID.String(), + ) + + block, err := reader.readBlock(line) + require.NoError(t, err) + + require.Equal(t, height, block.Number) + require.Equal(t, tipsetCID.String(), block.Id) + require.Equal(t, parentTipsetCID.String(), block.ParentId) + require.Equal(t, uint64(1), block.LibNum) + require.Equal(t, int32(time.Unix(0, nowNano).Nanosecond()), block.Timestamp.Nanos) + + require.NoError(t, err) + //require.Equal(t, anypbBlock.GetValue(), block.Payload.Value) + +} + +func Test_GetNext(t *testing.T) { + lines := make(chan string, 2) + reader := newConsoleReader(lines, zlogTest, tracerTest) + + initLine := "FIRE INIT 1.0 sf.filecoin.type.v1.Tipset" + blockLine := "FIRE BLOCK 2118596 1 1730884260 bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg" + + lines <- initLine + lines <- blockLine + close(lines) + + block, err := reader.ReadBlock() + require.NoError(t, err) + + require.Equal(t, uint64(2118596), block.Number) + require.Equal(t, "bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk", block.Id) + require.Equal(t, "bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg", block.ParentId) + require.Equal(t, uint64(1), block.LibNum) + require.Equal(t, int32(time.Unix(0, 1730884260).Nanosecond()), block.Timestamp.Nanos) +} diff --git a/console_reader/metrics.go b/console_reader/metrics.go index d8e30d8..b9a0d69 100644 --- a/console_reader/metrics.go +++ b/console_reader/metrics.go @@ -1,4 +1,4 @@ -package firecore +package console_reader import "github.com/streamingfast/dmetrics"