Skip to content

Commit

Permalink
extend blockinput for blobs and il data
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Mar 16, 2024
1 parent 8b053ca commit c7f3ec6
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 67 deletions.
64 changes: 38 additions & 26 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {allForks, deneb, electra, Slot, RootHex} from "@lodestar/types";
import {ForkSeq, ForkName, ForkPreBlobs, ForkILs} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

export enum BlockInputType {
Expand All @@ -23,20 +23,38 @@ export enum GossipedInputType {
blob = "blob",
}

type ForkBlobsInfo = {fork: ForkName.deneb};
type ForkILsInfo = {fork: ForkILs};

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
type CachedBlobs = {
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
};

type BlobsData = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
type ILsData = BlobsData & {ilSummary: electra.ILSummary; inclusionLists: electra.ILTransactions[]};

export type BlockInputDataBlobs = ForkBlobsInfo & BlobsData;
export type BlockInputDataIls = ForkILsInfo & ILsData;
export type BlockInputData = BlockInputDataBlobs | BlockInputDataIls;

type BlobsInputCache = {blobsCache: BlobsCache};
type ForkILsCache = BlobsInputCache & {ilSummary?: electra.ILSummary; inclusionLists: electra.ILTransactions[]};

export type BlockInputCacheBlobs = ForkBlobsInfo & BlobsInputCache;
export type BlockInputCacheILs = ForkILsInfo & ForkILsCache;
export type BlockInputCache = (ForkBlobsInfo & BlobsInputCache) | (ForkILsInfo & ForkILsCache);

type Availability<T> = {availabilityPromise: Promise<T>; resolveAvailability: (data: T) => void};
export type CachedData =
| (ForkBlobsInfo & BlobsInputCache & Availability<BlockInputDataBlobs>)
| (ForkILsInfo & ForkILsCache & Availability<BlockInputDataIls>);

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| ({type: BlockInputType.blobsPromise} & CachedBlobs)
| ({type: BlockInputType.postDeneb} & {blockData: BlockInputData})
| ({type: BlockInputType.blobsPromise} & {cachedData: CachedData})
);
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & CachedBlobs;
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & {
cachedData: CachedData;
};

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
return (
Expand Down Expand Up @@ -67,49 +85,43 @@ export const getBlockInput = {
postDeneb(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobs: deneb.BlobSidecars,
blockBytes: Uint8Array | null,
blobsBytes: (Uint8Array | null)[]
blockData: BlockInputData,
source: BlockSource
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.postDeneb,
block,
source,
blobs,
blockBytes,
blobsBytes,
blockData,
source,
};
},

blobsPromise(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobsCache: BlobsCache,
blockBytes: Uint8Array | null,
availabilityPromise: Promise<BlockInputBlobs>,
resolveAvailability: (blobs: BlockInputBlobs) => void
cachedData: CachedData,
source: BlockSource
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.blobsPromise,
block,
source,
blobsCache,
blockBytes,
availabilityPromise,
resolveAvailability,
source,
cachedData,
};
},
};

