Skip to content

Commit

Permalink
Merge pull request #557 from dora-rs/convert-pyevent-into-dictionary
Browse files Browse the repository at this point in the history
Transform custom PyEvent into standard python dictionary for easier d…
  • Loading branch information
haixuanTao authored Jun 17, 2024
2 parents 62d1343 + a8d4c07 commit 42255f5
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ jobs:
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install opencv-python
pip install "numpy<2.0.0" opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
python ../examples/python-dataflow/plot_dynamic.py
sleep 5
Expand Down
1 change: 0 additions & 1 deletion apis/python/node/dora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from .dora import (
Node,
PyEvent,
Ros2Context,
Ros2Node,
Ros2NodeOptions,
Expand Down
13 changes: 2 additions & 11 deletions apis/python/node/dora/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ from dora import Node
node = Node()
```"""

def __init__(self) -> None:
def __init__(self, node_id: str=None) -> None:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.
Expand All @@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file."""
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""

def next(self, timeout: float=None) -> dora.PyEvent:
def next(self, timeout: float=None) -> dict:
"""`.next()` gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
Expand Down Expand Up @@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
def __next__(self) -> typing.Any:
"""Implement next(self)."""

@typing.final
class PyEvent:
"""Dora Event"""

def inner(self):...

def __getitem__(self, key: typing.Any) -> typing.Any:
"""Return self[key]."""

@typing.final
class Ros2Context:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.
Expand Down
26 changes: 17 additions & 9 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict};
/// node = Node()
/// ```
///
/// :type node_id: str, optional
#[pyclass]
pub struct Node {
events: Events,
Expand Down Expand Up @@ -67,11 +68,18 @@ impl Node {
/// ```
///
/// :type timeout: float, optional
/// :rtype: dora.PyEvent
/// :rtype: dict
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<Py<PyDict>>> {
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
if let Some(event) = event {
let dict = event
.to_py_dict(py)
.context("Could not convert event into a dict")?;
Ok(Some(dict))
} else {
Ok(None)
}
}

/// You can iterate over the event stream with a loop
Expand All @@ -84,10 +92,11 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(None));
Ok(event)
/// Default behaviour is to timeout after 2 seconds.
///
/// :rtype: dict
pub fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyDict>>> {
self.next(py, None)
}

/// You can iterate over the event stream with a loop
Expand All @@ -100,7 +109,7 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
/// :rtype: dict
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
Expand Down Expand Up @@ -262,7 +271,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {

m.add_function(wrap_pyfunction!(start_runtime, &m)?)?;
m.add_class::<Node>()?;
m.add_class::<PyEvent>()?;
m.setattr("__version__", env!("CARGO_PKG_VERSION"))?;
m.setattr("__author__", "Dora-rs Authors")?;

Expand Down
92 changes: 35 additions & 57 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,52 @@
use arrow::{array::ArrayRef, pyarrow::ToPyArrow};
use std::collections::HashMap;

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict};
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};

/// Dora Event
#[pyclass]
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
}

// Dora Event
#[pymethods]
impl PyEvent {
///
/// :rtype: dora.PyObject
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
if key == "kind" {
let kind = match &self.event {
MergedEvent::Dora(_) => "dora",
MergedEvent::External(_) => "external",
};
return Ok(Some(kind.to_object(py)));
}
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
match &self.event {
MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)),
MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)),
};
match &self.event {
MergedEvent::Dora(event) => {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
}
};
Ok(value)
if let Some(id) = Self::id(event) {
pydict.insert("id", id.into_py(py));
}
pydict.insert("type", Self::ty(event).to_object(py));

if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
pydict.insert("error", error.to_object(py));
}
}
MergedEvent::External(event) => {
let value = match key {
"value" => event,
_ => todo!(),
};

Ok(Some(value.clone()))
pydict.insert("value", event.clone());
}
}
}

pub fn inner(&mut self) -> Option<&PyObject> {
match &self.event {
MergedEvent::Dora(_) => None,
MergedEvent::External(event) => Some(event),
}
Ok(pydict.into_py_dict_bound(py).unbind())
}

fn __str__(&self) -> PyResult<String> {
Ok(format!("{:#?}", &self.event))
}
}

impl PyEvent {
fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
Expand All @@ -84,9 +67,9 @@ impl PyEvent {

/// Returns the payload of an input event as an arrow array (if any).
fn value(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match (&self.event, &self.data) {
(MergedEvent::Dora(Event::Input { .. }), Some(data)) => {
// TODO: Does this call leak data?
match &self.event {
MergedEvent::Dora(Event::Input { data, .. }) => {
// TODO: Does this call leak data?&
let array_data = data.to_data().to_pyarrow(py)?;
Ok(Some(array_data))
}
Expand Down Expand Up @@ -116,13 +99,8 @@ impl From<Event> for PyEvent {
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
Some(data.clone())
} else {
None
};
Self { event, data }
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}

Expand Down
4 changes: 3 additions & 1 deletion binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event);
let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))
Expand Down
5 changes: 5 additions & 0 deletions examples/python-dataflow/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dora import Node

node = Node("plot")

event = node.next()
4 changes: 2 additions & 2 deletions examples/python-dataflow/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources
Expand Down Expand Up @@ -44,4 +44,4 @@ seaborn>=0.11.0
# roboflow

opencv-python>=4.1.1
maturin
maturin
7 changes: 0 additions & 7 deletions examples/python-dataflow/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;
Expand Down Expand Up @@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> {
)
.await
.context("maturin develop failed")?;
download_file(
"https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt",
Path::new("yolov8n.pt"),
)
.await
.context("Could not download weights.")?;

let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/python-operator-dataflow/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources
Expand Down
4 changes: 2 additions & 2 deletions examples/python-ros2-dataflow/random_turtle.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@

# ROS2 Event
elif event_kind == "external":
pose = event.inner()[0].as_py()
pose = event["value"][0].as_py()
min_x = min([min_x, pose["x"]])
max_x = max([max_x, pose["x"]])
min_y = min([min_y, pose["y"]])
max_y = max([max_y, pose["y"]])
dora_node.send_output("turtle_pose", event.inner())
dora_node.send_output("turtle_pose", event["value"])

assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"

0 comments on commit 42255f5

Please sign in to comment.