diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22c81c632..e5a4017c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,7 +318,7 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s - pip install opencv-python + pip install "numpy<2.0.0" opencv-python dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic python ../examples/python-dataflow/plot_dynamic.py sleep 5 diff --git a/apis/python/node/dora/__init__.py b/apis/python/node/dora/__init__.py index 7269bda14..354d9ad40 100644 --- a/apis/python/node/dora/__init__.py +++ b/apis/python/node/dora/__init__.py @@ -13,7 +13,6 @@ from .dora import ( Node, - PyEvent, Ros2Context, Ros2Node, Ros2NodeOptions, diff --git a/apis/python/node/dora/__init__.pyi b/apis/python/node/dora/__init__.pyi index 3dc336763..a6dcb659e 100644 --- a/apis/python/node/dora/__init__.pyi +++ b/apis/python/node/dora/__init__.pyi @@ -22,7 +22,7 @@ from dora import Node node = Node() ```""" - def __init__(self) -> None: + def __init__(self, node_id: str=None) -> None: """The custom node API lets you integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. @@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file.""" """Merge an external event stream with dora main loop. This currently only work with ROS2.""" - def next(self, timeout: float=None) -> dora.PyEvent: + def next(self, timeout: float=None) -> dict: """`.next()` gives you the next input that the node has received. It blocks until the next event becomes available. You can use timeout in seconds to return if no input is available. @@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) def __next__(self) -> typing.Any: """Implement next(self).""" -@typing.final -class PyEvent: - """Dora Event""" - - def inner(self):... - - def __getitem__(self, key: typing.Any) -> typing.Any: - """Return self[key].""" - @typing.final class Ros2Context: """ROS2 Context holding all messages definition for receiving and sending messages to ROS2. diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3c1fcb2a4..45125ec0f 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict}; /// node = Node() /// ``` /// +/// :type node_id: str, optional #[pyclass] pub struct Node { events: Events, @@ -67,11 +68,18 @@ impl Node { /// ``` /// /// :type timeout: float, optional - /// :rtype: dora.PyEvent + /// :rtype: dict #[allow(clippy::should_implement_trait)] - pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { + pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); - Ok(event) + if let Some(event) = event { + let dict = event + .to_py_dict(py) + .context("Could not convert event into a dict")?; + Ok(Some(dict)) + } else { + Ok(None) + } } /// You can iterate over the event stream with a loop @@ -84,10 +92,11 @@ impl Node { /// case "image": /// ``` /// - /// :rtype: dora.PyEvent - pub fn __next__(&mut self, py: Python) -> PyResult> { - let event = py.allow_threads(|| self.events.recv(None)); - Ok(event) + /// Default behaviour is to timeout after 2 seconds. + /// + /// :rtype: dict + pub fn __next__(&mut self, py: Python) -> PyResult>> { + self.next(py, None) } /// You can iterate over the event stream with a loop @@ -100,7 +109,7 @@ impl Node { /// case "image": /// ``` /// - /// :rtype: dora.PyEvent + /// :rtype: dict fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } @@ -262,7 +271,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; m.add_class::()?; - m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index bc74279e9..2c027e9ac 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,69 +1,52 @@ -use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use std::collections::HashMap; + +use arrow::pyarrow::ToPyArrow; use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict}; +use pyo3::{ + prelude::*, + pybacked::PyBackedStr, + types::{IntoPyDict, PyDict}, +}; /// Dora Event -#[pyclass] #[derive(Debug)] pub struct PyEvent { event: MergedEvent, - data: Option, } -// Dora Event -#[pymethods] impl PyEvent { - /// - /// :rtype: dora.PyObject - pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - if key == "kind" { - let kind = match &self.event { - MergedEvent::Dora(_) => "dora", - MergedEvent::External(_) => "external", - }; - return Ok(Some(kind.to_object(py))); - } + pub fn to_py_dict(self, py: Python<'_>) -> PyResult> { + let mut pydict = HashMap::new(); + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), + }; match &self.event { MergedEvent::Dora(event) => { - let value = match key { - "type" => Some(Self::ty(event).to_object(py)), - "id" => Self::id(event).map(|v| v.to_object(py)), - "value" => self.value(py)?, - "metadata" => Self::metadata(event, py), - "error" => Self::error(event).map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) - } - }; - Ok(value) + if let Some(id) = Self::id(event) { + pydict.insert("id", id.into_py(py)); + } + pydict.insert("type", Self::ty(event).to_object(py)); + + if let Some(value) = self.value(py)? { + pydict.insert("value", value); + } + if let Some(metadata) = Self::metadata(event, py) { + pydict.insert("metadata", metadata); + } + if let Some(error) = Self::error(event) { + pydict.insert("error", error.to_object(py)); + } } MergedEvent::External(event) => { - let value = match key { - "value" => event, - _ => todo!(), - }; - - Ok(Some(value.clone())) + pydict.insert("value", event.clone()); } } - } - pub fn inner(&mut self) -> Option<&PyObject> { - match &self.event { - MergedEvent::Dora(_) => None, - MergedEvent::External(event) => Some(event), - } + Ok(pydict.into_py_dict_bound(py).unbind()) } - fn __str__(&self) -> PyResult { - Ok(format!("{:#?}", &self.event)) - } -} - -impl PyEvent { fn ty(event: &Event) -> &str { match event { Event::Stop => "STOP", @@ -84,9 +67,9 @@ impl PyEvent { /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { - match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { - // TODO: Does this call leak data? + match &self.event { + MergedEvent::Dora(Event::Input { data, .. }) => { + // TODO: Does this call leak data?& let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } @@ -116,13 +99,8 @@ impl From for PyEvent { } impl From> for PyEvent { - fn from(mut event: MergedEvent) -> Self { - let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - Some(data.clone()) - } else { - None - }; - Self { event, data } + fn from(event: MergedEvent) -> Self { + Self { event } } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 82d01ebb9..fa4883887 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -208,7 +208,9 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event); + let py_event = PyEvent::from(event) + .to_py_dict(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone())) diff --git a/examples/python-dataflow/example.py b/examples/python-dataflow/example.py new file mode 100644 index 000000000..c9221a3a5 --- /dev/null +++ b/examples/python-dataflow/example.py @@ -0,0 +1,5 @@ +from dora import Node + +node = Node("plot") + +event = node.next() diff --git a/examples/python-dataflow/requirements.txt b/examples/python-dataflow/requirements.txt index a8f02b0a0..9c1fc9157 100644 --- a/examples/python-dataflow/requirements.txt +++ b/examples/python-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources @@ -44,4 +44,4 @@ seaborn>=0.11.0 # roboflow opencv-python>=4.1.1 -maturin \ No newline at end of file +maturin diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index a14b553f0..65ae5831e 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,5 +1,4 @@ use dora_core::{get_pip_path, get_python_path, run}; -use dora_download::download_file; use dora_tracing::set_up_tracing; use eyre::{bail, ContextCompat, WrapErr}; use std::path::Path; @@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> { ) .await .context("maturin develop failed")?; - download_file( - "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", - Path::new("yolov8n.pt"), - ) - .await - .context("Could not download weights.")?; let dataflow = Path::new("dataflow.yml"); run_dataflow(dataflow).await?; diff --git a/examples/python-operator-dataflow/requirements.txt b/examples/python-operator-dataflow/requirements.txt index 68020faa6..bd15e396a 100644 --- a/examples/python-operator-dataflow/requirements.txt +++ b/examples/python-operator-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy<2.0.0 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources diff --git a/examples/python-ros2-dataflow/random_turtle.py b/examples/python-ros2-dataflow/random_turtle.py index 1e690d079..3b25ac21a 100755 --- a/examples/python-ros2-dataflow/random_turtle.py +++ b/examples/python-ros2-dataflow/random_turtle.py @@ -55,11 +55,11 @@ # ROS2 Event elif event_kind == "external": - pose = event.inner()[0].as_py() + pose = event["value"][0].as_py() min_x = min([min_x, pose["x"]]) max_x = max([max_x, pose["x"]]) min_y = min([min_y, pose["y"]]) max_y = max([max_y, pose["y"]]) - dora_node.send_output("turtle_pose", event.inner()) + dora_node.send_output("turtle_pose", event["value"]) assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"