diff --git a/internal/common/dlq.go b/internal/common/dlq.go index e4e0870f..03cf290d 100644 --- a/internal/common/dlq.go +++ b/internal/common/dlq.go @@ -66,3 +66,8 @@ func (d *DLQ[E]) Empty() bool { func (d *DLQ[E]) Size() int { return len(d.dlq) } + +// Full ... Checks if the DLQ is full +func (d *DLQ[E]) Full() bool { + return len(d.dlq) >= d.size +} diff --git a/internal/core/constants.go b/internal/core/constants.go index c7405143..0393592f 100644 --- a/internal/core/constants.go +++ b/internal/core/constants.go @@ -91,6 +91,7 @@ const ( ContractEvent WithdrawalEnforcement FaultDetector + LargeWithdrawal ) // String ... Converts a heuristic type to a string @@ -108,6 +109,9 @@ func (it HeuristicType) String() string { case FaultDetector: return "fault_detector" + case LargeWithdrawal: + return "large_withdrawal" + default: return "unknown" } @@ -128,6 +132,9 @@ func StringToHeuristicType(stringType string) HeuristicType { case "fault_detector": return FaultDetector + case "large_withdrawal": + return LargeWithdrawal + default: return HeuristicType(0) } diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 9f79fab8..8a4f3dc7 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -8,6 +8,7 @@ import ( "github.com/base-org/pessimism/internal/engine/heuristic" "github.com/base-org/pessimism/internal/logging" "github.com/base-org/pessimism/internal/metrics" + "github.com/ethereum-optimism/optimism/op-service/retry" "go.uber.org/zap" ) @@ -95,11 +96,23 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) { logger.Debug("Heuristic input received", zap.String(logging.SUUIDKey, execInput.h.SUUID().String())) - // (1) Execute heuristic + // (1) Execute heuristic with retry strategy start := time.Now() - outcome, activated := hce.Execute(ctx, execInput.hi.Input, execInput.h) - metrics.WithContext(ctx).RecordHeuristicRun(execInput.h) - metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) + + var outcome *core.Activation + var activated bool + + retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} + if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) { + outcome, activated = hce.Execute(ctx, execInput.hi.Input, execInput.h) + metrics.WithContext(ctx).RecordHeuristicRun(execInput.h) + metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) + // a-ok! + return nil, nil + }); err != nil { + logger.Error("Failed to execute heuristic", zap.Error(err)) + metrics.WithContext(ctx).RecordAssessmentError(execInput.h) + } // (2) Send alert if activated if activated { diff --git a/internal/engine/registry/constants.go b/internal/engine/registry/constants.go index 1ba3503c..7d2c4ede 100644 --- a/internal/engine/registry/constants.go +++ b/internal/engine/registry/constants.go @@ -1,5 +1,9 @@ package registry +import ( + "github.com/ethereum/go-ethereum/crypto" +) + const ( // Error constant strings invalidAddrErr = "invalid address provided for heuristic. expected %s, got %s" @@ -10,4 +14,9 @@ const ( // Event declaration strings OutputProposedEvent = "OutputProposed(bytes32,uint256,uint256,uint256)" WithdrawalProvenEvent = "WithdrawalProven(bytes32,address,address)" + MessagePassed = "MessagePassed(uint256,address,address,uint256,uint256,bytes,bytes32)" +) + +var ( + MessagePassedSig = crypto.Keccak256Hash([]byte(MessagePassed)) ) diff --git a/internal/engine/registry/large_withdrawal.go b/internal/engine/registry/large_withdrawal.go new file mode 100644 index 00000000..c8bf0738 --- /dev/null +++ b/internal/engine/registry/large_withdrawal.go @@ -0,0 +1,133 @@ +package registry + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "strconv" + "time" + + "github.com/base-org/pessimism/internal/client" + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/engine/heuristic" + "github.com/base-org/pessimism/internal/logging" + "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + + "go.uber.org/zap" +) + +const largeWithdrawalMsg = ` + Large withdrawal has been proven on L1 + L1PortalAddress: %s + L2ToL1Address: %s + + Session UUID: %s + L1 Proving Transaction Hash: %s + L2 Initialization Transaction Hash: %s + Withdrawal Size: %d +` + +// LargeWithdrawalCfg ... Configuration for the balance heuristic +type LargeWithdrawalCfg struct { + Threshold *big.Int `json:"threshold"` + + L1PortalAddress string `json:"l1_portal_address"` + L2ToL1Address string `json:"l2_to_l1_address"` +} + +// LargeWithdrawHeuristic ... LargeWithdrawal heuristic implementation +type LargeWithdrawHeuristic struct { + eventHash common.Hash + cfg *LargeWithdrawalCfg + indexerClient client.IndexerClient + l1PortalFilter *bindings.OptimismPortalFilterer + + heuristic.Heuristic +} + +// Unmarshal ... Converts a general config to a LargeWithdrawal heuristic config +func (cfg *LargeWithdrawalCfg) Unmarshal(isp *core.SessionParams) error { + return json.Unmarshal(isp.Bytes(), &cfg) +} + +// NewLargeWithdrawHeuristic ... Initializer +func NewLargeWithdrawHeuristic(ctx context.Context, cfg *LargeWithdrawalCfg) (heuristic.Heuristic, error) { + clients, err := client.FromContext(ctx) + if err != nil { + return nil, err + } + + withdrawalHash := crypto.Keccak256Hash([]byte(WithdrawalProvenEvent)) + portalAddr := common.HexToAddress(cfg.L1PortalAddress) + + filter, err := bindings.NewOptimismPortalFilterer(portalAddr, clients.L1Client) + if err != nil { + return nil, err + } + + return &LargeWithdrawHeuristic{ + cfg: cfg, + + eventHash: withdrawalHash, + l1PortalFilter: filter, + indexerClient: clients.IndexerClient, + + Heuristic: heuristic.NewBaseHeuristic(core.EventLog), + }, nil +} + +// Assess ... Verifies than an L1 WithdrawalProven has a correlating hash +// to the withdrawal storage of the L2ToL1MessagePasser +func (wi *LargeWithdrawHeuristic) Assess(td core.TransitData) (*core.Activation, bool, error) { + logging.NoContext().Debug("Checking activation for withdrawal enforcement heuristic", + zap.String("data", fmt.Sprintf("%v", td))) + + // 1. Validate and extract data input + if td.Type != wi.InputType() { + return nil, false, fmt.Errorf("invalid type supplied") + } + + if td.Address.String() != wi.cfg.L1PortalAddress { + return nil, false, fmt.Errorf(invalidAddrErr, td.Address.String(), wi.cfg.L1PortalAddress) + } + + log, success := td.Value.(types.Log) + if !success { + return nil, false, fmt.Errorf(couldNotCastErr, "types.Log") + } + + // 2. Parse the log to a WithdrawalProven structured type + provenWithdrawal, err := wi.l1PortalFilter.ParseWithdrawalProven(log) + if err != nil { + return nil, false, err + } + + // 3. Get withdrawal metadata from OP Indexer API + withdrawals, err := wi.indexerClient.GetAllWithdrawalsByAddress(provenWithdrawal.From) + if err != nil { + return nil, false, err + } + + for _, withdrawal := range withdrawals { + asInt, err := strconv.Atoi(withdrawal.Amount) + if err != nil { + return nil, false, err + } + + if asInt > int(wi.cfg.Threshold.Int64()) { + return &core.Activation{ + TimeStamp: time.Now(), + Message: fmt.Sprintf(largeWithdrawalMsg, + wi.cfg.L1PortalAddress, wi.cfg.L2ToL1Address, + wi.SUUID(), log.TxHash.Hex(), withdrawal.TransactionHash, + withdrawal.Amount), + }, true, nil + } + } + + return nil, false, nil +} diff --git a/internal/engine/registry/registry.go b/internal/engine/registry/registry.go index 938d3c0e..1060736d 100644 --- a/internal/engine/registry/registry.go +++ b/internal/engine/registry/registry.go @@ -48,6 +48,12 @@ func NewHeuristicTable() HeuristicTable { InputType: core.EventLog, Constructor: constructWithdrawalEnforce, }, + core.LargeWithdrawal: { + PrepareValidate: WithdrawEnforcePrepare, + Policy: core.OnlyLayer1, + InputType: core.EventLog, + Constructor: constructLargeWithdrawal, + }, } return tbl @@ -101,6 +107,18 @@ func constructWithdrawalEnforce(ctx context.Context, isp *core.SessionParams) (h return NewWithdrawalEnforceInv(ctx, cfg) } +// constructLargeWithdrawal ... Constructs a large withdrawal heuristic instance +func constructLargeWithdrawal(ctx context.Context, isp *core.SessionParams) (heuristic.Heuristic, error) { + cfg := &LargeWithdrawalCfg{} + err := cfg.Unmarshal(isp) + + if err != nil { + return nil, err + } + + return NewLargeWithdrawHeuristic(ctx, cfg) +} + // ValidateEventTracking ... Ensures that an address and nested args exist in the session params func ValidateEventTracking(cfg *core.SessionParams) error { err := ValidateAddressing(cfg) diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 035fcbe2..b1cba92b 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -9,6 +9,7 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/component" "github.com/base-org/pessimism/internal/logging" + "github.com/base-org/pessimism/internal/metrics" "github.com/base-org/pessimism/internal/state" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -155,11 +156,19 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( // and add a data input to the DLQ if it fails for reprocessing next block tds2, err := ed.transformFunc(ctx, td) if err != nil { - err2 := ed.dlq.Add(&td) - if err2 != nil { - logging.WithContext(ctx).Error(err2.Error()) + if ed.dlq.Full() { + // NOTE ... If the DLQ is full, then we pop the oldest entry + // to make room for the new entry + lostVal, _ := ed.dlq.Pop() + logger.Warn("DLQ is full, popping oldest entry", + zap.String(logging.PUUIDKey, ed.pUUID.String()), + zap.Any("lost_value", lostVal)) + + metrics.WithContext(ctx). + IncMissedBlock(ed.pUUID) } + _ = ed.dlq.Add(&td) logging.WithContext(ctx).Error("Failed to process block data", zap.Int("dlq_size", ed.dlq.Size())) diff --git a/internal/metrics/factory.go b/internal/metrics/factory.go index 869c7c56..3641536d 100644 --- a/internal/metrics/factory.go +++ b/internal/metrics/factory.go @@ -12,6 +12,7 @@ const ( CounterType = "counter" ) +// Factory ... Interface for creating prometheus metrics type Factory interface { NewCounter(opts prometheus.CounterOpts) prometheus.Counter NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec @@ -24,6 +25,7 @@ type Factory interface { Document() []DocumentedMetric } +// DocumentedMetric ... Struct for documenting metrics type DocumentedMetric struct { Type string `json:"type"` Name string `json:"name"` @@ -31,6 +33,7 @@ type DocumentedMetric struct { Labels []string `json:"labels"` } +// documentor ... Factory implementation that documents metrics type documentor struct { metrics []DocumentedMetric factory promauto.Factory diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index d1b663d0..611ec7ed 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -20,8 +20,10 @@ const ( SubsystemEtl = "etl" ) +// serverShutdownTimeout ... Timeout for shutting down the metrics server const serverShutdownTimeout = 10 * time.Second +// Config ... Metrics server configuration type Config struct { Host string Port int @@ -29,7 +31,9 @@ type Config struct { ReadHeaderTimeout int } +// Metricer ... Interface for metrics type Metricer interface { + IncMissedBlock(pUUID core.PUUID) IncActiveHeuristics(ht core.HeuristicType, network core.Network, pipelineType core.PipelineType) IncActivePipelines(pipelineType core.PipelineType, network core.Network) DecActivePipelines(pipelineType core.PipelineType, network core.Network) @@ -46,6 +50,7 @@ type Metricer interface { Document() []DocumentedMetric } +// Metrics ... Metrics struct type Metrics struct { Up prometheus.Gauge ActivePipelines *prometheus.GaugeVec @@ -53,6 +58,7 @@ type Metrics struct { HeuristicRuns *prometheus.CounterVec AlertsGenerated *prometheus.CounterVec NodeErrors *prometheus.CounterVec + MissedBlocks *prometheus.CounterVec BlockLatency *prometheus.GaugeVec PipelineLatency *prometheus.GaugeVec InvExecutionTime *prometheus.GaugeVec @@ -145,6 +151,11 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) { Help: "Number of errors generated by heuristic executions", Namespace: metricsNamespace, }, []string{"heuristic"}), + MissedBlocks: factory.NewCounterVec(prometheus.CounterOpts{ + Name: "missed_blocks_total", + Help: "Number of missed blocks", + Namespace: metricsNamespace, + }, []string{"puuid"}), registry: registry, factory: factory, @@ -182,6 +193,11 @@ func (m *Metrics) RecordInvExecutionTime(h heuristic.Heuristic, latency float64) m.InvExecutionTime.WithLabelValues(ht).Set(latency) } +// IncMissedBlock ... Increments the number of missed blocks +func (m *Metrics) IncMissedBlock(pUUID core.PUUID) { + m.MissedBlocks.WithLabelValues(pUUID.String()).Inc() +} + // IncActiveHeuristics ... Increments the number of active heuristics func (m *Metrics) IncActiveHeuristics(ht core.HeuristicType, n core.Network, pipelineType core.PipelineType) { @@ -243,7 +259,8 @@ type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) -func (n *noopMetricer) RecordUp() {} +func (n *noopMetricer) IncMissedBlock(_ core.PUUID) {} +func (n *noopMetricer) RecordUp() {} func (n *noopMetricer) IncActiveHeuristics(_ core.HeuristicType, _ core.Network, _ core.PipelineType) { } func (n *noopMetricer) RecordInvExecutionTime(_ heuristic.Heuristic, _ float64) {}