From c7f3ec6a827ea4ecdabf37ff5d50a4bb7a7adce7 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sat, 16 Mar 2024 21:44:01 +0530 Subject: [PATCH] extend blockinput for blobs and il data --- .../beacon-node/src/chain/blocks/types.ts | 64 +++++++----- .../chain/seenCache/seenGossipBlockInput.ts | 99 +++++++++++-------- 2 files changed, 96 insertions(+), 67 deletions(-) diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index e2c7b5a32e0a..7416331e10c1 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -1,7 +1,7 @@ import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition"; import {MaybeValidExecutionStatus} from "@lodestar/fork-choice"; -import {allForks, deneb, Slot, RootHex} from "@lodestar/types"; -import {ForkSeq} from "@lodestar/params"; +import {allForks, deneb, electra, Slot, RootHex} from "@lodestar/types"; +import {ForkSeq, ForkName, ForkPreBlobs, ForkILs} from "@lodestar/params"; import {ChainForkConfig} from "@lodestar/config"; export enum BlockInputType { @@ -23,20 +23,38 @@ export enum GossipedInputType { blob = "blob", } +type ForkBlobsInfo = {fork: ForkName.deneb}; +type ForkILsInfo = {fork: ForkILs}; + export type BlobsCache = Map; -export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}; -type CachedBlobs = { - blobsCache: BlobsCache; - availabilityPromise: Promise; - resolveAvailability: (blobs: BlockInputBlobs) => void; -}; + +type BlobsData = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}; +type ILsData = BlobsData & {ilSummary: electra.ILSummary; inclusionLists: electra.ILTransactions[]}; + +export type BlockInputDataBlobs = ForkBlobsInfo & BlobsData; +export type BlockInputDataIls = ForkILsInfo & ILsData; +export type BlockInputData = BlockInputDataBlobs | BlockInputDataIls; + +type BlobsInputCache = {blobsCache: BlobsCache}; +type ForkILsCache = BlobsInputCache & {ilSummary?: electra.ILSummary; inclusionLists: electra.ILTransactions[]}; + +export type BlockInputCacheBlobs = ForkBlobsInfo & BlobsInputCache; +export type BlockInputCacheILs = ForkILsInfo & ForkILsCache; +export type BlockInputCache = (ForkBlobsInfo & BlobsInputCache) | (ForkILsInfo & ForkILsCache); + +type Availability = {availabilityPromise: Promise; resolveAvailability: (data: T) => void}; +export type CachedData = + | (ForkBlobsInfo & BlobsInputCache & Availability) + | (ForkILsInfo & ForkILsCache & Availability); export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & ( | {type: BlockInputType.preDeneb} - | ({type: BlockInputType.postDeneb} & BlockInputBlobs) - | ({type: BlockInputType.blobsPromise} & CachedBlobs) + | ({type: BlockInputType.postDeneb} & {blockData: BlockInputData}) + | ({type: BlockInputType.blobsPromise} & {cachedData: CachedData}) ); -export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise} & CachedBlobs; +export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise} & { + cachedData: CachedData; +}; export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean { return ( @@ -67,10 +85,9 @@ export const getBlockInput = { postDeneb( config: ChainForkConfig, block: allForks.SignedBeaconBlock, - source: BlockSource, - blobs: deneb.BlobSidecars, blockBytes: Uint8Array | null, - blobsBytes: (Uint8Array | null)[] + blockData: BlockInputData, + source: BlockSource ): BlockInput { if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { throw Error(`Pre Deneb block slot ${block.message.slot}`); @@ -78,21 +95,18 @@ export const getBlockInput = { return { type: BlockInputType.postDeneb, block, - source, - blobs, blockBytes, - blobsBytes, + blockData, + source, }; }, blobsPromise( config: ChainForkConfig, block: allForks.SignedBeaconBlock, - source: BlockSource, - blobsCache: BlobsCache, blockBytes: Uint8Array | null, - availabilityPromise: Promise, - resolveAvailability: (blobs: BlockInputBlobs) => void + cachedData: CachedData, + source: BlockSource ): BlockInput { if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { throw Error(`Pre Deneb block slot ${block.message.slot}`); @@ -100,16 +114,14 @@ export const getBlockInput = { return { type: BlockInputType.blobsPromise, block, - source, - blobsCache, blockBytes, - availabilityPromise, - resolveAvailability, + source, + cachedData, }; }, }; -export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs { +export function getBlockInputBlobs(blobsCache: BlobsCache): BlobsData { const blobs = []; const blobsBytes = []; diff --git a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts index c652eaad9a9b..9694aace5032 100644 --- a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts +++ b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts @@ -2,15 +2,15 @@ import {toHexString} from "@chainsafe/ssz"; import {deneb, RootHex, ssz, allForks} from "@lodestar/types"; import {ChainForkConfig} from "@lodestar/config"; import {pruneSetToMax} from "@lodestar/utils"; -import {BLOBSIDECAR_FIXED_SIZE, ForkSeq} from "@lodestar/params"; +import {BLOBSIDECAR_FIXED_SIZE, ForkSeq, ForkName} from "@lodestar/params"; import { BlockInput, NullBlockInput, getBlockInput, BlockSource, - BlockInputBlobs, - BlobsCache, + BlockInputDataBlobs, + CachedData, GossipedInputType, getBlockInputBlobs, } from "../blocks/types.js"; @@ -28,10 +28,7 @@ type GossipedBlockInput = type BlockInputCacheType = { block?: allForks.SignedBeaconBlock; blockBytes?: Uint8Array | null; - blobsCache: BlobsCache; - // blobs promise and its callback cached for delayed resolution - availabilityPromise: Promise; - resolveAvailability: (blobs: BlockInputBlobs) => void; + cachedData?: CachedData; // block promise and its callback cached for delayed resolution blockInputPromise: Promise; resolveBlockInput: (blockInput: BlockInput) => void; @@ -76,25 +73,29 @@ export class SeenGossipBlockInput { } { let blockHex; let blockCache; + let fork; if (gossipedInput.type === GossipedInputType.block) { const {signedBlock, blockBytes} = gossipedInput; + fork = config.getForkName(signedBlock.message.slot); blockHex = toHexString( config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message) ); - blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(); + blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(fork); blockCache.block = signedBlock; blockCache.blockBytes = blockBytes; } else { const {blobSidecar, blobBytes} = gossipedInput; const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobSidecar.signedBlockHeader.message); + fork = config.getForkName(blobSidecar.signedBlockHeader.message.slot); + blockHex = toHexString(blockRoot); - blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(); + blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(fork); // TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions - blockCache.blobsCache.set(blobSidecar.index, { + blockCache.cachedData?.blobsCache.set(blobSidecar.index, { blobSidecar, // easily splice out the unsigned message as blob is a fixed length type blobBytes: blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null, @@ -105,23 +106,21 @@ export class SeenGossipBlockInput { this.blockInputCache.set(blockHex, blockCache); } - const { - block: signedBlock, - blockBytes, - blobsCache, - availabilityPromise, - resolveAvailability, - blockInputPromise, - resolveBlockInput, - } = blockCache; + const {block: signedBlock, blockBytes, blockInputPromise, resolveBlockInput, cachedData} = blockCache; if (signedBlock !== undefined) { - if (config.getForkSeq(signedBlock.message.slot) < ForkSeq.deneb) { + if (ForkSeq[fork] < ForkSeq.deneb) { return { blockInput: getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, blockBytes ?? null), blockInputMeta: {pending: null, haveBlobs: 0, expectedBlobs: 0}, }; } + + if (cachedData === undefined) { + throw Error("Missing cached Data for deneb+ block"); + } + const {blobsCache} = cachedData; + // block is available, check if all blobs have shown up const {slot, body} = signedBlock.message; const {blobKzgCommitments} = body as deneb.BeaconBlockBody; @@ -135,16 +134,20 @@ export class SeenGossipBlockInput { if (blobKzgCommitments.length === blobsCache.size) { const allBlobs = getBlockInputBlobs(blobsCache); - resolveAvailability(allBlobs); + const blockData = {fork: ForkName.deneb, ...allBlobs} as BlockInputDataBlobs; + if (cachedData.fork === ForkName.deneb) { + cachedData.resolveAvailability(blockData); + } else { + throw Error("il availability not implemented"); + } metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP}); - const {blobs, blobsBytes} = allBlobs; + const {blobs} = allBlobs; const blockInput = getBlockInput.postDeneb( config, signedBlock, - BlockSource.gossip, - blobs, blockBytes ?? null, - blobsBytes + blockData, + BlockSource.gossip ); resolveBlockInput(blockInput); @@ -156,11 +159,9 @@ export class SeenGossipBlockInput { const blockInput = getBlockInput.blobsPromise( config, signedBlock, - BlockSource.gossip, - blobsCache, blockBytes ?? null, - availabilityPromise, - resolveAvailability + cachedData, + BlockSource.gossip ); resolveBlockInput(blockInput); @@ -174,14 +175,17 @@ export class SeenGossipBlockInput { }; } } else { + if (cachedData === undefined) { + throw Error("Missing cachedData for deneb+ blobs"); + } + const {blobsCache} = cachedData; + // will need to wait for the block to showup return { blockInput: { block: null, blockRootHex: blockHex, - blobsCache, - availabilityPromise, - resolveAvailability, + cachedData, blockInputPromise, }, blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null}, @@ -190,22 +194,35 @@ export class SeenGossipBlockInput { } } -function getEmptyBlockInputCacheEntry(): BlockInputCacheType { +function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType { // Capture both the promise and its callbacks for blockInput and final availability // It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately let resolveBlockInput: ((block: BlockInput) => void) | null = null; const blockInputPromise = new Promise((resolveCB) => { resolveBlockInput = resolveCB; }); - - let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; - const availabilityPromise = new Promise((resolveCB) => { - resolveAvailability = resolveCB; - }); - - if (resolveAvailability === null || resolveBlockInput === null) { + if (resolveBlockInput === null) { throw Error("Promise Constructor was not executed immediately"); } + + if (ForkSeq[fork] < ForkSeq.deneb) { + return {blockInputPromise, resolveBlockInput}; + } + const blobsCache = new Map(); - return {blockInputPromise, resolveBlockInput, availabilityPromise, resolveAvailability, blobsCache}; + + if (fork === ForkName.deneb) { + let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { + resolveAvailability = resolveCB; + }); + + if (resolveAvailability === null) { + throw Error("Promise Constructor was not executed immediately"); + } + const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability}; + return {blockInputPromise, resolveBlockInput, cachedData}; + } else { + throw Error("il cache not implemented"); + } }