Skip to content

Commit

Permalink
refactor: move message handler to separate file & test it
Browse files Browse the repository at this point in the history
  • Loading branch information
jost-s committed Jan 13, 2025
1 parent 8c932c1 commit 25780f7
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 417 deletions.
40 changes: 14 additions & 26 deletions crates/api/proto/fetch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,21 @@ message Response {
repeated kitsune2.op_store.Op ops = 1;
}

// Fetch message.
// message K2FetchProto {
// // Fetch message type.
// enum FetchMessageType {
// // A fetch request.
// REQUEST = 0;
// // A fetch response.
// RESPONSE = 1;
// }
//
// // Message type.
// FetchMessageType fetch_message_type = 1;
//
// // Request with op ids.
// optional Request request = 2;
//
// // Response with ops.
// optional Response response = 3;
// }

// Fetch message.
message K2FetchProto {
// Fetch message type.
oneof fetch_message {
// Request with op ids.
Request request = 1;
// Response with ops.
Response response = 2;
enum FetchMessageType {
// Default value.
UNSPECIFIED = 0;
// A fetch request.
REQUEST = 1;
// A fetch response.
RESPONSE = 2;
}
}

// Message type.
FetchMessageType fetch_message_type = 1;

// Message.
bytes data = 2;
}
58 changes: 47 additions & 11 deletions crates/api/proto/gen/kitsune2.fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,56 @@ pub struct Response {
/// Fetch message.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct K2FetchProto {
/// Fetch message type.
#[prost(oneof = "k2_fetch_proto::FetchMessage", tags = "1, 2")]
pub fetch_message: ::core::option::Option<k2_fetch_proto::FetchMessage>,
/// Message type.
#[prost(enumeration = "k2_fetch_proto::FetchMessageType", tag = "1")]
pub fetch_message_type: i32,
/// Message.
#[prost(bytes = "bytes", tag = "2")]
pub data: ::prost::bytes::Bytes,
}
/// Nested message and enum types in `K2FetchProto`.
pub mod k2_fetch_proto {
/// Fetch message type.
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum FetchMessage {
/// Request with op ids.
#[prost(message, tag = "1")]
Request(super::Request),
/// Response with ops.
#[prost(message, tag = "2")]
Response(super::Response),
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum FetchMessageType {
/// Default value.
Unspecified = 0,
/// A fetch request.
Request = 1,
/// A fetch response.
Response = 2,
}
impl FetchMessageType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Unspecified => "UNSPECIFIED",
Self::Request => "REQUEST",
Self::Response => "RESPONSE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"UNSPECIFIED" => Some(Self::Unspecified),
"REQUEST" => Some(Self::Request),
"RESPONSE" => Some(Self::Response),
_ => None,
}
}
}
}
83 changes: 63 additions & 20 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Kitsune2 fetch types.
use std::sync::Arc;

use k2_fetch_proto::FetchMessage;
use bytes::BytesMut;
use k2_fetch_proto::FetchMessageType;
use prost::Message;

use crate::{
builder, config, peer_store::DynPeerStore, transport::DynTransport,
AgentId, BoxFut, DynOpStore, K2Result, MetaOp, OpId, SpaceId,
};
use std::sync::Arc;

include!("../proto/gen/kitsune2.fetch.rs");

Expand Down Expand Up @@ -36,18 +36,29 @@ impl From<Vec<MetaOp>> for Response {

/// Serialize list of op ids for sending over the wire.
pub fn serialize_request(value: Vec<OpId>) -> bytes::Bytes {
let mut out = bytes::BytesMut::new();
let fetch_message = FetchMessage::Request(Request::from(value));
fetch_message.encode(&mut out);
let mut data = BytesMut::new();
Request::from(value).encode(&mut data).unwrap();
let data = data.freeze();
let fetch_message = K2FetchProto {
fetch_message_type: FetchMessageType::Request.into(),
data,
};
let mut out = BytesMut::new();
fetch_message.encode(&mut out).unwrap();
out.freeze()
}

/// Serialize list of ops for sending over the wire.
pub fn serialize_response(value: Vec<MetaOp>) -> bytes::Bytes {
let mut data = BytesMut::new();
Response::from(value).encode(&mut data).unwrap();
let data = data.freeze();
let fetch_message = K2FetchProto {
fetch_message_type: FetchMessageType::Response.into(),
data,
};
let mut out = bytes::BytesMut::new();
Response::from(value)
.encode(&mut out)
.expect("failed to serialize ops");
fetch_message.encode(&mut out).unwrap();
out.freeze()
}

Expand Down Expand Up @@ -89,7 +100,8 @@ mod test {
use crate::{id::Id, MetaOp};

use super::*;
use k2_fetch_proto::FetchMessage;
use bytes::BytesMut;
use k2_fetch_proto::FetchMessageType;
use prost::Message;

#[test]
Expand All @@ -107,6 +119,23 @@ mod test {
assert_eq!(op_id_vec, op_ids_dec_vec);
}

#[test]
fn unhappy_decode() {
let op_1 = MetaOp {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
println!("op id {}", op_1.op_id);
let ops_enc = Response::from(vec![op_1]).encode_to_vec();
let ops_dec = Request::decode(ops_enc.as_slice()).unwrap();
let op_ids_dec_vec = ops_dec
.op_ids
.into_iter()
.map(Into::into)
.collect::<Vec<OpId>>();
println!("{op_ids_dec_vec:?}");
}

#[test]
fn happy_response_encode_decode() {
let op_1 = MetaOp {
Expand All @@ -129,16 +158,23 @@ mod test {
#[test]
fn happy_fetch_encode_decode() {
let op_id = OpId(Id(bytes::Bytes::from_static(b"id_1")));
let mut data = BytesMut::new();
Request::from(vec![op_id]).encode(&mut data).unwrap();
let data = data.freeze();
let fetch_request = K2FetchProto {
fetch_message_type: FetchMessageType::Request.into(),
data,
};
// let fetch_request = K2FetchProto {
// fetch_message_type: FetchMessageType::Request.into(),
// request: Some(Request::from(vec![op_id])),
// ..Default::default()
// };
let fetch_request = K2FetchProto {
fetch_message: Some(FetchMessage::Request(Request::from(vec![
op_id,
]))),
};
// let fetch_request = K2FetchProto {
// fetch_message: Some(FetchMessage::Request(Request::from(vec![
// op_id,
// ]))),
// };

let request_enc = fetch_request.encode_to_vec();

Expand All @@ -149,16 +185,23 @@ mod test {
op_id: OpId::from(bytes::Bytes::from_static(b"some_op_id")),
op_data: vec![0],
};
let mut data = BytesMut::new();
Response::from(vec![op]).encode(&mut data).unwrap();
let data = data.freeze();
let fetch_response = K2FetchProto {
fetch_message_type: FetchMessageType::Response.into(),
data,
};
// let fetch_response = K2FetchProto {
// fetch_message_type: FetchMessageType::Response.into(),
// response: Some(Response::from(vec![op])),
// ..Default::default()
// };
let fetch_response = K2FetchProto {
fetch_message: Some(FetchMessage::Response(Response::from(vec![
op,
]))),
};
// let fetch_response = K2FetchProto {
// fetch_message: Some(FetchMessage::Response(Response::from(vec![
// op,
// ]))),
// };

let response_enc = fetch_response.encode_to_vec();

Expand Down
Loading

0 comments on commit 25780f7

Please sign in to comment.