Skip to content

Commit

Permalink
add ability and stubs to validate and process IL
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Mar 17, 2024
1 parent 13f6ccc commit c515cb7
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 13 deletions.
4 changes: 4 additions & 0 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ export function getBeaconBlockApi({
const ilSummary = ssz.electra.ILSummary.defaultValue();
const ilTransactions = [ssz.electra.ILTransactions.defaultValue()];
inclusionLists = [ssz.electra.NewInclusionListRequest.defaultValue()];
inclusionLists[0].slot = slot;
inclusionLists[0].parentBlockHash = (
signedBlock as electra.SignedBeaconBlock
).message.body.executionPayload.parentHash;
blockData = {fork, blobs: blobSidecars, blobsBytes: [null], ilSummary, inclusionLists: ilTransactions};
}

Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
Epoch,
ValidatorIndex,
deneb,
electra,
Wei,
bellatrix,
isBlindedBeaconBlock,
Expand Down Expand Up @@ -630,6 +631,10 @@ export class BeaconChain implements IBeaconChain {
return this.blockProcessor.processBlocksJob(blocks, opts);
}

async processInclusionList(inclusionList: electra.NewInclusionListRequest, _opts?: ImportBlockOpts): Promise<void> {
await this.executionEngine.notifyNewInclusionList(inclusionList);
}

getStatus(): phase0.Status {
const head = this.forkChoice.getHead();
const finalizedCheckpoint = this.forkChoice.getFinalizedCheckpoint();
Expand Down
21 changes: 21 additions & 0 deletions packages/beacon-node/src/chain/errors/inclusionListError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {Slot, RootHex, ValidatorIndex} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum InclusionListErrorCode {
ALREADY_KNOWN = "INCLUSION_LIST_ERROR_ALREADY_KNOWN",
PARENT_UNKNOWN = "INCLUSION_LIST_ERROR_PARENT_UNKNOWN",
NOT_LATER_THAN_PARENT = "INCLUSION_LIST_ERROR_NOT_LATER_THAN_PARENT",
PROPOSAL_SIGNATURE_INVALID = "INCLUSION_LIST_ERROR_PROPOSAL_SIGNATURE_INVALID",
INCORRECT_PROPOSER = "INCLUSION_LIST_INCORRECT_PROPOSER",
EXECUTION_ENGINE_ERROR = "INCLUSION_LIST_EXECUTION_ENGINE_ERROR",
}

export type InclusionListErrorType =
| {code: InclusionListErrorCode.ALREADY_KNOWN; root: RootHex}
| {code: InclusionListErrorCode.PARENT_UNKNOWN; parentRoot: RootHex}
| {code: InclusionListErrorCode.NOT_LATER_THAN_PARENT; parentSlot: Slot; slot: Slot}
| {code: InclusionListErrorCode.PROPOSAL_SIGNATURE_INVALID}
| {code: InclusionListErrorCode.INCORRECT_PROPOSER; proposerIndex: ValidatorIndex}
| {code: InclusionListErrorCode.EXECUTION_ENGINE_ERROR};

export class InclusionListGossipError extends GossipActionError<InclusionListErrorType> {}
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/errors/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from "./attestationError.js";
export * from "./attesterSlashingError.js";
export * from "./blobSidecarError.js";
export * from "./inclusionListError.js";
export * from "./blockError.js";
export * from "./gossipValidation.js";
export * from "./proposerSlashingError.js";
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Wei,
capella,
altair,
electra,
} from "@lodestar/types";
import {
BeaconStateAllForks,
Expand Down Expand Up @@ -176,6 +177,7 @@ export interface IBeaconChain {
processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void>;
/** Process a chain of blocks until complete */
processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void>;
processInclusionList(inclusionList: electra.NewInclusionListRequest, opts?: ImportBlockOpts): Promise<void>;

getStatus(): phase0.Status;

Expand Down
25 changes: 20 additions & 5 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type GossipedBlockInput =
};

type BlockInputCacheType = {
fork: ForkName;
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
cachedData?: CachedData;
Expand All @@ -55,9 +56,13 @@ const MAX_GOSSIPINPUT_CACHE = 5;
*/
export class SeenGossipBlockInput {
private blockInputCache = new Map<RootHex, BlockInputCacheType>();
private seenILsByParentHash = new Map<RootHex, electra.NewInclusionListRequest>();
private blockInputRootByParentHash = new Map<RootHex, RootHex>();

prune(): void {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
pruneSetToMax(this.seenILsByParentHash, MAX_GOSSIPINPUT_CACHE);
pruneSetToMax(this.blockInputRootByParentHash, MAX_GOSSIPINPUT_CACHE);
}

hasBlock(blockRoot: RootHex): boolean {
Expand All @@ -76,13 +81,23 @@ export class SeenGossipBlockInput {
| {
blockInput: NullBlockInput;
blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null};
} {
}
| null {
let blockHex;
let blockCache;
let fork;

if (gossipedInput.type === GossipedInputType.ilist) {
throw Error("Inclusion list gossip handling not implemented");
const {inclusionList} = gossipedInput;
const parentBlockHashHex = toHexString(inclusionList.parentBlockHash);
this.seenILsByParentHash.set(parentBlockHashHex, inclusionList);

blockHex = this.blockInputRootByParentHash.get(parentBlockHashHex);
blockCache = blockHex ? this.blockInputCache.get(blockHex) : undefined;
if (blockHex === undefined || blockCache === undefined) {
return null;
}
fork = blockCache.fork;
} else if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock, blockBytes} = gossipedInput;
fork = config.getForkName(signedBlock.message.slot);
Expand Down Expand Up @@ -214,7 +229,7 @@ function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
}

if (!isForkBlobs(fork)) {
return {blockInputPromise, resolveBlockInput};
return {fork, blockInputPromise, resolveBlockInput};
}
const blobsCache = new Map();

Expand All @@ -229,7 +244,7 @@ function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
throw Error("Promise Constructor was not executed immediately");
}
const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability};
return {blockInputPromise, resolveBlockInput, cachedData};
return {fork, blockInputPromise, resolveBlockInput, cachedData};
} else {
// il availability (with blobs)
let resolveAvailability: ((blobs: BlockInputDataIls) => void) | null = null;
Expand All @@ -241,6 +256,6 @@ function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
throw Error("Promise Constructor was not executed immediately");
}
const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability, inclusionLists: []};
return {blockInputPromise, resolveBlockInput, cachedData};
return {fork, blockInputPromise, resolveBlockInput, cachedData};
}
}
4 changes: 4 additions & 0 deletions packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
throw Error("Execution engine disabled");
}

