Skip to content

Commit

Permalink
Add docs and work on naming
Browse files Browse the repository at this point in the history
  • Loading branch information
ThetaSinner committed Dec 18, 2024
1 parent 93cbc34 commit 21cc5f3
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 36 deletions.
3 changes: 1 addition & 2 deletions crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::{DhtArc, K2Result, OpId, Timestamp};
use futures::future::BoxFuture;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;

/// An op with metadata.
Expand Down Expand Up @@ -105,7 +104,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
fn retrieve_slice_hashes(
&self,
arc: DhtArc,
) -> BoxFuture<'_, K2Result<HashMap<u64, bytes::Bytes>>>;
) -> BoxFuture<'_, K2Result<Vec<(u64, bytes::Bytes)>>>;
}

/// Trait-object version of kitsune2 op store.
Expand Down
11 changes: 3 additions & 8 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ impl Dht {
arc_set: &ArcSet,
store: DynOpStore,
) -> K2Result<DhtSnapshot> {
let (disc_top_hash, disc_boundary) = self
.partition
.full_time_slice_disc_top_hash(arc_set, store)
.await?;
let (disc_top_hash, disc_boundary) =
self.partition.disc_top_hash(arc_set, store).await?;

Ok(DhtSnapshot::Minimal {
disc_top_hash,
disc_boundary,
ring_top_hashes: self.partition.partial_time_rings(arc_set),
ring_top_hashes: self.partition.ring_top_hashes(arc_set),
})
}

Expand Down Expand Up @@ -137,9 +135,6 @@ impl Dht {
continue;
};

// TODO handle an empty list of missing slices here?
// if that's empty then we actually need to send the whole sector?

