From f355604b53d7079cb0c395732126787760419c5c Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 03:10:47 -0700 Subject: [PATCH 01/12] [epociask/issue-141-resilient-pipes] Event Query Reprocessing Logic --- internal/app/app_test.go | 4 +- internal/common/common_test.go | 30 +++++ internal/common/dlq.go | 63 ++++++++++ internal/etl/pipeline/manager_test.go | 6 +- internal/etl/registry/pipe/event_log.go | 115 ++++++++++++++++--- internal/etl/registry/pipe/event_log_test.go | 66 +++++++++++ internal/mocks/context.go | 24 +++- 7 files changed, 282 insertions(+), 26 deletions(-) create mode 100644 internal/common/dlq.go create mode 100644 internal/etl/registry/pipe/event_log_test.go diff --git a/internal/app/app_test.go b/internal/app/app_test.go index 58a4f6db..6a4d0e70 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -10,15 +10,13 @@ import ( "github.com/base-org/pessimism/internal/client" "github.com/base-org/pessimism/internal/config" "github.com/base-org/pessimism/internal/metrics" - "github.com/base-org/pessimism/internal/mocks" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) func Test_AppFlow(t *testing.T) { - ctx := mocks.Context(context.Background(), gomock.NewController(t)) + ctx := context.Background() cfg := &config.Config{ ServerConfig: &server.Config{ diff --git a/internal/common/common_test.go b/internal/common/common_test.go index c001d792..4318e6ec 100644 --- a/internal/common/common_test.go +++ b/internal/common/common_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/base-org/pessimism/internal/common" + "github.com/base-org/pessimism/internal/core" geth_common "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" @@ -32,3 +33,32 @@ func Test_SliceToAddresses(t *testing.T) { []geth_common.Address{geth_common.HexToAddress("0x00000000"), geth_common.HexToAddress("0x00000001")}) } + +// Test_DLQ ... Tests all DLQ functionality +func Test_DLQ(t *testing.T) { + dlq := common.NewTransitDLQ(5) + + // A. Add 5 elements and test size + for i := 0; i < 5; i++ { + td := core.NewTransitData(core.RegisterType(0), nil) + + err := dlq.Add(&td) + assert.NoError(t, err) + } + + // B. Add 6th element and test error + td := core.NewTransitData(core.RegisterType(0), nil) + err := dlq.Add(&td) + + assert.Error(t, err) + + // C. Pop 1 element and test size + elem, err := dlq.Pop() + assert.Equal(t, elem.Type, core.RegisterType(0)) + assert.NoError(t, err) + + // D. Pop all elements and test size + entries := dlq.PopAll() + assert.Equal(t, len(entries), 4) + assert.True(t, dlq.Empty(), true) +} diff --git a/internal/common/dlq.go b/internal/common/dlq.go new file mode 100644 index 00000000..03c01d02 --- /dev/null +++ b/internal/common/dlq.go @@ -0,0 +1,63 @@ +package common + +import ( + "fmt" + + "github.com/base-org/pessimism/internal/core" +) + +const ( + dlqFullMsg = "the dead letter queue is full with %d elements" + dlqEmptyMsg = "the dead letter queue is empty" +) + +// DLQ ... Dead Letter Queue construct +// Used to store block hashes of ETL events +// that failed to be processed +type DLQ[E any] struct { + size int + dlq []*E +} + +// NewTransitDLQ ... Initializer +func NewTransitDLQ(size int) *DLQ[core.TransitData] { + return &DLQ[core.TransitData]{ + size: size, + dlq: make([]*core.TransitData, 0, size), + } +} + +// Add ... Adds an entry to the DLQ if it is not full +func (d *DLQ[E]) Add(entry *E) error { + if len(d.dlq) >= d.size { + return fmt.Errorf(dlqFullMsg, d.size) + } + + d.dlq = append(d.dlq, entry) + return nil +} + +// Pop ... Removes the first element from the DLQ, +// typically for re-processing +func (d *DLQ[E]) Pop() (*E, error) { + if len(d.dlq) == 0 { + return nil, fmt.Errorf(dlqEmptyMsg) + } + + entry := d.dlq[0] + d.dlq = d.dlq[1:] + return entry, nil +} + +// PopAll ... Removes all elements from the DLQ, +// typically for re-processing +func (d *DLQ[E]) PopAll() []*E { + entries := d.dlq + d.dlq = make([]*E, 0, d.size) + return entries +} + +// Empty ... Checks if the DLQ is empty +func (d *DLQ[E]) Empty() bool { + return len(d.dlq) == 0 +} diff --git a/internal/etl/pipeline/manager_test.go b/internal/etl/pipeline/manager_test.go index 77e8dd8a..6c804d99 100644 --- a/internal/etl/pipeline/manager_test.go +++ b/internal/etl/pipeline/manager_test.go @@ -32,7 +32,9 @@ func Test_Manager(t *testing.T) { reg := registry.NewRegistry() ctrl := gomock.NewController(t) - ctx := mocks.Context(context.Background(), ctrl) + ctx, _ := mocks.Context(context.Background(), ctrl) + + ctx = context.WithValue(ctx, core.State, state.NewMemState()) return NewManager(ctx, NewAnalyzer(reg), reg, NewEtlStore(), NewComponentGraph(), nil) }, @@ -63,7 +65,7 @@ func Test_Manager(t *testing.T) { reg := registry.NewRegistry() ctrl := gomock.NewController(t) - ctx := mocks.Context(context.Background(), ctrl) + ctx, _ := mocks.Context(context.Background(), ctrl) ctx = context.WithValue(ctx, core.State, state.NewMemState()) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 55ebac73..83155ba1 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/base-org/pessimism/internal/client" - pess_common "github.com/base-org/pessimism/internal/common" + p_common "github.com/base-org/pessimism/internal/common" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/component" "github.com/base-org/pessimism/internal/logging" @@ -21,16 +21,16 @@ import ( // EventDefinition ... type EventDefinition struct { client client.EthClient - sk *core.StateKey - pUUID core.PUUID - cfg *core.ClientConfig - ss state.Store + dlq *p_common.DLQ[core.TransitData] + + sk *core.StateKey + pUUID core.PUUID + ss state.Store } -// NewEventParserPipe ... Initializer -func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, - opts ...component.Option) (component.Component, error) { - client, err := client.FromContext(ctx, cfg.Network) +func NewEventDefinition(ctx context.Context, n core.Network) (*EventDefinition, error) { + // 1. Load dependencies from context + client, err := client.FromContext(ctx, n) if err != nil { return nil, err } @@ -40,17 +40,33 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, return nil, err } + // 2. Construct the pipe definition ed := &EventDefinition{ - cfg: cfg, + dlq: p_common.NewTransitDLQ(10), client: client, ss: ss, } + return ed, nil +} +// NewEventParserPipe ... Initializer +func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, + opts ...component.Option) (component.Component, error) { + // 1. Construct the pipe definition + ed, err := NewEventDefinition(ctx, cfg.Network) + if err != nil { + return nil, err + } + + // 2. Embed the definition into a generic pipe construction p, err := component.NewPipe(ctx, ed, core.GethBlock, core.EventLog, opts...) if err != nil { return nil, err } + // 3. Set the post component construction fields on the definition + // There's likely a more extensible way to construct this definition fields + // given that they're used by component implementations across the ETL ed.sk = p.StateKey().Clone() ed.pUUID = p.PUUID() return p, nil @@ -59,8 +75,10 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, // getEventsToMonitor ... Gets the smart contract events to monitor from the state store func (ed *EventDefinition) getTopics(ctx context.Context, addresses []string, ss state.Store) [][]common.Hash { - events := make([]common.Hash, 0) + sigs := make([]common.Hash, 0) + // 1. Iterate over addresses and construct nested state keys + // to lookup the associated events to monitor for _, address := range addresses { innerKey := &core.StateKey{ Nesting: false, @@ -69,25 +87,84 @@ func (ed *EventDefinition) getTopics(ctx context.Context, PUUID: ed.sk.PUUID, } - sigs, err := ss.GetSlice(ctx, innerKey) + // 1.1 Attempt to fetch the events to monitor from the state store + // and continue if there is an error + events, err := ss.GetSlice(ctx, innerKey) if err != nil { logging.WithContext(ctx).Error(err.Error()) + continue } - for _, sig := range sigs { - events = append(events, crypto.Keccak256Hash([]byte(sig))) + // 1.2 Compute signatures for the function declaration strings + for _, event := range events { + sigs = append(sigs, crypto.Keccak256Hash([]byte(event))) } } + // 2. Construct the topics slice to be used in the filter query + // via populating the first index of 2D Topics slice with the event signatures to monitor topics := make([][]common.Hash, 1) - topics[0] = events + topics[0] = sigs return topics } -// Transform ... Gets the events from the block, filters them and -// returns them if they are in the list of events to monitor +// Transform ... Attempts to reprocess previously failed queries first +// before attempting to process the current block data func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ([]core.TransitData, error) { + // 1. Check to see if there are any failed queries to reprocess + // If failures occur again, add the caller (Transform) + // function input to the DLQ and return + tds, err := ed.attemptDLQ(ctx) + if err != nil { + err = ed.dlq.Add(&td) + if err != nil { + return tds, err + } + } + + // 2. If there are no failed queries, then process the current block data + // and add a data input to the DLQ if it fails for reprocessing next block + tds, err = ed.transformFunc(ctx, td) + if err != nil { + err = ed.dlq.Add(&td) + if err != nil { + return tds, err + } + } + + return tds, nil +} + +// attemptDLQ ... Attempts to reprocess previously failed queries +func (ed *EventDefinition) attemptDLQ(ctx context.Context) ([]core.TransitData, error) { + failedInputs := ed.dlq.PopAll() + + // 1. Attempt to reprocess failed inputs + tds := make([]core.TransitData, 0) + for _, td := range failedInputs { + result, err := ed.transformFunc(ctx, *td) + // 2. If the reprocessing fails, then the function will return an error + if err != nil { + err = ed.dlq.Add(td) + if err != nil { + return tds, err + } + // NOTE ... Returning here is intentional to ensure that block events + // downstream are processed in the sequential order for which they came in + return tds, err + } + + // 3. If the reprocessing succeeds, append result to return slice + tds = append(tds, result...) + } + + return tds, nil +} + +// transformFunc ... Gets the events from the block, filters them and +// returns them if they are in the list of events to monitor +func (ed *EventDefinition) transformFunc(ctx context.Context, td core.TransitData) ([]core.TransitData, error) { // 1. Convert arbitrary transit data to go-ethereum compatible block type block, success := td.Value.(types.Block) if !success { @@ -106,11 +183,11 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( topics := ed.getTopics(ctx, addresses, ed.ss) hash := block.Header().Hash() - // 3. Construct and execute a filter query on the provided block + // 3. Construct and execute a filter query on the provided block hash // to get the relevant logs query := ethereum.FilterQuery{ BlockHash: &hash, - Addresses: pess_common.SliceToAddresses(addresses), + Addresses: p_common.SliceToAddresses(addresses), Topics: topics, } diff --git a/internal/etl/registry/pipe/event_log_test.go b/internal/etl/registry/pipe/event_log_test.go new file mode 100644 index 00000000..501c2cd7 --- /dev/null +++ b/internal/etl/registry/pipe/event_log_test.go @@ -0,0 +1,66 @@ +package pipe_test + +import ( + "context" + "testing" + + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/etl/registry/pipe" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/base-org/pessimism/internal/etl/component" + "github.com/base-org/pessimism/internal/mocks" +) + +// testSuite ... Test suite for the event log pipe +type testSuite struct { + ctx context.Context + def component.PipeDefinition + mockSuite *mocks.MockSuite +} + +// defConstructor ... Default constructor for the test suite +func defConstructor(t *testing.T) *testSuite { + ctrl := gomock.NewController(t) + ctx, suite := mocks.Context(context.Background(), ctrl) + + p, err := pipe.NewEventDefinition(ctx, core.Layer1) + + if err != nil { + t.Fatal(err) + } + + return &testSuite{ + ctx: ctx, + def: p, + mockSuite: suite, + } +} + +// TestEventLogPipe ... Tests the event log pipe +func TestEventLogPipe(t *testing.T) { + var tests = []struct { + name string + constructor func(t *testing.T) *testSuite + runner func(t *testing.T, suite *testSuite) + }{ + { + name: "No Error When no Events to Monitor", + constructor: defConstructor, + runner: func(t *testing.T, suite *testSuite) { + + _, err := suite.def.Transform(suite.ctx, core.TransitData{}) + assert.NoError(t, err) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + suite := tt.constructor(t) + tt.runner(t, suite) + }) + } + +} diff --git a/internal/mocks/context.go b/internal/mocks/context.go index c28fe779..4b455051 100644 --- a/internal/mocks/context.go +++ b/internal/mocks/context.go @@ -4,16 +4,36 @@ import ( context "context" "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/state" gomock "github.com/golang/mock/gomock" ) +type MockSuite struct { + Ctrl *gomock.Controller + MockL1 *MockEthClient + MockL2 *MockEthClient + SS state.Store +} + // Context ... Creates a context with mocked clients -func Context(ctx context.Context, ctrl *gomock.Controller) context.Context { +func Context(ctx context.Context, ctrl *gomock.Controller) (context.Context, *MockSuite) { + // 1. Construct mocked clients mockedL1Client := NewMockEthClient(ctrl) mockedL2Client := NewMockEthClient(ctrl) + ss := state.NewMemState() + // 2. Bind to context ctx = context.WithValue(ctx, core.L1Client, mockedL1Client) ctx = context.WithValue(ctx, core.L2Client, mockedL2Client) + ctx = context.WithValue(ctx, core.State, ss) + + // 3. Generate mock suite + mockSuite := &MockSuite{ + Ctrl: ctrl, + MockL1: mockedL1Client, + MockL2: mockedL2Client, + SS: ss, + } - return ctx + return ctx, mockSuite } From b13f55170cd3a52e4475ea0ab841d075e5d44b60 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 03:28:01 -0700 Subject: [PATCH 02/12] [epociask/issue-141-resilient-pipes] Fleshing out unit tests --- internal/etl/registry/pipe/event_log.go | 11 +++--- internal/etl/registry/pipe/event_log_test.go | 40 ++++++++++++++++++-- internal/mocks/context.go | 11 +++--- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 83155ba1..40803c00 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -23,9 +23,10 @@ type EventDefinition struct { client client.EthClient dlq *p_common.DLQ[core.TransitData] - sk *core.StateKey pUUID core.PUUID ss state.Store + + SK *core.StateKey } func NewEventDefinition(ctx context.Context, n core.Network) (*EventDefinition, error) { @@ -67,7 +68,7 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, // 3. Set the post component construction fields on the definition // There's likely a more extensible way to construct this definition fields // given that they're used by component implementations across the ETL - ed.sk = p.StateKey().Clone() + ed.SK = p.StateKey().Clone() ed.pUUID = p.PUUID() return p, nil } @@ -82,9 +83,9 @@ func (ed *EventDefinition) getTopics(ctx context.Context, for _, address := range addresses { innerKey := &core.StateKey{ Nesting: false, - Prefix: ed.sk.Prefix, + Prefix: ed.SK.Prefix, ID: address, - PUUID: ed.sk.PUUID, + PUUID: ed.SK.PUUID, } // 1.1 Attempt to fetch the events to monitor from the state store @@ -175,7 +176,7 @@ func (ed *EventDefinition) transformFunc(ctx context.Context, td core.TransitDat logging.NoContext().Debug("Getting addresses", zap.String(logging.PUUIDKey, ed.pUUID.String())) - addresses, err := ed.ss.GetSlice(ctx, ed.sk) + addresses, err := ed.ss.GetSlice(ctx, ed.SK) if err != nil { return []core.TransitData{}, err } diff --git a/internal/etl/registry/pipe/event_log_test.go b/internal/etl/registry/pipe/event_log_test.go index 501c2cd7..58d6f85b 100644 --- a/internal/etl/registry/pipe/event_log_test.go +++ b/internal/etl/registry/pipe/event_log_test.go @@ -6,6 +6,8 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/registry/pipe" + "github.com/base-org/pessimism/internal/state" + "github.com/ethereum/go-ethereum/core/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -25,15 +27,17 @@ func defConstructor(t *testing.T) *testSuite { ctrl := gomock.NewController(t) ctx, suite := mocks.Context(context.Background(), ctrl) - p, err := pipe.NewEventDefinition(ctx, core.Layer1) - + ed, err := pipe.NewEventDefinition(ctx, core.Layer1) if err != nil { t.Fatal(err) } + nilKey := &core.StateKey{} + ed.SK = nilKey + return &testSuite{ ctx: ctx, - def: p, + def: ed, mockSuite: suite, } } @@ -50,7 +54,35 @@ func TestEventLogPipe(t *testing.T) { constructor: defConstructor, runner: func(t *testing.T, suite *testSuite) { - _, err := suite.def.Transform(suite.ctx, core.TransitData{}) + _, err := suite.def.Transform(suite.ctx, core.TransitData{ + Value: types.Block{}, + }) + assert.NoError(t, err) + }, + }, + { + name: "No Error When no Events to Monitor", + constructor: func(t *testing.T) *testSuite { + ts := defConstructor(t) + + state.InsertUnique(ts.ctx, &core.StateKey{ + Nesting: true, + }, "0x00000000") + + innerKey := &core.StateKey{ + Nesting: false, + ID: "0x00000000", + } + + state.InsertUnique(ts.ctx, innerKey, "transfer(address,address,uint256)") + return ts + }, + runner: func(t *testing.T, suite *testSuite) { + suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, nil) + + _, err := suite.def.Transform(suite.ctx, core.TransitData{ + Value: types.NewBlockWithHeader(&types.Header{}), + }) assert.NoError(t, err) }, }, diff --git a/internal/mocks/context.go b/internal/mocks/context.go index 4b455051..e5c74e4b 100644 --- a/internal/mocks/context.go +++ b/internal/mocks/context.go @@ -18,20 +18,19 @@ type MockSuite struct { // Context ... Creates a context with mocked clients func Context(ctx context.Context, ctrl *gomock.Controller) (context.Context, *MockSuite) { // 1. Construct mocked clients - mockedL1Client := NewMockEthClient(ctrl) - mockedL2Client := NewMockEthClient(ctrl) + mockedClient := NewMockEthClient(ctrl) ss := state.NewMemState() // 2. Bind to context - ctx = context.WithValue(ctx, core.L1Client, mockedL1Client) - ctx = context.WithValue(ctx, core.L2Client, mockedL2Client) + ctx = context.WithValue(ctx, core.L1Client, mockedClient) + ctx = context.WithValue(ctx, core.L2Client, mockedClient) ctx = context.WithValue(ctx, core.State, ss) // 3. Generate mock suite mockSuite := &MockSuite{ Ctrl: ctrl, - MockL1: mockedL1Client, - MockL2: mockedL2Client, + MockL1: mockedClient, + MockL2: mockedClient, SS: ss, } From 9688fbeaaf04db1e4a1c538803c06e4e16e94077 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 03:29:48 -0700 Subject: [PATCH 03/12] [epociask/issue-141-resilient-pipes] Fix spacing issue --- internal/etl/registry/pipe/event_log_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/etl/registry/pipe/event_log_test.go b/internal/etl/registry/pipe/event_log_test.go index 58d6f85b..259f3913 100644 --- a/internal/etl/registry/pipe/event_log_test.go +++ b/internal/etl/registry/pipe/event_log_test.go @@ -53,7 +53,6 @@ func TestEventLogPipe(t *testing.T) { name: "No Error When no Events to Monitor", constructor: defConstructor, runner: func(t *testing.T, suite *testSuite) { - _, err := suite.def.Transform(suite.ctx, core.TransitData{ Value: types.Block{}, }) From c1c0f1c0d08334a5a58d7326db69bfc3473f15dc Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 15:18:24 -0700 Subject: [PATCH 04/12] [epociask/issue-141-resilient-pipes] Added more tests --- internal/common/dlq.go | 5 ++ internal/etl/component/pipe.go | 21 +++-- internal/etl/registry/pipe/event_log.go | 30 +++++-- internal/etl/registry/pipe/event_log_test.go | 92 ++++++++++++++++---- 4 files changed, 118 insertions(+), 30 deletions(-) diff --git a/internal/common/dlq.go b/internal/common/dlq.go index 03c01d02..e65d7132 100644 --- a/internal/common/dlq.go +++ b/internal/common/dlq.go @@ -61,3 +61,8 @@ func (d *DLQ[E]) PopAll() []*E { func (d *DLQ[E]) Empty() bool { return len(d.dlq) == 0 } + +// Size ... Returns the size of the DLQ +func (d *DLQ[E]) Size() int { + return len(d.dlq) +} diff --git a/internal/etl/component/pipe.go b/internal/etl/component/pipe.go index 8c2f9427..78086b23 100644 --- a/internal/etl/component/pipe.go +++ b/internal/etl/component/pipe.go @@ -75,13 +75,17 @@ func (p *Pipe) EventLoop() error { for { select { case inputData := <-inChan: + // 1. Transform the input data to some output data + // NOTE ... Continuing to process data even if there is an error + // is done in the event of partial processing where some data + // may be successfully processed and some may not outputData, err := p.def.Transform(p.ctx, inputData) if err != nil { // TODO - Introduce metrics service (`prometheus`) call logger.Error(err.Error(), zap.String("ID", p.id.String())) - continue } + // 2. Determine if component is at the end of a pipeline, emit metrics if so if p.egressHandler.PathEnd() { latency := float64(time.Since(inputData.OriginTS).Milliseconds()) @@ -90,16 +94,17 @@ func (p *Pipe) EventLoop() error { latency) } - if length := len(outputData); length > 0 { - logger.Debug("Received tranformation output data", - zap.String("ID", p.id.String()), - zap.Int("Length", length)) - } else { - logger.Debug("Received output data of length 0", - zap.String("ID", p.id.String())) + // 3. Verify that some output data was produced and continue if not + length := len(outputData) + logger.Debug("Received transformation output data", + zap.String("ID", p.id.String()), + zap.Int("Length", length)) + + if length == 0 { continue } + // 4. Batch send output data to subscribed downstream components logger.Debug("Sending data batch", zap.String("ID", p.id.String()), zap.String("Type", p.OutputType().String())) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 40803c00..a4430347 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -18,7 +18,16 @@ import ( "go.uber.org/zap" ) -// EventDefinition ... +const ( + // dlqMaxSize ... Max size of the DLQ + // NOTE ... This could be made configurable via env vars + // or some other mechanism if needed + dlqMaxSize = 100 +) + +// EventDefinition ... Represents the stateful definition of the event log pipe component +// Used to transform block data into event logs (block->events) +// Uses a DLQ to reprocess failed queries & state store to get events to monitor type EventDefinition struct { client client.EthClient dlq *p_common.DLQ[core.TransitData] @@ -29,6 +38,7 @@ type EventDefinition struct { SK *core.StateKey } +// NewEventDefinition ... Initializes the event log pipe definition func NewEventDefinition(ctx context.Context, n core.Network) (*EventDefinition, error) { // 1. Load dependencies from context client, err := client.FromContext(ctx, n) @@ -43,7 +53,7 @@ func NewEventDefinition(ctx context.Context, n core.Network) (*EventDefinition, // 2. Construct the pipe definition ed := &EventDefinition{ - dlq: p_common.NewTransitDLQ(10), + dlq: p_common.NewTransitDLQ(dlqMaxSize), client: client, ss: ss, } @@ -126,14 +136,22 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( // 2. If there are no failed queries, then process the current block data // and add a data input to the DLQ if it fails for reprocessing next block - tds, err = ed.transformFunc(ctx, td) + tds2, err := ed.transformFunc(ctx, td) if err != nil { - err = ed.dlq.Add(&td) - if err != nil { - return tds, err + err2 := ed.dlq.Add(&td) + if err2 != nil { + logging.WithContext(ctx).Error(err2.Error()) } + + logging.WithContext(ctx).Error("Failed to process block data", + zap.Int("dlq_size", ed.dlq.Size())) + + return tds, err } + // 3. Concatenate the results from the failed queries and the current block data + // and return + tds = append(tds, tds2...) return tds, nil } diff --git a/internal/etl/registry/pipe/event_log_test.go b/internal/etl/registry/pipe/event_log_test.go index 259f3913..f8c278e2 100644 --- a/internal/etl/registry/pipe/event_log_test.go +++ b/internal/etl/registry/pipe/event_log_test.go @@ -2,11 +2,13 @@ package pipe_test import ( "context" + "fmt" "testing" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/registry/pipe" "github.com/base-org/pessimism/internal/state" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -27,6 +29,19 @@ func defConstructor(t *testing.T) *testSuite { ctrl := gomock.NewController(t) ctx, suite := mocks.Context(context.Background(), ctrl) + // Populate the state store with the events to monitor + // NOTE - There's likely a more extensible way to handle nested keys in the state store + _ = state.InsertUnique(ctx, &core.StateKey{ + Nesting: true, + }, "0x00000000") + + innerKey := &core.StateKey{ + Nesting: false, + ID: "0x00000000", + } + + _ = state.InsertUnique(ctx, innerKey, "transfer(address,address,uint256)") + ed, err := pipe.NewEventDefinition(ctx, core.Layer1) if err != nil { t.Fatal(err) @@ -50,39 +65,84 @@ func TestEventLogPipe(t *testing.T) { runner func(t *testing.T, suite *testSuite) }{ { - name: "No Error When no Events to Monitor", + name: "Error when failed FilterQuery", constructor: defConstructor, runner: func(t *testing.T, suite *testSuite) { + suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("unknown block")) + _, err := suite.def.Transform(suite.ctx, core.TransitData{ - Value: types.Block{}, - }) - assert.NoError(t, err) + Value: *types.NewBlockWithHeader(&types.Header{})}) + assert.Error(t, err) }, }, { - name: "No Error When no Events to Monitor", + name: "No Error When Successful Filter Query", constructor: func(t *testing.T) *testSuite { ts := defConstructor(t) - state.InsertUnique(ts.ctx, &core.StateKey{ - Nesting: true, - }, "0x00000000") + return ts + }, + runner: func(t *testing.T, suite *testSuite) { + suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, nil) - innerKey := &core.StateKey{ - Nesting: false, - ID: "0x00000000", - } + tds, err := suite.def.Transform(suite.ctx, core.TransitData{ + Value: *types.NewBlockWithHeader(&types.Header{}), + }) + assert.NoError(t, err) + assert.Empty(t, tds) + }, + }, + { + name: "DLQ Retry When Failed Filter Query", + constructor: func(t *testing.T) *testSuite { + ts := defConstructor(t) - state.InsertUnique(ts.ctx, innerKey, "transfer(address,address,uint256)") return ts }, runner: func(t *testing.T, suite *testSuite) { - suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, nil) + // 1. Fail the first filter query and assert that the DLQ is populated + suite.mockSuite.MockL1.EXPECT(). + FilterLogs(gomock.Any(), gomock.Any()). + Return(nil, fmt.Errorf("unknown block")) - _, err := suite.def.Transform(suite.ctx, core.TransitData{ - Value: types.NewBlockWithHeader(&types.Header{}), + tds, err := suite.def.Transform(suite.ctx, core.TransitData{ + Value: *types.NewBlockWithHeader(&types.Header{}), + }) + assert.Error(t, err) + assert.Empty(t, tds) + + log1 := types.Log{ + Address: common.HexToAddress("0x0"), + } + + log2 := types.Log{ + Address: common.HexToAddress("0x1"), + } + + // 2. Retry the filter query and assert that the DLQ is empty + suite.mockSuite.MockL1.EXPECT(). + FilterLogs(gomock.Any(), gomock.Any()). + Return([]types.Log{log1}, nil) + + suite.mockSuite.MockL1.EXPECT(). + FilterLogs(gomock.Any(), gomock.Any()). + Return([]types.Log{log2}, nil) + + tds, err = suite.def.Transform(suite.ctx, core.TransitData{ + Value: *types.NewBlockWithHeader(&types.Header{}), }) + assert.NoError(t, err) + assert.NotEmpty(t, tds) + + actualLog1, ok := tds[0].Value.(types.Log) + assert.True(t, ok) + + actualLog2, ok := tds[1].Value.(types.Log) + assert.True(t, ok) + + assert.Equal(t, actualLog1, log1) + assert.Equal(t, actualLog2, log2) }, }, } From 4e5c11199a4d1d135e719168c8678f724e8fa75e Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 16:23:03 -0700 Subject: [PATCH 05/12] [epociask/issue-141-resilient-pipes] Added additional logging and code comments --- internal/common/dlq.go | 2 +- internal/etl/registry/pipe/event_log.go | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/internal/common/dlq.go b/internal/common/dlq.go index e65d7132..e4e0870f 100644 --- a/internal/common/dlq.go +++ b/internal/common/dlq.go @@ -12,7 +12,7 @@ const ( ) // DLQ ... Dead Letter Queue construct -// Used to store block hashes of ETL events +// Used to store ETL events // that failed to be processed type DLQ[E any] struct { size int diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index a4430347..c0979b4c 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -102,7 +102,9 @@ func (ed *EventDefinition) getTopics(ctx context.Context, // and continue if there is an error events, err := ss.GetSlice(ctx, innerKey) if err != nil { - logging.WithContext(ctx).Error(err.Error()) + logging.WithContext(ctx).Error("Failed to get events to monitor", + zap.String(logging.PUUIDKey, ed.pUUID.String()), + zap.Error(err)) continue } @@ -123,15 +125,27 @@ func (ed *EventDefinition) getTopics(ctx context.Context, // Transform ... Attempts to reprocess previously failed queries first // before attempting to process the current block data func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ([]core.TransitData, error) { + logger := logging.WithContext(ctx) // 1. Check to see if there are any failed queries to reprocess // If failures occur again, add the caller (Transform) // function input to the DLQ and return - tds, err := ed.attemptDLQ(ctx) - if err != nil { - err = ed.dlq.Add(&td) + var tds []core.TransitData + + if !ed.dlq.Empty() { + logger.Debug("Attempting to reprocess failed queries", + zap.Int("dlq_size", ed.dlq.Size())) + + tds, err := ed.attemptDLQ(ctx) + // NOTE ... Returning here is intentional to ensure that block events + // downstream are processed in the sequential order for which they came in if err != nil { - return tds, err + err = ed.dlq.Add(&td) + if err != nil { + return tds, err + } } + logger.Debug("Successfully reprocessed failed queries", + zap.String(logging.PUUIDKey, ed.pUUID.String())) } // 2. If there are no failed queries, then process the current block data From 7f4f11ce1aa3777adce2036f34d0681302ed33f8 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 16:35:23 -0700 Subject: [PATCH 06/12] [epociask/issue-141-resilient-pipes] Fixed failing test --- internal/etl/registry/pipe/event_log.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index c0979b4c..34dd67ab 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -129,13 +129,16 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( // 1. Check to see if there are any failed queries to reprocess // If failures occur again, add the caller (Transform) // function input to the DLQ and return - var tds []core.TransitData + var ( + tds []core.TransitData + err error + ) if !ed.dlq.Empty() { logger.Debug("Attempting to reprocess failed queries", zap.Int("dlq_size", ed.dlq.Size())) - tds, err := ed.attemptDLQ(ctx) + tds, err = ed.attemptDLQ(ctx) // NOTE ... Returning here is intentional to ensure that block events // downstream are processed in the sequential order for which they came in if err != nil { @@ -146,6 +149,7 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( } logger.Debug("Successfully reprocessed failed queries", zap.String(logging.PUUIDKey, ed.pUUID.String())) + } // 2. If there are no failed queries, then process the current block data From 4d82ddb618991392354a7d9b6f202b84b93243d9 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 16:37:19 -0700 Subject: [PATCH 07/12] [epociask/issue-141-resilient-pipes] Fixed linting bug --- internal/etl/registry/pipe/event_log.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 34dd67ab..b354f039 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -149,7 +149,6 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( } logger.Debug("Successfully reprocessed failed queries", zap.String(logging.PUUIDKey, ed.pUUID.String())) - } // 2. If there are no failed queries, then process the current block data From dc93a5eb6157044c569dccbba7e6dd128364356f Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 20:05:47 -0700 Subject: [PATCH 08/12] [epociask/issue-141-resilient-pipes] Update DLQ logic --- internal/common/dlq.go | 5 +++++ internal/etl/registry/pipe/event_log.go | 16 +++++++++++++--- internal/metrics/factory.go | 3 +++ internal/metrics/metrics.go | 19 ++++++++++++++++++- 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/internal/common/dlq.go b/internal/common/dlq.go index e4e0870f..03cf290d 100644 --- a/internal/common/dlq.go +++ b/internal/common/dlq.go @@ -66,3 +66,8 @@ func (d *DLQ[E]) Empty() bool { func (d *DLQ[E]) Size() int { return len(d.dlq) } + +// Full ... Checks if the DLQ is full +func (d *DLQ[E]) Full() bool { + return len(d.dlq) >= d.size +} diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index b354f039..84f7cb71 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -9,6 +9,7 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/component" "github.com/base-org/pessimism/internal/logging" + "github.com/base-org/pessimism/internal/metrics" "github.com/base-org/pessimism/internal/state" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -155,11 +156,20 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( // and add a data input to the DLQ if it fails for reprocessing next block tds2, err := ed.transformFunc(ctx, td) if err != nil { - err2 := ed.dlq.Add(&td) - if err2 != nil { - logging.WithContext(ctx).Error(err2.Error()) + if ed.dlq.Full() { + // NOTE ... If the DLQ is full, then we pop the oldest entry + // to make room for the new entry + lostVal, _ := ed.dlq.Pop() + logger.Warn("DLQ is full, popping oldest entry", + zap.String(logging.PUUIDKey, ed.pUUID.String()), + zap.Any("lost_value", lostVal)) + + metrics.WithContext(ctx). + IncMissedBlock(ed.pUUID) } + // NOTE ... If the DLQ is not full, then we add the entry to the DLQ + _ = ed.dlq.Add(&td) logging.WithContext(ctx).Error("Failed to process block data", zap.Int("dlq_size", ed.dlq.Size())) diff --git a/internal/metrics/factory.go b/internal/metrics/factory.go index 869c7c56..3641536d 100644 --- a/internal/metrics/factory.go +++ b/internal/metrics/factory.go @@ -12,6 +12,7 @@ const ( CounterType = "counter" ) +// Factory ... Interface for creating prometheus metrics type Factory interface { NewCounter(opts prometheus.CounterOpts) prometheus.Counter NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec @@ -24,6 +25,7 @@ type Factory interface { Document() []DocumentedMetric } +// DocumentedMetric ... Struct for documenting metrics type DocumentedMetric struct { Type string `json:"type"` Name string `json:"name"` @@ -31,6 +33,7 @@ type DocumentedMetric struct { Labels []string `json:"labels"` } +// documentor ... Factory implementation that documents metrics type documentor struct { metrics []DocumentedMetric factory promauto.Factory diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 53245103..5db00de5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -20,8 +20,10 @@ const ( SubsystemEtl = "etl" ) +// serverShutdownTimeout ... Timeout for shutting down the metrics server const serverShutdownTimeout = 10 * time.Second +// Config ... Metrics server configuration type Config struct { Host string Port int @@ -29,7 +31,9 @@ type Config struct { ReadHeaderTimeout int } +// Metricer ... Interface for metrics type Metricer interface { + IncMissedBlock(pUUID core.PUUID) IncActiveHeuristics(ht core.HeuristicType, network core.Network, pipelineType core.PipelineType) IncActivePipelines(pipelineType core.PipelineType, network core.Network) DecActivePipelines(pipelineType core.PipelineType, network core.Network) @@ -46,6 +50,7 @@ type Metricer interface { Document() []DocumentedMetric } +// Metrics ... Metrics struct type Metrics struct { Up prometheus.Gauge ActivePipelines *prometheus.GaugeVec @@ -53,6 +58,7 @@ type Metrics struct { HeuristicRuns *prometheus.CounterVec AlertsGenerated *prometheus.CounterVec NodeErrors *prometheus.CounterVec + MissedBlocks *prometheus.CounterVec BlockLatency *prometheus.GaugeVec PipelineLatency *prometheus.GaugeVec InvExecutionTime *prometheus.GaugeVec @@ -145,6 +151,11 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) { Help: "Number of errors generated by heuristic executions", Namespace: metricsNamespace, }, []string{"heuristic"}), + MissedBlocks: factory.NewCounterVec(prometheus.CounterOpts{ + Name: "missed_blocks_total", + Help: "Number of missed blocks", + Namespace: metricsNamespace, + }, []string{"puuid"}), registry: registry, factory: factory, @@ -182,6 +193,11 @@ func (m *Metrics) RecordInvExecutionTime(h heuristic.Heuristic, latency float64) m.InvExecutionTime.WithLabelValues(ht).Set(latency) } +// IncMissedBlock ... Increments the number of missed blocks +func (m *Metrics) IncMissedBlock(pUUID core.PUUID) { + m.MissedBlocks.WithLabelValues(pUUID.String()).Inc() +} + // IncActiveHeuristics ... Increments the number of active heuristics func (m *Metrics) IncActiveHeuristics(ht core.HeuristicType, n core.Network, pipelineType core.PipelineType) { @@ -243,7 +259,8 @@ type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) -func (n *noopMetricer) RecordUp() {} +func (n *noopMetricer) IncMissedBlock(_ core.PUUID) {} +func (n *noopMetricer) RecordUp() {} func (n *noopMetricer) IncActiveHeuristics(_ core.HeuristicType, _ core.Network, _ core.PipelineType) { } func (n *noopMetricer) RecordInvExecutionTime(_ heuristic.Heuristic, _ float64) {} From 05ef8d0b5f8705ecb200b933ef00ea400090d805 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 20:07:33 -0700 Subject: [PATCH 09/12] [epociask/issue-141-resilient-pipes] Fix merge conflicts --- internal/etl/registry/pipe/event_log.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 536c13bf..84f7cb71 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -60,7 +60,6 @@ func NewEventDefinition(ctx context.Context, n core.Network) (*EventDefinition, } return ed, nil } -<<<<<<< HEAD // NewEventParserPipe ... Initializer func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, @@ -71,18 +70,6 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, return nil, err } -======= - -// NewEventParserPipe ... Initializer -func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, - opts ...component.Option) (component.Component, error) { - // 1. Construct the pipe definition - ed, err := NewEventDefinition(ctx, cfg.Network) - if err != nil { - return nil, err - } - ->>>>>>> d9da3aec27e458936600f46e40aed8141688994d // 2. Embed the definition into a generic pipe construction p, err := component.NewPipe(ctx, ed, core.GethBlock, core.EventLog, opts...) if err != nil { From bafd15ce2f5e53b5cbd3451b83bca8ef6e84cc6f Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 9 Aug 2023 20:10:20 -0700 Subject: [PATCH 10/12] [epociask/issue-141-resilient-pipes] Removed unecessary comment --- internal/etl/registry/pipe/event_log.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 84f7cb71..3f54ac10 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -168,7 +168,6 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( IncMissedBlock(ed.pUUID) } - // NOTE ... If the DLQ is not full, then we add the entry to the DLQ _ = ed.dlq.Add(&td) logging.WithContext(ctx).Error("Failed to process block data", zap.Int("dlq_size", ed.dlq.Size())) From 4d6298685869c6f0b00ab207b1ee5a88e76c2b2c Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Thu, 10 Aug 2023 00:09:01 -0700 Subject: [PATCH 11/12] [epociask/large-withdrawal-poc] Large Withdrawal PoC --- internal/core/constants.go | 7 + internal/engine/registry/large_withdrawal.go | 142 +++++++++++++++++++ internal/engine/registry/registry.go | 18 +++ 3 files changed, 167 insertions(+) create mode 100644 internal/engine/registry/large_withdrawal.go diff --git a/internal/core/constants.go b/internal/core/constants.go index bca816ff..084f3ec8 100644 --- a/internal/core/constants.go +++ b/internal/core/constants.go @@ -93,6 +93,7 @@ const ( ContractEvent WithdrawalEnforcement FaultDetector + LargeWithdrawal ) // String ... Converts a heuristic type to a string @@ -110,6 +111,9 @@ func (it HeuristicType) String() string { case FaultDetector: return "fault_detector" + case LargeWithdrawal: + return "large_withdrawal" + default: return "unknown" } @@ -130,6 +134,9 @@ func StringToHeuristicType(stringType string) HeuristicType { case "fault_detector": return FaultDetector + case "large_withdrawal": + return LargeWithdrawal + default: return HeuristicType(0) } diff --git a/internal/engine/registry/large_withdrawal.go b/internal/engine/registry/large_withdrawal.go new file mode 100644 index 00000000..42590889 --- /dev/null +++ b/internal/engine/registry/large_withdrawal.go @@ -0,0 +1,142 @@ +package registry + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "time" + + "github.com/base-org/pessimism/internal/client" + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/engine/heuristic" + "github.com/base-org/pessimism/internal/logging" + "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + + "go.uber.org/zap" +) + +const largeWithdrawalMsg = ` + Large withdrawal has been proven on L1 + L1PortalAddress: %s + L2ToL1Address: %s + + Session UUID: %s + L1 Proving Transaction Hash: %s + L2 Initialization Transaction Hash: %s + Withdrawal Size: %d +` + +// LargeWithdrawalCfg ... Configuration for the balance heuristic +type LargeWithdrawalCfg struct { + Threshold *big.Int `json:"threshold"` + + L1PortalAddress string `json:"l1_portal_address"` + L2ToL1Address string `json:"l2_to_l1_address"` +} + +// LargeWithdrawHeuristic ... LargeWithdrawal heuristic implementation +type LargeWithdrawHeuristic struct { + eventHash common.Hash + cfg *LargeWithdrawalCfg + l2tol1MessagePasser *bindings.L2ToL1MessagePasserFilterer + l1PortalFilter *bindings.OptimismPortalFilterer + + heuristic.Heuristic +} + +// Unmarshal ... Converts a general config to a LargeWithdrawal heuristic config +func (cfg *LargeWithdrawalCfg) Unmarshal(isp *core.SessionParams) error { + return json.Unmarshal(isp.Bytes(), &cfg) +} + +// NewLargeWithdrawHeuristic ... Initializer +func NewLargeWithdrawHeuristic(ctx context.Context, cfg *LargeWithdrawalCfg) (heuristic.Heuristic, error) { + l2Client, err := client.FromContext(ctx, core.Layer2) + if err != nil { + return nil, err + } + + l1Client, err := client.FromContext(ctx, core.Layer1) + if err != nil { + return nil, err + } + + withdrawalHash := crypto.Keccak256Hash([]byte(WithdrawalProvenEvent)) + + addr := common.HexToAddress(cfg.L2ToL1Address) + addr2 := common.HexToAddress(cfg.L1PortalAddress) + l2MessagePasser, err := bindings.NewL2ToL1MessagePasserFilterer(addr, l2Client) + if err != nil { + return nil, err + } + + filter, err := bindings.NewOptimismPortalFilterer(addr2, l1Client) + if err != nil { + return nil, err + } + + return &LargeWithdrawHeuristic{ + cfg: cfg, + + eventHash: withdrawalHash, + l1PortalFilter: filter, + l2tol1MessagePasser: l2MessagePasser, + + Heuristic: heuristic.NewBaseHeuristic(core.EventLog), + }, nil +} + +// Assess ... Verifies than an L1 WithdrawalProven has a correlating hash +// to the withdrawal storage of the L2ToL1MessagePasser +func (wi *LargeWithdrawHeuristic) Assess(td core.TransitData) (*core.Activation, bool, error) { + logging.NoContext().Debug("Checking activation for withdrawal enforcement heuristic", + zap.String("data", fmt.Sprintf("%v", td))) + + // 1. Validate and extract data input + if td.Type != wi.InputType() { + return nil, false, fmt.Errorf("invalid type supplied") + } + + if td.Address.String() != wi.cfg.L1PortalAddress { + return nil, false, fmt.Errorf(invalidAddrErr, td.Address.String(), wi.cfg.L1PortalAddress) + } + + log, success := td.Value.(types.Log) + if !success { + return nil, false, fmt.Errorf(couldNotCastErr, "types.Log") + } + + // 2. Parse the log to a WithdrawalProven structured type + provenWithdrawal, err := wi.l1PortalFilter.ParseWithdrawalProven(log) + if err != nil { + return nil, false, err + } + + // 3. Check if the withdrawal exists in the message outbox of the L2ToL1MessagePasser contract + iterator, err := wi.l2tol1MessagePasser.FilterMessagePassed(nil, + []*big.Int{}, []common.Address{provenWithdrawal.From}, []common.Address{provenWithdrawal.To}) + if err != nil { + return nil, false, err + } + + for iterator.Next() { + if iterator.Event.WithdrawalHash == provenWithdrawal.WithdrawalHash { // Found the associated withdrawal on L2 + // 4. Check if the withdrawal amount is greater than the threshold + if iterator.Event.Value.Cmp(wi.cfg.Threshold) == 1 { + return &core.Activation{ + TimeStamp: time.Now(), + Message: fmt.Sprintf(largeWithdrawalMsg, + wi.cfg.L1PortalAddress, wi.cfg.L2ToL1Address, + wi.SUUID(), log.TxHash.Hex(), iterator.Event.Raw.TxHash, + iterator.Event.Value), + }, true, nil + } + } + } + + return nil, false, nil +} diff --git a/internal/engine/registry/registry.go b/internal/engine/registry/registry.go index 938d3c0e..1060736d 100644 --- a/internal/engine/registry/registry.go +++ b/internal/engine/registry/registry.go @@ -48,6 +48,12 @@ func NewHeuristicTable() HeuristicTable { InputType: core.EventLog, Constructor: constructWithdrawalEnforce, }, + core.LargeWithdrawal: { + PrepareValidate: WithdrawEnforcePrepare, + Policy: core.OnlyLayer1, + InputType: core.EventLog, + Constructor: constructLargeWithdrawal, + }, } return tbl @@ -101,6 +107,18 @@ func constructWithdrawalEnforce(ctx context.Context, isp *core.SessionParams) (h return NewWithdrawalEnforceInv(ctx, cfg) } +// constructLargeWithdrawal ... Constructs a large withdrawal heuristic instance +func constructLargeWithdrawal(ctx context.Context, isp *core.SessionParams) (heuristic.Heuristic, error) { + cfg := &LargeWithdrawalCfg{} + err := cfg.Unmarshal(isp) + + if err != nil { + return nil, err + } + + return NewLargeWithdrawHeuristic(ctx, cfg) +} + // ValidateEventTracking ... Ensures that an address and nested args exist in the session params func ValidateEventTracking(cfg *core.SessionParams) error { err := ValidateAddressing(cfg) From d36e3af7a6f4ba1435a05e218196d580a184de00 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 25 Oct 2023 11:38:13 -0700 Subject: [PATCH 12/12] [epociask/large-withdrawal-poc] Indexer integration --- internal/engine/engine.go | 21 +++++-- internal/engine/registry/constants.go | 9 +++ internal/engine/registry/large_withdrawal.go | 63 +++++++++----------- 3 files changed, 53 insertions(+), 40 deletions(-) diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 9f79fab8..8a4f3dc7 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -8,6 +8,7 @@ import ( "github.com/base-org/pessimism/internal/engine/heuristic" "github.com/base-org/pessimism/internal/logging" "github.com/base-org/pessimism/internal/metrics" + "github.com/ethereum-optimism/optimism/op-service/retry" "go.uber.org/zap" ) @@ -95,11 +96,23 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) { logger.Debug("Heuristic input received", zap.String(logging.SUUIDKey, execInput.h.SUUID().String())) - // (1) Execute heuristic + // (1) Execute heuristic with retry strategy start := time.Now() - outcome, activated := hce.Execute(ctx, execInput.hi.Input, execInput.h) - metrics.WithContext(ctx).RecordHeuristicRun(execInput.h) - metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) + + var outcome *core.Activation + var activated bool + + retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} + if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) { + outcome, activated = hce.Execute(ctx, execInput.hi.Input, execInput.h) + metrics.WithContext(ctx).RecordHeuristicRun(execInput.h) + metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) + // a-ok! + return nil, nil + }); err != nil { + logger.Error("Failed to execute heuristic", zap.Error(err)) + metrics.WithContext(ctx).RecordAssessmentError(execInput.h) + } // (2) Send alert if activated if activated { diff --git a/internal/engine/registry/constants.go b/internal/engine/registry/constants.go index 1ba3503c..7d2c4ede 100644 --- a/internal/engine/registry/constants.go +++ b/internal/engine/registry/constants.go @@ -1,5 +1,9 @@ package registry +import ( + "github.com/ethereum/go-ethereum/crypto" +) + const ( // Error constant strings invalidAddrErr = "invalid address provided for heuristic. expected %s, got %s" @@ -10,4 +14,9 @@ const ( // Event declaration strings OutputProposedEvent = "OutputProposed(bytes32,uint256,uint256,uint256)" WithdrawalProvenEvent = "WithdrawalProven(bytes32,address,address)" + MessagePassed = "MessagePassed(uint256,address,address,uint256,uint256,bytes,bytes32)" +) + +var ( + MessagePassedSig = crypto.Keccak256Hash([]byte(MessagePassed)) ) diff --git a/internal/engine/registry/large_withdrawal.go b/internal/engine/registry/large_withdrawal.go index 42590889..c8bf0738 100644 --- a/internal/engine/registry/large_withdrawal.go +++ b/internal/engine/registry/large_withdrawal.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math/big" + "strconv" "time" "github.com/base-org/pessimism/internal/client" @@ -40,10 +41,10 @@ type LargeWithdrawalCfg struct { // LargeWithdrawHeuristic ... LargeWithdrawal heuristic implementation type LargeWithdrawHeuristic struct { - eventHash common.Hash - cfg *LargeWithdrawalCfg - l2tol1MessagePasser *bindings.L2ToL1MessagePasserFilterer - l1PortalFilter *bindings.OptimismPortalFilterer + eventHash common.Hash + cfg *LargeWithdrawalCfg + indexerClient client.IndexerClient + l1PortalFilter *bindings.OptimismPortalFilterer heuristic.Heuristic } @@ -55,26 +56,15 @@ func (cfg *LargeWithdrawalCfg) Unmarshal(isp *core.SessionParams) error { // NewLargeWithdrawHeuristic ... Initializer func NewLargeWithdrawHeuristic(ctx context.Context, cfg *LargeWithdrawalCfg) (heuristic.Heuristic, error) { - l2Client, err := client.FromContext(ctx, core.Layer2) - if err != nil { - return nil, err - } - - l1Client, err := client.FromContext(ctx, core.Layer1) + clients, err := client.FromContext(ctx) if err != nil { return nil, err } withdrawalHash := crypto.Keccak256Hash([]byte(WithdrawalProvenEvent)) + portalAddr := common.HexToAddress(cfg.L1PortalAddress) - addr := common.HexToAddress(cfg.L2ToL1Address) - addr2 := common.HexToAddress(cfg.L1PortalAddress) - l2MessagePasser, err := bindings.NewL2ToL1MessagePasserFilterer(addr, l2Client) - if err != nil { - return nil, err - } - - filter, err := bindings.NewOptimismPortalFilterer(addr2, l1Client) + filter, err := bindings.NewOptimismPortalFilterer(portalAddr, clients.L1Client) if err != nil { return nil, err } @@ -82,9 +72,9 @@ func NewLargeWithdrawHeuristic(ctx context.Context, cfg *LargeWithdrawalCfg) (he return &LargeWithdrawHeuristic{ cfg: cfg, - eventHash: withdrawalHash, - l1PortalFilter: filter, - l2tol1MessagePasser: l2MessagePasser, + eventHash: withdrawalHash, + l1PortalFilter: filter, + indexerClient: clients.IndexerClient, Heuristic: heuristic.NewBaseHeuristic(core.EventLog), }, nil @@ -116,25 +106,26 @@ func (wi *LargeWithdrawHeuristic) Assess(td core.TransitData) (*core.Activation, return nil, false, err } - // 3. Check if the withdrawal exists in the message outbox of the L2ToL1MessagePasser contract - iterator, err := wi.l2tol1MessagePasser.FilterMessagePassed(nil, - []*big.Int{}, []common.Address{provenWithdrawal.From}, []common.Address{provenWithdrawal.To}) + // 3. Get withdrawal metadata from OP Indexer API + withdrawals, err := wi.indexerClient.GetAllWithdrawalsByAddress(provenWithdrawal.From) if err != nil { return nil, false, err } - for iterator.Next() { - if iterator.Event.WithdrawalHash == provenWithdrawal.WithdrawalHash { // Found the associated withdrawal on L2 - // 4. Check if the withdrawal amount is greater than the threshold - if iterator.Event.Value.Cmp(wi.cfg.Threshold) == 1 { - return &core.Activation{ - TimeStamp: time.Now(), - Message: fmt.Sprintf(largeWithdrawalMsg, - wi.cfg.L1PortalAddress, wi.cfg.L2ToL1Address, - wi.SUUID(), log.TxHash.Hex(), iterator.Event.Raw.TxHash, - iterator.Event.Value), - }, true, nil - } + for _, withdrawal := range withdrawals { + asInt, err := strconv.Atoi(withdrawal.Amount) + if err != nil { + return nil, false, err + } + + if asInt > int(wi.cfg.Threshold.Int64()) { + return &core.Activation{ + TimeStamp: time.Now(), + Message: fmt.Sprintf(largeWithdrawalMsg, + wi.cfg.L1PortalAddress, wi.cfg.L2ToL1Address, + wi.SUUID(), log.TxHash.Hex(), withdrawal.TransactionHash, + withdrawal.Amount), + }, true, nil } }