Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Large Withdrawal Heuristic PoC #146

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/common/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ func (d *DLQ[E]) Empty() bool {
func (d *DLQ[E]) Size() int {
return len(d.dlq)
}

// Full ... Checks if the DLQ is full
func (d *DLQ[E]) Full() bool {
return len(d.dlq) >= d.size
}
7 changes: 7 additions & 0 deletions internal/core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
ContractEvent
WithdrawalEnforcement
FaultDetector
LargeWithdrawal
)

// String ... Converts a heuristic type to a string
Expand All @@ -108,6 +109,9 @@ func (it HeuristicType) String() string {
case FaultDetector:
return "fault_detector"

case LargeWithdrawal:
return "large_withdrawal"

default:
return "unknown"
}
Expand All @@ -128,6 +132,9 @@ func StringToHeuristicType(stringType string) HeuristicType {
case "fault_detector":
return FaultDetector

case "large_withdrawal":
return LargeWithdrawal

default:
return HeuristicType(0)
}
Expand Down
21 changes: 17 additions & 4 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"github.com/base-org/pessimism/internal/engine/heuristic"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"github.com/ethereum-optimism/optimism/op-service/retry"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -95,11 +96,23 @@
logger.Debug("Heuristic input received",
zap.String(logging.SUUIDKey, execInput.h.SUUID().String()))

// (1) Execute heuristic
// (1) Execute heuristic with retry strategy
start := time.Now()
outcome, activated := hce.Execute(ctx, execInput.hi.Input, execInput.h)
metrics.WithContext(ctx).RecordHeuristicRun(execInput.h)
metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds()))

var outcome *core.Activation
var activated bool

retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}

Check failure on line 105 in internal/engine/engine.go

View workflow job for this annotation

GitHub Actions / lint

mnd: Magic number: 1000, in <assign> detected (gomnd)
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {

Check failure on line 106 in internal/engine/engine.go

View workflow job for this annotation

GitHub Actions / lint

mnd: Magic number: 10, in <argument> detected (gomnd)
outcome, activated = hce.Execute(ctx, execInput.hi.Input, execInput.h)
metrics.WithContext(ctx).RecordHeuristicRun(execInput.h)
metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds()))
// a-ok!
return nil, nil

Check failure on line 111 in internal/engine/engine.go

View workflow job for this annotation

GitHub Actions / lint

return both the `nil` error and invalid value: use a sentinel error instead (nilnil)
}); err != nil {
logger.Error("Failed to execute heuristic", zap.Error(err))
metrics.WithContext(ctx).RecordAssessmentError(execInput.h)
}

