Skip to content

Commit

Permalink
Rust updates, better doc testing (#598)
Browse files Browse the repository at this point in the history
Capture a bunch of changes I had sitting around for a while. Fixes a bug
in the mdbook, but otherwise no functional changes.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Nov 11, 2024
1 parent 5323fd6 commit 229c114
Show file tree
Hide file tree
Showing 47 changed files with 89 additions and 160 deletions.
16 changes: 6 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ Be sure to read the [documentation for timely dataflow](https://docs.rs/timely).

To use timely dataflow, add the following to the dependencies section of your project's `Cargo.toml` file:

```
```toml
[dependencies]
timely="*"
```

This will bring in the [`timely` crate](https://crates.io/crates/timely) from [crates.io](http://crates.io), which should allow you to start writing timely dataflow programs like this one (also available in [timely/examples/simple.rs](https://github.com/timelydataflow/timely-dataflow/blob/master/timely/examples/simple.rs)):

```rust
extern crate timely;

use timely::dataflow::operators::*;

fn main() {
Expand All @@ -32,7 +30,7 @@ fn main() {

You can run this example from the root directory of the `timely-dataflow` repository by typing

```
```text
% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
Expand All @@ -54,8 +52,6 @@ This is a very simple example (it's in the name), which only just suggests at ho
For a more involved example, consider the very similar (but more explicit) [examples/hello.rs](https://github.com/timelydataflow/timely-dataflow/blob/master/timely/examples/hello.rs), which creates and drives the dataflow separately:

```rust
extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

Expand Down Expand Up @@ -96,7 +92,7 @@ We first build a dataflow graph creating an input stream (with `input_from`), wh
We then drive the computation by repeatedly introducing rounds of data, where the `round` itself is used as the data. In each round, each worker introduces the same data, and then repeatedly takes dataflow steps until the `probe` reveals that all workers have processed all work for that epoch, at which point the computation proceeds.

With two workers, the output looks like
```
```text
% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0: hello 0
Expand All @@ -120,7 +116,7 @@ The `hello.rs` program above will by default use a single worker thread. To use
To use multiple processes, you will need to use the `-h` or `--hostfile` option to specify a text file whose lines are `hostname:port` entries corresponding to the locations you plan on spawning the processes. You will need to use the `-n` or `--processes` argument to indicate how many processes you will spawn (a prefix of the host file), and each process must use the `-p` or `--process` argument to indicate their index out of this number.

Said differently, you want a hostfile that looks like so,
```
```text
% cat hostfile.txt
host0:port
host1:port
Expand All @@ -129,7 +125,7 @@ host3:port
...
```
and then to launch the processes like so:
```
```text
host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
Expand Down Expand Up @@ -187,7 +183,7 @@ The communication layer is based on a type `Content<T>` which can be backed by t

**NOTE**: Differential dataflow demonstrates how to do this at the user level in its `operators/arrange.rs`, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports).

This would allow us to safely pass Rc<T> types around, as long as we use the `Pipeline` parallelization contract.
This would allow us to safely pass `Rc<T>` types around, as long as we use the `Pipeline` parallelization contract.

## Coarse- vs fine-grained timestamps

Expand Down
4 changes: 2 additions & 2 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ license = "MIT"
default = ["getopts"]

[dependencies]
getopts = { version = "0.2.14", optional = true }
getopts = { version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
timely_bytes = { path = "../bytes", version = "0.12" }
timely_logging = { path = "../logging", version = "0.12" }
crossbeam-channel = "0.5.0"
crossbeam-channel = "0.5"
2 changes: 0 additions & 2 deletions communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
extern crate timely_communication;

use std::ops::Deref;
use timely_communication::{Message, Allocate};

Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::cell::RefCell;
use std::collections::{VecDeque, HashMap, hash_map::Entry};
use crossbeam_channel::{Sender, Receiver};

use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

Expand Down Expand Up @@ -274,4 +274,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
}
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;
use std::collections::{VecDeque, HashMap, hash_map::Entry};
use crossbeam_channel::{Sender, Receiver};

use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

Expand Down Expand Up @@ -250,4 +250,4 @@ impl Allocate for ProcessAllocator {
}
}
}
}
}
3 changes: 1 addition & 2 deletions communication/src/allocator/zero_copy/bytes_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;

use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;
use super::bytes_slab::BytesSlab;

/// A target for `Bytes`.
Expand Down Expand Up @@ -177,4 +177,3 @@ impl<P: BytesPush> Drop for SendEndpoint<P> {
self.send_buffer();
}
}

4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A large binary allocation for writing and sharing.
use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;

/// A large binary allocation for writing and sharing.
///
Expand Down Expand Up @@ -91,4 +91,4 @@ impl BytesSlab {
}
}
}
}
}
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Drop for CommsGuard {
}

use crate::logging::{CommunicationSetup, CommunicationEvent};
use logging_core::Logger;
use timely_logging::Logger;

