Skip to content

Commit

Permalink
Allow containers to specify their own serialization (#604)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Nov 25, 2024
1 parent b411957 commit 3307e10
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
73 changes: 67 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,78 @@ impl<T, C: Container> Message<T, C> {
impl<T, C> crate::communication::Bytesable for Message<T, C>
where
T: Serialize + for<'a> Deserialize<'a>,
C: Serialize + for<'a> Deserialize<'a>,
C: ContainerBytes,
{
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
let mut slice = &bytes[..];
let from: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
let seq: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
let time: T = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
let bytes_read = bytes.len() - slice.len();
bytes.extract_to(bytes_read);
let data: C = ContainerBytes::from_bytes(bytes);
Self { time, data, from, seq }
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
::bincode::serialized_size(&self.from).expect("bincode::serialized_size() failed") as usize +
::bincode::serialized_size(&self.seq).expect("bincode::serialized_size() failed") as usize +
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
self.data.length_in_bytes()
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
::bincode::serialize_into(&mut *writer, &self.from).expect("bincode::serialize_into() failed");
::bincode::serialize_into(&mut *writer, &self.seq).expect("bincode::serialize_into() failed");
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
self.data.into_bytes(&mut *writer);
}
}
}


/// A container-oriented version of `Bytesable` that can be implemented here for `Vec<T>` and other containers.
pub trait ContainerBytes {
/// Wrap bytes as `Self`.
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;

/// The number of bytes required to serialize the data.
fn length_in_bytes(&self) -> usize;

/// Writes the binary representation into `writer`.
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
}

mod implementations {

use serde::{Serialize, Deserialize};
use crate::dataflow::channels::ContainerBytes;

impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}

use crate::container::flatcontainer::FlatStack;
impl<T: Serialize + for<'a> Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack<T> {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}
}
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
where
C: ExchangeData + PushPartitioned,
C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait Exchange<C: PushPartitioned> {

impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
where
C: PushPartitioned + ExchangeData,
C: PushPartitioned + ExchangeData + crate::dataflow::channels::ContainerBytes,
{
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
where
Expand Down

0 comments on commit 3307e10

Please sign in to comment.