From 21cc5f372d216deb4f2bfa617d3d1506b374b31d Mon Sep 17 00:00:00 2001 From: ThetaSinner Date: Wed, 18 Dec 2024 21:01:40 +0000 Subject: [PATCH] Add docs and work on naming --- crates/api/src/op_store.rs | 3 +- crates/dht/src/dht.rs | 11 ++-- crates/dht/src/hash.rs | 30 ++++++++--- crates/dht/src/time.rs | 52 +++++++++++++------ crates/memory/src/op_store.rs | 2 +- .../src/op_store/time_slice_hash_store.rs | 2 +- 6 files changed, 64 insertions(+), 36 deletions(-) diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index 112cb9e6..d2bd33eb 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -3,7 +3,6 @@ use crate::{DhtArc, K2Result, OpId, Timestamp}; use futures::future::BoxFuture; use std::cmp::Ordering; -use std::collections::HashMap; use std::sync::Arc; /// An op with metadata. @@ -105,7 +104,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { fn retrieve_slice_hashes( &self, arc: DhtArc, - ) -> BoxFuture<'_, K2Result>>; + ) -> BoxFuture<'_, K2Result>>; } /// Trait-object version of kitsune2 op store. diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 3b0170f8..d142d50a 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -61,15 +61,13 @@ impl Dht { arc_set: &ArcSet, store: DynOpStore, ) -> K2Result { - let (disc_top_hash, disc_boundary) = self - .partition - .full_time_slice_disc_top_hash(arc_set, store) - .await?; + let (disc_top_hash, disc_boundary) = + self.partition.disc_top_hash(arc_set, store).await?; Ok(DhtSnapshot::Minimal { disc_top_hash, disc_boundary, - ring_top_hashes: self.partition.partial_time_rings(arc_set), + ring_top_hashes: self.partition.ring_top_hashes(arc_set), }) } @@ -137,9 +135,6 @@ impl Dht { continue; }; - // TODO handle an empty list of missing slices here? - // if that's empty then we actually need to send the whole sector? - for missing_slice in missing_slices { let Ok((start, end)) = self .partition diff --git a/crates/dht/src/hash.rs b/crates/dht/src/hash.rs index b52f4296..8c3bc43c 100644 --- a/crates/dht/src/hash.rs +++ b/crates/dht/src/hash.rs @@ -223,7 +223,19 @@ impl PartitionedHashes { // Query implementation impl PartitionedHashes { - pub async fn full_time_slice_disc_top_hash( + /// Compute the disc top hash for the given arc set. + /// + /// Considering the hash space as a circle, with time represented outwards from the center in + /// each sector. This function requests the top hash of each sector, over full time slices, and + /// then combines them into a single hash. It works around the circle from 0 and skips any + /// sectors that are not included in the arc set. + /// + /// If there are no sectors included in the arc set, then an empty hash is returned. + /// + /// Along with the disc top hash, the end timestamp of the last full time slice is returned. + /// This should be used when comparing disc top hash of one DHT model with that of another node + /// to ensure that both nodes are using a common reference point. + pub async fn disc_top_hash( &self, arc_set: &ArcSet, store: DynOpStore, @@ -234,8 +246,7 @@ impl PartitionedHashes { continue; } - let hash = - sector.combined_full_time_slice_hash(store.clone()).await?; + let hash = sector.full_time_slice_top_hash(store.clone()).await?; if !hash.is_empty() { combine_hashes(&mut combined, hash); } @@ -246,7 +257,7 @@ impl PartitionedHashes { Ok((combined.freeze(), timestamp)) } - pub fn partial_time_rings(&self, arc_set: &ArcSet) -> Vec { + pub fn ring_top_hashes(&self, arc_set: &ArcSet) -> Vec { let mut partials = Vec::with_capacity(arc_set.covered_sector_count()); for (sector_id, sector) in self.partitioned_hashes.iter().enumerate() { @@ -254,7 +265,7 @@ impl PartitionedHashes { continue; } - partials.push(sector.combined_partial_slice_hashes().peekable()); + partials.push(sector.partial_slice_combined_hashes().peekable()); } let mut out = Vec::new(); @@ -285,8 +296,7 @@ impl PartitionedHashes { continue; } - let hash = - sector.combined_full_time_slice_hash(store.clone()).await?; + let hash = sector.full_time_slice_top_hash(store.clone()).await?; if !hash.is_empty() { out.insert(sector_id as u32, hash); } @@ -318,7 +328,11 @@ impl PartitionedHashes { out.insert( sector_id as u32, - sector.full_time_slice_hashes(store.clone()).await?, + sector + .full_time_slice_hashes(store.clone()) + .await? + .into_iter() + .collect(), ); } diff --git a/crates/dht/src/time.rs b/crates/dht/src/time.rs index 18eb426e..ff2dd061 100644 --- a/crates/dht/src/time.rs +++ b/crates/dht/src/time.rs @@ -58,9 +58,9 @@ use crate::constant::UNIT_TIME; use kitsune2_api::{ DhtArc, DynOpStore, K2Error, K2Result, StoredOp, Timestamp, UNIX_TIMESTAMP, }; -use std::collections::HashMap; use std::time::Duration; +/// The partitioned time structure. #[derive(Debug)] #[cfg_attr(test, derive(Clone, PartialEq))] pub struct PartitionedTime { @@ -102,6 +102,10 @@ pub struct PartitionedTime { arc_constraint: DhtArc, } +/// A slice of recent time that has a combined hash of all the ops in that time slice. +/// +/// This is used to represent a slice of recent time that is not yet ready to be stored as a full +/// time slice. #[derive(Debug)] #[cfg_attr(test, derive(Clone, PartialEq))] pub struct PartialSlice { @@ -350,15 +354,29 @@ impl PartitionedTime { // Public, query methods impl PartitionedTime { - pub async fn combined_full_time_slice_hash( + /// Compute a top hash over the full time slice combined hashes owned by this [PartitionedTime]. + /// + /// This method will fetch the hashes of all the full time slices within the arc constraint + /// for this [PartitionedTime]. Those are expected to be ordered by slice id, which implies + /// that they are ordered by time. It will then combine those hashes into a single hash. + pub async fn full_time_slice_top_hash( &self, store: DynOpStore, ) -> K2Result { let hashes = store.retrieve_slice_hashes(self.arc_constraint).await?; - Ok(combine::combine_op_hashes(hashes.values().cloned()).freeze()) + Ok( + combine::combine_op_hashes( + hashes.into_iter().map(|(_, hash)| hash), + ) + .freeze(), + ) } - pub fn combined_partial_slice_hashes( + /// Get the combined hash of all the partial slices owned by this [PartitionedTime]. + /// + /// This method takes the current partial slices and returns their pre-computed hashes. + /// These are combined hashes over all the ops in each partial slice, ordered by time. + pub fn partial_slice_combined_hashes( &self, ) -> impl Iterator + use<'_> { self.partial_slices @@ -366,24 +384,26 @@ impl PartitionedTime { .map(|partial| partial.hash.clone().freeze()) } - pub async fn full_time_slice_hash( - &self, - slice_id: u64, - store: DynOpStore, - ) -> K2Result { - Ok(store - .retrieve_slice_hash(self.arc_constraint, slice_id) - .await? - .unwrap_or_else(bytes::Bytes::new)) - } - + /// Gets the combined hashes of all the full time slices owned by this [PartitionedTime]. + /// + /// This is a pass-through to the provided store, using the arc constraint of this [PartitionedTime]. pub async fn full_time_slice_hashes( &self, store: DynOpStore, - ) -> K2Result> { + ) -> K2Result> { store.retrieve_slice_hashes(self.arc_constraint).await } + /// Get the combined hash of a partial slice by its slice id. + /// + /// Note that the number of partial slices changes over time and the start point of the partial + /// slices moves. It is important that the slice id is only used to refer to a specific slice + /// at a specific point in time. This can be achieved by not calling [PartitionedTime::update] + /// while expecting the slice id to refer to the same slice. + /// + /// # Errors + /// + /// This method will return an error if the requested slice id is beyond the current partial slices. pub fn partial_slice_hash(&self, slice_id: u32) -> K2Result { let slice_id = slice_id as usize; if slice_id > self.partial_slices.len() { diff --git a/crates/memory/src/op_store.rs b/crates/memory/src/op_store.rs index 4175529c..5f077793 100644 --- a/crates/memory/src/op_store.rs +++ b/crates/memory/src/op_store.rs @@ -217,7 +217,7 @@ impl OpStore for Kitsune2MemoryOpStore { fn retrieve_slice_hashes( &self, arc: DhtArc, - ) -> BoxFuture<'_, K2Result>> { + ) -> BoxFuture<'_, K2Result>> { async move { let self_lock = self.read().await; Ok(self_lock.time_slice_hashes.get_all(&arc)) diff --git a/crates/memory/src/op_store/time_slice_hash_store.rs b/crates/memory/src/op_store/time_slice_hash_store.rs index b74b2f55..f8d966c4 100644 --- a/crates/memory/src/op_store/time_slice_hash_store.rs +++ b/crates/memory/src/op_store/time_slice_hash_store.rs @@ -47,7 +47,7 @@ impl TimeSliceHashStore { .cloned() } - pub(super) fn get_all(&self, arc: &DhtArc) -> HashMap { + pub(super) fn get_all(&self, arc: &DhtArc) -> Vec<(u64, bytes::Bytes)> { self.inner .get(arc) .map(|by_arc| {