Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Op store split metadata #74

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 19 additions & 31 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{
builder, config, peer_store::DynPeerStore, transport::DynTransport,
AgentId, BoxFut, DynOpStore, K2Result, MetaOp, OpId, SpaceId,
AgentId, BoxFut, DynOpStore, K2Result, OpId, SpaceId,
};
use bytes::{Bytes, BytesMut};
use k2_fetch_message::FetchMessageType;
Expand All @@ -25,8 +25,8 @@ impl From<Request> for Vec<OpId> {
}
}

impl From<Vec<MetaOp>> for Response {
fn from(value: Vec<MetaOp>) -> Self {
impl From<Vec<Bytes>> for Response {
fn from(value: Vec<Bytes>) -> Self {
Self {
ops: value.into_iter().map(Into::into).collect(),
}
Expand Down Expand Up @@ -57,17 +57,17 @@ pub fn serialize_request_message(value: Vec<OpId>) -> Bytes {
}

/// Serialize list of ops to response.
pub fn serialize_response(value: Vec<MetaOp>) -> Bytes {
pub fn serialize_response(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
Response::from(value)
.encode(&mut out)
.expect("failed to encode response");
out.freeze()
}

/// Serialize list of ops to fetch resopnse message.
pub fn serialize_response_message(value: Vec<MetaOp>) -> Bytes {
let mut out = bytes::BytesMut::new();
/// Serialize list of ops to fetch response message.
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
pub fn serialize_response_message(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
let data = serialize_response(value);
let fetch_message = K2FetchMessage {
fetch_message_type: FetchMessageType::Response.into(),
Expand Down Expand Up @@ -115,7 +115,7 @@ pub type DynFetchFactory = Arc<dyn FetchFactory>;
#[cfg(test)]
mod test {
use super::*;
use crate::{id::Id, MetaOp};
use crate::id::Id;
use prost::Message;

#[test]
Expand All @@ -135,24 +135,18 @@ mod test {

#[test]
fn happy_response_encode_decode() {
let op_1 = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
let op_2 = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"another_op_id")),
op_data: vec![1],
};
let op_vec = vec![op_1, op_2];
let ops_enc = serialize_response(op_vec.clone());
let expected_ops_data =
op_vec.into_iter().map(|op| op.op_data).collect::<Vec<_>>();
// Not real op payloads, any bytes will do to check the round trip encoding/decoding
// of the response type
let op_1 = bytes::Bytes::from(vec![0]);
let op_2 = bytes::Bytes::from(vec![1]);
let expected_ops_data = vec![op_1, op_2];
let ops_enc = serialize_response(expected_ops_data.clone());

let response = Response::decode(ops_enc).unwrap();
let actual_ops_data = response
.ops
.into_iter()
.map(|op| op.data.to_vec())
.map(|op| op.data)
.collect::<Vec<_>>();
assert_eq!(expected_ops_data, actual_ops_data);
}
Expand All @@ -179,16 +173,10 @@ mod test {

#[test]
fn happy_fetch_response_encode_decode() {
let op = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
let ops = vec![op];
let fetch_response = serialize_response_message(ops.clone());
let expected_ops_data = ops
.into_iter()
.map(|op| Bytes::from(op.op_data))
.collect::<Vec<_>>();
let op = bytes::Bytes::from(vec![0]);
let expected_ops_data = vec![op];
let fetch_response =
serialize_response_message(expected_ops_data.clone());

let fetch_message_dec = K2FetchMessage::decode(fetch_response).unwrap();
assert_eq!(
Expand Down
33 changes: 9 additions & 24 deletions crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::{
builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
};
use bytes::Bytes;
use futures::future::BoxFuture;
#[cfg(feature = "mockall")]
use mockall::automock;
Expand All @@ -19,22 +18,14 @@ pub struct MetaOp {
pub op_id: OpId,

/// The actual op data.
pub op_data: Vec<u8>,
pub op_data: bytes::Bytes,
}

include!("../proto/gen/kitsune2.op_store.rs");

impl From<MetaOp> for Op {
fn from(value: MetaOp) -> Self {
Self {
data: value.op_data.into(),
}
}
}

impl From<MetaOp> for Bytes {
fn from(value: MetaOp) -> Self {
value.op_data.into()
impl From<bytes::Bytes> for Op {
fn from(value: bytes::Bytes) -> Self {
Self { data: value }
}
}

Expand All @@ -54,12 +45,12 @@ pub struct StoredOp {
///
/// Note that this means any op implementation must include a consistent timestamp in the op
/// data so that it can be provided back to Kitsune.
pub timestamp: Timestamp,
pub created_at: Timestamp,
}

impl Ord for StoredOp {
fn cmp(&self, other: &Self) -> Ordering {
(&self.timestamp, &self.op_id).cmp(&(&other.timestamp, &other.op_id))
(&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
}
}

Expand All @@ -78,8 +69,8 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>>;
op_list: Vec<bytes::Bytes>,
) -> BoxFuture<'_, K2Result<Vec<OpId>>>;

/// Retrieve a batch of ops from the host by time range.
///
Expand Down Expand Up @@ -151,18 +142,12 @@ pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;

#[cfg(test)]
mod test {
use crate::MetaOp;

use super::*;
use prost::Message;

#[test]
fn happy_meta_op_encode_decode() {
let meta_op = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![1; 128],
};
let op = Op::from(meta_op);
let op = Op::from(bytes::Bytes::from(vec![1; 128]));
let op_enc = op.encode_to_vec();
let op_dec = Op::decode(op_enc.as_slice()).unwrap();

Expand Down
1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ ed25519-dalek = { workspace = true, features = ["rand_core"] }
kitsune2_api = { workspace = true, features = ["mockall"] }
kitsune2_test_utils = { workspace = true }
rand = { workspace = true }
sha2 = { workspace = true }
tokio = { workspace = true, features = ["full"] }
4 changes: 3 additions & 1 deletion crates/core/src/factories/core_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ impl CoreFetch {
tracing::error!("could not read ops from store: {err}");
continue;
}
Ok(ops) => ops,
Ok(ops) => {
ops.into_iter().map(|op| op.op_data).collect::<Vec<_>>()
}
};

if ops.is_empty() {
Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/factories/core_fetch/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,9 @@ mod test {
let peer = Url::from_str("wss://127.0.0.1:1").unwrap();

let op = make_op(vec![0]);
let received_ops = vec![op];
let request_message = serialize_response_message(received_ops.clone());
let expected_ops_data = received_ops
.into_iter()
.map(|op| Bytes::from(op.op_data))
.collect::<Vec<_>>();
let expected_ops_data = vec![op.into()];
let request_message =
serialize_response_message(expected_ops_data.clone());

let task_handle = tokio::task::spawn(async move {
let ops = incoming_response_rx
Expand Down
26 changes: 4 additions & 22 deletions crates/core/src/factories/core_fetch/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ mod outgoing_request_queue;

#[cfg(test)]
pub(crate) mod utils {
use crate::factories::Kitsune2MemoryOp;
use crate::factories::MemoryOp;
use bytes::Bytes;
use kitsune2_api::{id::Id, AgentId, MetaOp, OpId, Timestamp};
use kitsune2_api::{id::Id, AgentId, OpId, Timestamp};
use rand::Rng;

pub fn random_id() -> Id {
Expand Down Expand Up @@ -33,25 +33,7 @@ pub(crate) mod utils {
ops
}

pub fn hash_op(input: &bytes::Bytes) -> OpId {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(input);
let result = hasher.finalize();
let hash_bytes = bytes::Bytes::from(result.to_vec());
hash_bytes.into()
}

pub fn make_op(data: Vec<u8>) -> MetaOp {
let op_id = hash_op(&data.clone().into());
MetaOp {
op_id: op_id.clone(),
op_data: serde_json::to_vec(&Kitsune2MemoryOp::new(
op_id,
Timestamp::now(),
data,
))
.unwrap(),
}
pub fn make_op(data: Vec<u8>) -> MemoryOp {
MemoryOp::new(Timestamp::now(), data)
}
}
35 changes: 11 additions & 24 deletions crates/core/src/factories/core_fetch/test/incoming_request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use super::utils::random_op_id;
use crate::{
default_test_builder,
factories::{
core_fetch::{
test::utils::{hash_op, make_op},
CoreFetch, CoreFetchConfig,
},
Kitsune2MemoryOp, MemOpStoreFactory,
core_fetch::{test::utils::make_op, CoreFetch, CoreFetchConfig},
MemOpStoreFactory, MemoryOp,
},
};
use bytes::Bytes;
Expand All @@ -17,7 +14,7 @@ use kitsune2_api::{
},
id::Id,
transport::MockTransport,
K2Error, MetaOp, SpaceId, Url,
K2Error, SpaceId, Url,
};
use kitsune2_test_utils::enable_tracing;
use prost::Message;
Expand Down Expand Up @@ -98,10 +95,12 @@ async fn respond_to_multiple_requests() {
mock_transport.clone(),
);

let requested_op_ids_1 =
serialize_request_message(vec![op_1.op_id.clone(), op_2.op_id.clone()]);
let requested_op_ids_1 = serialize_request_message(vec![
op_1.compute_op_id(),
op_2.compute_op_id(),
]);
let requested_op_ids_2 =
serialize_request_message(vec![op_3.op_id.clone(), random_op_id()]);
serialize_request_message(vec![op_3.compute_op_id(), random_op_id()]);
fetch
.message_handler
.recv_module_msg(
Expand Down Expand Up @@ -214,20 +213,8 @@ async fn fail_to_respond_once_then_succeed() {
let response =
Response::decode(K2FetchMessage::decode(data).unwrap().data)
.unwrap();
let ops = response
.ops
.into_iter()
.map(|op| {
let op_data =
serde_json::from_slice::<Kitsune2MemoryOp>(&op.data)
.unwrap();
let op_id = hash_op(&bytes::Bytes::from(op_data.payload));
MetaOp {
op_id,
op_data: op.data.into(),
}
})
.collect::<Vec<_>>();
let ops: Vec<MemoryOp> =
response.ops.into_iter().map(Into::into).collect::<Vec<_>>();
responses_sent.lock().unwrap().push((ops, peer));
Box::pin(async move { Ok(()) })
}
Expand All @@ -251,7 +238,7 @@ async fn fail_to_respond_once_then_succeed() {
);

// Handle op request.
let data = serialize_request_message(vec![op.op_id]);
let data = serialize_request_message(vec![op.compute_op_id()]);
fetch
.message_handler
.recv_module_msg(
Expand Down
Loading
Loading