From 89328614579a7270c09729dcfc20e3470529eaa3 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 25 Jan 2024 16:38:12 +0200 Subject: [PATCH] fix: Do not reset peer state on new established connection libp2p may establish multiple connections at the same time to a peer and we shouldn't reset the state. --- src/client.rs | 34 +++++++++++++++++++++++++--------- src/lib.rs | 10 +++++++++- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1bb4770..43d2d39 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{hash_map, VecDeque}; use std::fmt; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -81,6 +81,7 @@ where #[derive(Debug)] struct PeerState { + established_connections_num: usize, sending: Arc>, wantlist: WantlistState, send_full: bool, @@ -124,14 +125,19 @@ where } pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler { - self.peers.insert( - peer, - PeerState { - sending: Arc::new(Mutex::new(SendingState::Ready)), - wantlist: WantlistState::new(), - send_full: true, - }, - ); + match self.peers.entry(peer) { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().established_connections_num += 1; + } + hash_map::Entry::Vacant(entry) => { + entry.insert(PeerState { + established_connections_num: 1, + sending: Arc::new(Mutex::new(SendingState::Ready)), + wantlist: WantlistState::new(), + send_full: true, + }); + } + } ClientConnectionHandler { protocol: self.protocol.clone(), @@ -143,6 +149,16 @@ where } } + pub(crate) fn on_connection_closed(&mut self, peer: PeerId) { + if let hash_map::Entry::Occupied(mut entry) = self.peers.entry(peer) { + entry.get_mut().established_connections_num -= 1; + + if entry.get_mut().established_connections_num == 0 { + entry.remove(); + } + } + } + fn next_query_id(&mut self) -> BitswapQueryId { let id = BitswapQueryId(self.next_query_id); self.next_query_id += 1; diff --git a/src/lib.rs b/src/lib.rs index f5aeb9b..88d3784 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use blockstore::{Blockstore, BlockstoreError}; use cid::CidGeneric; use client::SendingState; use futures::{stream::SelectAll, StreamExt}; +use libp2p::swarm::ConnectionClosed; use libp2p::{ core::{upgrade::ReadyUpgrade, Endpoint}, swarm::{ @@ -127,7 +128,14 @@ where }) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { + self.client.on_connection_closed(peer_id); + } + _ => {} + } + } fn on_connection_handler_event( &mut self,