Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(op-store)!: pass ops as bytes to op store #71

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::sync::Arc;

use bytes::Bytes;
use prost::Message;

use crate::{
Expand All @@ -25,6 +26,12 @@ impl From<OpIds> for Vec<OpId> {
}
}

impl From<Ops> for Vec<Bytes> {
fn from(value: Ops) -> Self {
value.op_list.into_iter().map(|op| op.data).collect()
}
}

impl From<Vec<MetaOp>> for Ops {
fn from(value: Vec<MetaOp>) -> Self {
Self {
Expand Down
9 changes: 8 additions & 1 deletion crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use std::cmp::Ordering;
use std::sync::Arc;
Expand All @@ -29,6 +30,12 @@ impl From<MetaOp> for Op {
}
}

impl From<MetaOp> for Bytes {
fn from(value: MetaOp) -> Self {
value.op_data.into()
}
}

/// An op that has been stored by the Kitsune host.
///
/// This is the basic unit of data that the host is expected to store. Whether that storage is
Expand Down Expand Up @@ -68,7 +75,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<MetaOp>,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>>;

/// Retrieve a batch of ops from the host by time range.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/factories/core_fetch/test/request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async fn fetch_queue() {
let mut num_requests_sent = requests_sent.lock().unwrap().len();

// Wait for tasks to settle all requests.
tokio::time::timeout(Duration::from_millis(20), async {
tokio::time::timeout(Duration::from_millis(30), async {
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
let current_num_requests_sent = requests_sent.lock().unwrap().len();
Expand Down
51 changes: 18 additions & 33 deletions crates/core/src/factories/core_fetch/test/response_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
Kitsune2MemoryOp, MemOpStoreFactory,
},
};
use bytes::Bytes;
use kitsune2_api::{
fetch::{serialize_op_ids, Ops},
id::Id,
Expand All @@ -19,9 +20,9 @@ use std::{
time::Duration,
};

type ResponsesSent = Vec<(Vec<MetaOp>, Url)>;
type ResponsesSent = Vec<(Vec<Bytes>, Url)>;

const SPACE_ID: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space_id")));
const SPACE_ID: SpaceId = SpaceId(Id(Bytes::from_static(b"space_id")));

fn hash_op(input: &bytes::Bytes) -> OpId {
use sha2::{Digest, Sha256};
Expand All @@ -32,17 +33,9 @@ fn hash_op(input: &bytes::Bytes) -> OpId {
hash_bytes.into()
}

fn make_op(data: Vec<u8>) -> MetaOp {
fn make_op(data: Vec<u8>) -> Kitsune2MemoryOp {
let op_id = hash_op(&data.clone().into());
MetaOp {
op_id: op_id.clone(),
op_data: serde_json::to_vec(&Kitsune2MemoryOp::new(
op_id,
Timestamp::now(),
data,
))
.unwrap(),
}
Kitsune2MemoryOp::new(op_id, Timestamp::now(), data)
}

fn make_mock_transport(
Expand All @@ -57,21 +50,12 @@ fn make_mock_transport(
move |peer, space, module, data| {
assert_eq!(space, SPACE_ID);
assert_eq!(module, crate::factories::core_fetch::MOD_NAME);
let ops = Ops::decode(data).unwrap();
let ops = ops
let ops = Ops::decode(data)
.unwrap()
.op_list
.into_iter()
.map(|op| {
let op_data =
serde_json::from_slice::<Kitsune2MemoryOp>(&op.data)
.unwrap();
let op_id = hash_op(&bytes::Bytes::from(op_data.payload));
MetaOp {
op_id,
op_data: op.data.into(),
}
})
.collect();
.map(|op| op.data)
.collect::<Vec<_>>();
responses_sent.lock().unwrap().push((ops, peer));
Box::pin(async move { Ok(()) })
}
Expand Down Expand Up @@ -100,11 +84,12 @@ async fn respond_to_multiple_requests() {
let op_2 = make_op(vec![2; 128]);
let op_3 = make_op(vec![3; 128]);
// Insert op 1, 2, 3 into op store. Op 4 will not be returned in the response.
let stored_op = vec![op_1.clone(), op_2.clone(), op_3.clone()];
op_store
.process_incoming_ops(stored_op.clone())
.await
.unwrap();
let stored_ops = vec![
op_1.clone().into(),
op_2.clone().into(),
op_3.clone().into(),
];
op_store.process_incoming_ops(stored_ops).await.unwrap();

let fetch = CoreFetch::new(
config.clone(),
Expand Down Expand Up @@ -151,12 +136,12 @@ async fn respond_to_multiple_requests() {
assert!(responses_sent
.lock()
.unwrap()
.contains(&(vec![op_1, op_2], agent_url_1)));
.contains(&(vec![op_1.into(), op_2.into()], agent_url_1)));
// Only op 3 is in op store.
assert!(responses_sent
.lock()
.unwrap()
.contains(&(vec![op_3], agent_url_2)));
.contains(&(vec![op_3.into()], agent_url_2)));
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -249,7 +234,7 @@ async fn fail_to_respond_once_then_succeed() {

let op = make_op(vec![1; 128]);
op_store
.process_incoming_ops(vec![op.clone()])
.process_incoming_ops(vec![op.clone().into()])
.await
.unwrap();
let agent_url = Url::from_str("wss://127.0.0.1:1").unwrap();
Expand Down
32 changes: 12 additions & 20 deletions crates/core/src/factories/mem_op_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The mem op store implementation provided by Kitsune2.

use crate::factories::mem_op_store::time_slice_hash_store::TimeSliceHashStore;
use bytes::Bytes;
use futures::future::BoxFuture;
use kitsune2_api::builder::Builder;
use kitsune2_api::config::Config;
Expand Down Expand Up @@ -77,27 +78,18 @@ impl From<Kitsune2MemoryOp> for StoredOp {
}
}

impl TryFrom<MetaOp> for Kitsune2MemoryOp {
type Error = serde_json::Error;

fn try_from(value: MetaOp) -> serde_json::Result<Self> {
let op = serde_json::from_slice(value.op_data.as_slice())?;

Ok(op)
impl From<Bytes> for Kitsune2MemoryOp {
fn from(value: Bytes) -> Self {
serde_json::from_slice(&value)
.expect("failed to deserialize Kitsune2MemoryOp from Op")
}
}

impl TryFrom<Kitsune2MemoryOp> for MetaOp {
type Error = K2Error;

fn try_from(value: Kitsune2MemoryOp) -> K2Result<Self> {
let op_data = serde_json::to_vec(&value).map_err(|e| {
K2Error::other_src("Failed to serialize Kitsune2MemoryOp", e)
})?;
Ok(MetaOp {
op_id: value.op_id,
op_data,
})
impl From<Kitsune2MemoryOp> for Bytes {
fn from(value: Kitsune2MemoryOp) -> Self {
serde_json::to_vec(&value)
.expect("failed to serialize Op to Kitsune2MemoryOp")
.into()
}
}

Expand Down Expand Up @@ -133,13 +125,13 @@ struct Kitsune2MemoryOpStoreInner {
impl OpStore for Kitsune2MemoryOpStore {
fn process_incoming_ops(
&self,
op_list: Vec<MetaOp>,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>> {
Box::pin(async move {
let ops_to_add = op_list
.iter()
.map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> {
let op = Kitsune2MemoryOp::try_from(op.clone())?;
let op = Kitsune2MemoryOp::from(op.clone());
Ok((op.op_id.clone(), op))
})
.collect::<Result<Vec<_>, _>>().map_err(|e| {
Expand Down
3 changes: 1 addition & 2 deletions crates/dht/src/dht/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ async fn take_minimal_snapshot() {
UNIX_TIMESTAMP,
vec![],
)
.try_into()
.unwrap()])
.into()])
.await
.unwrap();

Expand Down
14 changes: 8 additions & 6 deletions crates/dht/src/dht/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ impl DhtSyncHarness {
) -> K2Result<()> {
self.store
.process_incoming_ops(
op_list
.iter()
.map(|op| op.clone().try_into().unwrap())
.collect(),
op_list.iter().map(|op| op.clone().into()).collect(),
)
.await?;

Expand Down Expand Up @@ -374,12 +371,17 @@ async fn transfer_ops(
target_dht: &mut Dht,
requested_ops: Vec<OpId>,
) -> K2Result<()> {
let selected = source.retrieve_ops(requested_ops).await?;
let selected = source
.retrieve_ops(requested_ops)
.await?
.into_iter()
.map(Into::into)
.collect::<Vec<_>>();
target.process_incoming_ops(selected.clone()).await?;

let stored_ops = selected
.into_iter()
.map(|op| Kitsune2MemoryOp::try_from(op).unwrap().into())
.map(|op| Kitsune2MemoryOp::from(op).into())
.collect::<Vec<StoredOp>>();
target_dht.inform_ops_stored(stored_ops).await?;

Expand Down
3 changes: 1 addition & 2 deletions crates/dht/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,7 @@ mod tests {
now,
end.to_be_bytes().to_vec(),
)
.try_into()
.unwrap()])
.into()])
.await
.unwrap();
}
Expand Down
Loading
Loading