// (2) Send alert if activated
if activated {
Expand Down
9 changes: 9 additions & 0 deletions internal/engine/registry/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package registry

import (
"github.com/ethereum/go-ethereum/crypto"
)

const (
// Error constant strings
invalidAddrErr = "invalid address provided for heuristic. expected %s, got %s"
Expand All @@ -10,4 +14,9 @@ const (
// Event declaration strings
OutputProposedEvent = "OutputProposed(bytes32,uint256,uint256,uint256)"
WithdrawalProvenEvent = "WithdrawalProven(bytes32,address,address)"
MessagePassed = "MessagePassed(uint256,address,address,uint256,uint256,bytes,bytes32)"
)

var (
MessagePassedSig = crypto.Keccak256Hash([]byte(MessagePassed))
)
133 changes: 133 additions & 0 deletions internal/engine/registry/large_withdrawal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package registry

import (
"context"
"encoding/json"
"fmt"
"math/big"
"strconv"
"time"

"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/engine/heuristic"
"github.com/base-org/pessimism/internal/logging"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"

"go.uber.org/zap"
)

const largeWithdrawalMsg = `
Large withdrawal has been proven on L1
L1PortalAddress: %s
L2ToL1Address: %s

Session UUID: %s
L1 Proving Transaction Hash: %s
L2 Initialization Transaction Hash: %s
Withdrawal Size: %d
`

// LargeWithdrawalCfg ... Configuration for the balance heuristic
type LargeWithdrawalCfg struct {
Threshold *big.Int `json:"threshold"`

L1PortalAddress string `json:"l1_portal_address"`
L2ToL1Address string `json:"l2_to_l1_address"`
}

// LargeWithdrawHeuristic ... LargeWithdrawal heuristic implementation
type LargeWithdrawHeuristic struct {
eventHash common.Hash
cfg *LargeWithdrawalCfg
indexerClient client.IndexerClient
l1PortalFilter *bindings.OptimismPortalFilterer

heuristic.Heuristic
}

// Unmarshal ... Converts a general config to a LargeWithdrawal heuristic config
func (cfg *LargeWithdrawalCfg) Unmarshal(isp *core.SessionParams) error {
return json.Unmarshal(isp.Bytes(), &cfg)
}

// NewLargeWithdrawHeuristic ... Initializer
func NewLargeWithdrawHeuristic(ctx context.Context, cfg *LargeWithdrawalCfg) (heuristic.Heuristic, error) {
clients, err := client.FromContext(ctx)
if err != nil {
return nil, err
}

withdrawalHash := crypto.Keccak256Hash([]byte(WithdrawalProvenEvent))
portalAddr := common.HexToAddress(cfg.L1PortalAddress)

filter, err := bindings.NewOptimismPortalFilterer(portalAddr, clients.L1Client)
if err != nil {
return nil, err
}

return &LargeWithdrawHeuristic{
cfg: cfg,

eventHash: withdrawalHash,
l1PortalFilter: filter,
indexerClient: clients.IndexerClient,

Heuristic: heuristic.NewBaseHeuristic(core.EventLog),
}, nil
}

// Assess ... Verifies than an L1 WithdrawalProven has a correlating hash
// to the withdrawal storage of the L2ToL1MessagePasser
func (wi *LargeWithdrawHeuristic) Assess(td core.TransitData) (*core.Activation, bool, error) {
logging.NoContext().Debug("Checking activation for withdrawal enforcement heuristic",
zap.String("data", fmt.Sprintf("%v", td)))

// 1. Validate and extract data input
if td.Type != wi.InputType() {
return nil, false, fmt.Errorf("invalid type supplied")
}

if td.Address.String() != wi.cfg.L1PortalAddress {
return nil, false, fmt.Errorf(invalidAddrErr, td.Address.String(), wi.cfg.L1PortalAddress)
}

log, success := td.Value.(types.Log)
if !success {
return nil, false, fmt.Errorf(couldNotCastErr, "types.Log")
}

// 2. Parse the log to a WithdrawalProven structured type
provenWithdrawal, err := wi.l1PortalFilter.ParseWithdrawalProven(log)
if err != nil {
return nil, false, err
}

// 3. Get withdrawal metadata from OP Indexer API
withdrawals, err := wi.indexerClient.GetAllWithdrawalsByAddress(provenWithdrawal.From)
if err != nil {
return nil, false, err
}

for _, withdrawal := range withdrawals {
asInt, err := strconv.Atoi(withdrawal.Amount)
if err != nil {
return nil, false, err
}

if asInt > int(wi.cfg.Threshold.Int64()) {
return &core.Activation{
TimeStamp: time.Now(),
Message: fmt.Sprintf(largeWithdrawalMsg,

Check failure on line 124 in internal/engine/registry/large_withdrawal.go

View workflow job for this annotation

GitHub Actions / lint

SA5009: Printf format %d has arg #6 of wrong type string (staticcheck)

Check failure on line 124 in internal/engine/registry/large_withdrawal.go

View workflow job for this annotation

GitHub Actions / go-test

fmt.Sprintf format %d has arg withdrawal.Amount of wrong type string
wi.cfg.L1PortalAddress, wi.cfg.L2ToL1Address,
wi.SUUID(), log.TxHash.Hex(), withdrawal.TransactionHash,
withdrawal.Amount),
}, true, nil
}
}

return nil, false, nil
}
18 changes: 18 additions & 0 deletions internal/engine/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func NewHeuristicTable() HeuristicTable {
InputType: core.EventLog,
Constructor: constructWithdrawalEnforce,
},
core.LargeWithdrawal: {
PrepareValidate: WithdrawEnforcePrepare,
Policy: core.OnlyLayer1,
InputType: core.EventLog,
Constructor: constructLargeWithdrawal,
},
}

return tbl
Expand Down Expand Up @@ -101,6 +107,18 @@ func constructWithdrawalEnforce(ctx context.Context, isp *core.SessionParams) (h
return NewWithdrawalEnforceInv(ctx, cfg)
}

// constructLargeWithdrawal ... Constructs a large withdrawal heuristic instance
func constructLargeWithdrawal(ctx context.Context, isp *core.SessionParams) (heuristic.Heuristic, error) {
cfg := &LargeWithdrawalCfg{}
err := cfg.Unmarshal(isp)

if err != nil {
return nil, err
}

return NewLargeWithdrawHeuristic(ctx, cfg)
}

// ValidateEventTracking ... Ensures that an address and nested args exist in the session params
func ValidateEventTracking(cfg *core.SessionParams) error {
err := ValidateAddressing(cfg)
Expand Down
15 changes: 12 additions & 3 deletions internal/etl/registry/pipe/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/etl/component"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"github.com/base-org/pessimism/internal/state"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -155,11 +156,19 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) (
// and add a data input to the DLQ if it fails for reprocessing next block
tds2, err := ed.transformFunc(ctx, td)
if err != nil {
err2 := ed.dlq.Add(&td)
if err2 != nil {
logging.WithContext(ctx).Error(err2.Error())
if ed.dlq.Full() {
// NOTE ... If the DLQ is full, then we pop the oldest entry
// to make room for the new entry
lostVal, _ := ed.dlq.Pop()
logger.Warn("DLQ is full, popping oldest entry",
zap.String(logging.PUUIDKey, ed.pUUID.String()),
zap.Any("lost_value", lostVal))

metrics.WithContext(ctx).
IncMissedBlock(ed.pUUID)
}

_ = ed.dlq.Add(&td)
logging.WithContext(ctx).Error("Failed to process block data",
zap.Int("dlq_size", ed.dlq.Size()))

Expand Down
3 changes: 3 additions & 0 deletions internal/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
CounterType = "counter"
)

// Factory ... Interface for creating prometheus metrics
type Factory interface {
NewCounter(opts prometheus.CounterOpts) prometheus.Counter
NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec
Expand All @@ -24,13 +25,15 @@ type Factory interface {
Document() []DocumentedMetric
}

// DocumentedMetric ... Struct for documenting metrics
type DocumentedMetric struct {
Type string `json:"type"`
Name string `json:"name"`
Help string `json:"help"`
Labels []string `json:"labels"`
}

// documentor ... Factory implementation that documents metrics
type documentor struct {
metrics []DocumentedMetric
factory promauto.Factory
Expand Down
19 changes: 18 additions & 1 deletion internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ const (
SubsystemEtl = "etl"
)

// serverShutdownTimeout ... Timeout for shutting down the metrics server
const serverShutdownTimeout = 10 * time.Second

// Config ... Metrics server configuration
type Config struct {
Host string
Port int
Enabled bool
ReadHeaderTimeout int
}

// Metricer ... Interface for metrics
type Metricer interface {
IncMissedBlock(pUUID core.PUUID)
IncActiveHeuristics(ht core.HeuristicType, network core.Network, pipelineType core.PipelineType)
IncActivePipelines(pipelineType core.PipelineType, network core.Network)
DecActivePipelines(pipelineType core.PipelineType, network core.Network)
Expand All @@ -46,13 +50,15 @@ type Metricer interface {
Document() []DocumentedMetric
}

// Metrics ... Metrics struct
type Metrics struct {
Up prometheus.Gauge
ActivePipelines *prometheus.GaugeVec
ActiveHeuristics *prometheus.GaugeVec
HeuristicRuns *prometheus.CounterVec
AlertsGenerated *prometheus.CounterVec
NodeErrors *prometheus.CounterVec
MissedBlocks *prometheus.CounterVec
BlockLatency *prometheus.GaugeVec
PipelineLatency *prometheus.GaugeVec
InvExecutionTime *prometheus.GaugeVec
Expand Down Expand Up @@ -145,6 +151,11 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) {
Help: "Number of errors generated by heuristic executions",
Namespace: metricsNamespace,
}, []string{"heuristic"}),
MissedBlocks: factory.NewCounterVec(prometheus.CounterOpts{
Name: "missed_blocks_total",
Help: "Number of missed blocks",
Namespace: metricsNamespace,
}, []string{"puuid"}),

registry: registry,
factory: factory,
Expand Down Expand Up @@ -182,6 +193,11 @@ func (m *Metrics) RecordInvExecutionTime(h heuristic.Heuristic, latency float64)
m.InvExecutionTime.WithLabelValues(ht).Set(latency)
}

// IncMissedBlock ... Increments the number of missed blocks
func (m *Metrics) IncMissedBlock(pUUID core.PUUID) {
m.MissedBlocks.WithLabelValues(pUUID.String()).Inc()
}

// IncActiveHeuristics ... Increments the number of active heuristics
func (m *Metrics) IncActiveHeuristics(ht core.HeuristicType, n core.Network,
pipelineType core.PipelineType) {
Expand Down Expand Up @@ -243,7 +259,8 @@ type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)

func (n *noopMetricer) RecordUp() {}
func (n *noopMetricer) IncMissedBlock(_ core.PUUID) {}
func (n *noopMetricer) RecordUp() {}
func (n *noopMetricer) IncActiveHeuristics(_ core.HeuristicType, _ core.Network, _ core.PipelineType) {
}
func (n *noopMetricer) RecordInvExecutionTime(_ heuristic.Heuristic, _ float64) {}
Expand Down
Loading