Skip to content

Commit

Permalink
Revert "Add support for native async stream sources (#357)"
Browse files Browse the repository at this point in the history
This reverts commit 935b7e9.
  • Loading branch information
antiguru committed Feb 5, 2024
1 parent 806540c commit f93530d
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 96 deletions.
1 change: 0 additions & 1 deletion timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ timely_logging = { path = "../logging", version = "0.12" }
timely_communication = { path = "../communication", version = "0.12", default-features = false }
timely_container = { path = "../container", version = "0.12" }
crossbeam-channel = "0.5.0"
futures-util = "0.3"

[dev-dependencies]
# timely_sort="0.1.6"
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use self::delay::Delay;
pub use self::exchange::Exchange;
pub use self::broadcast::Broadcast;
pub use self::probe::Probe;
pub use self::to_stream::{ToStream, ToStreamCore, ToStreamAsync, Event};
pub use self::to_stream::{ToStream, ToStreamCore};
pub use self::capture::Capture;
pub use self::branch::{Branch, BranchWhen};
pub use self::ok_err::OkErr;
Expand Down
88 changes: 2 additions & 86 deletions timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
//! Conversion to the `Stream` type from iterators.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::Container;

use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::CapabilitySet;
use crate::dataflow::{StreamCore, Scope, Stream};
use crate::progress::Timestamp;
use crate::Data;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::{StreamCore, Stream, Scope};

/// Converts to a timely `Stream`.
pub trait ToStream<T: Timestamp, D: Data> {
Expand Down Expand Up @@ -112,82 +107,3 @@ impl<T: Timestamp, I: IntoIterator+'static> ToStreamCore<T, I::Item> for I where
})
}
}

/// Data and progress events of the native stream.
pub enum Event<F: IntoIterator, D> {
/// Indicates that timestamps have advanced to frontier F
Progress(F),
/// Indicates that event D happened at time T
Message(F::Item, D),
}

/// Converts to a timely `Stream`.
pub trait ToStreamAsync<T: Timestamp, D: Data> {
/// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely
/// `Stream`](crate::dataflow::Stream).
///
/// # Examples
///
/// ```
/// use futures_util::stream;
///
/// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync};
/// use timely::dataflow::operators::capture::Extract;
///
/// let native_stream = stream::iter(vec![
/// Event::Message(0, 0),
/// Event::Message(0, 1),
/// Event::Message(0, 2),
/// Event::Progress(Some(0)),
/// ]);
///
/// let native_stream = Box::pin(native_stream);
///
/// let (data1, data2) = timely::example(|scope| {
/// let data1 = native_stream.to_stream(scope).capture();
/// let data2 = vec![0,1,2].to_stream(scope).capture();
///
/// (data1, data2)
/// });
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream<S: Scope<Timestamp = T>>(self: Pin<Box<Self>>, scope: &S) -> Stream<S, D>;
}

impl<T, D, F, I> ToStreamAsync<T, D> for I
where
D: Data,
T: Timestamp,
F: IntoIterator<Item = T>,
I: futures_util::stream::Stream<Item = Event<F, D>> + ?Sized + 'static,
{
fn to_stream<S: Scope<Timestamp = T>>(mut self: Pin<Box<Self>>, scope: &S) -> Stream<S, D> {
source(scope, "ToStreamAsync", move |capability, info| {
let activator = Arc::new(scope.sync_activator_for(&info.address[..]));

let mut cap_set = CapabilitySet::from_elem(capability);

move |output| {
let waker = futures_util::task::waker_ref(&activator);
let mut context = Context::from_waker(&waker);

// Consume all the ready items of the source_stream and issue them to the operator
while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) {
match item {
Some(Event::Progress(time)) => {
cap_set.downgrade(time);
}
Some(Event::Message(time, data)) => {
output.session(&cap_set.delayed(&time)).give(data);
}
None => {
cap_set.downgrade(&[]);
break;
}
}
}
}
})
}
}
8 changes: 0 additions & 8 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
//! Parking and unparking timely fibers.
use std::rc::Rc;
use std::sync::Arc;
use std::cell::RefCell;
use std::thread::Thread;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
use std::cmp::Reverse;
use crossbeam_channel::{Sender, Receiver};
use futures_util::task::ArcWake;

/// Methods required to act as a timely scheduler.
///
Expand Down Expand Up @@ -274,12 +272,6 @@ impl SyncActivator {
}
}

impl ArcWake for SyncActivator {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.activate().unwrap();
}
}

/// The error returned when activation fails across thread boundaries because
/// the receiving end has hung up.
#[derive(Clone, Copy, Debug)]
Expand Down

0 comments on commit f93530d

Please sign in to comment.