diff --git a/Cargo.lock b/Cargo.lock index 471af58..3e734cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1059,7 +1059,6 @@ dependencies = [ "kitsune2_dht", "kitsune2_test_utils", "prost", - "serde", "tokio", "tracing", ] diff --git a/crates/api/src/builder.rs b/crates/api/src/builder.rs index 8251d4c..438edd4 100644 --- a/crates/api/src/builder.rs +++ b/crates/api/src/builder.rs @@ -43,6 +43,10 @@ pub struct Builder { /// [op_store::OpStore] instances. pub op_store: op_store::DynOpStoreFactory, + /// The [peer_meta_store::PeerMetaStoreFactory] to be used for creating + /// [peer_meta_store::PeerMetaStore] instances. + pub meta_store: peer_meta_store::DynPeerMetaStoreFactory, + /// The [gossip::GossipFactory] to be used for creating /// [gossip::Gossip] instances. pub gossip: gossip::DynGossipFactory, @@ -64,6 +68,7 @@ impl Builder { fetch, transport, op_store, + meta_store, gossip, } = &mut self; @@ -74,6 +79,7 @@ impl Builder { fetch.default_config(config)?; transport.default_config(config)?; op_store.default_config(config)?; + meta_store.default_config(config)?; gossip.default_config(config)?; config.mark_defaults_set(); diff --git a/crates/api/src/fetch.rs b/crates/api/src/fetch.rs index 01397a5..fef1b90 100644 --- a/crates/api/src/fetch.rs +++ b/crates/api/src/fetch.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use bytes::Bytes; use prost::Message; use crate::{ @@ -25,6 +26,12 @@ impl From for Vec { } } +impl From for Vec { + fn from(value: Ops) -> Self { + value.op_list.into_iter().map(|op| op.data).collect() + } +} + impl From> for Ops { fn from(value: Vec) -> Self { Self { diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 88e1c83..cf28f82 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -59,6 +59,9 @@ pub use timestamp::*; pub mod fetch; +mod peer_meta_store; +pub use peer_meta_store::*; + mod gossip; pub use gossip::*; diff --git a/crates/api/src/op_store.rs b/crates/api/src/op_store.rs index bd19773..8935b44 100644 --- a/crates/api/src/op_store.rs +++ b/crates/api/src/op_store.rs @@ -3,6 +3,7 @@ use crate::{ builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp, }; +use bytes::Bytes; use futures::future::BoxFuture; use std::cmp::Ordering; use std::sync::Arc; @@ -29,6 +30,12 @@ impl From for Op { } } +impl From for Bytes { + fn from(value: MetaOp) -> Self { + value.op_data.into() + } +} + /// An op that has been stored by the Kitsune host. /// /// This is the basic unit of data that the host is expected to store. Whether that storage is @@ -68,7 +75,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug { /// if it is able to process them. fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>>; /// Retrieve a batch of ops from the host by time range. diff --git a/crates/api/src/peer_meta_store.rs b/crates/api/src/peer_meta_store.rs new file mode 100644 index 0000000..a23ee2d --- /dev/null +++ b/crates/api/src/peer_meta_store.rs @@ -0,0 +1,55 @@ +use crate::{builder, config, AgentId, BoxFut, K2Result, SpaceId, Timestamp}; +use futures::future::BoxFuture; +use std::sync::Arc; + +/// A store for peer metadata. +/// +/// This is expected to be backed by a key-value store that keys by space, agent_id and key. +pub trait PeerMetaStore: 'static + Send + Sync + std::fmt::Debug { + /// Store a key-value pair for a given space and agent. + fn put( + &self, + space: SpaceId, + agent: AgentId, + key: String, + value: bytes::Bytes, + expiry: Option, + ) -> BoxFuture<'_, K2Result<()>>; + + /// Get a value by key for a given space and agent. + fn get( + &self, + space: SpaceId, + agent: AgentId, + key: String, + ) -> BoxFuture<'_, K2Result>>; + + /// Delete a key-value pair for a given space and agent. + fn delete( + &self, + space: SpaceId, + agent: AgentId, + key: String, + ) -> BoxFuture<'_, K2Result<()>>; +} + +/// Trait-object version of kitsune2 [PeerMetaStore]. +pub type DynPeerMetaStore = Arc; + +/// A factory for constructing [PeerMetaStore] instances. +pub trait PeerMetaStoreFactory: + 'static + Send + Sync + std::fmt::Debug +{ + /// Help the builder construct a default config from the chosen + /// module factories. + fn default_config(&self, config: &mut config::Config) -> K2Result<()>; + + /// Construct a meta store instance. + fn create( + &self, + builder: Arc, + ) -> BoxFut<'static, K2Result>; +} + +/// Trait-object [PeerMetaStoreFactory]. +pub type DynPeerMetaStoreFactory = Arc; diff --git a/crates/core/src/factories.rs b/crates/core/src/factories.rs index 5b6980e..df267ff 100644 --- a/crates/core/src/factories.rs +++ b/crates/core/src/factories.rs @@ -15,6 +15,9 @@ pub use mem_bootstrap::MemBootstrapFactory; pub mod core_bootstrap; pub use core_bootstrap::CoreBootstrapFactory; +mod mem_peer_meta_store; +pub use mem_peer_meta_store::*; + pub mod core_fetch; pub use core_fetch::CoreFetchFactory; diff --git a/crates/core/src/factories/core_fetch/test/request_queue.rs b/crates/core/src/factories/core_fetch/test/request_queue.rs index c75da14..e3c2b51 100644 --- a/crates/core/src/factories/core_fetch/test/request_queue.rs +++ b/crates/core/src/factories/core_fetch/test/request_queue.rs @@ -113,7 +113,7 @@ async fn fetch_queue() { let mut num_requests_sent = requests_sent.lock().unwrap().len(); // Wait for tasks to settle all requests. - tokio::time::timeout(Duration::from_millis(20), async { + tokio::time::timeout(Duration::from_millis(30), async { loop { tokio::time::sleep(Duration::from_millis(1)).await; let current_num_requests_sent = requests_sent.lock().unwrap().len(); diff --git a/crates/core/src/factories/core_fetch/test/response_queue.rs b/crates/core/src/factories/core_fetch/test/response_queue.rs index a07ee8f..10fcdce 100644 --- a/crates/core/src/factories/core_fetch/test/response_queue.rs +++ b/crates/core/src/factories/core_fetch/test/response_queue.rs @@ -6,6 +6,7 @@ use crate::{ Kitsune2MemoryOp, MemOpStoreFactory, }, }; +use bytes::Bytes; use kitsune2_api::{ fetch::{serialize_op_ids, Ops}, id::Id, @@ -19,9 +20,9 @@ use std::{ time::Duration, }; -type ResponsesSent = Vec<(Vec, Url)>; +type ResponsesSent = Vec<(Vec, Url)>; -const SPACE_ID: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space_id"))); +const SPACE_ID: SpaceId = SpaceId(Id(Bytes::from_static(b"space_id"))); fn hash_op(input: &bytes::Bytes) -> OpId { use sha2::{Digest, Sha256}; @@ -32,17 +33,9 @@ fn hash_op(input: &bytes::Bytes) -> OpId { hash_bytes.into() } -fn make_op(data: Vec) -> MetaOp { +fn make_op(data: Vec) -> Kitsune2MemoryOp { let op_id = hash_op(&data.clone().into()); - MetaOp { - op_id: op_id.clone(), - op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( - op_id, - Timestamp::now(), - data, - )) - .unwrap(), - } + Kitsune2MemoryOp::new(op_id, Timestamp::now(), data) } fn make_mock_transport( @@ -57,21 +50,12 @@ fn make_mock_transport( move |peer, space, module, data| { assert_eq!(space, SPACE_ID); assert_eq!(module, crate::factories::core_fetch::MOD_NAME); - let ops = Ops::decode(data).unwrap(); - let ops = ops + let ops = Ops::decode(data) + .unwrap() .op_list .into_iter() - .map(|op| { - let op_data = - serde_json::from_slice::(&op.data) - .unwrap(); - let op_id = hash_op(&bytes::Bytes::from(op_data.payload)); - MetaOp { - op_id, - op_data: op.data.into(), - } - }) - .collect(); + .map(|op| op.data) + .collect::>(); responses_sent.lock().unwrap().push((ops, peer)); Box::pin(async move { Ok(()) }) } @@ -101,11 +85,12 @@ async fn respond_to_multiple_requests() { let op_2 = make_op(vec![2; 128]); let op_3 = make_op(vec![3; 128]); // Insert op 1, 2, 3 into op store. Op 4 will not be returned in the response. - let stored_op = vec![op_1.clone(), op_2.clone(), op_3.clone()]; - op_store - .process_incoming_ops(stored_op.clone()) - .await - .unwrap(); + let stored_ops = vec![ + op_1.clone().into(), + op_2.clone().into(), + op_3.clone().into(), + ]; + op_store.process_incoming_ops(stored_ops).await.unwrap(); let fetch = CoreFetch::new( config.clone(), @@ -152,12 +137,12 @@ async fn respond_to_multiple_requests() { assert!(responses_sent .lock() .unwrap() - .contains(&(vec![op_1, op_2], agent_url_1))); + .contains(&(vec![op_1.into(), op_2.into()], agent_url_1))); // Only op 3 is in op store. assert!(responses_sent .lock() .unwrap() - .contains(&(vec![op_3], agent_url_2))); + .contains(&(vec![op_3.into()], agent_url_2))); } #[tokio::test(flavor = "multi_thread")] @@ -252,7 +237,7 @@ async fn fail_to_respond_once_then_succeed() { let op = make_op(vec![1; 128]); op_store - .process_incoming_ops(vec![op.clone()]) + .process_incoming_ops(vec![op.clone().into()]) .await .unwrap(); let agent_url = Url::from_str("wss://127.0.0.1:1").unwrap(); diff --git a/crates/core/src/factories/mem_op_store.rs b/crates/core/src/factories/mem_op_store.rs index 7399493..dbe541d 100644 --- a/crates/core/src/factories/mem_op_store.rs +++ b/crates/core/src/factories/mem_op_store.rs @@ -1,6 +1,7 @@ //! The mem op store implementation provided by Kitsune2. use crate::factories::mem_op_store::time_slice_hash_store::TimeSliceHashStore; +use bytes::Bytes; use futures::future::BoxFuture; use kitsune2_api::builder::Builder; use kitsune2_api::config::Config; @@ -77,27 +78,18 @@ impl From for StoredOp { } } -impl TryFrom for Kitsune2MemoryOp { - type Error = serde_json::Error; - - fn try_from(value: MetaOp) -> serde_json::Result { - let op = serde_json::from_slice(value.op_data.as_slice())?; - - Ok(op) +impl From for Kitsune2MemoryOp { + fn from(value: Bytes) -> Self { + serde_json::from_slice(&value) + .expect("failed to deserialize Kitsune2MemoryOp from Op") } } -impl TryFrom for MetaOp { - type Error = K2Error; - - fn try_from(value: Kitsune2MemoryOp) -> K2Result { - let op_data = serde_json::to_vec(&value).map_err(|e| { - K2Error::other_src("Failed to serialize Kitsune2MemoryOp", e) - })?; - Ok(MetaOp { - op_id: value.op_id, - op_data, - }) +impl From for Bytes { + fn from(value: Kitsune2MemoryOp) -> Self { + serde_json::to_vec(&value) + .expect("failed to serialize Op to Kitsune2MemoryOp") + .into() } } @@ -133,13 +125,13 @@ struct Kitsune2MemoryOpStoreInner { impl OpStore for Kitsune2MemoryOpStore { fn process_incoming_ops( &self, - op_list: Vec, + op_list: Vec, ) -> BoxFuture<'_, K2Result<()>> { Box::pin(async move { let ops_to_add = op_list .iter() .map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> { - let op = Kitsune2MemoryOp::try_from(op.clone())?; + let op = Kitsune2MemoryOp::from(op.clone()); Ok((op.op_id.clone(), op)) }) .collect::, _>>().map_err(|e| { diff --git a/crates/core/src/factories/mem_peer_meta_store.rs b/crates/core/src/factories/mem_peer_meta_store.rs new file mode 100644 index 0000000..e592438 --- /dev/null +++ b/crates/core/src/factories/mem_peer_meta_store.rs @@ -0,0 +1,108 @@ +use futures::future::BoxFuture; +use kitsune2_api::{ + AgentId, DynPeerMetaStore, DynPeerMetaStoreFactory, K2Result, + PeerMetaStore, PeerMetaStoreFactory, SpaceId, Timestamp, +}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[cfg(test)] +mod test; + +type MemPeerMetaInner = + HashMap<(SpaceId, AgentId), HashMap>; + +/// An in-memory implementation of the [PeerMetaStore]. +/// +/// This is useful for testing, but peer metadata is supposed to be persistent in a real deployment. +#[derive(Debug)] +pub struct MemPeerMetaStore { + inner: Arc>, +} + +impl MemPeerMetaStore { + /// Create a new [MemPeerMetaStore]. + pub fn create() -> DynPeerMetaStore { + let inner = Arc::new(Mutex::new(HashMap::new())); + Arc::new(MemPeerMetaStore { inner }) + } +} + +impl PeerMetaStore for MemPeerMetaStore { + fn put( + &self, + space: SpaceId, + agent: AgentId, + key: String, + value: bytes::Bytes, + _expiry: Option, + ) -> BoxFuture<'_, K2Result<()>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut inner = inner.lock().await; + let entry = + inner.entry((space, agent)).or_insert_with(HashMap::new); + entry.insert(key, value); + Ok(()) + }) + } + + fn get( + &self, + space: SpaceId, + agent: AgentId, + key: String, + ) -> BoxFuture<'_, K2Result>> { + let inner = self.inner.clone(); + Box::pin(async move { + let inner = inner.lock().await; + Ok(inner + .get(&(space, agent)) + .and_then(|entry| entry.get(&key).cloned())) + }) + } + + fn delete( + &self, + space: SpaceId, + agent: AgentId, + key: String, + ) -> BoxFuture<'_, K2Result<()>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut inner = inner.lock().await; + if let Some(entry) = inner.get_mut(&(space, agent)) { + entry.remove(&key); + } + Ok(()) + }) + } +} + +/// A factory for creating [MemPeerMetaStore] instances. +#[derive(Debug)] +pub struct MemPeerMetaStoreFactory; + +impl MemPeerMetaStoreFactory { + /// Construct a new [MemPeerMetaStoreFactory]. + pub fn create() -> DynPeerMetaStoreFactory { + Arc::new(MemPeerMetaStoreFactory) + } +} + +impl PeerMetaStoreFactory for MemPeerMetaStoreFactory { + fn default_config( + &self, + _config: &mut crate::config::Config, + ) -> K2Result<()> { + Ok(()) + } + + fn create( + &self, + _builder: Arc, + ) -> BoxFuture<'static, K2Result> { + Box::pin(async move { Ok(MemPeerMetaStore::create()) }) + } +} diff --git a/crates/core/src/factories/mem_peer_meta_store/test.rs b/crates/core/src/factories/mem_peer_meta_store/test.rs new file mode 100644 index 0000000..0f9b0b5 --- /dev/null +++ b/crates/core/src/factories/mem_peer_meta_store/test.rs @@ -0,0 +1,123 @@ +use super::*; +use crate::default_test_builder; +use kitsune2_api::Timestamp; +use std::time::Duration; + +struct TestPeerMetaStore { + inner: DynPeerMetaStore, + space: SpaceId, +} + +impl TestPeerMetaStore { + async fn new(inner: DynPeerMetaStore, space: &str) -> K2Result { + let space: SpaceId = + bytes::Bytes::from(space.as_bytes().to_vec()).into(); + + Ok(Self { inner, space }) + } + + async fn last_gossip_timestamp(&self, agent: AgentId) -> Option { + self.inner + .get( + self.space.clone(), + agent, + "gossip:last_timestamp".to_string(), + ) + .await + .unwrap() + .map(|v| { + Timestamp::from_micros(i64::from_be_bytes( + v.to_vec().as_slice().try_into().unwrap(), + )) + }) + } + + async fn set_last_gossip_timestamp( + &mut self, + agent: AgentId, + timestamp: Timestamp, + ) -> K2Result<()> { + let value = bytes::Bytes::from( + timestamp.as_micros().to_be_bytes().as_slice().to_vec(), + ); + + self.inner + .put( + self.space.clone(), + agent, + "gossip:last_timestamp".to_string(), + value, + None, + ) + .await?; + + Ok(()) + } +} + +#[tokio::test] +async fn mem_meta_store() { + let factory = MemPeerMetaStoreFactory::create(); + let store = factory + .create(Arc::new( + default_test_builder().with_default_config().unwrap(), + )) + .await + .unwrap(); + + let agent = AgentId::from(bytes::Bytes::from_static(b"agent-1")); + let mut agent_store = TestPeerMetaStore::new(store.clone(), "space") + .await + .unwrap(); + + assert_eq!(agent_store.last_gossip_timestamp(agent.clone()).await, None); + + let timestamp = Timestamp::now(); + agent_store + .set_last_gossip_timestamp(agent.clone(), timestamp) + .await + .unwrap(); + + assert_eq!( + agent_store.last_gossip_timestamp(agent).await, + Some(timestamp) + ); +} + +#[tokio::test] +async fn store_with_multiple_agents() { + let factory = MemPeerMetaStoreFactory::create(); + let store = factory + .create(Arc::new( + default_test_builder().with_default_config().unwrap(), + )) + .await + .unwrap(); + + let agent_1 = AgentId::from(bytes::Bytes::from_static(b"agent-1")); + let agent_2 = AgentId::from(bytes::Bytes::from_static(b"agent-2")); + let mut agent_store = TestPeerMetaStore::new(store.clone(), "space") + .await + .unwrap(); + + let timestamp_1 = Timestamp::now(); + let timestamp_2 = timestamp_1 + Duration::from_secs(1); + + agent_store + .set_last_gossip_timestamp(agent_1.clone(), timestamp_1) + .await + .unwrap(); + agent_store + .set_last_gossip_timestamp(agent_2.clone(), timestamp_2) + .await + .unwrap(); + + assert_eq!( + agent_store.last_gossip_timestamp(agent_1).await, + Some(timestamp_1) + ); + assert_eq!( + agent_store.last_gossip_timestamp(agent_2).await, + Some(timestamp_2) + ); +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5b61a8e..a878d9d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -146,6 +146,9 @@ impl agent::LocalAgent for Ed25519LocalAgent { /// - `bootstrap` - The default bootstrap is [factories::MemBootstrapFactory]. /// - `fetch` - The default fetch module is [factories::CoreFetchFactory]. /// - `transport` - The default transport is [factories::MemTransportFactory]. +/// - `op_store` - The default op store is [factories::MemOpStoreFactory]. +/// - `meta_store` - The default meta store is [factories::MemPeerMetaStoreFactory]. +/// - `gossip` - The default gossip module is [factories::CoreGossipStubFactory]. pub fn default_test_builder() -> Builder { Builder { config: Config::default(), @@ -157,6 +160,7 @@ pub fn default_test_builder() -> Builder { fetch: factories::CoreFetchFactory::create(), transport: factories::MemTransportFactory::create(), op_store: factories::MemOpStoreFactory::create(), + meta_store: factories::MemPeerMetaStoreFactory::create(), gossip: factories::CoreGossipStubFactory::create(), } } diff --git a/crates/dht/src/dht/tests.rs b/crates/dht/src/dht/tests.rs index 6522afa..e06cc01 100644 --- a/crates/dht/src/dht/tests.rs +++ b/crates/dht/src/dht/tests.rs @@ -26,8 +26,7 @@ async fn take_minimal_snapshot() { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); diff --git a/crates/dht/src/dht/tests/harness.rs b/crates/dht/src/dht/tests/harness.rs index e72b4f2..9a97570 100644 --- a/crates/dht/src/dht/tests/harness.rs +++ b/crates/dht/src/dht/tests/harness.rs @@ -53,10 +53,7 @@ impl DhtSyncHarness { ) -> K2Result<()> { self.store .process_incoming_ops( - op_list - .iter() - .map(|op| op.clone().try_into().unwrap()) - .collect(), + op_list.iter().map(|op| op.clone().into()).collect(), ) .await?; @@ -374,12 +371,17 @@ async fn transfer_ops( target_dht: &mut Dht, requested_ops: Vec, ) -> K2Result<()> { - let selected = source.retrieve_ops(requested_ops).await?; + let selected = source + .retrieve_ops(requested_ops) + .await? + .into_iter() + .map(Into::into) + .collect::>(); target.process_incoming_ops(selected.clone()).await?; let stored_ops = selected .into_iter() - .map(|op| Kitsune2MemoryOp::try_from(op).unwrap().into()) + .map(|op| Kitsune2MemoryOp::from(op).into()) .collect::>(); target_dht.inform_ops_stored(stored_ops).await?; diff --git a/crates/dht/src/hash.rs b/crates/dht/src/hash.rs index 1f683fd..48f51cb 100644 --- a/crates/dht/src/hash.rs +++ b/crates/dht/src/hash.rs @@ -647,8 +647,7 @@ mod tests { now, end.to_be_bytes().to_vec(), ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); } diff --git a/crates/dht/src/time.rs b/crates/dht/src/time.rs index 86b50f8..ae409e6 100644 --- a/crates/dht/src/time.rs +++ b/crates/dht/src/time.rs @@ -859,15 +859,13 @@ mod tests { (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -948,15 +946,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1010,29 +1006,25 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![11; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![29; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1089,15 +1081,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![11; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1152,15 +1142,13 @@ mod tests { (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![29; 32])), (current_time - UNIT_TIME).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1185,8 +1173,7 @@ mod tests { current_time, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1224,15 +1211,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1264,8 +1249,7 @@ mod tests { pt.full_slice_end_timestamp(), // Start of the next full slice vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1329,15 +1313,13 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![23; 32])), UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap(), + .into(), // Store two new ops at the unix timestamp plus one full time slice, // to go into the second complete slice Kitsune2MemoryOp::new( @@ -1345,15 +1327,13 @@ mod tests { UNIX_TIMESTAMP + pt.full_slice_duration, vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::from(vec![37; 32])), UNIX_TIMESTAMP + pt.full_slice_duration, vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap(); @@ -1431,8 +1411,7 @@ mod tests { pt.full_slice_end_timestamp(), vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); // and compute the new state in the future @@ -1508,8 +1487,7 @@ mod tests { UNIX_TIMESTAMP, vec![], ) - .try_into() - .unwrap()]) + .into()]) .await .unwrap(); @@ -1614,15 +1592,13 @@ mod tests { UNIX_TIMESTAMP + Duration::from_secs(10), vec![], ) - .try_into() - .unwrap(), + .into(), Kitsune2MemoryOp::new( OpId::from(bytes::Bytes::copy_from_slice(&[31, 0, 0, 0])), (current_time - Duration::from_secs(30)).unwrap(), vec![], ) - .try_into() - .unwrap(), + .into(), ]) .await .unwrap();