diff --git a/Cargo.lock b/Cargo.lock index 82694cf4..951fa9e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1029,6 +1029,7 @@ dependencies = [ "kitsune2_api", "kitsune2_memory", "kitsune2_test_utils", + "rand", "tokio", "tracing", ] diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index 9c9849c8..d2bd33eb 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -70,6 +70,14 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { end: Timestamp, ) -> BoxFuture<'_, K2Result>>; + /// Retrieve a list of ops by their op ids. + /// + /// This should be used to get op data for ops. + fn retrieve_ops( + &self, + op_ids: Vec, + ) -> BoxFuture<'_, K2Result>>; + /// Store the combined hash of a time slice. fn store_slice_hash( &self, @@ -91,6 +99,12 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { arc: DhtArc, slice_id: u64, ) -> BoxFuture<'_, K2Result>>; + + /// Retrieve all slice hashes for a given arc. + fn retrieve_slice_hashes( + &self, + arc: DhtArc, + ) -> BoxFuture<'_, K2Result>>; } /// Trait-object version of kitsune2 op store. diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index 4a919a22..f286741a 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -20,3 +20,4 @@ kitsune2_memory = { workspace = true } kitsune2_test_utils = { workspace = true } tokio = { workspace = true, features = ["macros", "rt"] } +rand = { workspace = true } diff --git a/crates/dht/src/arc_set.rs b/crates/dht/src/arc_set.rs new file mode 100644 index 00000000..c112ae4d --- /dev/null +++ b/crates/dht/src/arc_set.rs @@ -0,0 +1,320 @@ +//! Represents a set of [DhtArc]s. +//! +//! A set of [DhtArc]s is combined as a set union into an [ArcSet]. +//! +//! To restrict [crate::dht::Dht] operations to a specific set of sectors, the [ArcSet]s of two +//! DHTs can be intersected to find the common sectors, using [ArcSet::intersection]. + +use kitsune2_api::{DhtArc, K2Error, K2Result}; +use std::collections::HashSet; + +/// Represents a set of [DhtArc]s. +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct ArcSet { + inner: HashSet, +} + +impl ArcSet { + /// Create a new arc set from a list of arcs. + /// + /// The size parameter determines the size of each sector. When divided into U32::MAX + 1, the + /// resulting factor must be a power of 2. This is the same sizing logic found in + /// [PartitionedHashes::try_from_store](crate::hash::PartitionedHashes::try_from_store). + /// + /// The resulting arc set represents the union of the input arcs. + pub fn new(size: u32, arcs: Vec) -> K2Result { + let factor = u32::MAX / size + 1; + + // The original factor should have been a power of 2 + if factor == 0 || factor & (factor - 1) != 0 { + return Err(K2Error::other("Invalid size")); + } + + let mut inner = HashSet::new(); + for arc in arcs { + // If we have reached full arc then there's no need to keep going + if inner.len() == factor as usize { + break; + } + + match arc { + DhtArc::Empty => { + continue; + } + DhtArc::Arc(start, end) => { + let num_sectors_covered = if start > end { + let length = u32::MAX - start + end + 1; + length / size + 1 + } else { + (end - start) / size + 1 + }; + + let mut start = start; + for _ in 0..num_sectors_covered { + inner.insert(start / size); + start = start.overflowing_add(size).0; + } + + if start != end.overflowing_add(1).0 + && !(end == u32::MAX && start == 0) + { + return Err(K2Error::other(format!( + "Invalid arc, expected end at {} but arc specifies {}", + start, end + ))); + } + } + } + } + + Ok(ArcSet { inner }) + } + + /// Get the intersection of two arc sets as a new [ArcSet]. + /// + /// # Example + /// + /// ```rust + /// use kitsune2_api::DhtArc; + /// use kitsune2_dht::ArcSet; + /// + /// # fn main() -> kitsune2_api::K2Result<()> { + /// use tracing::Instrument; + /// let arc_size = 1 << 23; + /// let arc_1 = DhtArc::Arc(0, 2 * arc_size - 1); + /// let arc_set_1 = ArcSet::new(arc_size, vec![arc_1])?; + /// + /// let arc_2 = DhtArc::Arc(arc_size, 4 * arc_size - 1); + /// let arc_set_2 = ArcSet::new(arc_size, vec![arc_2])?; + /// + /// assert_eq!(1, arc_set_1.intersection(&arc_set_2).covered_sector_count()); + /// # Ok(()) + /// # } + /// ``` + pub fn intersection(&self, other: &Self) -> Self { + ArcSet { + inner: self.inner.intersection(&other.inner).copied().collect(), + } + } + + /// The number of sectors covered by this arc set. + /// + /// # Example + /// + /// ```rust + /// use kitsune2_api::DhtArc; + /// use kitsune2_dht::ArcSet; + /// + /// # fn main() -> kitsune2_api::K2Result<()> { + /// let arc_size = 1 << 23; + /// let arc_1 = DhtArc::Arc(0, 2 * arc_size - 1); + /// let arc_2 = DhtArc::Arc(2 * arc_size, 4 * arc_size - 1); + /// let arc_set = ArcSet::new(arc_size, vec![arc_1, arc_2])?; + /// + /// assert_eq!(4, arc_set.covered_sector_count()); + /// # Ok(()) + /// # } + /// ``` + pub fn covered_sector_count(&self) -> usize { + self.inner.len() + } + + pub(crate) fn includes_sector_index(&self, value: u32) -> bool { + self.inner.contains(&value) + } +} + +#[cfg(test)] +mod test { + use super::*; + + const SECTOR_SIZE: u32 = 1u32 << 23; + + #[test] + fn new_with_no_arcs() { + let set = ArcSet::new(SECTOR_SIZE, vec![]).unwrap(); + + assert!(set.inner.is_empty()); + } + + #[test] + fn new_with_full_arc() { + let set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap(); + + // Sufficient to check that all the right values are included + assert_eq!(512, set.inner.len()); + assert_eq!(511, *set.inner.iter().max().unwrap()); + } + + #[test] + fn new_with_two_arcs() { + let set = ArcSet::new( + SECTOR_SIZE, + vec![ + DhtArc::Arc(0, 255 * SECTOR_SIZE - 1), + DhtArc::Arc(255 * SECTOR_SIZE, u32::MAX), + ], + ) + .unwrap(); + + // Should become a full arc + assert_eq!(512, set.inner.len()); + assert_eq!(511, *set.inner.iter().max().unwrap()); + } + + #[test] + fn overlapping_arcs() { + let set = ArcSet::new( + SECTOR_SIZE, + vec![ + DhtArc::Arc(0, 3 * SECTOR_SIZE - 1), + DhtArc::Arc(2 * SECTOR_SIZE, 4 * SECTOR_SIZE - 1), + ], + ) + .unwrap(); + + assert_eq!(4, set.inner.len()); + assert_eq!(3, *set.inner.iter().max().unwrap()); + } + + #[test] + fn wrapping_arc() { + let set = ArcSet::new( + SECTOR_SIZE, + vec![DhtArc::Arc(510 * SECTOR_SIZE, 3 * SECTOR_SIZE - 1)], + ) + .unwrap(); + + assert_eq!(5, set.inner.len(), "Set is {:?}", set.inner); + assert_eq!( + set.inner.len(), + set.inner + .intersection(&vec![510, 511, 0, 1, 2].into_iter().collect()) + .count() + ); + } + + #[test] + fn overlapping_wrapping_arcs() { + let set = ArcSet::new( + SECTOR_SIZE, + vec![ + DhtArc::Arc(510 * SECTOR_SIZE, 3 * SECTOR_SIZE - 1), + DhtArc::Arc(2 * SECTOR_SIZE, 4 * SECTOR_SIZE - 1), + ], + ) + .unwrap(); + + assert_eq!(6, set.inner.len(), "Set is {:?}", set.inner); + assert_eq!( + set.inner.len(), + set.inner + .intersection(&vec![510, 511, 0, 1, 2, 3].into_iter().collect()) + .count() + ); + } + + #[test] + fn arc_not_on_boundaries() { + let set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::Arc(0, 50)]); + + assert!(set.is_err()); + assert_eq!( + "Invalid arc, expected end at 8388608 but arc specifies 50 (src: None)", + set.unwrap_err().to_string() + ); + } + + #[test] + fn valid_and_invalid_arcs() { + let set = ArcSet::new( + SECTOR_SIZE, + vec![ + DhtArc::Arc(0, SECTOR_SIZE - 1), + DhtArc::Arc(u32::MAX, u32::MAX), + ], + ); + + assert!(set.is_err()); + assert_eq!( + "Invalid arc, expected end at 8388607 but arc specifies 4294967295 (src: None)", + set.unwrap_err().to_string() + ); + } + + #[test] + fn intersect_non_overlapping_sets() { + let set1 = + ArcSet::new(SECTOR_SIZE, vec![DhtArc::Arc(0, SECTOR_SIZE - 1)]) + .unwrap(); + let set2 = ArcSet::new( + SECTOR_SIZE, + vec![DhtArc::Arc(2 * SECTOR_SIZE, 3 * SECTOR_SIZE - 1)], + ) + .unwrap(); + + let intersection = set1.intersection(&set2); + + assert!(intersection.inner.is_empty()); + } + + #[test] + fn intersect_overlapping_by_one() { + let set1 = + ArcSet::new(SECTOR_SIZE, vec![DhtArc::Arc(0, 2 * SECTOR_SIZE - 1)]) + .unwrap(); + let set2 = ArcSet::new( + SECTOR_SIZE, + vec![DhtArc::Arc(SECTOR_SIZE, 3 * SECTOR_SIZE - 1)], + ) + .unwrap(); + + let intersection = set1.intersection(&set2); + + assert_eq!(1, intersection.inner.len()); + assert!(intersection.inner.contains(&1)); + } + + #[test] + fn intersect_overlapping_by_multiple() { + let set1 = ArcSet::new( + SECTOR_SIZE, + vec![DhtArc::Arc(0, 10 * SECTOR_SIZE - 1)], + ) + .unwrap(); + let set2 = ArcSet::new( + SECTOR_SIZE, + vec![DhtArc::Arc(SECTOR_SIZE, 3 * SECTOR_SIZE - 1)], + ) + .unwrap(); + + let intersection = set1.intersection(&set2); + + assert_eq!(2, intersection.inner.len()); + assert!(intersection.inner.contains(&1)); + assert!(intersection.inner.contains(&2)); + } + + #[test] + fn preserves_full_arc() { + let full_set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap(); + assert_eq!( + full_set, + full_set.intersection( + &ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap() + ) + ); + } + + #[test] + fn preserves_empty() { + let empty_set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::Empty]).unwrap(); + assert_eq!( + empty_set, + empty_set.intersection( + &ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap() + ) + ); + } +} diff --git a/crates/dht/src/combine.rs b/crates/dht/src/combine.rs new file mode 100644 index 00000000..f997b693 --- /dev/null +++ b/crates/dht/src/combine.rs @@ -0,0 +1,47 @@ +/// Combine a series of op hashes into a single hash. +/// +/// Requires that the op hashes are already ordered. +/// If the input is empty, then the output is an empty byte array. +pub fn combine_op_hashes< + T: IntoIterator, + I: Clone + Into, +>( + hashes: T, +) -> bytes::BytesMut { + let mut hashes = hashes.into_iter().peekable(); + let mut out = if let Some(first) = hashes.peek() { + bytes::BytesMut::zeroed(first.clone().into().len()) + } else { + // `Bytes::new` does not allocate, so if there was no input, then return an empty + // byte array without allocating. + return bytes::BytesMut::new(); + }; + + for hash in hashes { + combine_hashes(&mut out, hash.into()); + } + + out +} + +/// Combine a hash into an existing, mutable hash. +/// +/// This is intended to be used for incrementally updating an existing combined hash. That makes it +/// an alternative to [combine_op_hashes] when it is possible to avoid loading all the existing +/// hashes and computing a new combined hash that includes this incoming hash. +pub fn combine_hashes(into: &mut bytes::BytesMut, other: bytes::Bytes) { + // Properly initialise the target from the source if the target is empty. + // Otherwise, the loop below would run 0 times. + if into.is_empty() && !other.is_empty() { + into.extend_from_slice(&other); + return; + } + + if into.len() != other.len() { + panic!("Refusing to combine hashes of different lengths ({} != {}), this data should have been rejected by the host", into.len(), other.len()); + } + + for (into_byte, other_byte) in into.iter_mut().zip(other.iter()) { + *into_byte ^= other_byte; + } +} diff --git a/crates/dht/src/constant.rs b/crates/dht/src/constant.rs index 83127e9c..ede7f5cc 100644 --- a/crates/dht/src/constant.rs +++ b/crates/dht/src/constant.rs @@ -1,3 +1,6 @@ +//! Constants used in the DHT model. + use std::time::Duration; +/// The smallest unit of time in the DHT. pub const UNIT_TIME: Duration = Duration::from_secs(15 * 60); // 15 minutes diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs new file mode 100644 index 00000000..2472b877 --- /dev/null +++ b/crates/dht/src/dht.rs @@ -0,0 +1,456 @@ +//! Top-level DHT model. +//! +//! This module is largely implemented in terms of the [PartitionedHashes] and [PartitionedTime](crate::time::PartitionedTime) +//! types. It combines these types into a single model that can be used to track the state of a +//! distributed hash table. +//! +//! What this adds on top of the inner types is the ability to compare two DHT models and determine +//! a set of op hashes that may need to be fetched from one model to the other to bring them into +//! sync. The comparison process is symmetric, meaning that both parties will end up with the same +//! list of op hashes to fetch regardless of who initiated the comparison. Comparison is initiated +//! using the [Dht::snapshot_minimal] method which produces a minimal snapshot of the DHT model. +//! +//! The set of op hashes to fetch is unlikely to be the exact ops that are missing but rather a +//! tradeoff between the number of steps required to determine the missing ops and the number of op +//! hashes that have to be sent. + +use crate::arc_set::ArcSet; +use crate::PartitionedHashes; +use kitsune2_api::{DynOpStore, K2Error, K2Result, OpId, StoredOp, Timestamp}; +use snapshot::{DhtSnapshot, SnapshotDiff}; + +pub mod snapshot; +#[cfg(test)] +mod tests; + +/// The top-level DHT model. +/// +/// Represents a distributed hash table (DHT) model that can be compared with other instances of +/// itself to determine if they are in sync and which regions to sync if they are not. +pub struct Dht { + partition: PartitionedHashes, + store: DynOpStore, +} + +/// The next action to take after comparing two DHT snapshots. +#[derive(Debug)] +pub enum DhtSnapshotNextAction { + /// No further action, these DHTs are in sync. + Identical, + /// The two DHT snapshots cannot be compared. + /// + /// This can happen if the time slices of the two DHTs are not aligned or one side is following + /// a different comparison flow to what we're expecting. + CannotCompare, + /// The two DHT snapshots are different, and we need to drill down to the next level of detail. + NewSnapshot(DhtSnapshot), + /// The two DHT snapshots are different, and we have drilled down to the most detailed level. + /// + /// The yielded op hashes should be checked by the other party and any missing ops should be + /// fetched from us. + NewSnapshotAndHashList(DhtSnapshot, Vec), + /// The two DHT snapshots are different, and we have drilled down to the most detailed level. + /// + /// This is the final step in the comparison process. The yielded op hashes should be fetched + /// from the other party. No further snapshots are required for this comparison. + HashList(Vec), +} + +impl Dht { + /// Create a new DHT from an op store. + /// + /// Creates the inner [PartitionedHashes] using the store. + pub async fn try_from_store( + current_time: Timestamp, + store: DynOpStore, + ) -> K2Result { + Ok(Dht { + partition: PartitionedHashes::try_from_store( + 14, + current_time, + store.clone(), + ) + .await?, + store, + }) + } + + /// Get the next time at which the DHT model should be updated. + /// + /// When this time is reached, [Dht::update] should be called. + pub fn next_update_at(&self) -> Timestamp { + self.partition.next_update_at() + } + + /// Update the DHT model. + /// + /// This recomputes the full and partial time slices to be accurate for the provided + /// `current_time`. + /// + /// See also [PartitionedHashes::update] and [PartitionedTime::update](crate::time::PartitionedTime::update). + pub async fn update(&mut self, current_time: Timestamp) -> K2Result<()> { + self.partition + .update(self.store.clone(), current_time) + .await + } + + /// Inform the DHT model that some ops have been stored. + /// + /// This will figure out where the incoming ops belong in the DHT model based on their hash + /// and timestamp. + /// + /// See also [PartitionedHashes::inform_ops_stored] for more details. + pub async fn inform_ops_stored( + &mut self, + stored_ops: Vec, + ) -> K2Result<()> { + self.partition + .inform_ops_stored(self.store.clone(), stored_ops) + .await + } + + /// Get a minimal snapshot of the DHT model. + /// + /// This is the entry point for comparing state with another DHT model. A minimal snapshot may + /// be enough to check that two DHTs are in sync. The receiver should call [Dht::handle_snapshot] + /// which will determine if the two DHTs are in sync or if a more detailed snapshot is required. + /// + /// # Errors + /// + /// Returns an error if there are no arcs to snapshot. If there is no overlap between the arc + /// sets of two DHT models then there is no point in comparing them because it will always + /// yield an empty diff. The [ArcSet::covered_sector_count] should be checked before calling + /// this method. + pub async fn snapshot_minimal( + &self, + arc_set: &ArcSet, + ) -> K2Result { + if arc_set.covered_sector_count() == 0 { + return Err(K2Error::other("No arcs to snapshot")); + } + + let (disc_top_hash, disc_boundary) = self + .partition + .disc_top_hash(arc_set, self.store.clone()) + .await?; + + Ok(DhtSnapshot::Minimal { + disc_top_hash, + disc_boundary, + ring_top_hashes: self.partition.ring_top_hashes(arc_set), + }) + } + + /// Handle a snapshot from another DHT model. + /// + /// This is a two-step process. First the type of the incoming snapshot is checked and a + /// snapshot of the same type is computed. Secondly, the two snapshots are compared to determine + /// what action should be taken next. + /// + /// The state flow is as follows: + /// - If the two snapshots are identical, the function will return [DhtSnapshotNextAction::Identical]. + /// - If the two snapshots cannot be compared, the function will return [DhtSnapshotNextAction::CannotCompare]. + /// This can happen if the time slices of the two DHTs are not aligned or one side is + /// following a different flow to what we're expecting. + /// - If the snapshots are different, the function will return [DhtSnapshotNextAction::NewSnapshot] + /// with a more detailed snapshot of the DHT model. + /// - When the most detailed snapshot type is reached, the function will return [DhtSnapshotNextAction::NewSnapshotAndHashList] + /// - The new snapshot from [DhtSnapshotNextAction::NewSnapshotAndHashList] should be sent to + /// the other party so that they can compare it with their own snapshot and determine which op + /// hashes they need to fetch. + /// - On the final comparison step, the function will return [DhtSnapshotNextAction::HashList] + /// with a list of op hashes. This list should be sent to the other party so that they can + /// fetch any missing ops. + /// + /// Notice that the final step would require re-computing the most detailed snapshot type. This + /// is expensive. To avoid having to recompute a snapshot we've already computed, the caller + /// MUST capture the snapshot from [DhtSnapshotNextAction::NewSnapshot] when it contains either + /// [DhtSnapshot::DiscSectorDetails] or [DhtSnapshot::RingSectorDetails]. This snapshot can be + /// provided back to this function in the `our_previous_snapshot` parameter. In all other cases, + /// the caller should provide `None` for `our_previous_snapshot`. + /// + /// Note also that there are two possible routes through the comparison process. The first is + /// when the historical disc mismatches, the second is when the recent rings mismatch. The + /// historical disc mismatch is prioritised, so if a mismatch is detected there then the sync + /// process will resolve that. Otherwise, the recent rings mismatch will be resolved. That means + /// that it may take up to two rounds of sync to resolve all mismatches. Of course, both the + /// disc and the rings must be considered a moving target so it cannot be assumed that 2 rounds + /// are actually enough to resolve all mismatches. + /// + /// The `arc_set` parameter is used to determine which arcs are relevant to the DHT model. This + /// should be the [ArcSet::intersection] of the arc sets of the two DHT models to be compared. + /// + /// # Errors + /// + /// Returns an error if there are no arcs to snapshot. If there is no overlap between the arc + /// sets of two DHT models then there is no point in comparing them because it will always + /// yield an empty diff. The [ArcSet::covered_sector_count] should be checked before calling + /// this method. + pub async fn handle_snapshot( + &self, + their_snapshot: &DhtSnapshot, + our_previous_snapshot: Option, + arc_set: &ArcSet, + ) -> K2Result { + if arc_set.covered_sector_count() == 0 { + return Err(K2Error::other("No arcs to snapshot")); + } + + let is_final = matches!( + our_previous_snapshot, + Some( + DhtSnapshot::DiscSectorDetails { .. } + | DhtSnapshot::RingSectorDetails { .. } + ) + ); + + // Check what snapshot we've been sent and compute a matching snapshot. + // In the case where we've already produced a most details snapshot type, we can use the + // already computed snapshot. + let our_snapshot = match &their_snapshot { + DhtSnapshot::Minimal { .. } => { + self.snapshot_minimal(arc_set).await? + } + DhtSnapshot::DiscSectors { .. } => { + self.snapshot_disc_sectors(arc_set).await? + } + DhtSnapshot::DiscSectorDetails { + disc_sector_hashes, .. + } => match our_previous_snapshot { + Some(snapshot @ DhtSnapshot::DiscSectorDetails { .. }) => { + #[cfg(test)] + { + let would_have_used = self + .snapshot_disc_sector_details( + disc_sector_hashes.keys().cloned().collect(), + arc_set, + self.store.clone(), + ) + .await?; + + assert_eq!(would_have_used, snapshot); + } + + // There is no value in recomputing if we already have a matching snapshot. + // The disc sector details only requires a list of mismatched sectors which + // we already had when we computed the previous detailed snapshot. + // What we were missing previously was the detailed snapshot from the other + // party, which we now have and can use to produce a hash list. + snapshot + } + _ => { + self.snapshot_disc_sector_details( + disc_sector_hashes.keys().cloned().collect(), + arc_set, + self.store.clone(), + ) + .await? + } + }, + DhtSnapshot::RingSectorDetails { + ring_sector_hashes, .. + } => { + match our_previous_snapshot { + Some(snapshot @ DhtSnapshot::RingSectorDetails { .. }) => { + #[cfg(test)] + { + let would_have_used = self + .snapshot_ring_sector_details( + ring_sector_hashes + .keys() + .cloned() + .collect(), + arc_set, + )?; + + assert_eq!(would_have_used, snapshot); + } + + // No need to recompute, see the comment above for DiscSectorDetails + snapshot + } + _ => self.snapshot_ring_sector_details( + ring_sector_hashes.keys().cloned().collect(), + arc_set, + )?, + } + } + }; + + // Now compare the snapshots to determine what to do next. + // We will either send a more detailed snapshot back or a list of possible mismatched op + // hashes. In the case that we produce a most detailed snapshot type, we can send the list + // of op hashes at the same time. + match our_snapshot.compare(their_snapshot) { + SnapshotDiff::Identical => Ok(DhtSnapshotNextAction::Identical), + SnapshotDiff::CannotCompare => { + Ok(DhtSnapshotNextAction::CannotCompare) + } + SnapshotDiff::DiscMismatch => { + Ok(DhtSnapshotNextAction::NewSnapshot( + self.snapshot_disc_sectors(arc_set).await?, + )) + } + SnapshotDiff::DiscSectorMismatches(mismatched_sectors) => { + Ok(DhtSnapshotNextAction::NewSnapshot( + self.snapshot_disc_sector_details( + mismatched_sectors, + arc_set, + self.store.clone(), + ) + .await?, + )) + } + SnapshotDiff::DiscSectorSliceMismatches( + mismatched_slice_indices, + ) => { + let mut out = Vec::new(); + for (sector_index, missing_slices) in mismatched_slice_indices { + let Ok(arc) = + self.partition.dht_arc_for_sector_index(sector_index) + else { + tracing::error!( + "Sector index {} out of bounds, ignoring", + sector_index + ); + continue; + }; + + for missing_slice in missing_slices { + let Ok((start, end)) = self + .partition + .time_bounds_for_full_slice_index(missing_slice) + else { + tracing::error!( + "Missing slice {} out of bounds, ignoring", + missing_slice + ); + continue; + }; + + out.extend( + self.store + .retrieve_op_hashes_in_time_slice( + arc, start, end, + ) + .await?, + ); + } + } + + Ok(if is_final { + DhtSnapshotNextAction::HashList(out) + } else { + DhtSnapshotNextAction::NewSnapshotAndHashList( + our_snapshot, + out, + ) + }) + } + SnapshotDiff::RingMismatches(mismatched_rings) => { + Ok(DhtSnapshotNextAction::NewSnapshot( + self.snapshot_ring_sector_details( + mismatched_rings, + arc_set, + )?, + )) + } + SnapshotDiff::RingSectorMismatches(mismatched_sectors) => { + let mut out = Vec::new(); + + for (ring_index, missing_sectors) in mismatched_sectors { + for sector_index in missing_sectors { + let Ok(arc) = self + .partition + .dht_arc_for_sector_index(sector_index) + else { + tracing::error!( + "Sector index {} out of bounds, ignoring", + sector_index + ); + continue; + }; + + let Ok((start, end)) = self + .partition + .time_bounds_for_partial_slice_index(ring_index) + else { + tracing::error!( + "Partial slice index {} out of bounds, ignoring", + ring_index + ); + continue; + }; + + out.extend( + self.store + .retrieve_op_hashes_in_time_slice( + arc, start, end, + ) + .await?, + ); + } + } + + Ok(if is_final { + DhtSnapshotNextAction::HashList(out) + } else { + DhtSnapshotNextAction::NewSnapshotAndHashList( + our_snapshot, + out, + ) + }) + } + } + } + + async fn snapshot_disc_sectors( + &self, + arc_set: &ArcSet, + ) -> K2Result { + let (disc_sector_top_hashes, disc_boundary) = self + .partition + .disc_sector_hashes(arc_set, self.store.clone()) + .await?; + + Ok(DhtSnapshot::DiscSectors { + disc_sector_top_hashes, + disc_boundary, + }) + } + + async fn snapshot_disc_sector_details( + &self, + mismatched_sector_indices: Vec, + arc_set: &ArcSet, + store: DynOpStore, + ) -> K2Result { + let (disc_sector_hashes, disc_boundary) = self + .partition + .disc_sector_sector_details( + mismatched_sector_indices, + arc_set, + store, + ) + .await?; + + Ok(DhtSnapshot::DiscSectorDetails { + disc_sector_hashes, + disc_boundary, + }) + } + + fn snapshot_ring_sector_details( + &self, + mismatched_rings: Vec, + arc_set: &ArcSet, + ) -> K2Result { + let (ring_sector_hashes, disc_boundary) = + self.partition.ring_details(mismatched_rings, arc_set)?; + + Ok(DhtSnapshot::RingSectorDetails { + ring_sector_hashes, + disc_boundary, + }) + } +} diff --git a/crates/dht/src/dht/snapshot.rs b/crates/dht/src/dht/snapshot.rs new file mode 100644 index 00000000..3d70363d --- /dev/null +++ b/crates/dht/src/dht/snapshot.rs @@ -0,0 +1,776 @@ +//! A snapshot of the DHT state at a given point in time. +//! +//! This module is public because its types need to be communicated between DHT instances, but it is +//! largely opaque to the user. See [crate::dht::Dht::snapshot_minimal] and +//! [crate::dht::Dht::handle_snapshot] for more information about using this module. + +use kitsune2_api::Timestamp; +use std::collections::{HashMap, HashSet}; + +/// A snapshot of the DHT state at a given point in time. +/// +/// This is largely opaque to the user of the [crate::dht::Dht] model. It is intended to be sent +/// between nodes to compare their DHT states and compared with [DhtSnapshot::compare]. +#[derive(Debug, Eq, PartialEq)] +pub enum DhtSnapshot { + /// The default, smallest snapshot type. + /// + /// It contains enough information to make further decisions about where mismatches might be + /// but does its best to compress historical information. The assumption being that the more + /// recent data is more likely to contain mismatches than older data. + /// + /// Requires 4 bytes for the timestamp. It then requires at most + /// 2 * (`time_factor` - 1) * `HASH_SIZE` + 1 bytes for the disc hash and the ring hashes. + /// Where `HASH_SIZE` is a host implementation detail. Assuming 4-byte hashes and a + /// `time_factor` of 14, this would be 4 + 2 * (14 - 1) * 4 + 1 = 109 bytes. + /// + /// Note that the calculation above is a maximum. The snapshot only contains the hashes that + /// are relevant to the pair of nodes that are comparing snapshots. Also, some sectors may be + /// empty and will be sent as an empty hash. + Minimal { + /// Disc top hash, representing the combined hash of the full time slice top hashes. + disc_top_hash: bytes::Bytes, + /// The end timestamp of the most recent full time slice. + disc_boundary: Timestamp, + /// Ring top hashes, representing the combined hashes of the partial time slices. + ring_top_hashes: Vec, + }, + /// A snapshot to be used when there is a [DhtSnapshot::Minimal] mismatch in the disc top hash. + DiscSectors { + /// Similar to the `disc_top_hash` except the sector hashes are not combined. + disc_sector_top_hashes: HashMap, + /// The end timestamp of the most recent full time slice. + disc_boundary: Timestamp, + }, + /// A snapshot to be used when there is a [DhtSnapshot::DiscSectors] mismatch. + /// + /// For each mismatched disc sector, the snapshot will contain the sector index and all the + /// hashes for that sector. + DiscSectorDetails { + /// Similar to the `disc_sector_top_hashes` except the full time slice hashes are not + /// combined. + disc_sector_hashes: HashMap>, + /// The end timestamp of the most recent full time slice. + disc_boundary: Timestamp, + }, + /// A snapshot to be used when there is a [DhtSnapshot::Minimal] mismatch in the ring top + /// hashes. + RingSectorDetails { + /// Similar to the `ring_top_hashes` except the sector hashes are not combined. + ring_sector_hashes: HashMap>, + /// The end timestamp of the most recent full time slice. + disc_boundary: Timestamp, + }, +} + +impl DhtSnapshot { + /// Compare two snapshots to determine how they differ. + /// + /// Produces a [SnapshotDiff] that describes the differences between the two snapshots. + /// This should not be use directly, please see [crate::dht::Dht::handle_snapshot]. + pub fn compare(&self, other: &Self) -> SnapshotDiff { + // Check if they match exactly, before doing further work to check how they differ. + if self == other { + return SnapshotDiff::Identical; + } + + match (self, other) { + ( + DhtSnapshot::Minimal { + disc_top_hash: our_disc_top_hash, + disc_boundary: our_disc_boundary, + ring_top_hashes: our_ring_top_hashes, + }, + DhtSnapshot::Minimal { + disc_top_hash: other_disc_top_hash, + disc_boundary: other_disc_boundary, + ring_top_hashes: other_ring_top_hashes, + }, + ) => { + // If the historical time boundary doesn't match, we can't compare. + // This won't happen very often so it's okay to just fail this match. + if our_disc_boundary != other_disc_boundary { + return SnapshotDiff::CannotCompare; + } + + // If the disc hash mismatches, then there is a historical mismatch. + // This shouldn't be common, but we sync forwards through time, so if we + // find a historical mismatch then focus on fixing that first. + if our_disc_top_hash != other_disc_top_hash { + return SnapshotDiff::DiscMismatch; + } + + // This is more common, it can happen if we're close to a UNIT_TIME boundary + // and there is a small clock difference or just one node calculated this snapshot + // before the other did. Still, we'll have to wait until we next compare to + // our DHT state. + if our_ring_top_hashes.len() != other_ring_top_hashes.len() { + return SnapshotDiff::CannotCompare; + } + + // There should always be at least one mismatched ring, otherwise the snapshots + // would have been identical which has already been checked. + SnapshotDiff::RingMismatches(hash_mismatch_indices( + our_ring_top_hashes, + other_ring_top_hashes, + )) + } + ( + DhtSnapshot::DiscSectors { + disc_sector_top_hashes: our_disc_sector_top_hashes, + disc_boundary: our_disc_boundary, + }, + DhtSnapshot::DiscSectors { + disc_sector_top_hashes: other_disc_sector_top_hashes, + disc_boundary: other_disc_boundary, + }, + ) => { + if our_disc_boundary != other_disc_boundary { + // TODO Don't expect the boundary to move during a comparison so treat this as + // an error at this point? We should have stopped before now. + return SnapshotDiff::CannotCompare; + } + + // If one side has a hash for a sector and the other doesn't then that is a mismatch + let our_indices = + our_disc_sector_top_hashes.keys().collect::>(); + let other_indices = + other_disc_sector_top_hashes.keys().collect::>(); + let mut mismatched_sector_indices = our_indices + .symmetric_difference(&other_indices) + .map(|index| **index) + .collect::>(); + + // Then for any common sectors, check if the hashes match + let common_indices = our_indices + .intersection(&other_indices) + .collect::>(); + for index in common_indices { + if our_disc_sector_top_hashes[index] + != other_disc_sector_top_hashes[index] + { + // We found a mismatched sector, store it + mismatched_sector_indices.push(**index); + } + } + + SnapshotDiff::DiscSectorMismatches(mismatched_sector_indices) + } + ( + DhtSnapshot::DiscSectorDetails { + disc_sector_hashes: our_disc_sector_hashes, + disc_boundary: our_disc_boundary, + }, + DhtSnapshot::DiscSectorDetails { + disc_sector_hashes: other_disc_sector_hashes, + disc_boundary: other_disc_boundary, + }, + ) => { + if our_disc_boundary != other_disc_boundary { + // TODO Don't expect the boundary to move during a comparison so treat this as + // an error at this point? We should have stopped before now. + return SnapshotDiff::CannotCompare; + } + + let our_indices = + our_disc_sector_hashes.keys().collect::>(); + let other_indices = + other_disc_sector_hashes.keys().collect::>(); + + // If one side has a sector and the other doesn't then that is a mismatch + let mut mismatched_sector_indices = our_indices + .symmetric_difference(&other_indices) + .map(|index| { + ( + **index, + our_disc_sector_hashes + .get(*index) + .unwrap_or_else(|| { + &other_disc_sector_hashes[*index] + }) + .keys() + .copied() + .collect::>(), + ) + }) + .collect::>(); + + // Then for any common sectors, check if the hashes match + let common_sector_indices = our_indices + .intersection(&other_indices) + .collect::>(); + for sector_index in common_sector_indices { + let our_slice_indices = &our_disc_sector_hashes + [sector_index] + .keys() + .collect::>(); + let other_slice_indices = &other_disc_sector_hashes + [sector_index] + .keys() + .collect::>(); + + let mut mismatched_slice_indices = our_slice_indices + .symmetric_difference(other_slice_indices) + .map(|index| **index) + .collect::>(); + + let common_slice_indices = our_slice_indices + .intersection(other_slice_indices) + .collect::>(); + + for slice_index in common_slice_indices { + if our_disc_sector_hashes[sector_index][slice_index] + != other_disc_sector_hashes[sector_index] + [slice_index] + { + mismatched_slice_indices.push(**slice_index); + } + } + + if !mismatched_slice_indices.is_empty() { + mismatched_sector_indices + .entry(**sector_index) + .or_insert_with(Vec::new) + .extend(mismatched_slice_indices); + } + } + + SnapshotDiff::DiscSectorSliceMismatches( + mismatched_sector_indices, + ) + } + ( + DhtSnapshot::RingSectorDetails { + ring_sector_hashes: our_ring_sector_hashes, + disc_boundary: our_disc_boundary, + }, + DhtSnapshot::RingSectorDetails { + ring_sector_hashes: other_ring_sector_hashes, + disc_boundary: other_disc_boundary, + }, + ) => { + if our_disc_boundary != other_disc_boundary { + // TODO Don't expect the boundary to move during a comparison so treat this as + // an error at this point? We should have stopped before now. + return SnapshotDiff::CannotCompare; + } + + let our_indices = + our_ring_sector_hashes.keys().collect::>(); + let other_indices = + other_ring_sector_hashes.keys().collect::>(); + + // The mismatched rings should have been figured out from the minimal snapshot + // or from the ring mismatch in the previous step. They should be identical + // regardless of which side computed this snapshot. + if our_indices.len() != other_indices.len() + || our_indices != other_indices + { + return SnapshotDiff::CannotCompare; + } + + // Then for any common rings, check if the hashes match + let common_ring_indices = our_indices + .intersection(&other_indices) + .collect::>(); + let mut mismatched_ring_sectors = + HashMap::with_capacity(common_ring_indices.len()); + for ring_index in common_ring_indices { + let our_sector_indices = &our_ring_sector_hashes + [ring_index] + .keys() + .collect::>(); + let other_sector_indices = &other_ring_sector_hashes + [ring_index] + .keys() + .collect::>(); + + let mut mismatched_sector_indices = our_sector_indices + .symmetric_difference(other_sector_indices) + .map(|index| **index) + .collect::>(); + + let common_sector_indices = our_sector_indices + .intersection(other_sector_indices) + .collect::>(); + + for sector_index in common_sector_indices { + if our_ring_sector_hashes[ring_index][sector_index] + != other_ring_sector_hashes[ring_index] + [sector_index] + { + mismatched_sector_indices.push(**sector_index); + } + } + + mismatched_ring_sectors + .insert(**ring_index, mismatched_sector_indices); + } + + SnapshotDiff::RingSectorMismatches(mismatched_ring_sectors) + } + (theirs, other) => { + tracing::error!( + "Mismatched snapshot types: ours: {:?}, theirs: {:?}", + theirs, + other + ); + SnapshotDiff::CannotCompare + } + } + } +} + +/// The differences between two snapshots. +#[derive(Debug)] +#[cfg_attr(test, derive(Eq, PartialEq))] +pub enum SnapshotDiff { + /// The snapshots are identical. + Identical, + /// The snapshots cannot be compared. + /// + /// This can happen if the historical time boundary doesn't match or if the snapshot types + /// don't match. + CannotCompare, + // Historical mismatch + /// The disc hashes do not match. + DiscMismatch, + /// These disc sectors are missing or do not match. + DiscSectorMismatches(Vec), + /// These disc sector slices are missing or do not match, and further these slices are missing + /// or do not match. + DiscSectorSliceMismatches(HashMap>), + // Recent mismatch + /// These rings do not match. + RingMismatches(Vec), + /// These rings do not match, and further these sectors within those rings do not match. + RingSectorMismatches(HashMap>), +} + +fn hash_mismatch_indices( + left: &[bytes::Bytes], + right: &[bytes::Bytes], +) -> Vec { + left.iter() + .enumerate() + .zip(right.iter()) + .filter_map(|((idx, left), right)| { + if left != right { + Some(idx as u32) + } else { + None + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn minimal_self_identical() { + let snapshot = DhtSnapshot::Minimal { + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + disc_boundary: Timestamp::now(), + ring_top_hashes: vec![bytes::Bytes::from(vec![2; 32])], + }; + + assert_eq!(SnapshotDiff::Identical, snapshot.compare(&snapshot)); + } + + #[test] + fn minimal_disc_hash_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![], + }; + + let snapshot_2 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![2; 32]), + ring_top_hashes: vec![], + }; + + assert_eq!(snapshot_1.compare(&snapshot_2), SnapshotDiff::DiscMismatch); + } + + #[test] + fn minimal_disc_boundary_mismatch() { + let snapshot_1 = DhtSnapshot::Minimal { + disc_boundary: Timestamp::now(), + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![], + }; + + let snapshot_2 = DhtSnapshot::Minimal { + // Just to be sure, we're using `::now()` twice but it can return the same value. + disc_boundary: Timestamp::now() + Duration::from_secs(1), + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![], + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn minimal_ring_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![2]), + ], + }; + + let snapshot_2 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![3]), + ], + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::RingMismatches(vec![1]) + ); + } + + #[test] + fn minimal_disc_and_ring_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![7]), + ], + }; + + let snapshot_2 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![2; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![3]), + ], + }; + + // Always chooses the disc mismatch over the ring mismatch, to prioritise historical data. + assert_eq!(snapshot_1.compare(&snapshot_2), SnapshotDiff::DiscMismatch); + } + + #[test] + fn minimal_disc_wrong_number_of_rings() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![4]), + ], + }; + + let snapshot_2 = DhtSnapshot::Minimal { + disc_boundary: timestamp, + disc_top_hash: bytes::Bytes::from(vec![1; 32]), + ring_top_hashes: vec![ + bytes::Bytes::from(vec![1]), + bytes::Bytes::from(vec![3]), + bytes::Bytes::from(vec![5]), + ], + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn disc_sector_boundary_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp, + disc_sector_top_hashes: HashMap::new(), + }; + + let snapshot_2 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp + Duration::from_secs(1), + disc_sector_top_hashes: HashMap::new(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn disc_sector_mismatch_finds_missing_sectors() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp, + disc_sector_top_hashes: vec![ + (0, bytes::Bytes::new()), + (1, bytes::Bytes::new()), + ] + .into_iter() + .collect(), + }; + + let snapshot_2 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp, + disc_sector_top_hashes: vec![ + (0, bytes::Bytes::new()), + (2, bytes::Bytes::new()), + ] + .into_iter() + .collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::DiscSectorMismatches(vec![1, 2]) + ); + } + + #[test] + fn disc_sector_mismatch_finds_mismatched_sectors() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp, + disc_sector_top_hashes: vec![ + (0, bytes::Bytes::new()), + (1, bytes::Bytes::from_static(&[1])), + ] + .into_iter() + .collect(), + }; + + let snapshot_2 = DhtSnapshot::DiscSectors { + disc_boundary: timestamp, + disc_sector_top_hashes: vec![ + (0, bytes::Bytes::new()), + (1, bytes::Bytes::from_static(&[2])), + ] + .into_iter() + .collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::DiscSectorMismatches(vec![1]) + ); + } + + #[test] + fn disc_sector_details_boundary_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: HashMap::new(), + }; + + let snapshot_2 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp + Duration::from_secs(1), + disc_sector_hashes: HashMap::new(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn disc_sector_details_mismatch_preserves_missing_sectors() { + let timestamp = Timestamp::now(); + let slices_1 = vec![(0, bytes::Bytes::from_static(&[1]))] + .into_iter() + .collect(); + let snapshot_1 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(0, HashMap::new()), (1, slices_1)] + .into_iter() + .collect(), + }; + + let slices_2 = vec![(0, bytes::Bytes::from_static(&[7]))] + .into_iter() + .collect(); + let snapshot_2 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(0, HashMap::new()), (2, slices_2)] + .into_iter() + .collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::DiscSectorSliceMismatches( + vec![(1, vec![0]), (2, vec![0])].into_iter().collect() + ) + ); + } + + #[test] + fn disc_sector_details_mismatch_finds_missing_slices() { + let timestamp = Timestamp::now(); + let slices_1 = vec![(10, bytes::Bytes::from_static(&[1]))] + .into_iter() + .collect(); + let snapshot_1 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(1, slices_1)].into_iter().collect(), + }; + + let slices_2 = vec![(20, bytes::Bytes::from_static(&[7]))] + .into_iter() + .collect(); + let snapshot_2 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(1, slices_2)].into_iter().collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::DiscSectorSliceMismatches( + vec![(1, vec![10, 20])].into_iter().collect() + ) + ); + } + + #[test] + fn disc_sector_details_mismatch_finds_slice_mismatches() { + let timestamp = Timestamp::now(); + let slices_1 = vec![(10, bytes::Bytes::from_static(&[1]))] + .into_iter() + .collect(); + let snapshot_1 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(1, slices_1)].into_iter().collect(), + }; + + let slices_2 = vec![(10, bytes::Bytes::from_static(&[7]))] + .into_iter() + .collect(); + let snapshot_2 = DhtSnapshot::DiscSectorDetails { + disc_boundary: timestamp, + disc_sector_hashes: vec![(1, slices_2)].into_iter().collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::DiscSectorSliceMismatches( + vec![(1, vec![10])].into_iter().collect() + ) + ); + } + + #[test] + fn ring_sector_details_boundary_mismatch() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: HashMap::new(), + }; + + let snapshot_2 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp + Duration::from_secs(1), + ring_sector_hashes: HashMap::new(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn ring_sector_details_mismatch_cannot_compare_different_number_of_rings() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![(0, HashMap::new()), (1, HashMap::new())] + .into_iter() + .collect(), + }; + + let snapshot_2 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![(0, HashMap::new())].into_iter().collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn ring_sector_details_mismatch_cannot_compare_different_rings() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![(0, HashMap::new())].into_iter().collect(), + }; + + let snapshot_2 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![(1, HashMap::new())].into_iter().collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::CannotCompare + ); + } + + #[test] + fn ring_sector_details_mismatch_detects_sector_mismatches() { + let timestamp = Timestamp::now(); + let snapshot_1 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![( + 0, + vec![(0, bytes::Bytes::from_static(&[1]))] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }; + + let snapshot_2 = DhtSnapshot::RingSectorDetails { + disc_boundary: timestamp, + ring_sector_hashes: vec![( + 0, + vec![(0, bytes::Bytes::from_static(&[5]))] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }; + + assert_eq!( + snapshot_1.compare(&snapshot_2), + SnapshotDiff::RingSectorMismatches( + vec![(0, vec![0])].into_iter().collect() + ) + ); + } +} diff --git a/crates/dht/src/dht/tests.rs b/crates/dht/src/dht/tests.rs new file mode 100644 index 00000000..3582de11 --- /dev/null +++ b/crates/dht/src/dht/tests.rs @@ -0,0 +1,587 @@ +mod harness; + +use super::*; +use crate::dht::tests::harness::SyncWithOutcome; +use harness::DhtSyncHarness; +use kitsune2_api::{DhtArc, OpId, OpStore, UNIX_TIMESTAMP}; +use kitsune2_memory::{Kitsune2MemoryOp, Kitsune2MemoryOpStore}; +use std::sync::Arc; +use std::time::Duration; + +const SECTOR_SIZE: u32 = 1u32 << 23; + +#[tokio::test] +async fn from_store_empty() { + let store = Arc::new(Kitsune2MemoryOpStore::default()); + Dht::try_from_store(UNIX_TIMESTAMP, store).await.unwrap(); +} + +#[tokio::test] +async fn take_minimal_snapshot() { + let store = Arc::new(Kitsune2MemoryOpStore::default()); + store + .process_incoming_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 32])), + UNIX_TIMESTAMP, + vec![], + ) + .try_into() + .unwrap()]) + .await + .unwrap(); + + let dht = Dht::try_from_store(Timestamp::now(), store.clone()) + .await + .unwrap(); + + let arc_set = ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap(); + + let snapshot = dht.snapshot_minimal(&arc_set).await.unwrap(); + match snapshot { + DhtSnapshot::Minimal { + disc_boundary, + disc_top_hash, + ring_top_hashes, + } => { + assert_eq!(dht.partition.full_slice_end_timestamp(), disc_boundary); + assert_eq!(bytes::Bytes::from(vec![7; 32]), disc_top_hash); + assert!(!ring_top_hashes.is_empty()); + } + s => panic!("Unexpected snapshot type: {:?}", s), + } +} + +#[tokio::test] +async fn cannot_take_minimal_snapshot_with_empty_arc_set() { + let current_time = Timestamp::now(); + let dht1 = DhtSyncHarness::new(current_time, DhtArc::Empty).await; + + let err = dht1 + .dht + .snapshot_minimal(&ArcSet::new(SECTOR_SIZE, vec![dht1.arc]).unwrap()) + .await + .unwrap_err(); + assert_eq!("No arcs to snapshot (src: None)", err.to_string()); +} + +#[tokio::test] +async fn cannot_handle_snapshot_with_empty_arc_set() { + let current_time = Timestamp::now(); + let dht1 = DhtSyncHarness::new(current_time, DhtArc::Empty).await; + + // Declare a full arc to get a snapshot + let snapshot = dht1 + .dht + .snapshot_minimal( + &ArcSet::new(SECTOR_SIZE, vec![DhtArc::FULL]).unwrap(), + ) + .await + .unwrap(); + + // Now try to compare that snapshot to ourselves with an empty arc set + let err = dht1 + .dht + .handle_snapshot( + &snapshot, + None, + &ArcSet::new(SECTOR_SIZE, vec![DhtArc::Empty]).unwrap(), + ) + .await + .unwrap_err(); + + assert_eq!("No arcs to snapshot (src: None)", err.to_string()); +} + +#[tokio::test] +async fn empty_dht_is_in_sync_with_empty() { + let current_time = Timestamp::now(); + let dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); + assert!(dht2.is_in_sync_with(&dht1).await.unwrap()); +} + +#[tokio::test] +async fn one_way_disc_sync_from_initiator() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put historical data in the first DHT + let op_id = OpId::from(bytes::Bytes::from(vec![41; 32])); + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + op_id.clone(), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + + // Try a sync + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + // We shouldn't learn about any ops + assert!(dht1.discovered_ops.is_empty()); + + // The other agent should have learned about the op + assert_eq!(1, dht2.discovered_ops.len()); + assert_eq!(1, dht2.discovered_ops[&dht1.agent_id].len()); + assert_eq!(vec![op_id], dht2.discovered_ops[&dht1.agent_id]); + + assert!(!dht1.is_in_sync_with(&dht2).await.unwrap()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn one_way_disc_sync_from_acceptor() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put historical data in the second DHT + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![41; 32])), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + // They shouldn't learn about any ops + assert!(dht2.discovered_ops.is_empty()); + + // We should have learned about the op + assert_eq!(1, dht1.discovered_ops.len()); + assert_eq!(1, dht1.discovered_ops[&dht2.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn two_way_disc_sync() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put historical data in both DHTs + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 32])), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![43; 32])), + // Two weeks later + UNIX_TIMESTAMP + Duration::from_secs(14 * 24 * 60 * 60), + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + // Each learns about one op + assert_eq!(1, dht1.discovered_ops.len()); + assert_eq!(1, dht1.discovered_ops[&dht2.agent_id].len()); + assert_eq!(1, dht2.discovered_ops.len()); + assert_eq!(1, dht2.discovered_ops[&dht1.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn one_way_ring_sync_from_initiator() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put recent data in the first ring of the first DHT + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![41; 32])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try a sync + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // We shouldn't learn about any ops + assert!(dht1.discovered_ops.is_empty()); + + // The other agent should have learned about the op + assert_eq!(1, dht2.discovered_ops.len()); + assert_eq!(1, dht2.discovered_ops[&dht1.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn one_way_ring_sync_from_acceptor() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put recent data in the first ring of the second DHT + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![41; 32])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // They shouldn't learn about any ops + assert!(dht2.discovered_ops.is_empty()); + + // We should have learned about the op + assert_eq!(1, dht1.discovered_ops.len()); + assert_eq!(1, dht1.discovered_ops[&dht2.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn two_way_ring_sync() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put recent data in the first ring of both DHTs + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 32])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![43; 32])), + // Two weeks later + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // Each learns about one op + assert_eq!(1, dht1.discovered_ops.len()); + assert_eq!(1, dht1.discovered_ops[&dht2.agent_id].len()); + assert_eq!(1, dht2.discovered_ops.len()); + assert_eq!(1, dht2.discovered_ops[&dht1.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn ring_sync_with_matching_disc() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put historical data in both DHTs + let historical_ops = vec![ + Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 4])), + UNIX_TIMESTAMP, + vec![], + ), + Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (u32::MAX / 2).to_le_bytes().to_vec(), + )), + UNIX_TIMESTAMP + Duration::from_secs(14 * 24 * 60 * 60), + vec![], + ), + ]; + dht1.inject_ops(historical_ops.clone()).await.unwrap(); + dht2.inject_ops(historical_ops).await.unwrap(); + + // Put recent data in the first ring of both DHTs + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 4])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![13; 4])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // Each learns about one op + assert_eq!(1, dht1.discovered_ops.len()); + assert_eq!(1, dht1.discovered_ops[&dht2.agent_id].len()); + assert_eq!(1, dht2.discovered_ops.len()); + assert_eq!(1, dht2.discovered_ops[&dht1.agent_id].len()); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn two_stage_sync_with_symmetry() { + let current_time = Timestamp::now(); + let mut dht1 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; + + // Put mismatched historical data in both DHTs + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![7; 32])), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![13; 32])), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + + // Put mismatched recent data in the first ring of both DHTs + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![11; 32])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(vec![17; 32])), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try a sync initiated by the first agent + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + let learned1 = dht1.discovered_ops.clone(); + let learned2 = dht2.discovered_ops.clone(); + dht1.discovered_ops.clear(); + dht2.discovered_ops.clear(); + + // Try a sync initiated by the second agent + let outcome = dht2.sync_with(&mut dht1).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + // The outcome should be identical, regardless of who initiated the sync + assert_eq!(learned1, dht1.discovered_ops); + assert_eq!(learned2, dht2.discovered_ops); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // That's the disc sync done, now we need to check the rings + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + let learned1 = dht1.discovered_ops.clone(); + let learned2 = dht2.discovered_ops.clone(); + dht1.discovered_ops.clear(); + dht2.discovered_ops.clear(); + + // Try a sync initiated by the second agent + let outcome = dht2.sync_with(&mut dht1).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // The outcome should be identical, regardless of who initiated the sync + assert_eq!(learned1, dht1.discovered_ops); + assert_eq!(learned2, dht2.discovered_ops); + + // Move any discovered ops between the two DHTs + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the two DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn disc_sync_respects_arc() { + let current_time = Timestamp::now(); + let mut dht1 = + DhtSyncHarness::new(current_time, DhtArc::Arc(0, 3 * SECTOR_SIZE - 1)) + .await; + let mut dht2 = DhtSyncHarness::new( + current_time, + DhtArc::Arc(2 * SECTOR_SIZE, 4 * SECTOR_SIZE - 1), + ) + .await; + + // Put mismatched historical data in both DHTs, but in sectors that don't overlap + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(SECTOR_SIZE.to_le_bytes().to_vec())), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (3 * SECTOR_SIZE).to_le_bytes().to_vec(), + )), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + + // At this point, the DHTs should be in sync, because the mismatch is not in common sectors + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); + + // Now put mismatched data in the common sector + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (2 * SECTOR_SIZE).to_le_bytes().to_vec(), + )), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), + )), + UNIX_TIMESTAMP, + vec![], + )]) + .await + .unwrap(); + + // Try to sync the DHTs + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedDisc)); + + // Sync the ops + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} + +#[tokio::test] +async fn ring_sync_respects_arc() { + let current_time = Timestamp::now(); + let mut dht1 = + DhtSyncHarness::new(current_time, DhtArc::Arc(0, 3 * SECTOR_SIZE - 1)) + .await; + let mut dht2 = DhtSyncHarness::new( + current_time, + DhtArc::Arc(2 * SECTOR_SIZE, 4 * SECTOR_SIZE - 1), + ) + .await; + + // Put mismatched recent data in both DHTs, but in sectors that don't overlap + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from(SECTOR_SIZE.to_le_bytes().to_vec())), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (3 * SECTOR_SIZE).to_le_bytes().to_vec(), + )), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // At this point, the DHTs should be in sync, because the mismatch is not in common sectors + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); + + // Now put mismatched data in the common sector + dht1.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (2 * SECTOR_SIZE).to_le_bytes().to_vec(), + )), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + dht2.inject_ops(vec![Kitsune2MemoryOp::new( + OpId::from(bytes::Bytes::from( + (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), + )), + dht1.dht.partition.full_slice_end_timestamp(), + vec![], + )]) + .await + .unwrap(); + + // Try to sync the DHTs + let outcome = dht1.sync_with(&mut dht2).await.unwrap(); + assert!(matches!(outcome, SyncWithOutcome::SyncedRings)); + + // Sync the ops + dht1.apply_op_sync(&mut dht2).await.unwrap(); + + // Now the DHTs should be in sync + assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); +} diff --git a/crates/dht/src/dht/tests/harness.rs b/crates/dht/src/dht/tests/harness.rs new file mode 100644 index 00000000..ec504716 --- /dev/null +++ b/crates/dht/src/dht/tests/harness.rs @@ -0,0 +1,388 @@ +use crate::arc_set::ArcSet; +use crate::dht::snapshot::DhtSnapshot; +use crate::dht::tests::SECTOR_SIZE; +use crate::{Dht, DhtSnapshotNextAction}; +use kitsune2_api::{ + AgentId, DhtArc, DynOpStore, K2Result, OpId, OpStore, StoredOp, Timestamp, +}; +use kitsune2_memory::{Kitsune2MemoryOp, Kitsune2MemoryOpStore}; +use rand::RngCore; +use std::collections::HashMap; +use std::sync::Arc; + +/// Intended to represent a single agent in a network, which knows how to sync with +/// some other agent. +pub(crate) struct DhtSyncHarness { + pub(crate) store: Arc, + pub(crate) dht: Dht, + pub(crate) arc: DhtArc, + pub(crate) agent_id: AgentId, + pub(crate) discovered_ops: HashMap>, +} + +pub(crate) enum SyncWithOutcome { + InSync, + CannotCompare, + SyncedDisc, + SyncedRings, + DiscoveredInSync, +} + +impl DhtSyncHarness { + pub(crate) async fn new(current_time: Timestamp, arc: DhtArc) -> Self { + let store = Arc::new(Kitsune2MemoryOpStore::default()); + let dht = Dht::try_from_store(current_time, store.clone()) + .await + .unwrap(); + + let mut bytes = [0; 32]; + rand::thread_rng().fill_bytes(&mut bytes); + let agent_id = AgentId::from(bytes::Bytes::copy_from_slice(&bytes)); + + Self { + store, + dht, + arc, + agent_id, + discovered_ops: HashMap::new(), + } + } + + pub(crate) async fn inject_ops( + &mut self, + op_list: Vec, + ) -> K2Result<()> { + self.store + .process_incoming_ops( + op_list + .iter() + .map(|op| op.clone().try_into().unwrap()) + .collect(), + ) + .await?; + + self.dht + .inform_ops_stored( + op_list.into_iter().map(|op| op.into()).collect(), + ) + .await?; + + Ok(()) + } + + pub(crate) async fn apply_op_sync( + &mut self, + other: &mut Self, + ) -> K2Result<()> { + // Sync from other to self, using op IDs we've discovered from other + if let Some(ops) = self + .discovered_ops + .get_mut(&other.agent_id) + .map(std::mem::take) + { + transfer_ops( + other.store.clone(), + self.store.clone(), + &mut self.dht, + ops, + ) + .await?; + } + + // Sync from self to other, using op that the other has discovered from us + if let Some(ops) = other + .discovered_ops + .get_mut(&self.agent_id) + .map(std::mem::take) + { + transfer_ops( + self.store.clone(), + other.store.clone(), + &mut other.dht, + ops, + ) + .await?; + } + + Ok(()) + } + + pub(crate) async fn is_in_sync_with(&self, other: &Self) -> K2Result { + let arc_set_1 = ArcSet::new(SECTOR_SIZE, vec![self.arc])?; + let arc_set_2 = ArcSet::new(SECTOR_SIZE, vec![other.arc])?; + let arc_set = arc_set_1.intersection(&arc_set_2); + let initial_snapshot = self.dht.snapshot_minimal(&arc_set).await?; + match other + .dht + .handle_snapshot(&initial_snapshot, None, &arc_set) + .await? + { + DhtSnapshotNextAction::Identical => Ok(true), + _ => Ok(false), + } + } + + pub(crate) async fn sync_with( + &mut self, + other: &mut Self, + ) -> K2Result { + let arc_set_1 = ArcSet::new(SECTOR_SIZE, vec![self.arc])?; + let arc_set_2 = ArcSet::new(SECTOR_SIZE, vec![other.arc])?; + let arc_set = arc_set_1.intersection(&arc_set_2); + + // Create the initial snapshot locally + let initial_snapshot = self.dht.snapshot_minimal(&arc_set).await?; + + // Send it to the other agent and have them diff against it + let outcome = other + .dht + .handle_snapshot(&initial_snapshot, None, &arc_set) + .await?; + + match outcome { + DhtSnapshotNextAction::Identical => { + // Nothing to do, the agents are in sync + Ok(SyncWithOutcome::InSync) + } + DhtSnapshotNextAction::CannotCompare => { + // Permit this for testing purposes, it would be treated as an error in + // real usage + Ok(SyncWithOutcome::CannotCompare) + } + DhtSnapshotNextAction::NewSnapshot(new_snapshot) => { + match new_snapshot { + DhtSnapshot::Minimal { .. } => { + panic!("A minimal snapshot cannot be produced from a minimal snapshot"); + } + DhtSnapshot::DiscSectors { .. } => { + // This means there's a historical mismatch, so we need to continue + // down this path another step. Do that in another function! + self.sync_disc_with(other, &arc_set, new_snapshot).await + } + DhtSnapshot::DiscSectorDetails { .. } => { + panic!("A sector details snapshot cannot be produced from a minimal snapshot"); + } + DhtSnapshot::RingSectorDetails { .. } => { + // This means there's a recent mismatch in the partial time slices. + // Similar to above, continue in another function. + self.sync_rings_with(other, &arc_set, new_snapshot) + .await + } + } + } + DhtSnapshotNextAction::NewSnapshotAndHashList(_, _) + | DhtSnapshotNextAction::HashList(_) => { + panic!( + "A hash list cannot be produced from a minimal snapshot" + ); + } + } + } + + async fn sync_disc_with( + &mut self, + other: &mut Self, + arc_set: &ArcSet, + snapshot: DhtSnapshot, + ) -> K2Result { + match snapshot { + DhtSnapshot::DiscSectors { .. } => {} + _ => panic!("Expected a DiscSectors snapshot"), + } + + // We expect the sync to have been initiated by self, so the disc snapshot should be + // coming back to us + let outcome = + self.dht.handle_snapshot(&snapshot, None, arc_set).await?; + + let our_details_snapshot = match outcome { + DhtSnapshotNextAction::NewSnapshot(new_snapshot) => new_snapshot, + DhtSnapshotNextAction::Identical => { + // This can't happen in tests but in a real implementation it's possible that + // missing ops might show up while we're syncing so this isn't an error, just + // a shortcut and we can stop syncing + return Ok(SyncWithOutcome::DiscoveredInSync); + } + DhtSnapshotNextAction::NewSnapshotAndHashList(_, _) + | DhtSnapshotNextAction::HashList(_) + | DhtSnapshotNextAction::CannotCompare => { + // A real implementation would treat these as errors because they are logic + // errors + panic!("Unexpected outcome: {:?}", outcome); + } + }; + + // At this point, the snapshot must be a disc details snapshot + match our_details_snapshot { + DhtSnapshot::DiscSectorDetails { .. } => {} + _ => panic!("Expected a DiscSectorDetails snapshot"), + } + + // Now we need to ask the other agent to diff against this details snapshot + let outcome = other + .dht + .handle_snapshot(&our_details_snapshot, None, arc_set) + .await?; + + let (snapshot, hash_list_from_other) = match outcome { + DhtSnapshotNextAction::NewSnapshotAndHashList( + new_snapshot, + hash_list, + ) => (new_snapshot, hash_list), + DhtSnapshotNextAction::Identical => { + // Nothing to do, the agents are in sync + return Ok(SyncWithOutcome::InSync); + } + DhtSnapshotNextAction::NewSnapshot(_) + | DhtSnapshotNextAction::HashList(_) + | DhtSnapshotNextAction::CannotCompare => { + // A real implementation would treat these as errors because they are logic + // errors + panic!("Unexpected outcome: {:?}", outcome); + } + }; + + // This snapshot must also be a disc details snapshot + match snapshot { + DhtSnapshot::DiscSectorDetails { .. } => {} + _ => panic!("Expected a DiscSectorDetails snapshot"), + } + + // Finally, we need to receive the details snapshot from the other agent and send them + // back our ops + let outcome = self + .dht + .handle_snapshot(&snapshot, Some(our_details_snapshot), arc_set) + .await?; + + let hash_list_from_self = match outcome { + DhtSnapshotNextAction::HashList(hash_list) => hash_list, + DhtSnapshotNextAction::Identical => { + // Nothing to do, the agents are in sync + return Ok(SyncWithOutcome::InSync); + } + DhtSnapshotNextAction::NewSnapshot(_) + | DhtSnapshotNextAction::NewSnapshotAndHashList(_, _) + | DhtSnapshotNextAction::CannotCompare => { + // A real implementation would treat these as errors because they are logic + // errors + panic!("Unexpected outcome: {:?}", outcome); + } + }; + + // Capture the discovered ops, but don't actually transfer them yet. + // Let the test decide when to do that. + if !hash_list_from_other.is_empty() { + self.discovered_ops + .entry(other.agent_id.clone()) + .or_default() + .extend(hash_list_from_other); + } + if !hash_list_from_self.is_empty() { + other + .discovered_ops + .entry(self.agent_id.clone()) + .or_default() + .extend(hash_list_from_self); + } + + Ok(SyncWithOutcome::SyncedDisc) + } + + async fn sync_rings_with( + &mut self, + other: &mut Self, + arc_set: &ArcSet, + other_details_snapshot: DhtSnapshot, + ) -> K2Result { + match other_details_snapshot { + DhtSnapshot::RingSectorDetails { .. } => {} + _ => panic!("Expected a RingSectorDetails snapshot"), + } + + // We expect the sync to have been initiated by self, so the ring sector details should + // have been sent to us + let outcome = self + .dht + .handle_snapshot(&other_details_snapshot, None, arc_set) + .await?; + + let (snapshot, hash_list_from_self) = match outcome { + DhtSnapshotNextAction::Identical => { + // Nothing to do, the agents are in sync + return Ok(SyncWithOutcome::InSync); + } + DhtSnapshotNextAction::NewSnapshotAndHashList( + new_snapshot, + hash_list, + ) => (new_snapshot, hash_list), + DhtSnapshotNextAction::CannotCompare + | DhtSnapshotNextAction::HashList(_) + | DhtSnapshotNextAction::NewSnapshot(_) => { + panic!("Unexpected outcome: {:?}", outcome); + } + }; + + // This snapshot must also be a ring sector details snapshot + match snapshot { + DhtSnapshot::RingSectorDetails { .. } => {} + _ => panic!("Expected a RingSectorDetails snapshot"), + } + + // Finally, we need to send the ring sector details back to the other agent so they can + // produce a hash list for us + let outcome = other + .dht + .handle_snapshot(&snapshot, Some(other_details_snapshot), arc_set) + .await?; + + let hash_list_from_other = match outcome { + DhtSnapshotNextAction::Identical => { + // Nothing to do, the agents are in sync + return Ok(SyncWithOutcome::InSync); + } + DhtSnapshotNextAction::HashList(hash_list) => hash_list, + DhtSnapshotNextAction::CannotCompare + | DhtSnapshotNextAction::NewSnapshotAndHashList(_, _) + | DhtSnapshotNextAction::NewSnapshot(_) => { + panic!("Unexpected outcome: {:?}", outcome); + } + }; + + // Capture the discovered ops, but don't actually transfer them yet. + // Let the test decide when to do that. + if !hash_list_from_other.is_empty() { + self.discovered_ops + .entry(other.agent_id.clone()) + .or_default() + .extend(hash_list_from_other); + } + if !hash_list_from_self.is_empty() { + other + .discovered_ops + .entry(self.agent_id.clone()) + .or_default() + .extend(hash_list_from_self); + } + + Ok(SyncWithOutcome::SyncedRings) + } +} + +async fn transfer_ops( + source: DynOpStore, + target: DynOpStore, + target_dht: &mut Dht, + requested_ops: Vec, +) -> K2Result<()> { + let selected = source.retrieve_ops(requested_ops).await?; + target.process_incoming_ops(selected.clone()).await?; + + let stored_ops = selected + .into_iter() + .map(|op| Kitsune2MemoryOp::try_from(op).unwrap().into()) + .collect::>(); + target_dht.inform_ops_stored(stored_ops).await?; + + Ok(()) +} diff --git a/crates/dht/src/hash.rs b/crates/dht/src/hash.rs index dd7fb9f8..3f14549e 100644 --- a/crates/dht/src/hash.rs +++ b/crates/dht/src/hash.rs @@ -60,8 +60,12 @@ //! served by a node is not the only storage it may require. It is just a lower bound on the //! free space that a node will need to have available. +use crate::arc_set::ArcSet; +use crate::combine::combine_hashes; use crate::PartitionedTime; -use kitsune2_api::{DhtArc, DynOpStore, K2Result, StoredOp, Timestamp}; +use kitsune2_api::{ + DhtArc, DynOpStore, K2Error, K2Result, StoredOp, Timestamp, +}; use std::collections::HashMap; /// A partitioned hash structure. @@ -79,6 +83,9 @@ pub struct PartitionedHashes { partitioned_hashes: Vec, } +pub(crate) type PartialTimeSliceDetails = + HashMap>; + impl PartitionedHashes { /// Create a new partitioned hash structure. /// @@ -190,6 +197,244 @@ impl PartitionedHashes { Ok(()) } + + /// For a given sector index, return the DHT arc that the sector is responsible for. + /// + /// This is actually stored on the [PartitionedTime] structure, so this function must find the + /// relevant [PartitionedTime] structure and then return the arc constraint from that. + pub(crate) fn dht_arc_for_sector_index( + &self, + sector_index: u32, + ) -> K2Result { + let sector_index = sector_index as usize; + if sector_index >= self.partitioned_hashes.len() { + return Err(K2Error::other("Sector index out of bounds")); + } + + Ok(*self.partitioned_hashes[sector_index].arc_constraint()) + } + + /// Get the time bounds for a full slice index. + /// + /// This is actually stored on the [PartitionedTime] structure, so this function must find the + /// relevant [PartitionedTime] structure and then return the time bounds from that. + pub(crate) fn time_bounds_for_full_slice_index( + &self, + slice_index: u64, + ) -> K2Result<(Timestamp, Timestamp)> { + self.partitioned_hashes[0].time_bounds_for_full_slice_index(slice_index) + } + + /// Get the time bounds for a partial slice index. + /// + /// This is actually stored on the [PartitionedTime] structure, so this function must find the + /// relevant [PartitionedTime] structure and then return the time bounds from that. + pub(crate) fn time_bounds_for_partial_slice_index( + &self, + slice_index: u32, + ) -> K2Result<(Timestamp, Timestamp)> { + self.partitioned_hashes[0] + .time_bounds_for_partial_slice_index(slice_index) + } +} + +// Query implementation +impl PartitionedHashes { + /// Compute the disc top hash for the given arc set. + /// + /// Considering the hash space as a circle, with time represented outwards from the center in + /// each sector. This function requests the top hash of each sector, over full time slices, and + /// then combines them into a single hash. It works around the circle from 0 and skips any + /// sectors that are not included in the arc set. + /// + /// If there are no sectors included in the arc set, then an empty hash is returned. + /// + /// Along with the disc top hash, the end timestamp of the last full time slice is returned. + /// This should be used when comparing disc top hash of one DHT model with that of another node + /// to ensure that both nodes are using a common reference point. + pub(crate) async fn disc_top_hash( + &self, + arc_set: &ArcSet, + store: DynOpStore, + ) -> K2Result<(bytes::Bytes, Timestamp)> { + let mut combined = bytes::BytesMut::new(); + for (sector_index, sector) in self.partitioned_hashes.iter().enumerate() + { + if !arc_set.includes_sector_index(sector_index as u32) { + continue; + } + + let hash = sector.full_time_slice_top_hash(store.clone()).await?; + if !hash.is_empty() { + combine_hashes(&mut combined, hash); + } + } + + let timestamp = self.partitioned_hashes[0].full_slice_end_timestamp(); + + Ok((combined.freeze(), timestamp)) + } + + /// Computes a top hash over the sector hashes for each partial time slice. + /// + /// Retrieves the partial slice combined hashes for each sector in the arc set. It then combines + /// the hashes for each partial time slice, working around the circle from 0. + /// + /// Note that this function does not return a disc boundary. This means it MUST be used with + /// [PartitionedHashes::disc_top_hash] to ensure that the result from this function can be + /// compared. + pub(crate) fn ring_top_hashes( + &self, + arc_set: &ArcSet, + ) -> Vec { + let mut partials = Vec::with_capacity(arc_set.covered_sector_count()); + + for (sector_index, sector) in self.partitioned_hashes.iter().enumerate() + { + if !arc_set.includes_sector_index(sector_index as u32) { + continue; + } + + partials.push(sector.partial_slice_combined_hashes().peekable()); + } + + let mut out = Vec::new(); + let mut combined = bytes::BytesMut::new(); + while partials[0].peek().is_some() { + combined.clear(); + for partial in &mut partials { + if let Some(hash) = partial.next() { + if !hash.is_empty() { + combine_hashes(&mut combined, hash); + } + } + } + out.push(combined.clone().freeze()); + } + + out + } + + /// Compute the disc sector hashes for the given arc set. + /// + /// This function does a similar job to [PartitionedHashes::disc_top_hash] but, it does not + /// combine the sector hashes. Instead, any sector that has a non-empty hash is returned in the + /// hash set. + /// + /// Along with the sector hashes, the end timestamp of the last full time slice is returned. + /// This should be used when comparing sector hashes of one DHT model with that of another node + /// to ensure that both nodes are using a common reference point. + pub(crate) async fn disc_sector_hashes( + &self, + arc_set: &ArcSet, + store: DynOpStore, + ) -> K2Result<(HashMap, Timestamp)> { + let mut out = HashMap::new(); + for (sector_index, sector) in self.partitioned_hashes.iter().enumerate() + { + if !arc_set.includes_sector_index(sector_index as u32) { + continue; + } + + let hash = sector.full_time_slice_top_hash(store.clone()).await?; + if !hash.is_empty() { + out.insert(sector_index as u32, hash); + } + } + + let timestamp = self.partitioned_hashes[0].full_slice_end_timestamp(); + + Ok((out, timestamp)) + } + + /// Compute the disc sector details for the given arc set. + /// + /// Does a similar job to [PartitionedHashes::disc_sector_hashes] but, it returns the full time + /// slice combined hashes for each sector that is both in the arc set and in the + /// `sector_indices` input. + /// + /// Along with the sector detail hashes, the end timestamp of the last full time slice is + /// returned. This should be used when comparing sector details hashes of one DHT model with + /// that of another node to ensure that both nodes are using a common reference point. + pub(crate) async fn disc_sector_sector_details( + &self, + sector_indices: Vec, + arc_set: &ArcSet, + store: DynOpStore, + ) -> K2Result<(HashMap>, Timestamp)> { + let sectors_indices = sector_indices + .into_iter() + .collect::>(); + + let mut out = HashMap::new(); + + for (sector_index, sector) in self.partitioned_hashes.iter().enumerate() + { + if !arc_set.includes_sector_index(sector_index as u32) + || !sectors_indices.contains(&(sector_index as u32)) + { + continue; + } + + out.insert( + sector_index as u32, + sector + .full_time_slice_hashes(store.clone()) + .await? + .into_iter() + .collect(), + ); + } + + let timestamp = self.partitioned_hashes[0].full_slice_end_timestamp(); + + Ok((out, timestamp)) + } + + /// Compute the ring details for the given arc set. + /// + /// Does a similar job to [PartitionedHashes::ring_top_hashes] but, it returns the partial time + /// slice combined hashes for each sector that is both in the arc set and in the `ring_indices`. + /// + /// Along with the ring details hashes, the end timestamp of the last full time slice is + /// returned. This should be used when comparing ring details hashes of one DHT model with + /// that of another node to ensure that both nodes are using a common reference point. + pub(crate) fn ring_details( + &self, + ring_indices: Vec, + arc_set: &ArcSet, + ) -> K2Result<(PartialTimeSliceDetails, Timestamp)> { + let mut out = HashMap::new(); + + for (sector_index, sector) in self.partitioned_hashes.iter().enumerate() + { + if !arc_set.includes_sector_index(sector_index as u32) { + continue; + } + + for ring_index in &ring_indices { + let hash = sector.partial_slice_hash(*ring_index)?; + + // Important to capture that the ring didn't match even if the hash is empty and + // therefore we won't communicate this sector. + let entry = out.entry(*ring_index).or_insert_with(HashMap::new); + if !hash.is_empty() { + entry.insert(sector_index as u32, hash); + } + } + } + + let timestamp = self.partitioned_hashes[0].full_slice_end_timestamp(); + + Ok((out, timestamp)) + } +} + +#[cfg(test)] +impl PartitionedHashes { + pub fn full_slice_end_timestamp(&self) -> Timestamp { + self.partitioned_hashes[0].full_slice_end_timestamp() + } } #[cfg(test)] @@ -296,7 +541,7 @@ mod tests { .slice_hash_count(DhtArc::Arc(ph.size, 2 * ph.size - 1)) .await .unwrap(); - // Note that this is because we've stored at id 1, not that two hashes ended up in this + // Note that this is because we've stored at index 1, not that two hashes ended up in this // partition. assert_eq!(2, count); diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 9c9eb9b0..b0e37db5 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -1,8 +1,16 @@ -pub mod constant; -pub use constant::*; +#![deny(missing_docs)] +//! A distributed hash table (DHT) implementation for use in Kitsune2. +pub mod arc_set; +pub mod constant; +pub mod dht; pub mod hash; -pub use hash::*; - pub mod time; + +mod combine; + +pub use arc_set::*; +pub use constant::*; +pub use dht::*; +pub use hash::*; pub use time::*; diff --git a/crates/dht/src/time.rs b/crates/dht/src/time.rs index 352e420b..09561129 100644 --- a/crates/dht/src/time.rs +++ b/crates/dht/src/time.rs @@ -53,13 +53,14 @@ //! comparing time slices with another peer. A higher factor allocates more recent time and fewer //! slices to be stored but is less granular when comparing time slices with another peer. +use crate::combine; use crate::constant::UNIT_TIME; use kitsune2_api::{ - DhtArc, DynOpStore, K2Error, K2Result, OpId, StoredOp, Timestamp, - UNIX_TIMESTAMP, + DhtArc, DynOpStore, K2Error, K2Result, StoredOp, Timestamp, UNIX_TIMESTAMP, }; use std::time::Duration; +/// The partitioned time structure. #[derive(Debug)] #[cfg_attr(test, derive(Clone, PartialEq))] pub struct PartitionedTime { @@ -101,6 +102,10 @@ pub struct PartitionedTime { arc_constraint: DhtArc, } +/// A slice of recent time that has a combined hash of all the ops in that time slice. +/// +/// This is used to represent a slice of recent time that is not yet ready to be stored as a full +/// time slice. #[derive(Debug)] #[cfg_attr(test, derive(Clone, PartialEq))] pub struct PartialSlice { @@ -239,22 +244,25 @@ impl PartitionedTime { // here. tracing::info!("Historical update detected. Seeing many of these places load on our system, but it is expected if we've been offline or a network partition has been resolved."); - let slice_id = op.timestamp.as_micros() + let slice_index = op.timestamp.as_micros() / (self.full_slice_duration.as_micros() as i64); let current_hash = store - .retrieve_slice_hash(self.arc_constraint, slice_id as u64) + .retrieve_slice_hash( + self.arc_constraint, + slice_index as u64, + ) .await?; match current_hash { Some(hash) => { let mut hash = bytes::BytesMut::from(hash); // Combine the stored hash with the new op hash - combine_hashes(&mut hash, op.op_id.0 .0); + combine::combine_hashes(&mut hash, op.op_id.0 .0); // and store the new value store .store_slice_hash( self.arc_constraint, - slice_id as u64, + slice_index as u64, hash.freeze(), ) .await?; @@ -264,7 +272,7 @@ impl PartitionedTime { store .store_slice_hash( self.arc_constraint, - slice_id as u64, + slice_index as u64, op.op_id.0 .0, ) .await?; @@ -297,7 +305,10 @@ impl PartitionedTime { // bound of the partials to find out which partial slice this op belongs to. for partial in self.partial_slices.iter_mut().rev() { if op.timestamp >= partial.start { - combine_hashes(&mut partial.hash, op.op_id.0 .0); + combine::combine_hashes( + &mut partial.hash, + op.op_id.0 .0, + ); // Belongs in exactly one partial, stop after finding the right one. break; @@ -308,6 +319,108 @@ impl PartitionedTime { Ok(()) } + + pub(crate) fn time_bounds_for_full_slice_index( + &self, + slice_index: u64, + ) -> K2Result<(Timestamp, Timestamp)> { + if slice_index > self.full_slices { + return Err(K2Error::other( + "Requested slice index is beyond the current full slices", + )); + } + + let start = UNIX_TIMESTAMP + + Duration::from_secs( + slice_index * self.full_slice_duration.as_secs(), + ); + let end = start + self.full_slice_duration; + + Ok((start, end)) + } + + pub(crate) fn time_bounds_for_partial_slice_index( + &self, + slice_index: u32, + ) -> K2Result<(Timestamp, Timestamp)> { + let slice_index = slice_index as usize; + if slice_index > self.partial_slices.len() { + return Err(K2Error::other( + "Requested slice index is beyond the current partial slices", + )); + } + + let partial = &self.partial_slices[slice_index]; + Ok((partial.start, partial.end())) + } +} + +// Public query methods +impl PartitionedTime { + /// Compute a top hash over the full time slice combined hashes owned by this [PartitionedTime]. + /// + /// This method will fetch the hashes of all the full time slices within the arc constraint + /// for this [PartitionedTime]. Those are expected to be ordered by slice index, which implies + /// that they are ordered by time. It will then combine those hashes into a single hash. + pub async fn full_time_slice_top_hash( + &self, + store: DynOpStore, + ) -> K2Result { + let hashes = store.retrieve_slice_hashes(self.arc_constraint).await?; + Ok( + combine::combine_op_hashes( + hashes.into_iter().map(|(_, hash)| hash), + ) + .freeze(), + ) + } + + /// Get the combined hash of all the partial slices owned by this [PartitionedTime]. + /// + /// This method takes the current partial slices and returns their pre-computed hashes. + /// These are combined hashes over all the ops in each partial slice, ordered by time. + pub fn partial_slice_combined_hashes( + &self, + ) -> impl Iterator + use<'_> { + self.partial_slices + .iter() + .map(|partial| partial.hash.clone().freeze()) + } + + /// Gets the combined hashes of all the full time slices owned by this [PartitionedTime]. + /// + /// This is a pass-through to the provided store, using the arc constraint of this [PartitionedTime]. + pub async fn full_time_slice_hashes( + &self, + store: DynOpStore, + ) -> K2Result> { + store.retrieve_slice_hashes(self.arc_constraint).await + } + + /// Get the combined hash of a partial slice by its slice index. + /// + /// Note that the number of partial slices changes over time and the start point of the partial + /// slices moves. It is important that the slice index is only used to refer to a specific slice + /// at a specific point in time. This can be achieved by not calling [PartitionedTime::update] + /// while expecting the slice index to refer to the same slice. + /// + /// # Errors + /// + /// This method will return an error if the requested slice index is beyond the current partial + /// slices. + pub fn partial_slice_hash( + &self, + slice_index: u32, + ) -> K2Result { + let slice_index = slice_index as usize; + if slice_index > self.partial_slices.len() { + return Err(K2Error::other( + "Requested slice index is beyond the current partial slices", + )); + } + + Ok(self.partial_slices[slice_index].hash.clone().freeze()) + } } // Private methods @@ -345,6 +458,10 @@ impl PartitionedTime { UNIX_TIMESTAMP + full_slices_duration } + pub(crate) fn arc_constraint(&self) -> &DhtArc { + &self.arc_constraint + } + /// Figure out how many new full slices need to be allocated. /// /// This is done by checking how many full slices fit between the current end of the last @@ -395,7 +512,7 @@ impl PartitionedTime { ) .await?; - let hash = combine_op_hashes(op_hashes); + let hash = combine::combine_op_hashes(op_hashes); if !hash.is_empty() { store @@ -491,7 +608,7 @@ impl PartitionedTime { + Duration::from_secs( (1u64 << size) * UNIT_TIME.as_secs(), ); - combine_op_hashes( + combine::combine_op_hashes( store .retrieve_op_hashes_in_time_slice( self.arc_constraint, @@ -518,11 +635,6 @@ impl PartitionedTime { pub(crate) fn partials(&self) -> &[PartialSlice] { &self.partial_slices } - - #[cfg(test)] - pub(crate) fn arc_constraint(&self) -> &DhtArc { - &self.arc_constraint - } } /// Computes what duration is required for a series of time slices. @@ -547,48 +659,10 @@ fn residual_duration_for_factor(factor: u8) -> K2Result { Ok(Duration::from_secs(sum * UNIT_TIME.as_secs())) } -/// Combine a series of op hashes into a single hash. -/// -/// Requires that the op hashes are already ordered. -/// If the input is empty, then the output is an empty byte array. -fn combine_op_hashes(hashes: Vec) -> bytes::BytesMut { - let mut out = if let Some(first) = hashes.first() { - bytes::BytesMut::zeroed(first.0.len()) - } else { - // `Bytes::new` does not allocate, so if there was no input, then return an empty - // byte array without allocating. - return bytes::BytesMut::new(); - }; - - let iter = hashes.into_iter().map(|x| x.0 .0); - for hash in iter { - combine_hashes(&mut out, hash); - } - - out -} - -fn combine_hashes(into: &mut bytes::BytesMut, other: bytes::Bytes) { - // Properly initialise the target from the source if the target is empty. - // Otherwise, the loop below would run 0 times. - if into.is_empty() && !other.is_empty() { - into.extend_from_slice(&other); - return; - } - - if into.len() != other.len() { - tracing::debug!("Combining hashes of different lengths. This is undefined behaviour.") - } - - for (into_byte, other_byte) in into.iter_mut().zip(other.iter()) { - *into_byte ^= other_byte; - } -} - #[cfg(test)] mod tests { use super::*; - use kitsune2_api::OpStore; + use kitsune2_api::{OpId, OpStore}; use kitsune2_memory::{Kitsune2MemoryOp, Kitsune2MemoryOpStore}; use kitsune2_test_utils::enable_tracing; use std::sync::Arc; @@ -1307,7 +1381,7 @@ mod tests { let current_time = Timestamp::now(); let store = Arc::new(Kitsune2MemoryOpStore::default()); - let arc_constraint = DhtArc::Arc(0, 2); + let arc_constraint = DhtArc::FULL; let mut pt = PartitionedTime::try_from_store( factor, current_time, @@ -1343,7 +1417,7 @@ mod tests { store .process_incoming_ops(vec![Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![7; 32])), - (Timestamp::now() - pt.full_slice_duration).unwrap(), + pt.full_slice_end_timestamp(), vec![], ) .try_into() diff --git a/crates/memory/src/op_store.rs b/crates/memory/src/op_store.rs index 2a5a8486..5f077793 100644 --- a/crates/memory/src/op_store.rs +++ b/crates/memory/src/op_store.rs @@ -39,6 +39,16 @@ impl From for StoredOp { } } +impl TryFrom for Kitsune2MemoryOp { + type Error = serde_json::Error; + + fn try_from(value: MetaOp) -> serde_json::Result { + let op = serde_json::from_slice(value.op_data.as_slice())?; + + Ok(op) + } +} + impl TryFrom for MetaOp { type Error = K2Error; @@ -79,10 +89,8 @@ impl OpStore for Kitsune2MemoryOpStore { let ops_to_add = op_list .iter() .map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> { - Ok(( - op.op_id.clone(), - serde_json::from_slice(op.op_data.as_slice())?, - )) + let op = Kitsune2MemoryOp::try_from(op.clone())?; + Ok((op.op_id.clone(), op)) }) .collect::, _>>().map_err(|e| { K2Error::other_src("Failed to deserialize op data, are you using `Kitsune2MemoryOp`s?", e) @@ -117,6 +125,26 @@ impl OpStore for Kitsune2MemoryOpStore { .boxed() } + fn retrieve_ops( + &self, + op_ids: Vec, + ) -> BoxFuture<'_, K2Result>> { + async move { + let self_lock = self.read().await; + Ok(op_ids + .iter() + .filter_map(|op_id| { + self_lock.op_list.get(op_id).map(|op| MetaOp { + op_id: op.op_id.clone(), + op_data: serde_json::to_vec(op) + .expect("Failed to serialize op"), + }) + }) + .collect()) + } + .boxed() + } + /// Store the combined hash of a time slice. /// /// The `slice_id` is the index of the time slice. This is a 0-based index. So for a given @@ -184,4 +212,16 @@ impl OpStore for Kitsune2MemoryOpStore { } .boxed() } + + /// Retrieve the hashes of all time slices. + fn retrieve_slice_hashes( + &self, + arc: DhtArc, + ) -> BoxFuture<'_, K2Result>> { + async move { + let self_lock = self.read().await; + Ok(self_lock.time_slice_hashes.get_all(&arc)) + } + .boxed() + } } diff --git a/crates/memory/src/op_store/time_slice_hash_store.rs b/crates/memory/src/op_store/time_slice_hash_store.rs index 36cdc4aa..f8d966c4 100644 --- a/crates/memory/src/op_store/time_slice_hash_store.rs +++ b/crates/memory/src/op_store/time_slice_hash_store.rs @@ -47,7 +47,19 @@ impl TimeSliceHashStore { .cloned() } - pub fn highest_stored_id(&self, arc: &DhtArc) -> Option { + pub(super) fn get_all(&self, arc: &DhtArc) -> Vec<(u64, bytes::Bytes)> { + self.inner + .get(arc) + .map(|by_arc| { + by_arc + .iter() + .map(|(id, hash)| (*id, hash.clone())) + .collect() + }) + .unwrap_or_default() + } + + pub(super) fn highest_stored_id(&self, arc: &DhtArc) -> Option { self.inner .get(arc) .and_then(|by_arc| by_arc.iter().last().map(|(id, _)| *id))