diff --git a/crates/api/src/fetch.rs b/crates/api/src/fetch.rs index 01397a5..fef1b90 100644 --- a/crates/api/src/fetch.rs +++ b/crates/api/src/fetch.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use bytes::Bytes; use prost::Message; use crate::{ @@ -25,6 +26,12 @@ impl From for Vec { } } +impl From for Vec { + fn from(value: Ops) -> Self { + value.op_list.into_iter().map(|op| op.data).collect() + } +} + impl From> for Ops { fn from(value: Vec) -> Self { Self { diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index bd19773..8935b44 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -3,6 +3,7 @@ use crate::{ builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp, }; +use bytes::Bytes; use futures::future::BoxFuture; use std::cmp::Ordering; use std::sync::Arc; @@ -29,6 +30,12 @@ impl From for Op { } } +impl From for Bytes { + fn from(value: MetaOp) -> Self { + value.op_data.into() + } +} + /// An op that has been stored by the Kitsune host. /// /// This is the basic unit of data that the host is expected to store. Whether that storage is @@ -68,7 +75,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { /// if it is able to process them. fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>>; /// Retrieve a batch of ops from the host by time range. diff --git a/crates/core/src/factories/core_fetch/test/request_queue.rs b/crates/core/src/factories/core_fetch/test/request_queue.rs index e95988e..43f255c 100644 --- a/crates/core/src/factories/core_fetch/test/request_queue.rs +++ b/crates/core/src/factories/core_fetch/test/request_queue.rs @@ -112,7 +112,7 @@ async fn fetch_queue() { let mut num_requests_sent = requests_sent.lock().unwrap().len(); // Wait for tasks to settle all requests. - tokio::time::timeout(Duration::from_millis(20), async { + tokio::time::timeout(Duration::from_millis(30), async { loop { tokio::time::sleep(Duration::from_millis(1)).await; let current_num_requests_sent = requests_sent.lock().unwrap().len(); diff --git a/crates/core/src/factories/core_fetch/test/response_queue.rs b/crates/core/src/factories/core_fetch/test/response_queue.rs index 4901ff9..3d868bb 100644 --- a/crates/core/src/factories/core_fetch/test/response_queue.rs +++ b/crates/core/src/factories/core_fetch/test/response_queue.rs @@ -6,6 +6,7 @@ use crate::{ Kitsune2MemoryOp, MemOpStoreFactory, }, }; +use bytes::Bytes; use kitsune2_api::{ fetch::{serialize_op_ids, Ops}, id::Id, @@ -19,9 +20,9 @@ use std::{ time::Duration, }; -type ResponsesSent = Vec<(Vec, Url)>; +type ResponsesSent = Vec<(Vec, Url)>; -const SPACE_ID: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space_id"))); +const SPACE_ID: SpaceId = SpaceId(Id(Bytes::from_static(b"space_id"))); fn hash_op(input: &bytes::Bytes) -> OpId { use sha2::{Digest, Sha256}; @@ -32,17 +33,9 @@ fn hash_op(input: &bytes::Bytes) -> OpId { hash_bytes.into() } -fn make_op(data: Vec) -> MetaOp { +fn make_op(data: Vec) -> Kitsune2MemoryOp { let op_id = hash_op(&data.clone().into()); - MetaOp { - op_id: op_id.clone(), - op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( - op_id, - Timestamp::now(), - data, - )) - .unwrap(), - } + Kitsune2MemoryOp::new(op_id, Timestamp::now(), data) } fn make_mock_transport( @@ -57,21 +50,12 @@ fn make_mock_transport( move |peer, space, module, data| { assert_eq!(space, SPACE_ID); assert_eq!(module, crate::factories::core_fetch::MOD_NAME); - let ops = Ops::decode(data).unwrap(); - let ops = ops + let ops = Ops::decode(data) + .unwrap() .op_list .into_iter() - .map(|op| { - let op_data = - serde_json::from_slice::(&op.data) - .unwrap(); - let op_id = hash_op(&bytes::Bytes::from(op_data.payload)); - MetaOp { - op_id, - op_data: op.data.into(), - } - }) - .collect(); + .map(|op| op.data) + .collect::>(); responses_sent.lock().unwrap().push((ops, peer)); Box::pin(async move { Ok(()) }) } @@ -100,11 +84,12 @@ async fn respond_to_multiple_requests() { let op_2 = make_op(vec![2; 128]); let op_3 = make_op(vec![3; 128]); // Insert op 1, 2, 3 into op store. Op 4 will not be returned in the response. - let stored_op = vec![op_1.clone(), op_2.clone(), op_3.clone()]; - op_store - .process_incoming_ops(stored_op.clone()) - .await - .unwrap(); + let stored_ops = vec![ + op_1.clone().into(), + op_2.clone().into(), + op_3.clone().into(), + ]; + op_store.process_incoming_ops(stored_ops).await.unwrap(); let fetch = CoreFetch::new( config.clone(), @@ -151,12 +136,12 @@ async fn respond_to_multiple_requests() { assert!(responses_sent .lock() .unwrap() - .contains(&(vec![op_1, op_2], agent_url_1))); + .contains(&(vec![op_1.into(), op_2.into()], agent_url_1))); // Only op 3 is in op store. assert!(responses_sent .lock() .unwrap() - .contains(&(vec![op_3], agent_url_2))); + .contains(&(vec![op_3.into()], agent_url_2))); } #[tokio::test(flavor = "multi_thread")] @@ -249,7 +234,7 @@ async fn fail_to_respond_once_then_succeed() { let op = make_op(vec![1; 128]); op_store - .process_incoming_ops(vec![op.clone()]) + .process_incoming_ops(vec![op.clone().into()]) .await .unwrap(); let agent_url = Url::from_str("wss://127.0.0.1:1").unwrap(); diff --git a/crates/core/src/factories/mem_op_store.rs b/crates/core/src/factories/mem_op_store.rs index 7399493..dbe541d 100644 --- a/crates/core/src/factories/mem_op_store.rs +++ b/crates/core/src/factories/mem_op_store.rs @@ -1,6 +1,7 @@ //! The mem op store implementation provided by Kitsune2. use crate::factories::mem_op_store::time_slice_hash_store::TimeSliceHashStore; +use bytes::Bytes; use futures::future::BoxFuture; use kitsune2_api::builder::Builder; use kitsune2_api::config::Config; @@ -77,27 +78,18 @@ impl From for StoredOp { } } -impl TryFrom for Kitsune2MemoryOp { - type Error = serde_json::Error; - - fn try_from(value: MetaOp) -> serde_json::Result { - let op = serde_json::from_slice(value.op_data.as_slice())?; - - Ok(op) +impl From for Kitsune2MemoryOp { + fn from(value: Bytes) -> Self { + serde_json::from_slice(&value) + .expect("failed to deserialize Kitsune2MemoryOp from Op") } } -impl TryFrom for MetaOp { - type Error = K2Error; - - fn try_from(value: Kitsune2MemoryOp) -> K2Result { - let op_data = serde_json::to_vec(&value).map_err(|e| { - K2Error::other_src("Failed to serialize Kitsune2MemoryOp", e) - })?; - Ok(MetaOp { - op_id: value.op_id, - op_data, - }) +impl From for Bytes { + fn from(value: Kitsune2MemoryOp) -> Self { + serde_json::to_vec(&value) + .expect("failed to serialize Op to Kitsune2MemoryOp") + .into() } } @@ -133,13 +125,13 @@ struct Kitsune2MemoryOpStoreInner { impl OpStore for Kitsune2MemoryOpStore { fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>> { Box::pin(async move { let ops_to_add = op_list .iter() .map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> { - let op = Kitsune2MemoryOp::try_from(op.clone())?; + let op = Kitsune2MemoryOp::from(op.clone()); Ok((op.op_id.clone(), op)) }) .collect::, _>>().map_err(|e| { diff --git a/crates/dht/src/dht/tests.rs b/crates/dht/src/dht/tests.rs index 6522afa..e06cc01 100644 --- a/crates/dht/src/dht/tests.rs +++ b/crates/dht/src/dht/tests.rs @@ -26,8 +26,7 @@ async fn take_minimal_snapshot() { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); diff --git a/crates/dht/src/dht/tests/harness.rs b/crates/dht/src/dht/tests/harness.rs index e72b4f2..9a97570 100644 --- a/crates/dht/src/dht/tests/harness.rs +++ b/crates/dht/src/dht/tests/harness.rs @@ -53,10 +53,7 @@ impl DhtSyncHarness { ) -> K2Result<()> { self.store .process_incoming_ops( - op_list - .iter() - .map(|op| op.clone().try_into().unwrap()) - .collect(), + op_list.iter().map(|op| op.clone().into()).collect(), ) .await?; @@ -374,12 +371,17 @@ async fn transfer_ops( target_dht: &mut Dht, requested_ops: Vec, ) -> K2Result<()> { - let selected = source.retrieve_ops(requested_ops).await?; + let selected = source + .retrieve_ops(requested_ops) + .await? + .into_iter() + .map(Into::into) + .collect::>(); target.process_incoming_ops(selected.clone()).await?; let stored_ops = selected .into_iter() - .map(|op| Kitsune2MemoryOp::try_from(op).unwrap().into()) + .map(|op| Kitsune2MemoryOp::from(op).into()) .collect::>(); target_dht.inform_ops_stored(stored_ops).await?; diff --git a/crates/dht/src/hash.rs b/crates/dht/src/hash.rs index 1f683fd..48f51cb 100644 --- a/crates/dht/src/hash.rs +++ b/crates/dht/src/hash.rs @@ -647,8 +647,7 @@ mod tests { now, end.to_be_bytes().to_vec(), ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); } diff --git a/crates/dht/src/time.rs b/crates/dht/src/time.rs index 86b50f8..ae409e6 100644 --- a/crates/dht/src/time.rs +++ b/crates/dht/src/time.rs @@ -859,15 +859,13 @@ mod tests { (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -948,15 +946,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1010,29 +1006,25 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![11; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![29; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1089,15 +1081,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![11; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1152,15 +1142,13 @@ mod tests { (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![29; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1185,8 +1173,7 @@ mod tests { current_time, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1224,15 +1211,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1264,8 +1249,7 @@ mod tests { pt.full_slice_end_timestamp(), // Start of the next full slice vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1329,15 +1313,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), // Store two new ops at the unix timestamp plus one full time slice, // to go into the second complete slice Kitsune2MemoryOp::new( @@ -1345,15 +1327,13 @@ mod tests { UNIX_TIMESTAMP + pt.full_slice_duration, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![37; 32])), UNIX_TIMESTAMP + pt.full_slice_duration, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1431,8 +1411,7 @@ mod tests { pt.full_slice_end_timestamp(), vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); // and compute the new state in the future @@ -1508,8 +1487,7 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1614,15 +1592,13 @@ mod tests { UNIX_TIMESTAMP + Duration::from_secs(10), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::copy_from_slice(&[31, 0, 0, 0])), (current_time - Duration::from_secs(30)).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap();