Skip to content

Commit

Permalink
Op store split metadata (#74)
Browse files Browse the repository at this point in the history
* Refactor op store to properly handle op data vs metadata

* Return op ids after processing

* Remove more uses of MetaOp
  • Loading branch information
ThetaSinner authored Jan 16, 2025
1 parent c0328be commit 84239d1
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 421 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 19 additions & 31 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
builder, config, peer_store::DynPeerStore, transport::DynTransport,
AgentId, BoxFut, DynOpStore, K2Result, MetaOp, OpId, SpaceId,
AgentId, BoxFut, DynOpStore, K2Result, OpId, SpaceId,
};
use bytes::{Bytes, BytesMut};
use k2_fetch_message::FetchMessageType;
Expand All @@ -25,8 +25,8 @@ impl From<Request> for Vec<OpId> {
}
}

impl From<Vec<MetaOp>> for Response {
fn from(value: Vec<MetaOp>) -> Self {
impl From<Vec<Bytes>> for Response {
fn from(value: Vec<Bytes>) -> Self {
Self {
ops: value.into_iter().map(Into::into).collect(),
}
Expand Down Expand Up @@ -57,17 +57,17 @@ pub fn serialize_request_message(value: Vec<OpId>) -> Bytes {
}

/// Serialize list of ops to response.
pub fn serialize_response(value: Vec<MetaOp>) -> Bytes {
pub fn serialize_response(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
Response::from(value)
.encode(&mut out)
.expect("failed to encode response");
out.freeze()
}

/// Serialize list of ops to fetch resopnse message.
pub fn serialize_response_message(value: Vec<MetaOp>) -> Bytes {
let mut out = bytes::BytesMut::new();
/// Serialize list of ops to fetch response message.
pub fn serialize_response_message(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
let data = serialize_response(value);
let fetch_message = K2FetchMessage {
fetch_message_type: FetchMessageType::Response.into(),
Expand Down Expand Up @@ -115,7 +115,7 @@ pub type DynFetchFactory = Arc<dyn FetchFactory>;
#[cfg(test)]
mod test {
use super::*;
use crate::{id::Id, MetaOp};
use crate::id::Id;
use prost::Message;

#[test]
Expand All @@ -135,24 +135,18 @@ mod test {

#[test]
fn happy_response_encode_decode() {
let op_1 = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
let op_2 = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"another_op_id")),
op_data: vec![1],
};
let op_vec = vec![op_1, op_2];
let ops_enc = serialize_response(op_vec.clone());
let expected_ops_data =
op_vec.into_iter().map(|op| op.op_data).collect::<Vec<_>>();
// Not real op payloads, any bytes will do to check the round trip encoding/decoding
// of the response type
let op_1 = bytes::Bytes::from(vec![0]);
let op_2 = bytes::Bytes::from(vec![1]);
let expected_ops_data = vec![op_1, op_2];
let ops_enc = serialize_response(expected_ops_data.clone());

let response = Response::decode(ops_enc).unwrap();
let actual_ops_data = response
.ops
.into_iter()
.map(|op| op.data.to_vec())
.map(|op| op.data)
.collect::<Vec<_>>();
assert_eq!(expected_ops_data, actual_ops_data);
}
Expand All @@ -179,16 +173,10 @@ mod test {

#[test]
fn happy_fetch_response_encode_decode() {
let op = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
let ops = vec![op];
let fetch_response = serialize_response_message(ops.clone());
let expected_ops_data = ops
.into_iter()
.map(|op| Bytes::from(op.op_data))
.collect::<Vec<_>>();
let op = bytes::Bytes::from(vec![0]);
let expected_ops_data = vec![op];
let fetch_response =
serialize_response_message(expected_ops_data.clone());

let fetch_message_dec = K2FetchMessage::decode(fetch_response).unwrap();
assert_eq!(
Expand Down
33 changes: 9 additions & 24 deletions crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::{
builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
};
use bytes::Bytes;
use futures::future::BoxFuture;
#[cfg(feature = "mockall")]
use mockall::automock;
Expand All @@ -19,22 +18,14 @@ pub struct MetaOp {
pub op_id: OpId,

/// The actual op data.
pub op_data: Vec<u8>,
pub op_data: bytes::Bytes,
}

include!("../proto/gen/kitsune2.op_store.rs");

impl From<MetaOp> for Op {
fn from(value: MetaOp) -> Self {
Self {
data: value.op_data.into(),
}
}
}

impl From<MetaOp> for Bytes {
fn from(value: MetaOp) -> Self {
value.op_data.into()
impl From<bytes::Bytes> for Op {
fn from(value: bytes::Bytes) -> Self {
Self { data: value }
}
}

Expand All @@ -54,12 +45,12 @@ pub struct StoredOp {
///
/// Note that this means any op implementation must include a consistent timestamp in the op
/// data so that it can be provided back to Kitsune.
pub timestamp: Timestamp,
pub created_at: Timestamp,
}

impl Ord for StoredOp {
fn cmp(&self, other: &Self) -> Ordering {
(&self.timestamp, &self.op_id).cmp(&(&other.timestamp, &other.op_id))
(&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
}
}

Expand All @@ -78,8 +69,8 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>>;
op_list: Vec<bytes::Bytes>,
) -> BoxFuture<'_, K2Result<Vec<OpId>>>;

/// Retrieve a batch of ops from the host by time range.
///
Expand Down Expand Up @@ -151,18 +142,12 @@ pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;

#[cfg(test)]
mod test {
use crate::MetaOp;

use super::*;
use prost::Message;

#[test]
fn happy_meta_op_encode_decode() {
let meta_op = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![1; 128],
};
let op = Op::from(meta_op);
let op = Op::from(bytes::Bytes::from(vec![1; 128]));
let op_enc = op.encode_to_vec();
let op_dec = Op::decode(op_enc.as_slice()).unwrap();

Expand Down
1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ ed25519-dalek = { workspace = true, features = ["rand_core"] }
kitsune2_api = { workspace = true, features = ["mockall"] }
kitsune2_test_utils = { workspace = true }
rand = { workspace = true }
sha2 = { workspace = true }
tokio = { workspace = true, features = ["full"] }
4 changes: 3 additions & 1 deletion crates/core/src/factories/core_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ impl CoreFetch {
tracing::error!("could not read ops from store: {err}");
continue;
}
Ok(ops) => ops,
Ok(ops) => {
ops.into_iter().map(|op| op.op_data).collect::<Vec<_>>()
}
};

if ops.is_empty() {
Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/factories/core_fetch/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,9 @@ mod test {
let peer = Url::from_str("wss://127.0.0.1:1").unwrap();

let op = make_op(vec![0]);
let received_ops = vec![op];
let request_message = serialize_response_message(received_ops.clone());
let expected_ops_data = received_ops
.into_iter()
.map(|op| Bytes::from(op.op_data))
.collect::<Vec<_>>();
let expected_ops_data = vec![op.into()];
let request_message =
serialize_response_message(expected_ops_data.clone());

let task_handle = tokio::task::spawn(async move {
let ops = incoming_response_rx
Expand Down
26 changes: 4 additions & 22 deletions crates/core/src/factories/core_fetch/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ mod outgoing_request_queue;

#[cfg(test)]
pub(crate) mod utils {
use crate::factories::Kitsune2MemoryOp;
use crate::factories::MemoryOp;
use bytes::Bytes;
use kitsune2_api::{id::Id, AgentId, MetaOp, OpId, Timestamp};
use kitsune2_api::{id::Id, AgentId, OpId, Timestamp};
use rand::Rng;

pub fn random_id() -> Id {
Expand Down Expand Up @@ -33,25 +33,7 @@ pub(crate) mod utils {
ops
}

pub fn hash_op(input: &bytes::Bytes) -> OpId {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(input);
let result = hasher.finalize();
let hash_bytes = bytes::Bytes::from(result.to_vec());
hash_bytes.into()
}

pub fn make_op(data: Vec<u8>) -> MetaOp {
let op_id = hash_op(&data.clone().into());
MetaOp {
op_id: op_id.clone(),
op_data: serde_json::to_vec(&Kitsune2MemoryOp::new(
op_id,
Timestamp::now(),
data,
))
.unwrap(),
}
pub fn make_op(data: Vec<u8>) -> MemoryOp {
MemoryOp::new(Timestamp::now(), data)
}
}
35 changes: 11 additions & 24 deletions crates/core/src/factories/core_fetch/test/incoming_request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use super::utils::random_op_id;
use crate::{
default_test_builder,
factories::{
core_fetch::{
test::utils::{hash_op, make_op},
CoreFetch, CoreFetchConfig,
},
Kitsune2MemoryOp, MemOpStoreFactory,
core_fetch::{test::utils::make_op, CoreFetch, CoreFetchConfig},
MemOpStoreFactory, MemoryOp,
},
};
use bytes::Bytes;
Expand All @@ -17,7 +14,7 @@ use kitsune2_api::{
},
id::Id,
transport::MockTransport,
K2Error, MetaOp, SpaceId, Url,
K2Error, SpaceId, Url,
};
use kitsune2_test_utils::enable_tracing;
use prost::Message;
Expand Down Expand Up @@ -98,10 +95,12 @@ async fn respond_to_multiple_requests() {
mock_transport.clone(),
);

let requested_op_ids_1 =
serialize_request_message(vec![op_1.op_id.clone(), op_2.op_id.clone()]);
let requested_op_ids_1 = serialize_request_message(vec![
op_1.compute_op_id(),
op_2.compute_op_id(),
]);
let requested_op_ids_2 =
serialize_request_message(vec![op_3.op_id.clone(), random_op_id()]);
serialize_request_message(vec![op_3.compute_op_id(), random_op_id()]);
fetch
.message_handler
.recv_module_msg(
Expand Down Expand Up @@ -214,20 +213,8 @@ async fn fail_to_respond_once_then_succeed() {
let response =
Response::decode(K2FetchMessage::decode(data).unwrap().data)
.unwrap();
let ops = response
.ops
.into_iter()
.map(|op| {
let op_data =
serde_json::from_slice::<Kitsune2MemoryOp>(&op.data)
.unwrap();
let op_id = hash_op(&bytes::Bytes::from(op_data.payload));
MetaOp {
op_id,
op_data: op.data.into(),
}
})
.collect::<Vec<_>>();
let ops: Vec<MemoryOp> =
response.ops.into_iter().map(Into::into).collect::<Vec<_>>();
responses_sent.lock().unwrap().push((ops, peer));
Box::pin(async move { Ok(()) })
}
Expand All @@ -251,7 +238,7 @@ async fn fail_to_respond_once_then_succeed() {
);

// Handle op request.
let data = serialize_request_message(vec![op.op_id]);
let data = serialize_request_message(vec![op.compute_op_id()]);
fetch
.message_handler
.recv_module_msg(
Expand Down
Loading

0 comments on commit 84239d1

Please sign in to comment.