Skip to content

Commit

Permalink
Bind DHT model to a single store
Browse files Browse the repository at this point in the history
  • Loading branch information
ThetaSinner committed Jan 2, 2025
1 parent b4fa680 commit 01f402c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 83 deletions.
50 changes: 26 additions & 24 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod tests;
/// itself to determine if they are in sync and which regions to sync if they are not.
pub struct Dht {
partition: PartitionedHashes,
store: DynOpStore,
}

/// The next action to take after comparing two DHT snapshots.
Expand Down Expand Up @@ -67,9 +68,10 @@ impl Dht {
partition: PartitionedHashes::try_from_store(
14,
current_time,
store,
store.clone(),
)
.await?,
store,
})
}

Expand All @@ -86,12 +88,10 @@ impl Dht {
/// `current_time`.
///
/// See also [PartitionedHashes::update] and [PartitionedTime::update](crate::time::PartitionedTime::update).
pub async fn update(
&mut self,
current_time: Timestamp,
store: DynOpStore,
) -> K2Result<()> {
self.partition.update(store, current_time).await
pub async fn update(&mut self, current_time: Timestamp) -> K2Result<()> {
self.partition
.update(self.store.clone(), current_time)
.await
}

/// Inform the DHT model that some ops have been stored.
Expand All @@ -102,10 +102,11 @@ impl Dht {
/// See also [PartitionedHashes::inform_ops_stored] for more details.
pub async fn inform_ops_stored(
&mut self,
store: DynOpStore,
stored_ops: Vec<StoredOp>,
) -> K2Result<()> {
self.partition.inform_ops_stored(store, stored_ops).await
self.partition
.inform_ops_stored(self.store.clone(), stored_ops)
.await
}

/// Get a minimal snapshot of the DHT model.
Expand All @@ -123,14 +124,15 @@ impl Dht {
pub async fn snapshot_minimal(
&self,
arc_set: &ArcSet,
store: DynOpStore,
) -> K2Result<DhtSnapshot> {
if arc_set.covered_sector_count() == 0 {
return Err(K2Error::other("No arcs to snapshot"));
}

let (disc_top_hash, disc_boundary) =
self.partition.disc_top_hash(arc_set, store).await?;
let (disc_top_hash, disc_boundary) = self
.partition
.disc_top_hash(arc_set, self.store.clone())
.await?;

Ok(DhtSnapshot::Minimal {
disc_top_hash,
Expand Down Expand Up @@ -189,7 +191,6 @@ impl Dht {
their_snapshot: &DhtSnapshot,
our_previous_snapshot: Option<DhtSnapshot>,
arc_set: &ArcSet,
store: DynOpStore,
) -> K2Result<DhtSnapshotNextAction> {
if arc_set.covered_sector_count() == 0 {
return Err(K2Error::other("No arcs to snapshot"));
Expand All @@ -208,10 +209,10 @@ impl Dht {
// already computed snapshot.
let our_snapshot = match &their_snapshot {
DhtSnapshot::Minimal { .. } => {
self.snapshot_minimal(arc_set, store.clone()).await?
self.snapshot_minimal(arc_set).await?
}
DhtSnapshot::DiscSectors { .. } => {
self.snapshot_disc_sectors(arc_set, store.clone()).await?
self.snapshot_disc_sectors(arc_set).await?
}
DhtSnapshot::DiscSectorDetails {
disc_sector_hashes, ..
Expand All @@ -223,7 +224,7 @@ impl Dht {
.snapshot_disc_sector_details(
disc_sector_hashes.keys().cloned().collect(),
arc_set,
store.clone(),
self.store.clone(),
)
.await?;

Expand All @@ -241,7 +242,7 @@ impl Dht {
self.snapshot_disc_sector_details(
disc_sector_hashes.keys().cloned().collect(),
arc_set,
store.clone(),
self.store.clone(),
)
.await?
}
Expand Down Expand Up @@ -287,15 +288,15 @@ impl Dht {
}
SnapshotDiff::DiscMismatch => {
Ok(DhtSnapshotNextAction::NewSnapshot(
self.snapshot_disc_sectors(arc_set, store).await?,
self.snapshot_disc_sectors(arc_set).await?,
))
}
SnapshotDiff::DiscSectorMismatches(mismatched_sectors) => {
Ok(DhtSnapshotNextAction::NewSnapshot(
self.snapshot_disc_sector_details(
mismatched_sectors,
arc_set,
store,
self.store.clone(),
)
.await?,
))
Expand Down Expand Up @@ -328,7 +329,7 @@ impl Dht {
};

out.extend(
store
self.store
.retrieve_op_hashes_in_time_slice(
arc, start, end,
)
Expand Down Expand Up @@ -382,7 +383,7 @@ impl Dht {
};

out.extend(
store
self.store
.retrieve_op_hashes_in_time_slice(
arc, start, end,
)
Expand All @@ -406,10 +407,11 @@ impl Dht {
async fn snapshot_disc_sectors(
&self,
arc_set: &ArcSet,
store: DynOpStore,
) -> K2Result<DhtSnapshot> {
let (disc_sector_top_hashes, disc_boundary) =
self.partition.disc_sector_hashes(arc_set, store).await?;
let (disc_sector_top_hashes, disc_boundary) = self
.partition
.disc_sector_hashes(arc_set, self.store.clone())
.await?;

Ok(DhtSnapshot::DiscSectors {
disc_sector_top_hashes,
Expand Down
9 changes: 2 additions & 7 deletions crates/dht/src/dht/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn take_minimal_snapshot() {

let arc_set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap();

let snapshot = dht.snapshot_minimal(&arc_set, store.clone()).await.unwrap();
let snapshot = dht.snapshot_minimal(&arc_set).await.unwrap();
match snapshot {
DhtSnapshot::Minimal {
disc_boundary,
Expand All @@ -58,10 +58,7 @@ async fn cannot_take_minimal_snapshot_with_empty_arc_set() {

let err = dht1
.dht
.snapshot_minimal(
&ArcSet::new(SECTOR_SIZE, vec![dht1.arc]).unwrap(),
dht1.store.clone(),
)
.snapshot_minimal(&ArcSet::new(SECTOR_SIZE, vec![dht1.arc]).unwrap())
.await
.unwrap_err();
assert_eq!("No arcs to snapshot (src: None)", err.to_string());
Expand All @@ -77,7 +74,6 @@ async fn cannot_handle_snapshot_with_empty_arc_set() {
.dht
.snapshot_minimal(
&ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap(),
dht1.store.clone(),
)
.await
.unwrap();
Expand All @@ -89,7 +85,6 @@ async fn cannot_handle_snapshot_with_empty_arc_set() {
&snapshot,
None,
&ArcSet::new(SECTOR_SIZE, vec![DhtArc::Empty]).unwrap(),
dht1.store.clone(),
)
.await
.unwrap_err();
Expand Down
63 changes: 11 additions & 52 deletions crates/dht/src/dht/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl DhtSyncHarness {

self.dht
.inform_ops_stored(
self.store.clone(),
op_list.into_iter().map(|op| op.into()).collect(),
)
.await?;
Expand Down Expand Up @@ -112,18 +111,10 @@ impl DhtSyncHarness {
let arc_set_1 = ArcSet::new(SECTOR_SIZE, vec![self.arc])?;
let arc_set_2 = ArcSet::new(SECTOR_SIZE, vec![other.arc])?;
let arc_set = arc_set_1.intersection(&arc_set_2);
let initial_snapshot = self
.dht
.snapshot_minimal(&arc_set, self.store.clone())
.await?;
let initial_snapshot = self.dht.snapshot_minimal(&arc_set).await?;
match other
.dht
.handle_snapshot(
&initial_snapshot,
None,
&arc_set,
other.store.clone(),
)
.handle_snapshot(&initial_snapshot, None, &arc_set)
.await?
{
DhtSnapshotNextAction::Identical => Ok(true),
Expand All @@ -140,20 +131,12 @@ impl DhtSyncHarness {
let arc_set = arc_set_1.intersection(&arc_set_2);

// Create the initial snapshot locally
let initial_snapshot = self
.dht
.snapshot_minimal(&arc_set, self.store.clone())
.await?;
let initial_snapshot = self.dht.snapshot_minimal(&arc_set).await?;

// Send it to the other agent and have them diff against it
let outcome = other
.dht
.handle_snapshot(
&initial_snapshot,
None,
&arc_set,
other.store.clone(),
)
.handle_snapshot(&initial_snapshot, None, &arc_set)
.await?;

match outcome {
Expand Down Expand Up @@ -209,10 +192,8 @@ impl DhtSyncHarness {

// We expect the sync to have been initiated by self, so the disc snapshot should be
// coming back to us
let outcome = self
.dht
.handle_snapshot(&snapshot, None, arc_set, self.store.clone())
.await?;
let outcome =
self.dht.handle_snapshot(&snapshot, None, arc_set).await?;

let our_details_snapshot = match outcome {
DhtSnapshotNextAction::NewSnapshot(new_snapshot) => new_snapshot,
Expand Down Expand Up @@ -240,12 +221,7 @@ impl DhtSyncHarness {
// Now we need to ask the other agent to diff against this details snapshot
let outcome = other
.dht
.handle_snapshot(
&our_details_snapshot,
None,
arc_set,
other.store.clone(),
)
.handle_snapshot(&our_details_snapshot, None, arc_set)
.await?;

let (snapshot, hash_list_from_other) = match outcome {
Expand Down Expand Up @@ -276,12 +252,7 @@ impl DhtSyncHarness {
// back our ops
let outcome = self
.dht
.handle_snapshot(
&snapshot,
Some(our_details_snapshot),
arc_set,
self.store.clone(),
)
.handle_snapshot(&snapshot, Some(our_details_snapshot), arc_set)
.await?;

let hash_list_from_self = match outcome {
Expand Down Expand Up @@ -333,12 +304,7 @@ impl DhtSyncHarness {
// have been sent to us
let outcome = self
.dht
.handle_snapshot(
&other_details_snapshot,
None,
arc_set,
self.store.clone(),
)
.handle_snapshot(&other_details_snapshot, None, arc_set)
.await?;

let (snapshot, hash_list_from_self) = match outcome {
Expand Down Expand Up @@ -367,12 +333,7 @@ impl DhtSyncHarness {
// produce a hash list for us
let outcome = other
.dht
.handle_snapshot(
&snapshot,
Some(other_details_snapshot),
arc_set,
other.store.clone(),
)
.handle_snapshot(&snapshot, Some(other_details_snapshot), arc_set)
.await?;

let hash_list_from_other = match outcome {
Expand Down Expand Up @@ -421,9 +382,7 @@ async fn transfer_ops(
.into_iter()
.map(|op| Kitsune2MemoryOp::try_from(op).unwrap().into())
.collect::<Vec<StoredOp>>();
target_dht
.inform_ops_stored(target.clone(), stored_ops)
.await?;
target_dht.inform_ops_stored(stored_ops).await?;

Ok(())
}

0 comments on commit 01f402c

Please sign in to comment.