/// Initializes network connections
pub fn initialize_networking(
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;

use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;
Expand Down Expand Up @@ -139,4 +139,4 @@ impl<T:Data> Pull<Message<T>> for PullerInner<T> {
&mut self.current
}
}
}
}
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::bytes_slab::BytesSlab;
use super::bytes_exchange::MergeQueue;
use super::stream::Stream;

use logging_core::Logger;
use timely_logging::Logger;

use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};

Expand Down
2 changes: 1 addition & 1 deletion communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
use crate::allocator::zero_copy::initialize::initialize_networking;

use crate::logging::{CommunicationSetup, CommunicationEvent};
use logging_core::Logger;
use timely_logging::Logger;
use std::fmt::{Debug, Formatter};


Expand Down
14 changes: 3 additions & 11 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//!
//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait.
//!
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`] and [`Pull`]
//! traits), which is used for more precise control of resources.
//!
//! # Examples
Expand Down Expand Up @@ -59,7 +59,7 @@
//! else { println!("error in computation"); }
//! ```
//!
//! The should produce output like:
//! This should produce output like:
//!
//! ```ignore
//! worker 0 started
Expand All @@ -74,14 +74,6 @@
#![forbid(missing_docs)]

#[cfg(feature = "getopts")]
extern crate getopts;
extern crate bincode;
extern crate serde;

extern crate timely_bytes as bytes;
extern crate timely_logging as logging_core;

pub mod allocator;
pub mod networking;
pub mod initialize;
Expand Down Expand Up @@ -171,4 +163,4 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
}

(senders, recvers)
}
}
2 changes: 1 addition & 1 deletion communication/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Types wrapping typed data.
use bytes::arc::Bytes;
use timely_bytes::arc::Bytes;
use crate::Data;

/// A wrapped message which supports serialization and deserialization.
Expand Down
18 changes: 7 additions & 11 deletions mdbook/src/chapter_0/chapter_0_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
Let's start with what may be the simplest non-trivial timely dataflow program.

```rust
extern crate timely;
# extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x));
});
}
timely::example(|scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x));
});
```

This program gives us a bit of a flavor for what a timely dataflow program might look like, including a bit of what Rust looks like, without getting too bogged down in weird stream processing details. Not to worry; we will do that in just a moment!
Expand All @@ -39,9 +37,7 @@ If we run the program up above, we see it print out the numbers zero through nin
This isn't very different from a Rust program that would do this much more simply, namely the program

```rust
fn main() {
(0..10).for_each(|x| println!("seen: {:?}", x));
}
(0..10).for_each(|x| println!("seen: {:?}", x));
```

Why would we want to make our life so complicated? The main reason is that we can make our program *reactive*, so that we can run it without knowing ahead of time the data we will use, and it will respond as we produce new data.
Why would we want to make our life so complicated? The main reason is that we can make our program *reactive*, so that we can run it without knowing ahead of time the data we will use, and it will respond as we produce new data.
50 changes: 24 additions & 26 deletions mdbook/src/chapter_0/chapter_0_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,31 @@ extern crate timely;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {

let index = worker.index();
let mut input = InputHandle::new();

// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope|
scope.input_from(&mut input)
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
);

// introduce data and watch!
for round in 0..10 {
if index == 0 {
input.send(round);
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {

let index = worker.index();
let mut input = InputHandle::new();

// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope|
scope.input_from(&mut input)
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
);

// introduce data and watch!
for round in 0..10 {
if index == 0 {
input.send(round);
}
}).unwrap();
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
```

We can run this program in a variety of configurations: with just a single worker thread, with one process and multiple worker threads, and with multiple processes each with multiple worker threads.
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ fn main() {

});
}
```
```
8 changes: 4 additions & 4 deletions mdbook/src/chapter_4/chapter_4_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ We can check out the examples `examples/capture_send.rs` and `examples/capture_r

The `capture_send` example creates a new TCP connection for each worker, which it wraps and uses as an `EventPusher`. Timely dataflow takes care of all the serialization and stuff like that (warning: it uses abomonation, so this is not great for long-term storage).

```rust,ignore
```rust,no_run
extern crate timely;
use std::net::TcpStream;
Expand All @@ -138,7 +138,7 @@ fn main() {

The `capture_recv` example is more complicated, because we may have a different number of workers replaying the stream than initially captured it.

```rust,ignore
```rust,no_run
extern crate timely;
use std::net::TcpListener;
Expand All @@ -158,10 +158,10 @@ fn main() {
.collect::<Vec<_>>()
.into_iter()
.map(|l| l.incoming().next().unwrap().unwrap())
.map(|r| EventReader::<_,u64,_>::new(r))
.map(|r| EventReader::<_,Vec<u64>,_>::new(r))
.collect::<Vec<_>>();
worker.dataflow::<u64,_,_>(|scope| {
worker.dataflow::<u64,_,_>(move |scope| {
replayers
.replay_into(scope)
.inspect(|x| println!("replayed: {:?}", x));
Expand Down
Loading

0 comments on commit 229c114

Please sign in to comment.