diff --git a/packages/interface-compliance-tests/src/transport/listen-test.ts b/packages/interface-compliance-tests/src/transport/listen-test.ts index ddc585506d..eadf69db60 100644 --- a/packages/interface-compliance-tests/src/transport/listen-test.ts +++ b/packages/interface-compliance-tests/src/transport/listen-test.ts @@ -106,8 +106,8 @@ export default (common: TestSetup): void => { expect(upgradeSpy.callCount).to.equal(2) }) - it('should not handle connection if upgradeInbound throws', async () => { - sinon.stub(upgrader, 'upgradeInbound').throws() + it('should not handle connection if upgradeInbound rejects', async () => { + sinon.stub(upgrader, 'upgradeInbound').rejects() const listen = listener.createListener({ upgrader diff --git a/packages/interface-internal/src/connection-manager/index.ts b/packages/interface-internal/src/connection-manager/index.ts index 860c8c9f62..de9d036e08 100644 --- a/packages/interface-internal/src/connection-manager/index.ts +++ b/packages/interface-internal/src/connection-manager/index.ts @@ -71,7 +71,7 @@ export interface ConnectionManager { acceptIncomingConnection(maConn: MultiaddrConnection): Promise /** - * Invoked after upgrading a multiaddr connection has finished + * Invoked after upgrading an inbound multiaddr connection has finished */ afterUpgradeInbound(): void diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 696c74830d..981b9cf578 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -354,7 +354,7 @@ export interface ConnectionProtector { * between its two peers from the PSK the Protector instance was * created with. */ - protect(connection: MultiaddrConnection): Promise + protect(connection: MultiaddrConnection, options?: AbortOptions): Promise } export interface MultiaddrConnectionTimeline { diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 628088e8ed..95d9b32692 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -4,9 +4,14 @@ export const DIAL_TIMEOUT = 5e3 /** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundUpgradeTimeout + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#upgradeTimeout */ -export const INBOUND_UPGRADE_TIMEOUT = 2e3 +export const UPGRADE_TIMEOUT = 3e3 + +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#protocolNegotiationTimeout + */ +export const PROTOCOL_NEGOTIATION_TIMEOUT = 2e3 /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxPeerAddrsToDial diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 6cc859aade..8889113d01 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -57,17 +57,34 @@ export interface ConnectionManagerInit { /** * How long a dial attempt is allowed to take, including DNS resolution * of the multiaddr, opening a socket and upgrading it to a Connection. + * + * @default 5000 */ dialTimeout?: number /** - * When a new inbound connection is opened, the upgrade process (e.g. protect, - * encrypt, multiplex etc) must complete within this number of ms. + * When a new incoming connection is opened, the upgrade process (e.g. + * protect, encrypt, multiplex etc) must complete within this number of ms. * - * @default 30000 + * @default 3000 */ inboundUpgradeTimeout?: number + /** + * When a new outbound connection is opened, the upgrade process (e.g. + * protect, encrypt, multiplex etc) must complete within this number of ms. + * + * @default 3000 + */ + outboundUpgradeTimeout?: number + + /** + * Protocol negotiation must complete within this number of ms + * + * @default 2000 + */ + protocolNegotiationTimeout?: number + /** * Multiaddr resolvers to use when dialling */ @@ -164,7 +181,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly deny: Multiaddr[] private readonly maxIncomingPendingConnections: number private incomingPendingConnections: number - private outboundPendingConnections: number private readonly maxConnections: number public readonly dialQueue: DialQueue @@ -203,7 +219,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { this.allow = (init.allow ?? []).map(ma => multiaddr(ma)) this.deny = (init.deny ?? []).map(ma => multiaddr(ma)) - this.outboundPendingConnections = 0 this.incomingPendingConnections = 0 this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections @@ -266,8 +281,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { const metric = { inbound: 0, 'inbound pending': this.incomingPendingConnections, - outbound: 0, - 'outbound pending': this.outboundPendingConnections + outbound: 0 } for (const conns of this.connections.values()) { @@ -468,54 +482,48 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { options.signal?.throwIfAborted() - try { - this.outboundPendingConnections++ + const { peerId } = getPeerAddress(peerIdOrMultiaddr) - const { peerId } = getPeerAddress(peerIdOrMultiaddr) + if (peerId != null && options.force !== true) { + this.log('dial %p', peerId) + const existingConnection = this.getConnections(peerId) + .find(conn => conn.limits == null) - if (peerId != null && options.force !== true) { - this.log('dial %p', peerId) - const existingConnection = this.getConnections(peerId) - .find(conn => conn.limits == null) + if (existingConnection != null) { + this.log('had an existing non-limited connection to %p', peerId) - if (existingConnection != null) { - this.log('had an existing non-limited connection to %p', peerId) - - options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) - return existingConnection - } + options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) + return existingConnection } + } - const connection = await this.dialQueue.dial(peerIdOrMultiaddr, { - ...options, - priority: options.priority ?? DEFAULT_DIAL_PRIORITY - }) - let peerConnections = this.connections.get(connection.remotePeer) - - if (peerConnections == null) { - peerConnections = [] - this.connections.set(connection.remotePeer, peerConnections) - } + const connection = await this.dialQueue.dial(peerIdOrMultiaddr, { + ...options, + priority: options.priority ?? DEFAULT_DIAL_PRIORITY + }) + let peerConnections = this.connections.get(connection.remotePeer) - // we get notified of connections via the Upgrader emitting "connection" - // events, double check we aren't already tracking this connection before - // storing it - let trackedConnection = false + if (peerConnections == null) { + peerConnections = [] + this.connections.set(connection.remotePeer, peerConnections) + } - for (const conn of peerConnections) { - if (conn.id === connection.id) { - trackedConnection = true - } - } + // we get notified of connections via the Upgrader emitting "connection" + // events, double check we aren't already tracking this connection before + // storing it + let trackedConnection = false - if (!trackedConnection) { - peerConnections.push(connection) + for (const conn of peerConnections) { + if (conn.id === connection.id) { + trackedConnection = true } + } - return connection - } finally { - this.outboundPendingConnections-- + if (!trackedConnection) { + peerConnections.push(connection) } + + return connection } async closeConnections (peerId: PeerId, options: AbortOptions = {}): Promise { diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index 16221de1c3..912afeb716 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -110,7 +110,8 @@ export class Libp2p extends TypedEventEmitter this.components.upgrader = new DefaultUpgrader(this.components, { connectionEncrypters: (init.connectionEncrypters ?? []).map((fn, index) => this.configureComponent(`connection-encryption-${index}`, fn(this.components))), streamMuxers: (init.streamMuxers ?? []).map((fn, index) => this.configureComponent(`stream-muxers-${index}`, fn(this.components))), - inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout + inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout, + outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout }) // Setup the transport manager diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 059f816ec8..92baa1e773 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -1,16 +1,15 @@ -import { InvalidMultiaddrError, InvalidPeerIdError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, TimeoutError, setMaxListeners } from '@libp2p/interface' +import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners } from '@libp2p/interface' import * as mss from '@libp2p/multistream-select' import { peerIdFromString } from '@libp2p/peer-id' +import { anySignal } from 'any-signal' import { CustomProgressEvent } from 'progress-events' import { createConnection } from './connection/index.js' -import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js' +import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js' import { ConnectionDeniedError, ConnectionInterceptedError, EncryptionFailedError, MuxerUnavailableError } from './errors.js' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions } from '@libp2p/interface' import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' -const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000 - interface CreateConnectionOptions { cryptoProtocol: string direction: 'inbound' | 'outbound' @@ -36,10 +35,34 @@ export interface UpgraderInit { streamMuxers: StreamMuxerFactory[] /** - * An amount of ms by which an inbound connection upgrade - * must complete + * An amount of ms by which an inbound connection upgrade must complete + * + * @default 3000 */ inboundUpgradeTimeout?: number + + /** + * An amount of ms by which an outbound connection upgrade must complete + * + * @default 3000 + */ + outboundUpgradeTimeout?: number + + /** + * When a new incoming stream is opened on a multiplexed connection, protocol + * negotiation on that stream must complete within this many ms + * + * @default 2000 + */ + inboundStreamProtocolNegotiationTimeout?: number + + /** + * When a new incoming stream is opened on a multiplexed connection, protocol + * negotiation on that stream must complete within this many ms + * + * @default 2000 + */ + outboundStreamProtocolNegotiationTimeout?: number } function findIncomingStreamLimit (protocol: string, registrar: Registrar): number | undefined { @@ -103,6 +126,9 @@ export class DefaultUpgrader implements Upgrader { private readonly connectionEncrypters: Map private readonly streamMuxers: Map private readonly inboundUpgradeTimeout: number + private readonly outboundUpgradeTimeout: number + private readonly inboundStreamProtocolNegotiationTimeout: number + private readonly outboundStreamProtocolNegotiationTimeout: number private readonly events: TypedEventTarget constructor (components: DefaultUpgraderComponents, init: UpgraderInit) { @@ -119,135 +145,46 @@ export class DefaultUpgrader implements Upgrader { this.streamMuxers.set(muxer.protocol, muxer) }) - this.inboundUpgradeTimeout = init.inboundUpgradeTimeout ?? INBOUND_UPGRADE_TIMEOUT + this.inboundUpgradeTimeout = init.inboundUpgradeTimeout ?? UPGRADE_TIMEOUT + this.outboundUpgradeTimeout = init.outboundUpgradeTimeout ?? UPGRADE_TIMEOUT + this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT + this.outboundStreamProtocolNegotiationTimeout = init.outboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT this.events = components.events } readonly [Symbol.toStringTag] = '@libp2p/upgrader' - async shouldBlockConnection (remotePeer: PeerId, maConn: MultiaddrConnection, connectionType: ConnectionDeniedType): Promise { - const connectionGater = this.components.connectionGater[connectionType] + async shouldBlockConnection (connectionType: 'denyInboundConnection', maConn: MultiaddrConnection): Promise + async shouldBlockConnection (connectionType: ConnectionDeniedType, remotePeer: PeerId, maConn: MultiaddrConnection): Promise + async shouldBlockConnection (method: ConnectionDeniedType | 'denyInboundConnection', ...args: any[]): Promise { + const denyOperation: any = this.components.connectionGater[method] - if (connectionGater !== undefined) { - if (await connectionGater(remotePeer, maConn)) { - throw new ConnectionInterceptedError(`The multiaddr connection is blocked by gater.${connectionType}`) - } + if (denyOperation == null) { + return + } + + const result = await denyOperation.apply(this.components.connectionGater, args) + + if (result === true) { + throw new ConnectionInterceptedError(`The multiaddr connection is blocked by gater.${method}`) } } /** * Upgrades an inbound connection */ - async upgradeInbound (maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise { - const accept = await this.components.connectionManager.acceptIncomingConnection(maConn) - - if (!accept) { - throw new ConnectionDeniedError('connection denied') - } - - let encryptedConn: MultiaddrConnection - let remotePeer - let upgradedConn: MultiaddrConnection - let muxerFactory: StreamMuxerFactory | undefined - let cryptoProtocol - - const signal = AbortSignal.timeout(this.inboundUpgradeTimeout) - - const onAbort = (): void => { - maConn.abort(new TimeoutError('inbound upgrade timeout')) - } - - signal.addEventListener('abort', onAbort, { once: true }) - - setMaxListeners(Infinity, signal) - + async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { try { - if ((await this.components.connectionGater.denyInboundConnection?.(maConn)) === true) { - throw new ConnectionInterceptedError('The multiaddr connection is blocked by gater.acceptConnection') - } - - this.components.metrics?.trackMultiaddrConnection(maConn) + const accept = await this.components.connectionManager.acceptIncomingConnection(maConn) - maConn.log('starting the inbound connection upgrade') - - // Protect - let protectedConn = maConn - - if (opts?.skipProtection !== true) { - const protector = this.components.connectionProtector - - if (protector != null) { - maConn.log('protecting the inbound connection') - protectedConn = await protector.protect(maConn) - } + if (!accept) { + throw new ConnectionDeniedError('connection denied') } - try { - // Encrypt the connection - encryptedConn = protectedConn - if (opts?.skipEncryption !== true) { - opts?.onProgress?.(new CustomProgressEvent('upgrader:encrypt-inbound-connection')); - - ({ - conn: encryptedConn, - remotePeer, - protocol: cryptoProtocol - } = await this._encryptInbound(protectedConn)) - - const maConn: MultiaddrConnection = { - ...protectedConn, - ...encryptedConn - } + await this.shouldBlockConnection('denyInboundConnection', maConn) - await this.shouldBlockConnection(remotePeer, maConn, 'denyInboundEncryptedConnection') - } else { - const idStr = maConn.remoteAddr.getPeerId() - - if (idStr == null) { - throw new InvalidMultiaddrError('inbound connection that skipped encryption must have a peer id') - } - - const remotePeerId = peerIdFromString(idStr) - - cryptoProtocol = 'native' - remotePeer = remotePeerId - } - - upgradedConn = encryptedConn - if (opts?.muxerFactory != null) { - muxerFactory = opts.muxerFactory - } else if (this.streamMuxers.size > 0) { - opts?.onProgress?.(new CustomProgressEvent('upgrader:multiplex-inbound-connection')) - - // Multiplex the connection - const multiplexed = await this._multiplexInbound({ - ...protectedConn, - ...encryptedConn - }, this.streamMuxers) - muxerFactory = multiplexed.muxerFactory - upgradedConn = multiplexed.stream - } - } catch (err: any) { - maConn.log.error('failed to upgrade inbound connection', err) - throw err - } - - await this.shouldBlockConnection(remotePeer, maConn, 'denyInboundUpgradedConnection') - - maConn.log('successfully upgraded inbound connection') - - return this._createConnection({ - cryptoProtocol, - direction: 'inbound', - maConn, - upgradedConn, - muxerFactory, - remotePeer, - limits: opts?.limits - }) + return await this._performUpgrade(maConn, 'inbound', opts) } finally { - signal.removeEventListener('abort', onAbort) - this.components.connectionManager.afterUpgradeInbound() } } @@ -255,36 +192,43 @@ export class DefaultUpgrader implements Upgrader { /** * Upgrades an outbound connection */ - async upgradeOutbound (maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise { + async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { const idStr = maConn.remoteAddr.getPeerId() let remotePeerId: PeerId | undefined if (idStr != null) { remotePeerId = peerIdFromString(idStr) - - await this.shouldBlockConnection(remotePeerId, maConn, 'denyOutboundConnection') + await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn) } + return this._performUpgrade(maConn, 'outbound', opts) + } + + private async _performUpgrade (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', opts: UpgraderOptions): Promise { let encryptedConn: MultiaddrConnection let remotePeer: PeerId let upgradedConn: MultiaddrConnection + let muxerFactory: StreamMuxerFactory | undefined let cryptoProtocol - let muxerFactory - this.components.metrics?.trackMultiaddrConnection(maConn) + const upgradeTimeoutSignal = AbortSignal.timeout(direction === 'inbound' ? this.inboundUpgradeTimeout : this.outboundUpgradeTimeout) + const signal = anySignal([upgradeTimeoutSignal, opts.signal]) + setMaxListeners(Infinity, upgradeTimeoutSignal, signal) + opts.signal = signal - maConn.log('starting the outbound connection upgrade') + this.components.metrics?.trackMultiaddrConnection(maConn) - // If the transport natively supports encryption, skip connection - // protector and encryption + maConn.log('starting the %s connection upgrade', direction) // Protect let protectedConn = maConn + if (opts?.skipProtection !== true) { const protector = this.components.connectionProtector if (protector != null) { - protectedConn = await protector.protect(maConn) + maConn.log('protecting the %s connection', direction) + protectedConn = await protector.protect(maConn, opts) } } @@ -292,26 +236,38 @@ export class DefaultUpgrader implements Upgrader { // Encrypt the connection encryptedConn = protectedConn if (opts?.skipEncryption !== true) { + opts?.onProgress?.(new CustomProgressEvent(`upgrader:encrypt-${direction}-connection`)); + ({ conn: encryptedConn, remotePeer, protocol: cryptoProtocol - } = await this._encryptOutbound(protectedConn, { - ...opts, - remotePeer: remotePeerId - })) + } = await (direction === 'inbound' + ? this._encryptInbound(protectedConn, { + ...opts, + signal + }) + : this._encryptOutbound(protectedConn, { + ...opts, + signal + }) + )) const maConn: MultiaddrConnection = { ...protectedConn, ...encryptedConn } - await this.shouldBlockConnection(remotePeer, maConn, 'denyOutboundEncryptedConnection') + await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundEncryptedConnection' : 'denyOutboundEncryptedConnection', remotePeer, maConn) } else { - if (remotePeerId == null) { - throw new InvalidPeerIdError('Encryption was skipped but no peer id was passed') + const idStr = maConn.remoteAddr.getPeerId() + + if (idStr == null) { + throw new InvalidMultiaddrError(`${direction} connection that skipped encryption must have a peer id`) } + const remotePeerId = peerIdFromString(idStr) + cryptoProtocol = 'native' remotePeer = remotePeerId } @@ -320,27 +276,33 @@ export class DefaultUpgrader implements Upgrader { if (opts?.muxerFactory != null) { muxerFactory = opts.muxerFactory } else if (this.streamMuxers.size > 0) { + opts?.onProgress?.(new CustomProgressEvent(`upgrader:multiplex-${direction}-connection`)) + // Multiplex the connection - const multiplexed = await this._multiplexOutbound({ - ...protectedConn, - ...encryptedConn - }, this.streamMuxers) + const multiplexed = await (direction === 'inbound' + ? this._multiplexInbound({ + ...protectedConn, + ...encryptedConn + }, this.streamMuxers, opts) + : this._multiplexOutbound({ + ...protectedConn, + ...encryptedConn + }, this.streamMuxers, opts)) muxerFactory = multiplexed.muxerFactory upgradedConn = multiplexed.stream } } catch (err: any) { - maConn.log.error('failed to upgrade outbound connection', err) - await maConn.close(err) + maConn.log.error('failed to upgrade inbound connection', err) throw err } - await this.shouldBlockConnection(remotePeer, maConn, 'denyOutboundUpgradedConnection') + await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn) - maConn.log('successfully upgraded outbound connection') + maConn.log('successfully %s inbound connection', direction) return this._createConnection({ cryptoProtocol, - direction: 'outbound', + direction, maConn, upgradedConn, muxerFactory, @@ -380,7 +342,11 @@ export class DefaultUpgrader implements Upgrader { void Promise.resolve() .then(async () => { const protocols = this.components.registrar.getProtocols() + const signal = AbortSignal.timeout(this.inboundStreamProtocolNegotiationTimeout) + setMaxListeners(Infinity, signal) + const { stream, protocol } = await mss.handle(muxedStream, protocols, { + signal, log: muxedStream.log, yieldBytes: false }) @@ -455,7 +421,7 @@ export class DefaultUpgrader implements Upgrader { if (options.signal == null) { muxedStream.log('no abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols) - const signal = AbortSignal.timeout(DEFAULT_PROTOCOL_SELECT_TIMEOUT) + const signal = AbortSignal.timeout(this.outboundStreamProtocolNegotiationTimeout) setMaxListeners(Infinity, signal) options = { @@ -632,6 +598,7 @@ export class DefaultUpgrader implements Upgrader { try { const { stream, protocol } = await mss.handle(connection, protocols, { + ...options, log: connection.log }) const encrypter = this.connectionEncrypters.get(protocol) @@ -656,7 +623,7 @@ export class DefaultUpgrader implements Upgrader { * Attempts to encrypt the given `connection` with the provided connection encrypters. * The first `ConnectionEncrypter` module to succeed will be used */ - async _encryptOutbound (connection: MultiaddrConnection, options?: SecureConnectionOptions): Promise { + async _encryptOutbound (connection: MultiaddrConnection, options: SecureConnectionOptions): Promise { const protocols = Array.from(this.connectionEncrypters.keys()) connection.log('selecting outbound crypto protocol', protocols) @@ -667,6 +634,7 @@ export class DefaultUpgrader implements Upgrader { stream, protocol } = await mss.select(connection, protocols, { + ...options, log: connection.log, yieldBytes: true }) @@ -693,7 +661,7 @@ export class DefaultUpgrader implements Upgrader { * Selects one of the given muxers via multistream-select. That * muxer will be used for all future streams on the connection. */ - async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: MultiaddrConnection, muxerFactory?: StreamMuxerFactory }> { + async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map, options: AbortOptions): Promise<{ stream: MultiaddrConnection, muxerFactory?: StreamMuxerFactory }> { const protocols = Array.from(muxers.keys()) connection.log('outbound selecting muxer %s', protocols) try { @@ -703,6 +671,7 @@ export class DefaultUpgrader implements Upgrader { stream, protocol } = await mss.select(connection, protocols, { + ...options, log: connection.log, yieldBytes: true }) @@ -721,11 +690,12 @@ export class DefaultUpgrader implements Upgrader { * Registers support for one of the given muxers via multistream-select. The * selected muxer will be used for all future streams on the connection. */ - async _multiplexInbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: MultiaddrConnection, muxerFactory?: StreamMuxerFactory }> { + async _multiplexInbound (connection: MultiaddrConnection, muxers: Map, options: AbortOptions): Promise<{ stream: MultiaddrConnection, muxerFactory?: StreamMuxerFactory }> { const protocols = Array.from(muxers.keys()) connection.log('inbound handling muxers %s', protocols) try { const { stream, protocol } = await mss.handle(connection, protocols, { + ...options, log: connection.log }) const muxerFactory = muxers.get(protocol) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index 39254c198b..3f4808ea1d 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -8,21 +8,9 @@ import { type NetConfig } from './utils.js' import type { TCPCreateListenerOptions } from './index.js' -import type { ComponentLogger, Logger, LoggerOptions, MultiaddrConnection, Connection, CounterGroup, MetricGroup, Metrics, Listener, ListenerEvents, Upgrader } from '@libp2p/interface' +import type { ComponentLogger, Logger, MultiaddrConnection, Connection, CounterGroup, MetricGroup, Metrics, Listener, ListenerEvents, Upgrader } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -/** - * Attempts to close the given maConn. If a failure occurs, it will be logged - */ -async function attemptClose (maConn: MultiaddrConnection, options: LoggerOptions): Promise { - try { - await maConn.close() - } catch (err: any) { - options.log.error('an error occurred closing the connection', err) - maConn.abort(err) - } -} - export interface CloseServerOnMaxConnectionsOpts { /** * Server listens once connection count is less than `listenBelow` @@ -205,69 +193,51 @@ export class TCPListener extends TypedEventEmitter implements Li this.log('new inbound connection %s', maConn.remoteAddr) - try { - this.context.upgrader.upgradeInbound(maConn) - .then((conn) => { - this.log('inbound connection upgraded %s', maConn.remoteAddr) - this.connections.add(maConn) - - socket.once('close', () => { - this.connections.delete(maConn) - - if ( - this.context.closeServerOnMaxConnections != null && - this.connections.size < this.context.closeServerOnMaxConnections.listenBelow - ) { - // The most likely case of error is if the port taken by this - // application is bound by another process during the time the - // server if closed. In that case there's not much we can do. - // resume() will be called again every time a connection is - // dropped, which acts as an eventual retry mechanism. - // onListenError allows the consumer act on this. - this.resume().catch(e => { - this.log.error('error attempting to listen server once connection count under limit', e) - this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) - }) - } - }) + this.context.upgrader.upgradeInbound(maConn) + .then((conn) => { + this.log('inbound connection upgraded %s', maConn.remoteAddr) + this.connections.add(maConn) - if (this.context.handler != null) { - this.context.handler(conn) - } + socket.once('close', () => { + this.connections.delete(maConn) if ( this.context.closeServerOnMaxConnections != null && - this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove + this.connections.size < this.context.closeServerOnMaxConnections.listenBelow ) { - this.pause(false).catch(e => { - this.log.error('error attempting to close server once connection count over limit', e) + // The most likely case of error is if the port taken by this + // application is bound by another process during the time the + // server if closed. In that case there's not much we can do. + // resume() will be called again every time a connection is + // dropped, which acts as an eventual retry mechanism. + // onListenError allows the consumer act on this. + this.resume().catch(e => { + this.log.error('error attempting to listen server once connection count under limit', e) + this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) }) } - - this.safeDispatchEvent('connection', { detail: conn }) }) - .catch(async err => { - this.log.error('inbound connection failed', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) - await attemptClose(maConn, { - log: this.log + if (this.context.handler != null) { + this.context.handler(conn) + } + + if ( + this.context.closeServerOnMaxConnections != null && + this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove + ) { + this.pause(false).catch(e => { + this.log.error('error attempting to close server once connection count over limit', e) }) - }) - .catch(err => { - this.log.error('closing inbound connection failed', err) - }) - } catch (err) { - this.log.error('inbound connection failed', err) + } - attemptClose(maConn, { - log: this.log + this.safeDispatchEvent('connection', { detail: conn }) + }) + .catch(async err => { + this.log.error('inbound connection upgrade failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) + maConn.abort(err) }) - .catch(err => { - this.log.error('closing inbound connection failed', err) - this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) - }) - } } getAddrs (): Multiaddr[] { diff --git a/packages/transport-tcp/src/tcp.ts b/packages/transport-tcp/src/tcp.ts index 5316940ab7..9b039988f0 100644 --- a/packages/transport-tcp/src/tcp.ts +++ b/packages/transport-tcp/src/tcp.ts @@ -89,28 +89,14 @@ export class TCP implements Transport { logger: this.components.logger }) - const onAbort = (): void => { - maConn.close().catch(err => { - this.log.error('Error closing maConn after abort', err) - }) + try { + this.log('new outbound connection %s', maConn.remoteAddr) + return await options.upgrader.upgradeOutbound(maConn, options) + } catch (err: any) { + this.log.error('error upgrading outbound connection', err) + maConn.abort(err) + throw err } - options.signal?.addEventListener('abort', onAbort, { once: true }) - - this.log('new outbound connection %s', maConn.remoteAddr) - const conn = await options.upgrader.upgradeOutbound(maConn) - this.log('outbound connection %s upgraded', maConn.remoteAddr) - - options.signal?.removeEventListener('abort', onAbort) - - if (options.signal?.aborted === true) { - conn.close().catch(err => { - this.log.error('Error closing conn after abort', err) - }) - - throw new AbortError() - } - - return conn } async _connect (ma: Multiaddr, options: TCPDialOptions): Promise { diff --git a/packages/transport-tcp/test/listen-dial.spec.ts b/packages/transport-tcp/test/listen-dial.spec.ts index b165267463..fe90b4eecd 100644 --- a/packages/transport-tcp/test/listen-dial.spec.ts +++ b/packages/transport-tcp/test/listen-dial.spec.ts @@ -1,6 +1,6 @@ import os from 'os' import path from 'path' -import { TypedEventEmitter } from '@libp2p/interface' +import { AbortError, TypedEventEmitter } from '@libp2p/interface' import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' @@ -343,12 +343,14 @@ describe('dial', () => { const maConnPromise = pDefer() // @ts-expect-error missing return value - upgrader.upgradeOutbound = async (maConn) => { + upgrader.upgradeOutbound = async (maConn, opts) => { maConnPromise.resolve(maConn) - // take a long time to give us time to abort the dial - await new Promise((resolve) => { - setTimeout(() => { resolve() }, 100) + // abort the upgrade if the signal aborts + await new Promise((resolve, reject) => { + opts?.signal?.addEventListener('abort', () => { + reject(new AbortError()) + }) }) } @@ -360,7 +362,9 @@ describe('dial', () => { const abortController = new AbortController() // abort once the upgrade process has started - void maConnPromise.promise.then(() => { abortController.abort() }) + void maConnPromise.promise.then(() => { + abortController.abort() + }) await expect(transport.dial(ma, { upgrader,