async notifyNewInclusionList(): Promise<never> {
throw Error("Execution engine disabled");
}

async notifyForkchoiceUpdate(): Promise<never> {
throw Error("Execution engine disabled");
}
Expand Down
9 changes: 8 additions & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Root, RootHex, allForks, Wei} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {Root, RootHex, allForks, Wei, electra} from "@lodestar/types";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";
import {Logger} from "@lodestar/logger";
import {
Expand Down Expand Up @@ -144,6 +145,12 @@ export class ExecutionEngineHttp implements IExecutionEngine {
});
}

async notifyNewInclusionList(inclusionList: electra.NewInclusionListRequest): Promise<ExecutePayloadResponse> {
// return dummy status, don't use latestValidHash anyway
const latestValidHash = toHexString(inclusionList.parentBlockHash);
return {status: ExecutionPayloadStatus.VALID, latestValidHash, validationError: null};
}

/**
* `engine_newPayloadV1`
* From: https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.6/src/engine/specification.md#engine_newpayloadv1
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ForkName} from "@lodestar/params";
import {KZGCommitment, Blob, KZGProof} from "@lodestar/types/deneb";
import {Root, RootHex, allForks, capella, Wei} from "@lodestar/types";
import {Root, RootHex, allForks, capella, electra, Wei} from "@lodestar/types";

import {DATA} from "../../eth1/provider/utils.js";
import {PayloadIdCache, PayloadId, WithdrawalV1} from "./payloadIdCache.js";
Expand Down Expand Up @@ -90,6 +90,7 @@ export type VersionedHashes = Uint8Array[];
*/
export interface IExecutionEngine {
payloadIdCache: PayloadIdCache;
notifyNewInclusionList(InclusionList: electra.NewInclusionListRequest): Promise<ExecutePayloadResponse>;
/**
* A state transition function which applies changes to the self.execution_state.
* Returns ``True`` iff ``execution_payload`` is valid with respect to ``self.execution_state``.
Expand Down
19 changes: 18 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {EpochTransitionStep, StateCloneSource, StateHashTreeRootSource} from "@l
import {allForks} from "@lodestar/types";
import {BlockSource} from "../../chain/blocks/types.js";
import {JobQueueItemType} from "../../chain/bls/index.js";
import {BlockErrorCode} from "../../chain/errors/index.js";
import {BlockErrorCode, InclusionListErrorCode} from "../../chain/errors/index.js";
import {InsertOutcome} from "../../chain/opPools/types.js";
import {RegenCaller, RegenFnName} from "../../chain/regen/interface.js";
import {ReprocessStatus} from "../../chain/reprocess.js";
Expand Down Expand Up @@ -777,6 +777,23 @@ export function createLodestarMetrics(
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
},
gossipInclusionList: {
recvToValidation: register.histogram({
name: "lodestar_gossip_inclusion_list_received_to_gossip_validate",
help: "Time elapsed between inclusion list received and validation",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
validationTime: register.histogram({
name: "lodestar_gossip_inclusion_list_gossip_validate_time",
help: "Time elapsed for inclusion list validation",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
processInclusionListErrors: register.gauge<{error: InclusionListErrorCode | "NOT_INCLUSION_LIST_ERROR"}>({
name: "lodestar_gossip_inclusion_list_process_errors",
help: "Count of errors, by error type, while processing inclusionlist",
labelNames: ["error"],
}),
},
importBlock: {
persistBlockNoSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_no_serialized_data_count",
Expand Down
83 changes: 78 additions & 5 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
BlockGossipError,
BlobSidecarErrorCode,
BlobSidecarGossipError,
InclusionListGossipError,
InclusionListErrorCode,
GossipAction,
GossipActionError,
SyncCommitteeError,
Expand Down Expand Up @@ -136,6 +138,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
},
metrics
);
if (blockInputRes === null) {
throw Error("Invalid blockInputRes=null returned for gossip block in cache processing");
}

const blockInput = blockInputRes.blockInput;
// blockInput can't be returned null, improve by enforcing via return types
if (blockInput.block === null) {
Expand Down Expand Up @@ -200,7 +206,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec);
const recvToValLatency = Date.now() / 1000 - seenTimestampSec;

const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(
const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(
config,
{
type: GossipedInputType.blob,
Expand All @@ -209,7 +215,11 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
},
metrics
);
if (blockInputRes === null) {
throw Error("Invalid blockInputRes=null returned for gossip blob in cache processing");
}

const {blockInput, blockInputMeta} = blockInputRes;
try {
await validateGossipBlobSidecar(chain, blobSidecar, gossipIndex);
const recvToValidation = Date.now() / 1000 - seenTimestampSec;
Expand Down Expand Up @@ -258,12 +268,12 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
inclusionListBytes: Uint8Array,
peerIdStr: string,
seenTimestampSec: number
): Promise<BlockInput | NullBlockInput> {
): Promise<BlockInput | NullBlockInput | null> {
const slot = inclusionList.slot;
const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec);
const recvToValLatency = Date.now() / 1000 - seenTimestampSec;

const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(
const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(
config,
{
type: GossipedInputType.ilist,
Expand All @@ -273,7 +283,9 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
metrics
);

const blockInput = blockInputRes?.blockInput ?? null;
try {
const blockInputMeta = blockInputRes?.blockInputMeta ?? {};
await validateGossipInclusionList(chain, inclusionList);
const recvToValidation = Date.now() / 1000 - seenTimestampSec;
const validationTime = recvToValidation - recvToValLatency;
Expand All @@ -295,7 +307,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
} catch (e) {
if (e instanceof BlobSidecarGossipError) {
// Don't trigger this yet if full block and blobs haven't arrived yet
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput.block !== null) {
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput !== null && blockInput.block !== null) {
logger.debug("Gossip inclusion list has error", {code: e.type.code});
events.emit(NetworkEvent.unknownBlockParent, {blockInput, peer: peerIdStr});
}
Expand Down Expand Up @@ -388,6 +400,64 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
});
}

function handleValidInclusionList(
inclusionList: electra.NewInclusionListRequest,
peerIdStr: string,
seenTimestampSec: number
): void {
// Handler - MUST NOT `await`, to allow validation result to be propagated

// metrics?.registerInclusionList(OpSource.gossip, seenTimestampSec, inclusionList);
// if blobs are not yet fully available start an aggressive blob pull

chain
.processInclusionList(inclusionList, {
// block may be downloaded and processed by UnknownBlockSync
ignoreIfKnown: true,
// proposer signature already checked in validateBeaconBlock()
validProposerSignature: true,
blsVerifyOnMainThread: true,
// to track block process steps
seenTimestampSec,
})
.then(() => {
// Returns the delay between the start of `block.slot` and `current time`
const delaySec = chain.clock.secFromSlot(inclusionList.slot);
metrics?.gossipBlock.elapsedTimeTillProcessed.observe(delaySec);
chain.seenGossipBlockInput.prune();
})
.catch((e) => {
// Adjust verbosity based on error type
let logLevel: LogLevel;

if (e instanceof InclusionListGossipError) {
switch (e.type.code) {
// ALREADY_KNOWN should not happen with ignoreIfKnown=true above
// PARENT_UNKNOWN should not happen, we handled this in validateBeaconBlock() function above
case InclusionListErrorCode.ALREADY_KNOWN:
case InclusionListErrorCode.PARENT_UNKNOWN:
case InclusionListErrorCode.EXECUTION_ENGINE_ERROR:
// Errors might indicate an issue with our node or the connected EL client
logLevel = LogLevel.error;
break;
default:
// TODO: Should it use PeerId or string?
core.reportPeer(peerIdStr, PeerAction.LowToleranceError, "BadGossipInclusionList");
// Misbehaving peer, but could highlight an issue in another client
logLevel = LogLevel.warn;
}
} else {
// Any unexpected error
logLevel = LogLevel.error;
}
metrics?.gossipInclusionList.processInclusionListErrors.inc({
error: e instanceof InclusionListGossipError ? e.type.code : "NOT_INCLUSION_LIST_ERROR",
});
logger[logLevel]("Error receiving inclusion list", {slot: inclusionList.slot, peer: peerIdStr}, e as Error);
chain.seenGossipBlockInput.prune();
});
}

return {
[GossipType.beacon_block]: async ({
gossipData,
Expand Down Expand Up @@ -480,7 +550,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}

const blockInput = await validateInclusionList(inclusionList, serializedData, peerIdStr, seenTimestampSec);
if (blockInput.block !== null) {
handleValidInclusionList(inclusionList, peerIdStr, seenTimestampSec);
if (blockInput === null) {
return;
} else if (blockInput.block !== null) {
// we can just queue up the blockInput in the processor, but block gossip handler would have already
// queued it up.
//
Expand Down

0 comments on commit c515cb7

Please sign in to comment.