Skip to content

Commit

Permalink
feat: Send full wantlist every 30 seconds
Browse files Browse the repository at this point in the history
This behavior is part boxo/bitswap. It is not in the spec but it
doesn't violate it.
  • Loading branch information
oblique committed Jan 19, 2024
1 parent 95a90d7 commit 10d2ede
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 17 deletions.
92 changes: 92 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = "1.5.0"
cid = "0.11.0"
fnv = "1.0.7"
futures = "0.3.30"
futures-timer = "3.0.2"
libp2p = "0.53.2"
multihash = "0.19.1"
multihash-codetable = "0.1.1"
Expand All @@ -25,3 +26,6 @@ void = "1.0.2"
hex = "0.4.3"
multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] }
tokio = { version = "1.35.1", features = ["rt", "macros", "time"] }

[features]
wasm-bindgen = ["futures-timer/wasm-bindgen"]
39 changes: 22 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::future::{AbortHandle, Abortable, BoxFuture};
use futures::stream::FuturesUnordered;
use futures::task::AtomicWaker;
use futures::{FutureExt, SinkExt, StreamExt};
use futures_timer::Delay;
use libp2p::swarm::NotifyHandler;
use libp2p::PeerId;
use libp2p::{
Expand Down Expand Up @@ -75,13 +76,14 @@ where
next_query_id: u64,
waker: Arc<AtomicWaker>,
multihasher: Arc<MultihasherTable<S>>,
send_full_timer: Delay,
}

#[derive(Debug)]
struct PeerState<const S: usize> {
sending: Arc<Mutex<SendingState>>,
wantlist: WantlistState<S>,
last_send_full_tm: Option<Instant>,
send_full: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -117,6 +119,7 @@ where
next_query_id: 0,
waker: Arc::new(AtomicWaker::new()),
multihasher,
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
})
}

Expand All @@ -126,7 +129,7 @@ where
PeerState {
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
last_send_full_tm: None,
send_full: true,
},
);

Expand Down Expand Up @@ -294,12 +297,7 @@ where
continue;
}
}
SendingState::Ready => match state.last_send_full_tm {
// Send full list if interval time is elapsed.
Some(tm) => tm.elapsed() >= SEND_FULL_INTERVAL,
// Send full list the first time.
None => true,
},
SendingState::Ready => state.send_full,
// State is poisoned, send full list to recover.
SendingState::Poisoned => true,
};
Expand All @@ -310,18 +308,15 @@ where
state.wantlist.generate_proto_update(&self.wantlist)
};

if wantlist.entries.is_empty() {
// Nothing to send
//
// TODO: What if the send_full is true? Shouldn't we send it to clear
// the wantlist? However we should do it once.
// Allow empty entries to be sent when send_full flag is set.
if send_full {
// Reset flag
state.send_full = false;
} else if wantlist.entries.is_empty() {
// No updates to be sent for this peer
continue;
}

if wantlist.full {
state.last_send_full_tm = Some(Instant::now());
}

self.queue.push_back(ToSwarm::NotifyHandler {
peer_id: peer.to_owned(),
handler: NotifyHandler::Any,
Expand All @@ -345,6 +340,16 @@ where
return Poll::Ready(ev);
}

if self.send_full_timer.poll_unpin(cx).is_ready() {
for state in self.peers.values_mut() {
state.send_full = true;
}

// Reset timer and loop again to get it registered
self.send_full_timer.reset(SEND_FULL_INTERVAL);
continue;
}

if let Poll::Ready(Some(task_result)) = self.tasks.poll_next_unpin(cx) {
match task_result {
// Blockstore already has the data so return them to the user
Expand Down

0 comments on commit 10d2ede

Please sign in to comment.