Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Bytesable messages to u64 #614

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ impl<T, C: Container> Message<T, C> {
}

// Instructions for serialization of `Message`.
// Intended to swap out the constraint on `C` for `C: Bytesable`.
//
// Serialization of each field is meant to be `u64` aligned, so that each has tha ability
// to be decoded using safe transmutation, e.g. `bytemuck`.
impl<T, C> crate::communication::Bytesable for Message<T, C>
where
T: Serialize + for<'a> Deserialize<'a>,
Expand All @@ -69,24 +71,29 @@ where
let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
let bytes_read = bytes.len() - slice.len();
let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
// We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
let bytes_read = 8 + 8 + ((time_size + 7) & !7);
// let bytes_read = bytes.len() - slice.len();
bytes.extract_to(bytes_read);
let data: C = ContainerBytes::from_bytes(bytes);
Self { time, data, from, seq }
}

fn length_in_bytes(&self) -> usize {
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
// 16 comes from the two `u64` fields: `from` and `seq`.
16 +
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
self.data.length_in_bytes()
16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
use byteorder::WriteBytesExt;
writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
let time_slop = ((time_size + 7) & !7) - time_size;
writer.write(&[0u8; 8][..time_slop]).unwrap();
self.data.into_bytes(&mut *writer);
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub trait Partition<G: Scope, C: Container> {
/// Produces `parts` output streams, containing records produced and assigned by `route`.
///
/// # Examples
/// ```
/// ```ignore
/// use timely::dataflow::operators::ToStream;
/// use timely::dataflow::operators::core::{Partition, Inspect};
///
Expand Down
17 changes: 12 additions & 5 deletions timely/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl<T: Clone+'static> Data for T { }
///
/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication`
/// `Data` trait, which describes requirements for communication along channels.
pub trait ExchangeData: Data + encoding::Data { }
impl<T: Data + encoding::Data> ExchangeData for T { }
pub trait ExchangeData: Data + encoding::Data + columnar::Columnar { }
impl<T: Data + encoding::Data + columnar::Columnar> ExchangeData for T { }
Comment on lines +111 to +112
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .. worked out ok but I realized is from a prior attempt that wanted to columnarize things. Unnecessary for this PR.


#[doc = include_str!("../../README.md")]
#[cfg(doctest)]
Expand Down Expand Up @@ -141,18 +141,25 @@ mod encoding {
}
}

// We will pad out anything we write to make the result `u64` aligned.
impl<T: Data> Bytesable for Bincode<T> {
fn from_bytes(bytes: Bytes) -> Self {
let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
let typed_size = ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize;
assert_eq!(bytes.len(), (typed_size + 7) & !7);
Bincode { payload: typed }
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
(typed_size + 7) & !7
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed");
fn into_bytes<W: ::std::io::Write>(&self, mut writer: &mut W) {
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
let typed_slop = ((typed_size + 7) & !7) - typed_size;
::bincode::serialize_into(&mut writer, &self.payload).expect("bincode::serialize_into() failed");
writer.write(&[0u8; 8][..typed_slop]).unwrap();
}
}

Expand Down
Loading