Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: simplify connection upgrade #2719

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
expect(upgradeSpy.callCount).to.equal(2)
})

it('should not handle connection if upgradeInbound throws', async () => {
sinon.stub(upgrader, 'upgradeInbound').throws()
it('should not handle connection if upgradeInbound rejects', async () => {
sinon.stub(upgrader, 'upgradeInbound').rejects()

const listen = listener.createListener({
upgrader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export interface ConnectionManager {
acceptIncomingConnection(maConn: MultiaddrConnection): Promise<boolean>

/**
* Invoked after upgrading a multiaddr connection has finished
* Invoked after upgrading an inbound multiaddr connection has finished
*/
afterUpgradeInbound(): void

Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export interface ConnectionProtector {
* between its two peers from the PSK the Protector instance was
* created with.
*/
protect(connection: MultiaddrConnection): Promise<MultiaddrConnection>
protect(connection: MultiaddrConnection, options?: AbortOptions): Promise<MultiaddrConnection>
}

export interface MultiaddrConnectionTimeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
export const DIAL_TIMEOUT = 5e3

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundUpgradeTimeout
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#upgradeTimeout
*/
export const INBOUND_UPGRADE_TIMEOUT = 2e3
export const UPGRADE_TIMEOUT = 3e3

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#protocolNegotiationTimeout
*/
export const PROTOCOL_NEGOTIATION_TIMEOUT = 2e3

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxPeerAddrsToDial
Expand Down
96 changes: 52 additions & 44 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,34 @@
/**
* How long a dial attempt is allowed to take, including DNS resolution
* of the multiaddr, opening a socket and upgrading it to a Connection.
*
* @default 5000
*/
dialTimeout?: number

/**
* When a new inbound connection is opened, the upgrade process (e.g. protect,
* encrypt, multiplex etc) must complete within this number of ms.
* When a new incoming connection is opened, the upgrade process (e.g.
* protect, encrypt, multiplex etc) must complete within this number of ms.
*
* @default 30000
* @default 3000
*/
inboundUpgradeTimeout?: number

/**
* When a new outbound connection is opened, the upgrade process (e.g.
* protect, encrypt, multiplex etc) must complete within this number of ms.
*
* @default 3000
*/
outboundUpgradeTimeout?: number

/**
* Protocol negotiation must complete within this number of ms
*
* @default 2000
*/
protocolNegotiationTimeout?: number

/**
* Multiaddr resolvers to use when dialling
*/
Expand Down Expand Up @@ -164,7 +181,6 @@
private readonly deny: Multiaddr[]
private readonly maxIncomingPendingConnections: number
private incomingPendingConnections: number
private outboundPendingConnections: number
private readonly maxConnections: number

public readonly dialQueue: DialQueue
Expand Down Expand Up @@ -203,7 +219,6 @@
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))

this.outboundPendingConnections = 0
this.incomingPendingConnections = 0
this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections

Expand Down Expand Up @@ -266,8 +281,7 @@
const metric = {
inbound: 0,
'inbound pending': this.incomingPendingConnections,
outbound: 0,
'outbound pending': this.outboundPendingConnections
outbound: 0

Check warning on line 284 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L284

Added line #L284 was not covered by tests
}

for (const conns of this.connections.values()) {
Expand Down Expand Up @@ -468,54 +482,48 @@

options.signal?.throwIfAborted()

try {
this.outboundPendingConnections++
const { peerId } = getPeerAddress(peerIdOrMultiaddr)

const { peerId } = getPeerAddress(peerIdOrMultiaddr)
if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => conn.limits == null)

if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => conn.limits == null)
if (existingConnection != null) {
this.log('had an existing non-limited connection to %p', peerId)

if (existingConnection != null) {
this.log('had an existing non-limited connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
}
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
}
}

const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
...options,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})
let peerConnections = this.connections.get(connection.remotePeer)

if (peerConnections == null) {
peerConnections = []
this.connections.set(connection.remotePeer, peerConnections)
}
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
...options,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})
let peerConnections = this.connections.get(connection.remotePeer)

// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false
if (peerConnections == null) {
peerConnections = []
this.connections.set(connection.remotePeer, peerConnections)
}

for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
}
}
// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false

if (!trackedConnection) {
peerConnections.push(connection)
for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
}
}

return connection
} finally {
this.outboundPendingConnections--
if (!trackedConnection) {
peerConnections.push(connection)
}

return connection
}

async closeConnections (peerId: PeerId, options: AbortOptions = {}): Promise<void> {
Expand Down
3 changes: 2 additions & 1 deletion packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
this.components.upgrader = new DefaultUpgrader(this.components, {
connectionEncrypters: (init.connectionEncrypters ?? []).map((fn, index) => this.configureComponent(`connection-encryption-${index}`, fn(this.components))),
streamMuxers: (init.streamMuxers ?? []).map((fn, index) => this.configureComponent(`stream-muxers-${index}`, fn(this.components))),
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout,
outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout
})

// Setup the transport manager
Expand Down
Loading
Loading