Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peerdas - ensure there are at least n peers per sampling column subnet #7274

Open
wants to merge 10 commits into
base: peerDAS
Choose a base branch
from
30 changes: 27 additions & 3 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {RegistryMetricCreator} from "../../metrics/utils/registryMetricCreator.js";
import {SubnetType} from "../metadata.js";
import {DiscoveredPeerStatus} from "../peers/discover.js";
import {DiscoveredPeerStatus, NotDialReason} from "../peers/discover.js";
import {PeerRequestedSubnetType} from "../peers/peerManager.js";
import {SubnetSource} from "../subnets/attnetsService.js";

export type NetworkCoreMetrics = ReturnType<typeof createNetworkCoreMetrics>;
Expand Down Expand Up @@ -31,6 +32,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Histogram of current count of long lived attnets of connected peers",
buckets: [0, 4, 16, 32, 64],
}),
peerColumnSubnetCount: register.histogram({
name: "lodestar_peer_column_subnet_count",
help: "Histogram of current count of column subnets of connected peers",
buckets: [0, 4, 8, 16, 32, 64, 128],
}),
peerScoreByClient: register.histogram<{client: string}>({
name: "lodestar_app_peer_score",
help: "Current peer score at lodestar app side",
Expand Down Expand Up @@ -90,12 +96,12 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Prioritization results total peers count requested to disconnect",
labelNames: ["reason"],
}),
peersRequestedSubnetsToQuery: register.gauge<{type: SubnetType}>({
peersRequestedSubnetsToQuery: register.gauge<{type: PeerRequestedSubnetType}>({
name: "lodestar_peers_requested_total_subnets_to_query",
help: "Prioritization results total subnets to query and discover peers in",
labelNames: ["type"],
}),
peersRequestedSubnetsPeerCount: register.gauge<{type: SubnetType}>({
peersRequestedSubnetsPeerCount: register.gauge<{type: PeerRequestedSubnetType}>({
name: "lodestar_peers_requested_total_subnets_peers_count",
help: "Prioritization results total peers in subnets to query and discover peers in",
labelNames: ["type"],
Expand All @@ -105,6 +111,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "network.reportPeer count by reason",
labelNames: ["reason"],
}),
peerCountPerSamplingColumnSubnet: register.gauge<{subnet: number}>({
name: "lodestar_peer_count_per_sampling_column_subnet",
help: "Current count of peers per sampling column subnet",
labelNames: ["subnet"],
}),
peerManager: {
heartbeatDuration: register.histogram({
name: "lodestar_peer_manager_heartbeat_duration_seconds",
Expand All @@ -127,11 +138,19 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Current peers to connect count from discoverPeers requests",
labelNames: ["type"],
}),
subnetColumnPeersToConnect: register.gauge({
name: "lodestar_discovery_subnet_column_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
}),
subnetsToConnect: register.gauge<{type: SubnetType}>({
name: "lodestar_discovery_subnets_to_connect",
help: "Current subnets to connect count from discoverPeers requests",
labelNames: ["type"],
}),
columnSubnetsToConnect: register.gauge({
name: "lodestar_discovery_column_subnets_to_connect",
help: "Current column subnets to connect count from discoverPeers requests",
}),
cachedENRsSize: register.gauge({
name: "lodestar_discovery_cached_enrs_size",
help: "Current size of the cachedENRs Set",
Expand All @@ -155,6 +174,11 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Total count of status results of PeerDiscovery.onDiscovered() function",
labelNames: ["status"],
}),
notDialReason: register.gauge<{reason: NotDialReason}>({
name: "lodestar_discovery_not_dial_reason_total_count",
help: "Total count of not dial reasons",
labelNames: ["reason"],
}),
dialAttempts: register.gauge({
name: "lodestar_discovery_total_dial_attempts",
help: "Total dial attempts by peer discovery",
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ export const defaultNetworkOptions: NetworkOptions = {
beaconAttestationBatchValidation: true,
// This will enable the light client server by default
disableLightClientServer: false,
// for PeerDAS, this is the same to TARGET_SUBNET_PEERS, should reavaluate after devnets
targetColumnSubnetPeers: 6,
};
136 changes: 108 additions & 28 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {BeaconConfig} from "@lodestar/config";
import {pruneSetToMax, sleep} from "@lodestar/utils";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {LoggerNode} from "@lodestar/logger/node";
import {ssz} from "@lodestar/types";
import {ColumnIndex} from "@lodestar/types";
import {bytesToInt} from "@lodestar/utils";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {Libp2p} from "../interface.js";
Expand All @@ -18,6 +18,7 @@ import {NodeId, computeNodeId} from "../subnets/interface.js";
import {getDataColumnSubnets} from "../../util/dataColumns.js";
import {deserializeEnrSubnets, zeroAttnets, zeroSyncnets} from "./utils/enrSubnetsDeserialize.js";
import {IPeerRpcScoreStore, ScoreState} from "./score/index.js";
import {ColumnSubnetId} from "./peersData.js";

/** Max number of cached ENRs after discovering a good peer */
const MAX_CACHED_ENRS = 100;
Expand All @@ -30,6 +31,7 @@ export type PeerDiscoveryOpts = {
discv5: LodestarDiscv5Opts;
connectToDiscv5Bootnodes?: boolean;
// experimental flags for debugging
// TODO-das: remove
onlyConnectToBiggerDataNodes?: boolean;
onlyConnectToMinimalCustodyOverlapNodes?: boolean;
};
Expand Down Expand Up @@ -62,6 +64,11 @@ export enum DiscoveredPeerStatus {
no_multiaddrs = "no_multiaddrs",
}

export enum NotDialReason {
not_contain_requested_subnet_sampling_columns = "not_contain_requested_subnet_sampling_columns",
not_contain_requested_attnet_syncnet_subnets = "not_contain_requested_attnet_syncnet_subnets",
}

type UnixMs = number;
/**
* Maintain peersToConnect to avoid having too many topic peers at some point.
Expand All @@ -80,12 +87,17 @@ export type SubnetDiscvQueryMs = {
maxPeersToDiscover: number;
};

/**
* A map of column subnet id to maxPeersToDiscover
*/
type ColumnSubnetQueries = Map<ColumnSubnetId, number>;

type CachedENR = {
peerId: PeerId;
multiaddrTCP: Multiaddr;
subnets: Record<SubnetType, boolean[]>;
addedUnixMs: number;
custodySubnetCount: number;
peerCustodySubnets: number[];
Copy link
Contributor

@g11tech g11tech Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ain't the spec right? if not are you proposing a spec change in line with this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is implementation specific and not part of the spec
here we store peerCustodySubnets once to reuse later

};

/**
Expand All @@ -95,27 +107,28 @@ type CachedENR = {
export class PeerDiscovery {
readonly discv5: Discv5Worker;
private libp2p: Libp2p;
// TODO-das: remove nodeId and sampleSubnets once we remove onlyConnect* flag
private nodeId: NodeId;
private sampleSubnets: number[];
private peerRpcScores: IPeerRpcScoreStore;
private metrics: NetworkCoreMetrics | null;
private logger: LoggerNode;
private config: BeaconConfig;
private cachedENRs = new Map<PeerIdStr, CachedENR>();
private peerIdToCustodySubnetCount = new Map<PeerIdStr, number>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
private subnetRequests: Record<SubnetType, Map<number, SubnetRequestInfo>> = {
attnets: new Map(),
syncnets: new Map(),
};
// map of column subnet to max number of peers to connect
private columnSubnetRequests: Map<number, number>;

/** The maximum number of peers we allow (exceptions for subnet peers) */
private maxPeers: number;
private discv5StartMs: number;
private discv5FirstQueryDelayMs: number;

private connectToDiscv5BootnodesOnStart: boolean | undefined = false;
// TODO-das: remove
private onlyConnectToBiggerDataNodes: boolean | undefined = false;
private onlyConnectToMinimalCustodyOverlapNodes: boolean | undefined = false;

Expand All @@ -127,14 +140,15 @@ export class PeerDiscovery {
this.logger = logger;
this.config = config;
this.discv5 = discv5;
// TODO-das: remove
this.nodeId = nodeId;
// we will only connect to peers that can provide us custody
this.sampleSubnets = getDataColumnSubnets(
nodeId,
Math.max(config.CUSTODY_REQUIREMENT, config.NODE_CUSTODY_REQUIREMENT, config.SAMPLES_PER_SLOT)
);
this.columnSubnetRequests = new Map();

this.maxPeers = opts.maxPeers;
this.discv5StartMs = 0;
this.discv5StartMs = Date.now();
this.discv5FirstQueryDelayMs = opts.discv5FirstQueryDelayMs;
Expand Down Expand Up @@ -167,6 +181,13 @@ export class PeerDiscovery {
metrics.discovery.cachedENRsSize.addCollect(() => {
metrics.discovery.cachedENRsSize.set(this.cachedENRs.size);
metrics.discovery.peersToConnect.set(this.peersToConnect);

// PeerDAS metrics
const columnSubnetsToConnect = Array.from(this.columnSubnetRequests.values());
const subnetColumnPeersToConnect = columnSubnetsToConnect.reduce((acc, elem) => acc + elem, 0);
metrics.discovery.subnetColumnPeersToConnect.set(subnetColumnPeersToConnect);
metrics.discovery.columnSubnetsToConnect.set(columnSubnetsToConnect.filter((elem) => elem > 0).length);

for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
const subnetPeersToConnect = Array.from(this.subnetRequests[type].values()).reduce(
(acc, {peersToConnect}) => acc + peersToConnect,
Expand Down Expand Up @@ -200,7 +221,11 @@ export class PeerDiscovery {
/**
* Request to find peers, both on specific subnets and in general
*/
discoverPeers(peersToConnect: number, subnetRequests: SubnetDiscvQueryMs[] = []): void {
discoverPeers(
peersToConnect: number,
columnSubnetRequests: ColumnSubnetQueries,
subnetRequests: SubnetDiscvQueryMs[] = []
): void {
const subnetsToDiscoverPeers: SubnetDiscvQueryMs[] = [];
const cachedENRsToDial = new Map<PeerIdStr, CachedENR>();
// Iterate in reverse to consider first the most recent ENRs
Expand All @@ -226,15 +251,40 @@ export class PeerDiscovery {

this.peersToConnect += peersToConnect;

// starting from PeerDAS, we need to prioritize column subnet peers first in order to have stable subnet sampling
const columnSubnetsToDiscover = new Set<ColumnIndex>();
let columnPeersToDiscover = 0;
column: for (const [subnet, maxPeersToConnect] of columnSubnetRequests) {
let cachedENRsInColumn = 0;
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.peerCustodySubnets.includes(subnet)) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInColumn >= maxPeersToConnect) {
continue column;
}
}

const columnPeersToConnect = Math.max(maxPeersToConnect - cachedENRsInColumn, 0);
this.columnSubnetRequests.set(subnet, columnPeersToConnect);
columnSubnetsToDiscover.add(subnet);
columnPeersToDiscover += columnPeersToConnect;
}
}

subnet: for (const subnetRequest of subnetRequests) {
// Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet
let cachedENRsInSubnet = 0;
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) {
continue subnet;
// only dial attnet/syncnet peers if subnet sampling peers are stable
if (columnPeersToDiscover === 0) {
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) {
cachedENRsToDial.set(cachedENR.peerId.toString(), cachedENR);

if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) {
continue subnet;
}
}
}
}
Expand Down Expand Up @@ -282,6 +332,8 @@ export class PeerDiscovery {
peersToConnect,
peersAvailableToDial: cachedENRsToDial.size,
subnetsToDiscover: subnetsToDiscoverPeers.length,
columnSubnetsToDiscover: Array.from(columnSubnetsToDiscover).join(","),
columnPeersToDiscover,
shouldRunFindRandomNodeQuery,
});
}
Expand Down Expand Up @@ -336,12 +388,8 @@ export class PeerDiscovery {

const attnets = zeroAttnets;
const syncnets = zeroSyncnets;
const custodySubnetCount = this.peerIdToCustodySubnetCount.get(id.toString());
if (custodySubnetCount === undefined) {
this.logger.warn("onDiscoveredPeer with unknown custodySubnetCount assuming 4", {peerId: id.toString()});
}

const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount ?? 4);
const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, undefined);
this.logger.debug("Discovered peer via libp2p", {peer: prettyPrintPeerId(id), status});
this.metrics?.discovery.discoveredStatus.inc({status});
};
Expand Down Expand Up @@ -376,13 +424,16 @@ export class PeerDiscovery {
// never throw and treat too long or too short bitfields as zero-ed
const attnets = attnetsBytes ? deserializeEnrSubnets(attnetsBytes, ATTESTATION_SUBNET_COUNT) : zeroAttnets;
const syncnets = syncnetsBytes ? deserializeEnrSubnets(syncnetsBytes, SYNC_COMMITTEE_SUBNET_COUNT) : zeroSyncnets;
const custodySubnetCount = custodySubnetCountBytes
? bytesToInt(custodySubnetCountBytes, "be")
: this.config.CUSTODY_REQUIREMENT;
this.peerIdToCustodySubnetCount.set(peerId.toString(), custodySubnetCount);

const status = this.handleDiscoveredPeer(peerId, multiaddrTCP, attnets, syncnets, custodySubnetCount);
this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status});
const custodySubnetCount = custodySubnetCountBytes ? bytesToInt(custodySubnetCountBytes, "be") : undefined;

const status = this.handleDiscoveredPeer(
peerId,
multiaddrTCP,
attnets,
syncnets,
custodySubnetCount
);
this.logger.debug("Discovered peer via discv5", {peer: prettyPrintPeerId(peerId), status, custodySubnetCount});
this.metrics?.discovery.discoveredStatus.inc({status});
};

Expand All @@ -394,7 +445,7 @@ export class PeerDiscovery {
multiaddrTCP: Multiaddr,
attnets: boolean[],
syncnets: boolean[],
custodySubnetCount: number
custodySubnetCount?: number
): DiscoveredPeerStatus {
const nodeId = computeNodeId(peerId);
this.logger.warn("handleDiscoveredPeer", {nodeId: toHexString(nodeId), peerId: peerId.toString()});
Expand Down Expand Up @@ -424,7 +475,7 @@ export class PeerDiscovery {
multiaddrTCP,
subnets: {attnets, syncnets},
addedUnixMs: Date.now(),
custodySubnetCount,
peerCustodySubnets: getDataColumnSubnets(nodeId, custodySubnetCount ?? this.config.CUSTODY_REQUIREMENT),
};

// Only dial peer if necessary
Expand All @@ -445,9 +496,11 @@ export class PeerDiscovery {
}

private shouldDialPeer(peer: CachedENR): boolean {
// begin onlyConnect* experimental logic
// TODO-das: remove
const nodeId = computeNodeId(peer.peerId);
const peerCustodySubnetCount = peer.custodySubnetCount;
const peerCustodySubnets = getDataColumnSubnets(nodeId, peerCustodySubnetCount);
const {peerCustodySubnets} = peer;
const peerCustodySubnetCount = peerCustodySubnets.length;

const matchingSubnetsNum = this.sampleSubnets.reduce(
(acc, elem) => acc + (peerCustodySubnets.includes(elem) ? 1 : 0),
Expand All @@ -468,10 +521,36 @@ export class PeerDiscovery {
if (this.onlyConnectToBiggerDataNodes && !hasAllColumns) {
return false;
}

if (this.onlyConnectToMinimalCustodyOverlapNodes && !hasMinCustodyMatchingColumns) {
return false;
}
// end onlyConnect* experimental logic

// starting from PeerDAS fork, we need to make sure we have stable subnet sampling peers first
// given CUSTODY_REQUIREMENT = 4 and 100 peers, we have 400 custody columns from peers
// with NUMBER_OF_COLUMNS = 128, we have 400 / 128 = 3.125 peers per column in average
// it would not be hard to find TARGET_SUBNET_PEERS(6) peers per SAMPLES_PER_SLOT(8) columns
// after some first heartbeats, we should have no more column requested, then go with conditions of prior forks
let hasMatchingColumn = false;
let columnRequestCount = 0;
for (const [column, peersToConnect] of this.columnSubnetRequests.entries()) {
if (peersToConnect <= 0) {
this.columnSubnetRequests.delete(column);
} else if (peerCustodySubnets.includes(column)) {
this.columnSubnetRequests.set(column, Math.max(0, peersToConnect - 1));
hasMatchingColumn = true;
columnRequestCount += peersToConnect;
}
}

// if subnet sampling peers are not stable and this peer is not in the requested columns, ignore it
if (columnRequestCount > 0 && !hasMatchingColumn) {
this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_subnet_sampling_columns});
return false;
}

// logics up to Deneb fork
for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
for (const [subnet, {toUnixMs, peersToConnect}] of this.subnetRequests[type].entries()) {
if (toUnixMs < Date.now() || peersToConnect === 0) {
Expand All @@ -497,6 +576,7 @@ export class PeerDiscovery {
return true;
}

this.metrics?.discovery.notDialReason.inc({reason: NotDialReason.not_contain_requested_attnet_syncnet_subnets});
return false;
}

Expand Down
Loading
Loading