export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
export function getBlockInputBlobs(blobsCache: BlobsCache): BlobsData {
const blobs = [];
const blobsBytes = [];

Expand Down
99 changes: 58 additions & 41 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import {toHexString} from "@chainsafe/ssz";
import {deneb, RootHex, ssz, allForks} from "@lodestar/types";
import {ChainForkConfig} from "@lodestar/config";
import {pruneSetToMax} from "@lodestar/utils";
import {BLOBSIDECAR_FIXED_SIZE, ForkSeq} from "@lodestar/params";
import {BLOBSIDECAR_FIXED_SIZE, ForkSeq, ForkName} from "@lodestar/params";

import {
BlockInput,
NullBlockInput,
getBlockInput,
BlockSource,
BlockInputBlobs,
BlobsCache,
BlockInputDataBlobs,
CachedData,
GossipedInputType,
getBlockInputBlobs,
} from "../blocks/types.js";
Expand All @@ -28,10 +28,7 @@ type GossipedBlockInput =
type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobsCache: BlobsCache;
// blobs promise and its callback cached for delayed resolution
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
cachedData?: CachedData;
// block promise and its callback cached for delayed resolution
blockInputPromise: Promise<BlockInput>;
resolveBlockInput: (blockInput: BlockInput) => void;
Expand Down Expand Up @@ -76,25 +73,29 @@ export class SeenGossipBlockInput {
} {
let blockHex;
let blockCache;
let fork;

if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock, blockBytes} = gossipedInput;
fork = config.getForkName(signedBlock.message.slot);

blockHex = toHexString(
config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry();
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(fork);

blockCache.block = signedBlock;
blockCache.blockBytes = blockBytes;
} else {
const {blobSidecar, blobBytes} = gossipedInput;
const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobSidecar.signedBlockHeader.message);
fork = config.getForkName(blobSidecar.signedBlockHeader.message.slot);

blockHex = toHexString(blockRoot);
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry();
blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(fork);

// TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions
blockCache.blobsCache.set(blobSidecar.index, {
blockCache.cachedData?.blobsCache.set(blobSidecar.index, {
blobSidecar,
// easily splice out the unsigned message as blob is a fixed length type
blobBytes: blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null,
Expand All @@ -105,23 +106,21 @@ export class SeenGossipBlockInput {
this.blockInputCache.set(blockHex, blockCache);
}

const {
block: signedBlock,
blockBytes,
blobsCache,
availabilityPromise,
resolveAvailability,
blockInputPromise,
resolveBlockInput,
} = blockCache;
const {block: signedBlock, blockBytes, blockInputPromise, resolveBlockInput, cachedData} = blockCache;

if (signedBlock !== undefined) {
if (config.getForkSeq(signedBlock.message.slot) < ForkSeq.deneb) {
if (ForkSeq[fork] < ForkSeq.deneb) {
return {
blockInput: getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, blockBytes ?? null),
blockInputMeta: {pending: null, haveBlobs: 0, expectedBlobs: 0},
};
}

if (cachedData === undefined) {
throw Error("Missing cached Data for deneb+ block");
}
const {blobsCache} = cachedData;

// block is available, check if all blobs have shown up
const {slot, body} = signedBlock.message;
const {blobKzgCommitments} = body as deneb.BeaconBlockBody;
Expand All @@ -135,16 +134,20 @@ export class SeenGossipBlockInput {

if (blobKzgCommitments.length === blobsCache.size) {
const allBlobs = getBlockInputBlobs(blobsCache);
resolveAvailability(allBlobs);
const blockData = {fork: ForkName.deneb, ...allBlobs} as BlockInputDataBlobs;
if (cachedData.fork === ForkName.deneb) {
cachedData.resolveAvailability(blockData);
} else {
throw Error("il availability not implemented");
}
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
const {blobs} = allBlobs;
const blockInput = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
blockBytes ?? null,
blobsBytes
blockData,
BlockSource.gossip
);

resolveBlockInput(blockInput);
Expand All @@ -156,11 +159,9 @@ export class SeenGossipBlockInput {
const blockInput = getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise,
resolveAvailability
cachedData,
BlockSource.gossip
);

resolveBlockInput(blockInput);
Expand All @@ -174,14 +175,17 @@ export class SeenGossipBlockInput {
};
}
} else {
if (cachedData === undefined) {
throw Error("Missing cachedData for deneb+ blobs");
}
const {blobsCache} = cachedData;

// will need to wait for the block to showup
return {
blockInput: {
block: null,
blockRootHex: blockHex,
blobsCache,
availabilityPromise,
resolveAvailability,
cachedData,
blockInputPromise,
},
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null},
Expand All @@ -190,22 +194,35 @@ export class SeenGossipBlockInput {
}
}

function getEmptyBlockInputCacheEntry(): BlockInputCacheType {
function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
// Capture both the promise and its callbacks for blockInput and final availability
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolveBlockInput: ((block: BlockInput) => void) | null = null;
const blockInputPromise = new Promise<BlockInput>((resolveCB) => {
resolveBlockInput = resolveCB;
});

let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});

if (resolveAvailability === null || resolveBlockInput === null) {
if (resolveBlockInput === null) {
throw Error("Promise Constructor was not executed immediately");
}

if (ForkSeq[fork] < ForkSeq.deneb) {
return {blockInputPromise, resolveBlockInput};
}

const blobsCache = new Map();
return {blockInputPromise, resolveBlockInput, availabilityPromise, resolveAvailability, blobsCache};

if (fork === ForkName.deneb) {
let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputDataBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});

if (resolveAvailability === null) {
throw Error("Promise Constructor was not executed immediately");
}
const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability};
return {blockInputPromise, resolveBlockInput, cachedData};
} else {
throw Error("il cache not implemented");
}
}

0 comments on commit c7f3ec6

Please sign in to comment.