Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report when shared memory region is mapped to allow faster cleanup #673

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 3 additions & 50 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
daemon_to_node::{DaemonCommunication, DaemonReply},
id::DataId,
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
Expand All @@ -18,10 +18,7 @@ use futures::{
use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT};

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{Input, NodeId},
Expand Down Expand Up @@ -198,51 +195,7 @@ impl EventStream {

fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
}) => unsafe {
MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: ack_channel,
}))
})
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::NodeEvent { event } => event,
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
Expand Down
10 changes: 2 additions & 8 deletions apis/rust/node/src/event_stream/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::{HashMap, VecDeque};

use dora_message::{daemon_to_node::NodeEvent, id::DataId};

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, x86_64-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, armv7-unknown-linux-musleabihf)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, aarch64-unknown-linux-musl)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, i686-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, aarch64-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (macos-13, x86_64-apple-darwin)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (macos-13, aarch64-apple-darwin)

unused import: `daemon_to_node::NodeEvent`

use super::thread::EventItem;
use super::{thread::EventItem, Event};
pub const NON_INPUT_EVENT: &str = "dora/non_input_event";

// This scheduler will make sure that there is fairness between
Expand All @@ -25,7 +25,7 @@
impl Scheduler {
pub fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
let topic = VecDeque::from_iter(
event_queues

Check warning on line 28 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Clippy

useless conversion to the same type: `std::collections::hash_map::Keys<'_, dora_core::config::DataId, (usize, std::collections::VecDeque<event_stream::thread::EventItem>)>`
.keys()
.into_iter()
.filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string()))
Expand All @@ -40,13 +40,7 @@
pub fn add_event(&mut self, event: EventItem) {
let event_id = match &event {
EventItem::NodeEvent {
event:
NodeEvent::Input {
id,
metadata: _,
data: _,
},
ack_channel: _,
event: Event::Input { id, .. },
} => id,
_ => &DataId::from(NON_INPUT_EVENT.to_string()),
};
Expand Down
85 changes: 61 additions & 24 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dora_core::{
uhlc::{self, Timestamp},
};
use dora_message::{
common::{DataMessage, DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
};
Expand All @@ -15,6 +16,8 @@ use std::{

use crate::daemon_connection::DaemonChannel;

use super::{event::SharedMemoryData, Event, MappedInputData, RawData};

pub fn init(
node_id: NodeId,
tx: flume::Sender<EventItem>,
Expand All @@ -28,10 +31,7 @@ pub fn init(

#[derive(Debug)]
pub enum EventItem {
NodeEvent {
event: NodeEvent,
ack_channel: flume::Sender<()>,
},
NodeEvent { event: super::Event },
FatalError(eyre::Report),
TimeoutError(eyre::Report),
}
Expand Down Expand Up @@ -130,25 +130,60 @@ fn event_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),

let event = match inner {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => unsafe {
let (drop_tx, drop_rx) = flume::bounded(0);
let data = MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: drop_tx,
}))
});
drop_tokens.push(DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
});
pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1));
data
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
}
_ => None,
};

if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event: inner,
ack_channel: drop_tx,
}) {
match tx.send(EventItem::NodeEvent { event }) {
Ok(()) => {}
Err(send_error) => {
let event = send_error.into_inner();
Expand All @@ -159,12 +194,8 @@ fn event_stream_loop(
break 'outer Ok(());
}
}

if let Some(token) = drop_token {
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
}
}
};
Expand Down Expand Up @@ -196,15 +227,18 @@ fn event_stream_loop(

fn handle_pending_drop_tokens(
pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
) -> eyre::Result<()> {
let mut still_pending = Vec::new();
for (token, rx, since, warn) in pending_drop_tokens.drain(..) {
match rx.try_recv() {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::TryRecvError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::TryRecvError::Empty) => {
let duration = Duration::from_secs(30 * warn);
Expand All @@ -221,7 +255,7 @@ fn handle_pending_drop_tokens(

fn report_remaining_drop_tokens(
mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut drop_tokens: Vec<DropTokenStatus>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp,
) -> eyre::Result<()> {
Expand All @@ -234,7 +268,10 @@ fn report_remaining_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(1);
Expand All @@ -259,7 +296,7 @@ fn report_remaining_drop_tokens(
}

fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
channel: &mut DaemonChannel,
timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> {
Expand Down
35 changes: 21 additions & 14 deletions apis/rust/node/src/node/drop_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;

pub struct DropStream {
receiver: flume::Receiver<DropToken>,
receiver: flume::Receiver<DropTokenStatus>,
_thread_handle: DropStreamThreadHandle,
}

Expand Down Expand Up @@ -82,7 +83,7 @@ impl DropStream {
}

impl std::ops::Deref for DropStream {
type Target = flume::Receiver<DropToken>;
type Target = flume::Receiver<DropTokenStatus>;

fn deref(&self) -> &Self::Target {
&self.receiver
Expand All @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream {
#[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop(
node_id: NodeId,
tx: flume::Sender<DropToken>,
tx: flume::Sender<DropTokenStatus>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
Expand Down Expand Up @@ -125,16 +126,22 @@ fn drop_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token`{drop_token:?}`"
);
break 'outer;
}
}
let event = match inner {
NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
},
NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Dropped,
},
};
if tx.send(event).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token event `{event:?}`"
);
break 'outer;
}
}
}
Expand Down
Loading
Loading