Skip to content

Commit

Permalink
fix: Do not reset peer state on new established connection
Browse files Browse the repository at this point in the history
libp2p may establish multiple connections at the same time
to a peer and we shouldn't reset the state.
  • Loading branch information
oblique committed Jan 25, 2024
1 parent 4b57f45 commit 8932861
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
34 changes: 25 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{hash_map, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
Expand Down Expand Up @@ -81,6 +81,7 @@ where

#[derive(Debug)]
struct PeerState<const S: usize> {
established_connections_num: usize,
sending: Arc<Mutex<SendingState>>,
wantlist: WantlistState<S>,
send_full: bool,
Expand Down Expand Up @@ -124,14 +125,19 @@ where
}

pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler<S> {
self.peers.insert(
peer,
PeerState {
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
send_full: true,
},
);
match self.peers.entry(peer) {
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().established_connections_num += 1;
}
hash_map::Entry::Vacant(entry) => {
entry.insert(PeerState {
established_connections_num: 1,
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
send_full: true,
});
}
}

ClientConnectionHandler {
protocol: self.protocol.clone(),
Expand All @@ -143,6 +149,16 @@ where
}
}

pub(crate) fn on_connection_closed(&mut self, peer: PeerId) {
if let hash_map::Entry::Occupied(mut entry) = self.peers.entry(peer) {
entry.get_mut().established_connections_num -= 1;

if entry.get_mut().established_connections_num == 0 {
entry.remove();
}
}
}

fn next_query_id(&mut self) -> BitswapQueryId {
let id = BitswapQueryId(self.next_query_id);
self.next_query_id += 1;
Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use blockstore::{Blockstore, BlockstoreError};
use cid::CidGeneric;
use client::SendingState;
use futures::{stream::SelectAll, StreamExt};
use libp2p::swarm::ConnectionClosed;
use libp2p::{
core::{upgrade::ReadyUpgrade, Endpoint},
swarm::{
Expand Down Expand Up @@ -127,7 +128,14 @@ where
})
}

fn on_swarm_event(&mut self, _event: FromSwarm) {}
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
self.client.on_connection_closed(peer_id);
}
_ => {}
}
}

fn on_connection_handler_event(
&mut self,
Expand Down

0 comments on commit 8932861

Please sign in to comment.