for missing_slice in missing_slices {
let Ok((start, end)) = self
.partition
Expand Down
30 changes: 22 additions & 8 deletions crates/dht/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,19 @@ impl PartitionedHashes {

// Query implementation
impl PartitionedHashes {
pub async fn full_time_slice_disc_top_hash(
/// Compute the disc top hash for the given arc set.
///
/// Considering the hash space as a circle, with time represented outwards from the center in
/// each sector. This function requests the top hash of each sector, over full time slices, and
/// then combines them into a single hash. It works around the circle from 0 and skips any
/// sectors that are not included in the arc set.
///
/// If there are no sectors included in the arc set, then an empty hash is returned.
///
/// Along with the disc top hash, the end timestamp of the last full time slice is returned.
/// This should be used when comparing disc top hash of one DHT model with that of another node
/// to ensure that both nodes are using a common reference point.
pub async fn disc_top_hash(
&self,
arc_set: &ArcSet,
store: DynOpStore,
Expand All @@ -234,8 +246,7 @@ impl PartitionedHashes {
continue;
}

let hash =
sector.combined_full_time_slice_hash(store.clone()).await?;
let hash = sector.full_time_slice_top_hash(store.clone()).await?;
if !hash.is_empty() {
combine_hashes(&mut combined, hash);
}
Expand All @@ -246,15 +257,15 @@ impl PartitionedHashes {
Ok((combined.freeze(), timestamp))
}

pub fn partial_time_rings(&self, arc_set: &ArcSet) -> Vec<bytes::Bytes> {
pub fn ring_top_hashes(&self, arc_set: &ArcSet) -> Vec<bytes::Bytes> {
let mut partials = Vec::with_capacity(arc_set.covered_sector_count());

for (sector_id, sector) in self.partitioned_hashes.iter().enumerate() {
if !arc_set.includes_sector_id(sector_id as u32) {
continue;
}

partials.push(sector.combined_partial_slice_hashes().peekable());
partials.push(sector.partial_slice_combined_hashes().peekable());
}

let mut out = Vec::new();
Expand Down Expand Up @@ -285,8 +296,7 @@ impl PartitionedHashes {
continue;
}

let hash =
sector.combined_full_time_slice_hash(store.clone()).await?;
let hash = sector.full_time_slice_top_hash(store.clone()).await?;
if !hash.is_empty() {
out.insert(sector_id as u32, hash);
}
Expand Down Expand Up @@ -318,7 +328,11 @@ impl PartitionedHashes {

out.insert(
sector_id as u32,
sector.full_time_slice_hashes(store.clone()).await?,
sector
.full_time_slice_hashes(store.clone())
.await?
.into_iter()
.collect(),
);
}

Expand Down
52 changes: 36 additions & 16 deletions crates/dht/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use crate::constant::UNIT_TIME;
use kitsune2_api::{
DhtArc, DynOpStore, K2Error, K2Result, StoredOp, Timestamp, UNIX_TIMESTAMP,
};
use std::collections::HashMap;
use std::time::Duration;

/// The partitioned time structure.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub struct PartitionedTime {
Expand Down Expand Up @@ -102,6 +102,10 @@ pub struct PartitionedTime {
arc_constraint: DhtArc,
}

/// A slice of recent time that has a combined hash of all the ops in that time slice.
///
/// This is used to represent a slice of recent time that is not yet ready to be stored as a full
/// time slice.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub struct PartialSlice {
Expand Down Expand Up @@ -350,40 +354,56 @@ impl PartitionedTime {

// Public, query methods
impl PartitionedTime {
pub async fn combined_full_time_slice_hash(
/// Compute a top hash over the full time slice combined hashes owned by this [PartitionedTime].
///
/// This method will fetch the hashes of all the full time slices within the arc constraint
/// for this [PartitionedTime]. Those are expected to be ordered by slice id, which implies
/// that they are ordered by time. It will then combine those hashes into a single hash.
pub async fn full_time_slice_top_hash(
&self,
store: DynOpStore,
) -> K2Result<bytes::Bytes> {
let hashes = store.retrieve_slice_hashes(self.arc_constraint).await?;
Ok(combine::combine_op_hashes(hashes.values().cloned()).freeze())
Ok(
combine::combine_op_hashes(
hashes.into_iter().map(|(_, hash)| hash),
)
.freeze(),
)
}

pub fn combined_partial_slice_hashes(
/// Get the combined hash of all the partial slices owned by this [PartitionedTime].
///
/// This method takes the current partial slices and returns their pre-computed hashes.
/// These are combined hashes over all the ops in each partial slice, ordered by time.
pub fn partial_slice_combined_hashes(
&self,
) -> impl Iterator<Item = bytes::Bytes> + use<'_> {
self.partial_slices
.iter()
.map(|partial| partial.hash.clone().freeze())
}

pub async fn full_time_slice_hash(
&self,
slice_id: u64,
store: DynOpStore,
) -> K2Result<bytes::Bytes> {
Ok(store
.retrieve_slice_hash(self.arc_constraint, slice_id)
.await?
.unwrap_or_else(bytes::Bytes::new))
}

/// Gets the combined hashes of all the full time slices owned by this [PartitionedTime].
///
/// This is a pass-through to the provided store, using the arc constraint of this [PartitionedTime].
pub async fn full_time_slice_hashes(
&self,
store: DynOpStore,
) -> K2Result<HashMap<u64, bytes::Bytes>> {
) -> K2Result<Vec<(u64, bytes::Bytes)>> {
store.retrieve_slice_hashes(self.arc_constraint).await
}

/// Get the combined hash of a partial slice by its slice id.
///
/// Note that the number of partial slices changes over time and the start point of the partial
/// slices moves. It is important that the slice id is only used to refer to a specific slice
/// at a specific point in time. This can be achieved by not calling [PartitionedTime::update]
/// while expecting the slice id to refer to the same slice.
///
/// # Errors
///
/// This method will return an error if the requested slice id is beyond the current partial slices.
pub fn partial_slice_hash(&self, slice_id: u32) -> K2Result<bytes::Bytes> {
let slice_id = slice_id as usize;
if slice_id > self.partial_slices.len() {
Expand Down
2 changes: 1 addition & 1 deletion crates/memory/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl OpStore for Kitsune2MemoryOpStore {
fn retrieve_slice_hashes(
&self,
arc: DhtArc,
) -> BoxFuture<'_, K2Result<HashMap<u64, bytes::Bytes>>> {
) -> BoxFuture<'_, K2Result<Vec<(u64, bytes::Bytes)>>> {
async move {
let self_lock = self.read().await;
Ok(self_lock.time_slice_hashes.get_all(&arc))
Expand Down
2 changes: 1 addition & 1 deletion crates/memory/src/op_store/time_slice_hash_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl TimeSliceHashStore {
.cloned()
}

pub(super) fn get_all(&self, arc: &DhtArc) -> HashMap<u64, bytes::Bytes> {
pub(super) fn get_all(&self, arc: &DhtArc) -> Vec<(u64, bytes::Bytes)> {
self.inner
.get(arc)
.map(|by_arc| {
Expand Down

0 comments on commit 21cc5f3

Please sign in to comment.