Skip to content

Commit

Permalink
sync recent miner/ccc changes (#1062)
Browse files Browse the repository at this point in the history
* refactor: eliminate double re-execution in AsyncChecker (#1036)

* fix: make reorg mode explicit (#1049)

* fix: avoid committing empty blocks after the deadline (#1051)

* fix: initialize pending block with an empty block (#1052)

* fix: reuse timestamp for blocks failing CCC (#1031)

we assume that same height wont trigger multiple reorgs to be able
to put an upper bound on the reorg depth. We rely on the fact that
AsyncChecker executes transactions one-by-one and tells worker the
safe set of transactions to include in the replacement block that
wont trigger another error on the same block. If worker changes the
timestamp and that causes significant changes to the execution flow
of included transactions; we might have a height where multiple
reorgs happen.

---------

Co-authored-by: Ömer Faruk Irmak <[email protected]>
  • Loading branch information
0xmountaintop and omerfirmak authored Oct 10, 2024
1 parent 01eefe5 commit 3d88e87
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 49 deletions.
51 changes: 33 additions & 18 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type work struct {
cccLogger *ccc.Logger
vmConfig vm.Config

reorging bool
reorgReason error

// accumulated state
Expand Down Expand Up @@ -287,7 +288,7 @@ func (w *worker) mainLoop() {
var retryableCommitError *retryableCommitError
if errors.As(err, &retryableCommitError) {
log.Warn("failed to commit to a block, retrying", "err", err)
if _, err = w.tryCommitNewWork(time.Now(), w.current.header.ParentHash, w.current.reorgReason); err != nil {
if _, err = w.tryCommitNewWork(time.Now(), w.current.header.ParentHash, w.current.reorging, w.current.reorgReason); err != nil {
continue
}
} else if err != nil {
Expand All @@ -305,21 +306,20 @@ func (w *worker) mainLoop() {
return
}
}

_, err = w.tryCommitNewWork(time.Now(), w.chain.CurrentHeader().Hash(), nil)
_, err = w.tryCommitNewWork(time.Now(), w.chain.CurrentHeader().Hash(), false, nil)
case trigger := <-w.reorgCh:
idleTimer.UpdateSince(idleStart)
err = w.handleReorg(&trigger)
case chainHead := <-w.chainHeadCh:
idleTimer.UpdateSince(idleStart)
if w.isCanonical(chainHead.Block.Header()) {
_, err = w.tryCommitNewWork(time.Now(), chainHead.Block.Hash(), nil)
_, err = w.tryCommitNewWork(time.Now(), chainHead.Block.Hash(), false, nil)
}
case <-w.current.deadlineCh():
idleTimer.UpdateSince(idleStart)
w.current.deadlineReached = true
if len(w.current.txs) > 0 {
_, err = w.commit(false)
_, err = w.commit()
}
case ev := <-w.txsCh:
idleTimer.UpdateSince(idleStart)
Expand All @@ -330,8 +330,8 @@ func (w *worker) mainLoop() {
// be automatically eliminated.
if w.current != nil {
shouldCommit, _ := w.processTxnSlice(ev.Txs)
if shouldCommit || w.current.deadlineReached {
_, err = w.commit(false)
if shouldCommit || (w.current.deadlineReached && len(w.current.txs) > 0) {
_, err = w.commit()
}
}
w.newTxs.Add(int32(len(ev.Txs)))
Expand Down Expand Up @@ -370,7 +370,7 @@ func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx
}

// newWork
func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason error) error {
func (w *worker) newWork(now time.Time, parentHash common.Hash, reorging bool, reorgReason error) error {
parent := w.chain.GetBlockByHash(parentHash)
header := &types.Header{
ParentHash: parent.Hash(),
Expand All @@ -380,6 +380,13 @@ func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason erro
Time: uint64(now.Unix()),
}

if reorgReason != nil {
// if we are replacing a failing block, reuse the timestamp to make sure
// the information we get from AsyncChecker is reliable. Changing timestamp
// might alter execution flow of reorged transactions.
header.Time = w.chain.GetHeaderByNumber(header.Number.Uint64()).Time
}

parentState, err := w.chain.StateAt(parent.Root())
if err != nil {
return fmt.Errorf("failed to fetch parent state: %w", err)
Expand Down Expand Up @@ -431,14 +438,19 @@ func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason erro
coalescedLogs: []*types.Log{},
gasPool: new(core.GasPool).AddGas(header.GasLimit),
nextL1MsgIndex: nextL1MsgIndex,
reorging: reorging,
reorgReason: reorgReason,
}

// initiliaze pending block with an empty block to make sure we always have
// a pending block to serve RPC requests
w.updateSnapshot()
return nil
}

// tryCommitNewWork
func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason error) (common.Hash, error) {
err := w.newWork(now, parent, reorgReason)
func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorging bool, reorgReason error) (common.Hash, error) {
err := w.newWork(now, parent, reorging, reorgReason)
if err != nil {
return common.Hash{}, fmt.Errorf("failed creating new work: %w", err)
}
Expand All @@ -449,8 +461,7 @@ func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason
}

// check if we are reorging
reorging := w.chain.GetBlockByNumber(w.current.header.Number.Uint64()) != nil
if !shouldCommit && reorging {
if !shouldCommit && w.current.reorging {
shouldCommit, err = w.processReorgedTxns(w.current.reorgReason)
}
if err != nil {
Expand All @@ -468,7 +479,7 @@ func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason
// if reorging, force committing even if we are not "running"
// this can happen when sequencer is instructed to shutdown while handling a reorg
// we should make sure reorg is not interrupted
if blockHash, err := w.commit(reorging); err != nil {
if blockHash, err := w.commit(); err != nil {
return common.Hash{}, fmt.Errorf("failed committing new work: %w", err)
} else {
return blockHash, nil
Expand Down Expand Up @@ -595,6 +606,10 @@ func (w *worker) processTxnSlice(txns types.Transactions) (bool, error) {
// processReorgedTxns
func (w *worker) processReorgedTxns(reason error) (bool, error) {
reorgedBlock := w.chain.GetBlockByNumber(w.current.header.Number.Uint64())
if reorgedBlock == nil {
return false, nil
}

commitGasCounter.Dec(int64(reorgedBlock.GasUsed()))
reorgedTxns := reorgedBlock.Transactions()
var errorWithTxnIdx *ccc.ErrorWithTxnIdx
Expand Down Expand Up @@ -729,14 +744,14 @@ func (e retryableCommitError) Unwrap() error {

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(reorging bool) (common.Hash, error) {
func (w *worker) commit() (common.Hash, error) {
sealDelay := time.Duration(0)
defer func(t0 time.Time) {
l2CommitTimer.Update(time.Since(t0) - sealDelay)
}(time.Now())

w.updateSnapshot()
if !w.isRunning() && !reorging {
if !w.isRunning() && !w.current.reorging {
return common.Hash{}, nil
}

Expand Down Expand Up @@ -813,7 +828,7 @@ func (w *worker) commit(reorging bool) (common.Hash, error) {

currentHeight := w.current.header.Number.Uint64()
maxReorgDepth := uint64(w.config.CCCMaxWorkers + 1)
if !reorging && currentHeight > maxReorgDepth {
if !w.current.reorging && currentHeight > maxReorgDepth {
ancestorHeight := currentHeight - maxReorgDepth
ancestorHash := w.chain.GetHeaderByNumber(ancestorHeight).Hash()
if rawdb.ReadBlockRowConsumption(w.chain.Database(), ancestorHash) == nil {
Expand Down Expand Up @@ -980,7 +995,7 @@ func (w *worker) handleReorg(trigger *reorgTrigger) error {
return nil
}

newBlockHash, err := w.tryCommitNewWork(time.Now(), parentHash, reorgReason)
newBlockHash, err := w.tryCommitNewWork(time.Now(), parentHash, true, reorgReason)
if err != nil {
return err
}
Expand All @@ -989,7 +1004,7 @@ func (w *worker) handleReorg(trigger *reorgTrigger) error {
if newBlockHash == (common.Hash{}) {
// force committing the new canonical head to trigger a reorg in blockchain
// otherwise we might ignore CCC errors from the new side chain since it is not canonical yet
newBlockHash, err = w.commit(true)
newBlockHash, err = w.commit()
if err != nil {
return err
}
Expand Down
35 changes: 4 additions & 31 deletions rollup/ccc/async_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}

header := block.Header()
header.GasUsed = 0
gasPool := new(core.GasPool).AddGas(header.GasLimit)
ccc.Reset()

accRc := new(types.RowConsumption)
Expand All @@ -184,7 +182,7 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}

var curRc *types.RowConsumption
curRc, err = c.checkTxAndApply(parent, header, statedb, gasPool, tx, ccc)
curRc, err = c.checkTx(parent, header, statedb, tx, ccc)
if err != nil {
err = &ErrorWithTxnIdx{
TxIdx: uint(txIdx),
Expand All @@ -208,39 +206,14 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}
}

func (c *AsyncChecker) checkTxAndApply(parent *types.Block, header *types.Header, state *state.StateDB, gasPool *core.GasPool, tx *types.Transaction, ccc *Checker) (*types.RowConsumption, error) {
// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := state.Snapshot()

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
func (c *AsyncChecker) checkTx(parent *types.Block, header *types.Header, state *state.StateDB, tx *types.Transaction, ccc *Checker) (*types.RowConsumption, error) {
trace, err := tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(c.bc.Config(), c.bc, c.bc.Engine(), c.bc.Database(),
state, parent.Header(), types.NewBlockWithHeader(header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
state.RevertToSnapshot(snap)
state, parent.Header(), types.NewBlockWithHeader(header).WithBody([]*types.Transaction{tx}, nil), true)
if err != nil {
return nil, err
}

rc, err := ccc.ApplyTransaction(trace)
if err != nil {
return rc, err
}

_, err = core.ApplyTransaction(c.bc.Config(), c.bc, nil /* coinbase will default to chainConfig.Scroll.FeeVaultAddress */, gasPool,
state, header, tx, &header.GasUsed, *c.bc.GetVMConfig())
if err != nil {
return nil, err
}
return rc, nil
return ccc.ApplyTransaction(trace)
}

// ScheduleError forces a block to error on a given transaction index
Expand Down

0 comments on commit 3d88e87

Please sign in to comment.