From a897b39629c39a96973ec7f1b30b244217d77086 Mon Sep 17 00:00:00 2001 From: ThetaSinner Date: Wed, 15 Jan 2025 14:55:59 +0000 Subject: [PATCH 1/3] Refactor op store to properly handle op data vs metadata --- Cargo.lock | 1 - crates/api/src/fetch.rs | 50 ++-- crates/api/src/op_store.rs | 31 +-- crates/core/Cargo.toml | 1 - crates/core/src/factories/core_fetch.rs | 4 +- .../factories/core_fetch/message_handler.rs | 9 +- crates/core/src/factories/core_fetch/test.rs | 26 +-- .../core_fetch/test/incoming_request_queue.rs | 27 +-- crates/core/src/factories/mem_op_store.rs | 114 ++++++--- .../core/src/factories/mem_op_store/test.rs | 25 ++ crates/dht/src/dht/tests.rs | 174 +++++--------- crates/dht/src/dht/tests/harness.rs | 8 +- crates/dht/src/hash.rs | 21 +- crates/dht/src/time.rs | 216 +++++++----------- 14 files changed, 300 insertions(+), 407 deletions(-) create mode 100644 crates/core/src/factories/mem_op_store/test.rs diff --git a/Cargo.lock b/Cargo.lock index 3c3a60f..32b4b49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1031,7 +1031,6 @@ dependencies = [ "rand", "serde", "serde_json", - "sha2", "tokio", "tracing", "ureq", diff --git a/crates/api/src/fetch.rs b/crates/api/src/fetch.rs index eee9a84..6523846 100644 --- a/crates/api/src/fetch.rs +++ b/crates/api/src/fetch.rs @@ -2,7 +2,7 @@ use crate::{ builder, config, peer_store::DynPeerStore, transport::DynTransport, - AgentId, BoxFut, DynOpStore, K2Result, MetaOp, OpId, SpaceId, + AgentId, BoxFut, DynOpStore, K2Result, OpId, SpaceId, }; use bytes::{Bytes, BytesMut}; use k2_fetch_message::FetchMessageType; @@ -25,8 +25,8 @@ impl From for Vec { } } -impl From> for Response { - fn from(value: Vec) -> Self { +impl From> for Response { + fn from(value: Vec) -> Self { Self { ops: value.into_iter().map(Into::into).collect(), } @@ -57,7 +57,7 @@ pub fn serialize_request_message(value: Vec) -> Bytes { } /// Serialize list of ops to response. -pub fn serialize_response(value: Vec) -> Bytes { +pub fn serialize_response(value: Vec) -> Bytes { let mut out = BytesMut::new(); Response::from(value) .encode(&mut out) @@ -65,9 +65,9 @@ pub fn serialize_response(value: Vec) -> Bytes { out.freeze() } -/// Serialize list of ops to fetch resopnse message. -pub fn serialize_response_message(value: Vec) -> Bytes { - let mut out = bytes::BytesMut::new(); +/// Serialize list of ops to fetch response message. +pub fn serialize_response_message(value: Vec) -> Bytes { + let mut out = BytesMut::new(); let data = serialize_response(value); let fetch_message = K2FetchMessage { fetch_message_type: FetchMessageType::Response.into(), @@ -115,7 +115,7 @@ pub type DynFetchFactory = Arc; #[cfg(test)] mod test { use super::*; - use crate::{id::Id, MetaOp}; + use crate::id::Id; use prost::Message; #[test] @@ -135,24 +135,18 @@ mod test { #[test] fn happy_response_encode_decode() { - let op_1 = MetaOp { - op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")), - op_data: vec![0], - }; - let op_2 = MetaOp { - op_id: OpId::from(bytes::Bytes::from_static(b"another_op_id")), - op_data: vec![1], - }; - let op_vec = vec![op_1, op_2]; - let ops_enc = serialize_response(op_vec.clone()); - let expected_ops_data = - op_vec.into_iter().map(|op| op.op_data).collect::>(); + // Not real op payloads, any bytes will do to check the round trip encoding/decoding + // of the response type + let op_1 = bytes::Bytes::from(vec![0]); + let op_2 = bytes::Bytes::from(vec![1]); + let expected_ops_data = vec![op_1, op_2]; + let ops_enc = serialize_response(expected_ops_data.clone()); let response = Response::decode(ops_enc).unwrap(); let actual_ops_data = response .ops .into_iter() - .map(|op| op.data.to_vec()) + .map(|op| op.data) .collect::>(); assert_eq!(expected_ops_data, actual_ops_data); } @@ -179,16 +173,10 @@ mod test { #[test] fn happy_fetch_response_encode_decode() { - let op = MetaOp { - op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")), - op_data: vec![0], - }; - let ops = vec![op]; - let fetch_response = serialize_response_message(ops.clone()); - let expected_ops_data = ops - .into_iter() - .map(|op| Bytes::from(op.op_data)) - .collect::>(); + let op = bytes::Bytes::from(vec![0]); + let expected_ops_data = vec![op]; + let fetch_response = + serialize_response_message(expected_ops_data.clone()); let fetch_message_dec = K2FetchMessage::decode(fetch_response).unwrap(); assert_eq!( diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index 245da47..20cec6e 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -3,7 +3,6 @@ use crate::{ builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp, }; -use bytes::Bytes; use futures::future::BoxFuture; #[cfg(feature = "mockall")] use mockall::automock; @@ -19,22 +18,14 @@ pub struct MetaOp { pub op_id: OpId, /// The actual op data. - pub op_data: Vec, + pub op_data: bytes::Bytes, } include!("../proto/gen/kitsune2.op_store.rs"); -impl From for Op { - fn from(value: MetaOp) -> Self { - Self { - data: value.op_data.into(), - } - } -} - -impl From for Bytes { - fn from(value: MetaOp) -> Self { - value.op_data.into() +impl From for Op { + fn from(value: bytes::Bytes) -> Self { + Self { data: value } } } @@ -54,12 +45,12 @@ pub struct StoredOp { /// /// Note that this means any op implementation must include a consistent timestamp in the op /// data so that it can be provided back to Kitsune. - pub timestamp: Timestamp, + pub created_at: Timestamp, } impl Ord for StoredOp { fn cmp(&self, other: &Self) -> Ordering { - (&self.timestamp, &self.op_id).cmp(&(&other.timestamp, &other.op_id)) + (&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id)) } } @@ -78,7 +69,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { /// if it is able to process them. fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>>; /// Retrieve a batch of ops from the host by time range. @@ -151,18 +142,12 @@ pub type DynOpStoreFactory = Arc; #[cfg(test)] mod test { - use crate::MetaOp; - use super::*; use prost::Message; #[test] fn happy_meta_op_encode_decode() { - let meta_op = MetaOp { - op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")), - op_data: vec![1; 128], - }; - let op = Op::from(meta_op); + let op = Op::from(bytes::Bytes::from(vec![1; 128])); let op_enc = op.encode_to_vec(); let op_dec = Op::decode(op_enc.as_slice()).unwrap(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index baa8c4a..b994a30 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -34,5 +34,4 @@ ed25519-dalek = { workspace = true, features = ["rand_core"] } kitsune2_api = { workspace = true, features = ["mockall"] } kitsune2_test_utils = { workspace = true } rand = { workspace = true } -sha2 = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/crates/core/src/factories/core_fetch.rs b/crates/core/src/factories/core_fetch.rs index e24a5d9..cef5480 100644 --- a/crates/core/src/factories/core_fetch.rs +++ b/crates/core/src/factories/core_fetch.rs @@ -424,7 +424,9 @@ impl CoreFetch { tracing::error!("could not read ops from store: {err}"); continue; } - Ok(ops) => ops, + Ok(ops) => { + ops.into_iter().map(|op| op.op_data).collect::>() + } }; if ops.is_empty() { diff --git a/crates/core/src/factories/core_fetch/message_handler.rs b/crates/core/src/factories/core_fetch/message_handler.rs index 62596a4..342abcf 100644 --- a/crates/core/src/factories/core_fetch/message_handler.rs +++ b/crates/core/src/factories/core_fetch/message_handler.rs @@ -188,12 +188,9 @@ mod test { let peer = Url::from_str("wss://127.0.0.1:1").unwrap(); let op = make_op(vec![0]); - let received_ops = vec![op]; - let request_message = serialize_response_message(received_ops.clone()); - let expected_ops_data = received_ops - .into_iter() - .map(|op| Bytes::from(op.op_data)) - .collect::>(); + let expected_ops_data = vec![op.into()]; + let request_message = + serialize_response_message(expected_ops_data.clone()); let task_handle = tokio::task::spawn(async move { let ops = incoming_response_rx diff --git a/crates/core/src/factories/core_fetch/test.rs b/crates/core/src/factories/core_fetch/test.rs index 995e0a6..eaf49df 100644 --- a/crates/core/src/factories/core_fetch/test.rs +++ b/crates/core/src/factories/core_fetch/test.rs @@ -3,9 +3,9 @@ mod outgoing_request_queue; #[cfg(test)] pub(crate) mod utils { - use crate::factories::Kitsune2MemoryOp; + use crate::factories::MemoryOp; use bytes::Bytes; - use kitsune2_api::{id::Id, AgentId, MetaOp, OpId, Timestamp}; + use kitsune2_api::{id::Id, AgentId, OpId, Timestamp}; use rand::Rng; pub fn random_id() -> Id { @@ -33,25 +33,7 @@ pub(crate) mod utils { ops } - pub fn hash_op(input: &bytes::Bytes) -> OpId { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(input); - let result = hasher.finalize(); - let hash_bytes = bytes::Bytes::from(result.to_vec()); - hash_bytes.into() - } - - pub fn make_op(data: Vec) -> MetaOp { - let op_id = hash_op(&data.clone().into()); - MetaOp { - op_id: op_id.clone(), - op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( - op_id, - Timestamp::now(), - data, - )) - .unwrap(), - } + pub fn make_op(data: Vec) -> MemoryOp { + MemoryOp::new(Timestamp::now(), data) } } diff --git a/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs b/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs index fc291f1..3ab31d5 100644 --- a/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs +++ b/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs @@ -2,11 +2,8 @@ use super::utils::random_op_id; use crate::{ default_test_builder, factories::{ - core_fetch::{ - test::utils::{hash_op, make_op}, - CoreFetch, CoreFetchConfig, - }, - Kitsune2MemoryOp, MemOpStoreFactory, + core_fetch::{test::utils::make_op, CoreFetch, CoreFetchConfig}, + MemOpStoreFactory, MemoryOp, }, }; use bytes::Bytes; @@ -98,10 +95,12 @@ async fn respond_to_multiple_requests() { mock_transport.clone(), ); - let requested_op_ids_1 = - serialize_request_message(vec![op_1.op_id.clone(), op_2.op_id.clone()]); + let requested_op_ids_1 = serialize_request_message(vec![ + op_1.compute_op_id(), + op_2.compute_op_id(), + ]); let requested_op_ids_2 = - serialize_request_message(vec![op_3.op_id.clone(), random_op_id()]); + serialize_request_message(vec![op_3.compute_op_id(), random_op_id()]); fetch .message_handler .recv_module_msg( @@ -218,13 +217,11 @@ async fn fail_to_respond_once_then_succeed() { .ops .into_iter() .map(|op| { - let op_data = - serde_json::from_slice::(&op.data) - .unwrap(); - let op_id = hash_op(&bytes::Bytes::from(op_data.payload)); + let memory_op = + serde_json::from_slice::(&op.data).unwrap(); MetaOp { - op_id, - op_data: op.data.into(), + op_id: memory_op.compute_op_id(), + op_data: op.data, } }) .collect::>(); @@ -251,7 +248,7 @@ async fn fail_to_respond_once_then_succeed() { ); // Handle op request. - let data = serialize_request_message(vec![op.op_id]); + let data = serialize_request_message(vec![op.compute_op_id()]); fetch .message_handler .recv_module_msg( diff --git a/crates/core/src/factories/mem_op_store.rs b/crates/core/src/factories/mem_op_store.rs index 3f268a0..63dab85 100644 --- a/crates/core/src/factories/mem_op_store.rs +++ b/crates/core/src/factories/mem_op_store.rs @@ -1,7 +1,6 @@ //! The mem op store implementation provided by Kitsune2. use crate::factories::mem_op_store::time_slice_hash_store::TimeSliceHashStore; -use bytes::Bytes; use futures::future::BoxFuture; use kitsune2_api::builder::Builder; use kitsune2_api::config::Config; @@ -16,6 +15,9 @@ use tokio::sync::RwLock; mod time_slice_hash_store; +#[cfg(test)] +mod test; + /// The mem op store implementation provided by Kitsune2. #[derive(Debug)] pub struct MemOpStoreFactory {} @@ -48,56 +50,91 @@ impl OpStoreFactory for MemOpStoreFactory { /// This is a stub implementation of an op that will be serialized /// via serde_json (with inefficient encoding of the payload) to be /// used for testing purposes. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Kitsune2MemoryOp { - /// The id (hash) of the op - pub op_id: OpId, +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MemoryOp { /// The creation timestamp of this op - pub timestamp: Timestamp, - /// The payload of the op - pub payload: Vec, + pub created_at: Timestamp, + /// The data for the op + pub op_data: Vec, } -impl Kitsune2MemoryOp { - /// Create a new [Kitsune2MemoryOp] - pub fn new(op_id: OpId, timestamp: Timestamp, payload: Vec) -> Self { +impl MemoryOp { + /// Create a new [MemoryOp]. + pub fn new(timestamp: Timestamp, payload: Vec) -> Self { Self { - op_id, - timestamp, - payload, + created_at: timestamp, + op_data: payload, } } -} -impl From for StoredOp { - fn from(value: Kitsune2MemoryOp) -> Self { - StoredOp { - op_id: value.op_id, - timestamp: value.timestamp, - } + /// Compute the op id for this op. + /// + /// Note that this produces predictable op ids for testing purposes. + /// It is simply the first 32 bytes of the op data. + pub fn compute_op_id(&self) -> OpId { + let mut value = + self.op_data.as_slice()[..32.min(self.op_data.len())].to_vec(); + value.resize(32, 0); + OpId::from(bytes::Bytes::from(value)) } } -impl From for Kitsune2MemoryOp { - fn from(value: Bytes) -> Self { +impl From for MemoryOp { + fn from(value: bytes::Bytes) -> Self { serde_json::from_slice(&value) - .expect("failed to deserialize Kitsune2MemoryOp from Op") + .expect("failed to deserialize MemoryOp from bytes") } } -impl From for Bytes { - fn from(value: Kitsune2MemoryOp) -> Self { +impl From for bytes::Bytes { + fn from(value: MemoryOp) -> Self { serde_json::to_vec(&value) - .expect("failed to serialize Op to Kitsune2MemoryOp") + .expect("failed to serialize MemoryOp to bytes") .into() } } -impl TryFrom for Kitsune2MemoryOp { +/// This is the storage record for an op with computed fields. +/// +/// Test data should create [MemoryOp]s and not be aware of this type. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MemoryOpRecord { + /// The id (hash) of the op + pub op_id: OpId, + /// The creation timestamp of this op + pub created_at: Timestamp, + /// The timestamp at which this op was stored by us + pub stored_at: Timestamp, + /// The data for the op + pub op_data: Vec, +} + +impl From for MemoryOpRecord { + fn from(value: bytes::Bytes) -> Self { + let inner: MemoryOp = value.into(); + Self { + op_id: inner.compute_op_id(), + created_at: inner.created_at, + stored_at: Timestamp::now(), + op_data: inner.op_data, + } + } +} + +impl From for StoredOp { + fn from(value: MemoryOp) -> Self { + StoredOp { + op_id: value.compute_op_id(), + created_at: value.created_at, + } + } +} + +impl TryFrom for MemoryOp { type Error = serde_json::Error; fn try_from(value: Op) -> Result { - serde_json::from_slice(&value.data) + Ok(value.data.into()) } } @@ -126,20 +163,20 @@ impl std::ops::Deref for Kitsune2MemoryOpStore { #[derive(Debug, Default)] struct Kitsune2MemoryOpStoreInner { - op_list: HashMap, + op_list: HashMap, time_slice_hashes: TimeSliceHashStore, } impl OpStore for Kitsune2MemoryOpStore { fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>> { Box::pin(async move { let ops_to_add = op_list .iter() - .map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> { - let op = Kitsune2MemoryOp::from(op.clone()); + .map(|op| -> serde_json::Result<(OpId, MemoryOpRecord)> { + let op = MemoryOpRecord::from(op.clone()); Ok((op.op_id.clone(), op)) }) .collect::, _>>().map_err(|e| { @@ -164,8 +201,8 @@ impl OpStore for Kitsune2MemoryOpStore { .iter() .filter(|(_, op)| { let loc = op.op_id.loc(); - op.timestamp >= start - && op.timestamp < end + op.created_at >= start + && op.created_at < end && arc.contains(loc) }) .map(|(op_id, _)| op_id.clone()) @@ -184,8 +221,11 @@ impl OpStore for Kitsune2MemoryOpStore { .filter_map(|op_id| { self_lock.op_list.get(op_id).map(|op| MetaOp { op_id: op.op_id.clone(), - op_data: serde_json::to_vec(op) - .expect("Failed to serialize op"), + op_data: MemoryOp { + created_at: op.created_at, + op_data: op.op_data.clone(), + } + .into(), }) }) .collect()) diff --git a/crates/core/src/factories/mem_op_store/test.rs b/crates/core/src/factories/mem_op_store/test.rs new file mode 100644 index 0000000..14a85ff --- /dev/null +++ b/crates/core/src/factories/mem_op_store/test.rs @@ -0,0 +1,25 @@ +use crate::factories::mem_op_store::Kitsune2MemoryOpStore; +use crate::factories::MemoryOp; +use kitsune2_api::{DynOpStore, SpaceId, Timestamp}; +use std::sync::Arc; + +#[tokio::test] +async fn process_and_retrieve_op() { + let op_store: DynOpStore = Arc::new(Kitsune2MemoryOpStore::new( + SpaceId::from(bytes::Bytes::from_static(b"test")), + )); + + let op = MemoryOp::new(Timestamp::now(), vec![1, 2, 3]); + let op_id = op.compute_op_id(); + op_store + .process_incoming_ops(vec![op.clone().into()]) + .await + .unwrap(); + + let retrieved = op_store.retrieve_ops(vec![op_id.clone()]).await.unwrap(); + assert_eq!(retrieved.len(), 1); + assert_eq!(retrieved[0].op_id, op_id); + + let out: MemoryOp = serde_json::from_slice(&retrieved[0].op_data).unwrap(); + assert_eq!(op, out); +} diff --git a/crates/dht/src/dht/tests.rs b/crates/dht/src/dht/tests.rs index e06cc01..29e9a67 100644 --- a/crates/dht/src/dht/tests.rs +++ b/crates/dht/src/dht/tests.rs @@ -2,8 +2,8 @@ use super::*; use crate::dht::tests::harness::SyncWithOutcome; use crate::test::test_store; use harness::DhtSyncHarness; -use kitsune2_api::{DhtArc, OpId, UNIX_TIMESTAMP}; -use kitsune2_core::factories::Kitsune2MemoryOp; +use kitsune2_api::{DhtArc, UNIX_TIMESTAMP}; +use kitsune2_core::factories::MemoryOp; use std::time::Duration; mod harness; @@ -21,12 +21,9 @@ async fn from_store_empty() { async fn take_minimal_snapshot() { let store = test_store().await; store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into()]) + .process_incoming_ops(vec![ + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into() + ]) .await .unwrap(); @@ -107,14 +104,9 @@ async fn one_way_disc_sync_from_initiator() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put historical data in the first DHT - let op_id = OpId::from(bytes::Bytes::from(vec![41; 32])); - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - op_id.clone(), - UNIX_TIMESTAMP, - vec![], - )]) - .await - .unwrap(); + let op = MemoryOp::new(UNIX_TIMESTAMP, vec![41; 32]); + let op_id = op.compute_op_id(); + dht1.inject_ops(vec![op]).await.unwrap(); // Try a sync let outcome = dht1.sync_with(&mut dht2).await.unwrap(); @@ -144,13 +136,9 @@ async fn one_way_disc_sync_from_acceptor() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put historical data in the second DHT - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![41; 32])), - UNIX_TIMESTAMP, - vec![], - )]) - .await - .unwrap(); + dht2.inject_ops(vec![MemoryOp::new(UNIX_TIMESTAMP, vec![41; 32])]) + .await + .unwrap(); // Try a sync initiated by the first agent let outcome = dht1.sync_with(&mut dht2).await.unwrap(); @@ -177,18 +165,13 @@ async fn two_way_disc_sync() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put historical data in both DHTs - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - )]) - .await - .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![43; 32])), + dht1.inject_ops(vec![MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32])]) + .await + .unwrap(); + dht2.inject_ops(vec![MemoryOp::new( // Two weeks later UNIX_TIMESTAMP + Duration::from_secs(14 * 24 * 60 * 60), - vec![], + vec![43; 32], )]) .await .unwrap(); @@ -217,10 +200,9 @@ async fn one_way_ring_sync_from_initiator() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put recent data in the first ring of the first DHT - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![41; 32])), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![41; 32], )]) .await .unwrap(); @@ -250,10 +232,9 @@ async fn one_way_ring_sync_from_acceptor() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put recent data in the first ring of the second DHT - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![41; 32])), + dht2.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![41; 32], )]) .await .unwrap(); @@ -283,18 +264,16 @@ async fn two_way_ring_sync() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put recent data in the first ring of both DHTs - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![7; 32], )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![43; 32])), + dht2.inject_ops(vec![MemoryOp::new( // Two weeks later dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![43; 32], )]) .await .unwrap(); @@ -324,35 +303,26 @@ async fn ring_sync_with_matching_disc() { // Put historical data in both DHTs let historical_ops = vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 4])), - UNIX_TIMESTAMP, - vec![], - ), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (u32::MAX / 2).to_le_bytes().to_vec(), - )), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 4]), + MemoryOp::new( UNIX_TIMESTAMP + Duration::from_secs(14 * 24 * 60 * 60), - vec![], + (u32::MAX / 2).to_le_bytes().to_vec(), ), ]; dht1.inject_ops(historical_ops.clone()).await.unwrap(); dht2.inject_ops(historical_ops).await.unwrap(); // Put recent data in the first ring of both DHTs - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 4])), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![7; 4], )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![13; 4])), + dht2.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![13; 4], )]) .await .unwrap(); @@ -381,33 +351,23 @@ async fn two_stage_sync_with_symmetry() { let mut dht2 = DhtSyncHarness::new(current_time, DhtArc::FULL).await; // Put mismatched historical data in both DHTs - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - )]) - .await - .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![13; 32])), - UNIX_TIMESTAMP, - vec![], - )]) - .await - .unwrap(); + dht1.inject_ops(vec![MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32])]) + .await + .unwrap(); + dht2.inject_ops(vec![MemoryOp::new(UNIX_TIMESTAMP, vec![13; 32])]) + .await + .unwrap(); // Put mismatched recent data in the first ring of both DHTs - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![11; 32])), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![11; 32], )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![17; 32])), + dht2.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + vec![17; 32], )]) .await .unwrap(); @@ -469,19 +429,15 @@ async fn disc_sync_respects_arc() { .await; // Put mismatched historical data in both DHTs, but in sectors that don't overlap - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(SECTOR_SIZE.to_le_bytes().to_vec())), + dht1.inject_ops(vec![MemoryOp::new( UNIX_TIMESTAMP, - vec![], + SECTOR_SIZE.to_le_bytes().to_vec(), )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (3 * SECTOR_SIZE).to_le_bytes().to_vec(), - )), + dht2.inject_ops(vec![MemoryOp::new( UNIX_TIMESTAMP, - vec![], + (3 * SECTOR_SIZE).to_le_bytes().to_vec(), )]) .await .unwrap(); @@ -490,21 +446,15 @@ async fn disc_sync_respects_arc() { assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); // Now put mismatched data in the common sector - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (2 * SECTOR_SIZE).to_le_bytes().to_vec(), - )), + dht1.inject_ops(vec![MemoryOp::new( UNIX_TIMESTAMP, - vec![], + (2 * SECTOR_SIZE).to_le_bytes().to_vec(), )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), - )), + dht2.inject_ops(vec![MemoryOp::new( UNIX_TIMESTAMP, - vec![], + (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), )]) .await .unwrap(); @@ -533,19 +483,15 @@ async fn ring_sync_respects_arc() { .await; // Put mismatched recent data in both DHTs, but in sectors that don't overlap - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(SECTOR_SIZE.to_le_bytes().to_vec())), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + SECTOR_SIZE.to_le_bytes().to_vec(), )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (3 * SECTOR_SIZE).to_le_bytes().to_vec(), - )), + dht2.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + (3 * SECTOR_SIZE).to_le_bytes().to_vec(), )]) .await .unwrap(); @@ -554,21 +500,15 @@ async fn ring_sync_respects_arc() { assert!(dht1.is_in_sync_with(&dht2).await.unwrap()); // Now put mismatched data in the common sector - dht1.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (2 * SECTOR_SIZE).to_le_bytes().to_vec(), - )), + dht1.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + (2 * SECTOR_SIZE).to_le_bytes().to_vec(), )]) .await .unwrap(); - dht2.inject_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from( - (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), - )), + dht2.inject_ops(vec![MemoryOp::new( dht1.dht.partition.full_slice_end_timestamp(), - vec![], + (2 * SECTOR_SIZE + 1).to_le_bytes().to_vec(), )]) .await .unwrap(); diff --git a/crates/dht/src/dht/tests/harness.rs b/crates/dht/src/dht/tests/harness.rs index 9a97570..c97784a 100644 --- a/crates/dht/src/dht/tests/harness.rs +++ b/crates/dht/src/dht/tests/harness.rs @@ -5,7 +5,7 @@ use crate::{Dht, DhtSnapshotNextAction}; use kitsune2_api::{ AgentId, DhtArc, DynOpStore, K2Result, OpId, StoredOp, Timestamp, }; -use kitsune2_core::factories::Kitsune2MemoryOp; +use kitsune2_core::factories::MemoryOp; use rand::RngCore; use std::collections::HashMap; @@ -49,7 +49,7 @@ impl DhtSyncHarness { pub(crate) async fn inject_ops( &mut self, - op_list: Vec, + op_list: Vec, ) -> K2Result<()> { self.store .process_incoming_ops( @@ -375,13 +375,13 @@ async fn transfer_ops( .retrieve_ops(requested_ops) .await? .into_iter() - .map(Into::into) + .map(|op| op.op_data) .collect::>(); target.process_incoming_ops(selected.clone()).await?; let stored_ops = selected .into_iter() - .map(|op| Kitsune2MemoryOp::from(op).into()) + .map(|op| MemoryOp::from(op).into()) .collect::>(); target_dht.inform_ops_stored(stored_ops).await?; diff --git a/crates/dht/src/hash.rs b/crates/dht/src/hash.rs index 48f51cb..92defc3 100644 --- a/crates/dht/src/hash.rs +++ b/crates/dht/src/hash.rs @@ -444,7 +444,7 @@ mod tests { use crate::test::test_store; use crate::UNIT_TIME; use kitsune2_api::{OpId, UNIX_TIMESTAMP}; - use kitsune2_core::factories::Kitsune2MemoryOp; + use kitsune2_core::factories::MemoryOp; use kitsune2_test_utils::enable_tracing; use std::time::Duration; @@ -510,11 +510,11 @@ mod tests { vec![ StoredOp { op_id: OpId::from(op_id_bytes_1.clone()), - timestamp: UNIX_TIMESTAMP, + created_at: UNIX_TIMESTAMP, }, StoredOp { op_id: OpId::from(op_id_bytes_2.clone()), - timestamp: UNIX_TIMESTAMP + created_at: UNIX_TIMESTAMP + ph.sectors[0].full_slice_duration(), }, ], @@ -569,12 +569,12 @@ mod tests { // Stored in the first time slice of the first space partition. StoredOp { op_id: OpId::from(op_id_bytes_1.clone()), - timestamp: ph.sectors[0].full_slice_end_timestamp(), + created_at: ph.sectors[0].full_slice_end_timestamp(), }, // Stored in the second time slice of the first space partition. StoredOp { op_id: OpId::from(op_id_bytes_2.clone()), - timestamp: ph.sectors[0].full_slice_end_timestamp() + created_at: ph.sectors[0].full_slice_end_timestamp() + Duration::from_secs((1 << 13) * UNIT_TIME.as_secs()), }, ], @@ -634,18 +634,15 @@ mod tests { assert_eq!(512, ph.sectors.len()); for h in ph.sectors.iter() { - let (start, end) = match h.sector_constraint() { + let (start, _) = match h.sector_constraint() { DhtArc::Arc(s, e) => (s, e), _ => panic!("Expected an arc"), }; store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - // Place the op within the current space partition - OpId::from(bytes::Bytes::copy_from_slice( - (start + 1).to_le_bytes().as_slice(), - )), + .process_incoming_ops(vec![MemoryOp::new( now, - end.to_be_bytes().to_vec(), + // Place the op within the current space partition + (start + 1).to_le_bytes().as_slice().to_vec(), ) .into()]) .await diff --git a/crates/dht/src/time.rs b/crates/dht/src/time.rs index ae409e6..51e3335 100644 --- a/crates/dht/src/time.rs +++ b/crates/dht/src/time.rs @@ -243,14 +243,14 @@ impl TimePartition { let full_slice_end = self.full_slice_end_timestamp(); for op in stored_ops { - if op.timestamp < full_slice_end { + if op.created_at < full_slice_end { // This is a historical update. We don't really expect this to happen too often. // If we're syncing because we've been offline then it's okay and we should // try to detect that when it's happening but otherwise it'd be good to log a warning // here. tracing::info!("Historical update detected. Seeing many of these places load on our system, but it is expected if we've been offline or a network partition has been resolved."); - let slice_index = op.timestamp.as_micros() + let slice_index = op.created_at.as_micros() / (self.full_slice_duration.as_micros() as i64); let current_hash = store .retrieve_slice_hash( @@ -299,7 +299,7 @@ impl TimePartition { } }; - if op.timestamp >= end_of_partials { + if op.created_at >= end_of_partials { // This new op is not yet included in the partial slices. That's okay, there // is expected to be a small amount of recent time that isn't covered by // partial slices. This op will get included in a future update of the partials. @@ -310,7 +310,7 @@ impl TimePartition { // the partial slices. We can easily iterate backwards and check just the start // bound of the partials to find out which partial slice this op belongs to. for partial in self.partial_slices.iter_mut().rev() { - if op.timestamp >= partial.start { + if op.created_at >= partial.start { combine::combine_hashes( &mut partial.hash, op.op_id.0 .0, @@ -675,7 +675,7 @@ mod tests { use super::*; use crate::test::test_store; use kitsune2_api::OpId; - use kitsune2_core::factories::Kitsune2MemoryOp; + use kitsune2_core::factories::MemoryOp; use kitsune2_test_utils::enable_tracing; #[test] @@ -854,16 +854,11 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), + MemoryOp::new((current_time - UNIT_TIME).unwrap(), vec![7; 32]) + .into(), + MemoryOp::new( (current_time - UNIT_TIME).unwrap(), - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![23; 32])), - (current_time - UNIT_TIME).unwrap(), - vec![], + vec![23; 32], ) .into(), ]) @@ -941,18 +936,8 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![23; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![23; 32]).into(), ]) .await .unwrap(); @@ -1001,28 +986,16 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![23; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![11; 32])), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![23; 32]).into(), + MemoryOp::new( (current_time - UNIT_TIME).unwrap(), - vec![], + vec![11; 32], ) .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![29; 32])), + MemoryOp::new( (current_time - UNIT_TIME).unwrap(), - vec![], + vec![29; 32], ) .into(), ]) @@ -1076,16 +1049,10 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![11; 32])), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into(), + MemoryOp::new( (current_time - UNIT_TIME).unwrap(), - vec![], + vec![11; 32], ) .into(), ]) @@ -1137,16 +1104,11 @@ mod tests { .unwrap(); store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), + MemoryOp::new((current_time - UNIT_TIME).unwrap(), vec![7; 32]) + .into(), + MemoryOp::new( (current_time - UNIT_TIME).unwrap(), - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![29; 32])), - (current_time - UNIT_TIME).unwrap(), - vec![], + vec![29; 32], ) .into(), ]) @@ -1168,10 +1130,9 @@ mod tests { // Store a new op which will currently be outside the last partial slice store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![13; 32])), + .process_incoming_ops(vec![MemoryOp::new( current_time, - vec![], + vec![13; 32], ) .into()]) .await @@ -1206,18 +1167,8 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![23; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![23; 32]).into(), ]) .await .unwrap(); @@ -1244,10 +1195,9 @@ mod tests { // Store a new op, currently in the first partial slice, but will be in the next full slice. store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![13; 32])), + .process_incoming_ops(vec![MemoryOp::new( pt.full_slice_end_timestamp(), // Start of the next full slice - vec![], + vec![13; 32], ) .into()]) .await @@ -1308,30 +1258,18 @@ mod tests { store .process_incoming_ops(vec![ // Store two new ops at the unix timestamp, to go into the first complete slice - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![23; 32])), - UNIX_TIMESTAMP, - vec![], - ) - .into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![7; 32]).into(), + MemoryOp::new(UNIX_TIMESTAMP, vec![23; 32]).into(), // Store two new ops at the unix timestamp plus one full time slice, // to go into the second complete slice - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![11; 32])), + MemoryOp::new( UNIX_TIMESTAMP + pt.full_slice_duration, - vec![], + vec![11; 32], ) .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![37; 32])), + MemoryOp::new( UNIX_TIMESTAMP + pt.full_slice_duration, - vec![], + vec![37; 32], ) .into(), ]) @@ -1406,10 +1344,9 @@ mod tests { // Now insert an op at the current time store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::from(vec![7; 32])), + .process_incoming_ops(vec![MemoryOp::new( pt.full_slice_end_timestamp(), - vec![], + vec![7; 32], ) .into()]) .await @@ -1457,7 +1394,7 @@ mod tests { op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ 11, 0, 0, 0, ])), - timestamp: UNIX_TIMESTAMP, + created_at: UNIX_TIMESTAMP, }], store.clone(), ) @@ -1482,10 +1419,9 @@ mod tests { let store = test_store().await; // Insert a single op in the first time slice store - .process_incoming_ops(vec![Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::copy_from_slice(&[7, 0, 0, 0])), + .process_incoming_ops(vec![MemoryOp::new( UNIX_TIMESTAMP, - vec![], + vec![7, 0, 0, 0], ) .into()]) .await @@ -1506,15 +1442,17 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(vec![7, 0, 0, 0], initial_hash); + let mut expected = vec![7]; + expected.resize(32, 0); + assert_eq!(expected, initial_hash); // Receive a new op into the same time slice + let mut inner_op_id = vec![23]; + inner_op_id.resize(32, 0); pt.inform_ops_stored( vec![StoredOp { - op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ - 23, 0, 0, 0, - ])), - timestamp: UNIX_TIMESTAMP, + op_id: OpId::from(bytes::Bytes::from(inner_op_id)), + created_at: UNIX_TIMESTAMP, }], store.clone(), ) @@ -1527,7 +1465,9 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(vec![7 ^ 23, 0, 0, 0], updated_hash); + let mut expected = vec![7 ^ 23]; + expected.resize(32, 0); + assert_eq!(expected, updated_hash); } #[tokio::test] @@ -1559,13 +1499,14 @@ mod tests { op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ 11, 0, 0, 0, ])), - timestamp: UNIX_TIMESTAMP, + created_at: UNIX_TIMESTAMP, }, StoredOp { op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ 29, 0, 0, 0, ])), - timestamp: (current_time - Duration::from_secs(5)).unwrap(), + created_at: (current_time - Duration::from_secs(5)) + .unwrap(), }, ], store.clone(), @@ -1587,16 +1528,14 @@ mod tests { let store = test_store().await; store .process_incoming_ops(vec![ - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::copy_from_slice(&[7, 0, 0, 0])), + MemoryOp::new( UNIX_TIMESTAMP + Duration::from_secs(10), - vec![], + vec![7, 0, 0, 0], ) .into(), - Kitsune2MemoryOp::new( - OpId::from(bytes::Bytes::copy_from_slice(&[31, 0, 0, 0])), + MemoryOp::new( (current_time - Duration::from_secs(30)).unwrap(), - vec![], + vec![31, 0, 0, 0], ) .into(), ]) @@ -1613,23 +1552,28 @@ mod tests { .await .unwrap(); - assert_eq!(vec![7, 0, 0, 0], pt.partial_slices.first().unwrap().hash); - assert_eq!(vec![31, 0, 0, 0], pt.partial_slices.last().unwrap().hash); + let mut expected = vec![7]; + expected.resize(32, 0); + assert_eq!(expected, pt.partial_slices.first().unwrap().hash); + let mut expected = vec![31]; + expected.resize(32, 0); + assert_eq!(expected, pt.partial_slices.last().unwrap().hash); // Receive an op into the first and last time slices + let mut inner_op_id_1 = vec![11]; + inner_op_id_1.resize(32, 0); + let mut inner_op_id_2 = vec![29]; + inner_op_id_2.resize(32, 0); pt.inform_ops_stored( vec![ StoredOp { - op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ - 11, 0, 0, 0, - ])), - timestamp: UNIX_TIMESTAMP, + op_id: OpId::from(bytes::Bytes::from(inner_op_id_1)), + created_at: UNIX_TIMESTAMP, }, StoredOp { - op_id: OpId::from(bytes::Bytes::copy_from_slice(&[ - 29, 0, 0, 0, - ])), - timestamp: (current_time - Duration::from_secs(5)).unwrap(), + op_id: OpId::from(bytes::Bytes::from(inner_op_id_2)), + created_at: (current_time - Duration::from_secs(5)) + .unwrap(), }, ], store.clone(), @@ -1637,14 +1581,12 @@ mod tests { .await .unwrap(); - assert_eq!( - vec![7 ^ 11, 0, 0, 0], - pt.partial_slices.first().unwrap().hash - ); - assert_eq!( - vec![31 ^ 29, 0, 0, 0], - pt.partial_slices.last().unwrap().hash - ); + let mut expected = vec![7 ^ 11]; + expected.resize(32, 0); + assert_eq!(expected, pt.partial_slices.first().unwrap().hash); + let mut expected = vec![31 ^ 29]; + expected.resize(32, 0); + assert_eq!(expected, pt.partial_slices.last().unwrap().hash); } fn validate_partial_slices(pt: &TimePartition) { From fa30b5f3b6c5bfef2e7e7f28f0b09fd6f4ff31df Mon Sep 17 00:00:00 2001 From: ThetaSinner Date: Wed, 15 Jan 2025 18:16:29 +0000 Subject: [PATCH 2/3] Return op ids after processing --- crates/api/src/op_store.rs | 2 +- crates/core/src/factories/mem_op_store.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index 20cec6e..9b10fdf 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -70,7 +70,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { fn process_incoming_ops( &self, op_list: Vec, - ) -> BoxFuture<'_, K2Result<()>>; + ) -> BoxFuture<'_, K2Result>>; /// Retrieve a batch of ops from the host by time range. /// diff --git a/crates/core/src/factories/mem_op_store.rs b/crates/core/src/factories/mem_op_store.rs index 63dab85..4074d6d 100644 --- a/crates/core/src/factories/mem_op_store.rs +++ b/crates/core/src/factories/mem_op_store.rs @@ -171,7 +171,7 @@ impl OpStore for Kitsune2MemoryOpStore { fn process_incoming_ops( &self, op_list: Vec, - ) -> BoxFuture<'_, K2Result<()>> { + ) -> BoxFuture<'_, K2Result>> { Box::pin(async move { let ops_to_add = op_list .iter() @@ -183,8 +183,12 @@ impl OpStore for Kitsune2MemoryOpStore { K2Error::other_src("Failed to deserialize op data, are you using `Kitsune2MemoryOp`s?", e) })?; + let op_ids = ops_to_add + .iter() + .map(|(op_id, _)| op_id.clone()) + .collect::>(); self.write().await.op_list.extend(ops_to_add); - Ok(()) + Ok(op_ids) }) } From 3e3dafdfe67f360addfdf0897a908c148c597ea5 Mon Sep 17 00:00:00 2001 From: ThetaSinner Date: Thu, 16 Jan 2025 14:37:31 +0000 Subject: [PATCH 3/3] Remove more uses of MetaOp --- .../core_fetch/test/incoming_request_queue.rs | 16 +++------------- crates/core/src/factories/mem_op_store.rs | 8 +++----- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs b/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs index 3ab31d5..4155f71 100644 --- a/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs +++ b/crates/core/src/factories/core_fetch/test/incoming_request_queue.rs @@ -14,7 +14,7 @@ use kitsune2_api::{ }, id::Id, transport::MockTransport, - K2Error, MetaOp, SpaceId, Url, + K2Error, SpaceId, Url, }; use kitsune2_test_utils::enable_tracing; use prost::Message; @@ -213,18 +213,8 @@ async fn fail_to_respond_once_then_succeed() { let response = Response::decode(K2FetchMessage::decode(data).unwrap().data) .unwrap(); - let ops = response - .ops - .into_iter() - .map(|op| { - let memory_op = - serde_json::from_slice::(&op.data).unwrap(); - MetaOp { - op_id: memory_op.compute_op_id(), - op_data: op.data, - } - }) - .collect::>(); + let ops: Vec = + response.ops.into_iter().map(Into::into).collect::>(); responses_sent.lock().unwrap().push((ops, peer)); Box::pin(async move { Ok(()) }) } diff --git a/crates/core/src/factories/mem_op_store.rs b/crates/core/src/factories/mem_op_store.rs index 4074d6d..bd13a96 100644 --- a/crates/core/src/factories/mem_op_store.rs +++ b/crates/core/src/factories/mem_op_store.rs @@ -130,11 +130,9 @@ impl From for StoredOp { } } -impl TryFrom for MemoryOp { - type Error = serde_json::Error; - - fn try_from(value: Op) -> Result { - Ok(value.data.into()) +impl From for MemoryOp { + fn from(value: Op) -> Self { + value.data.into() } }