From b325881df15c8fcd85f3ba568c42868c8fa20ae4 Mon Sep 17 00:00:00 2001 From: Andrew Straw Date: Wed, 25 Oct 2023 11:14:49 +0200 Subject: [PATCH] pybraidz-chunked-iter: also allow chunking by num frames --- braidz-parser/braidz-chunked-iter/Cargo.toml | 5 - .../braidz-chunked-iter/examples/one.rs | 32 ---- .../examples/read_to_pandas.py | 5 +- .../pybraidz-chunked-iter/examples/simple.py | 2 +- .../pybraidz-chunked-iter/src/lib.rs | 57 +++++--- braidz-parser/braidz-chunked-iter/src/lib.rs | 137 +++++++++--------- .../users-guide/src/braidz-files.md | 5 +- 7 files changed, 117 insertions(+), 126 deletions(-) delete mode 100644 braidz-parser/braidz-chunked-iter/examples/one.rs diff --git a/braidz-parser/braidz-chunked-iter/Cargo.toml b/braidz-parser/braidz-chunked-iter/Cargo.toml index b684738f4..6d9dcd787 100644 --- a/braidz-parser/braidz-chunked-iter/Cargo.toml +++ b/braidz-parser/braidz-chunked-iter/Cargo.toml @@ -3,17 +3,12 @@ name = "braidz-chunked-iter" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] anyhow = "1" -libflate = "1.2.0" csv = "1.1" chrono = "0.4.31" -csv-eof = { path = "../../csv-eof" } flydra-types = { path = "../../flydra-types" } -zip-or-dir = { path = "../../zip-or-dir", features = ["with-gz"] } [dev-dependencies] clap = { version = "4", features = ["derive"] } diff --git a/braidz-parser/braidz-chunked-iter/examples/one.rs b/braidz-parser/braidz-chunked-iter/examples/one.rs deleted file mode 100644 index b5f0fec93..000000000 --- a/braidz-parser/braidz-chunked-iter/examples/one.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::path::PathBuf; - -use anyhow::Result; -use clap::Parser; - -#[derive(Parser, Debug)] -#[command(author, version, about)] -struct Cli { - src: PathBuf, -} - -fn main() -> Result<()> { - let cli = Cli::parse(); - let mut archive = zip_or_dir::ZipDirArchive::auto_from_path(&cli.src)?; - let mut start = None; - let chunk_dur = std::time::Duration::from_millis(1000); - for chunk in braidz_chunked_iter::chunk_by_duration(&mut archive, chunk_dur)? { - println!("---- duration chunk ----"); - for row in chunk.rows { - if start.is_none() { - start = Some(row.timestamp.as_ref().unwrap().as_f64()); - } - let t0 = start.unwrap(); - println!( - " frame: {}, timestamp: {}", - row.frame.0, - row.timestamp.unwrap().as_f64() - t0 - ); - } - } - Ok(()) -} diff --git a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/read_to_pandas.py b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/read_to_pandas.py index 77d6cb84f..0ae24101a 100644 --- a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/read_to_pandas.py +++ b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/read_to_pandas.py @@ -6,7 +6,10 @@ braidz_fname = sys.argv[1] # Open the braidz file and create chunks of 60 second durations. -estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60) +estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60) + +# One could also create chunks with 100 frames of data. +# estimates_chunker = pybraidz_chunked_iter.chunk_on_num_frames(braidz_fname, 100) # Iterate over each chunk for chunk in estimates_chunker: diff --git a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/simple.py b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/simple.py index 8bf128013..50019cffc 100644 --- a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/simple.py +++ b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/examples/simple.py @@ -5,7 +5,7 @@ braidz_fname = sys.argv[1] # Open the braidz file and create chunks of 60 second durations. -estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60) +estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60) # Iterate over each chunk for chunk in estimates_chunker: diff --git a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/src/lib.rs b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/src/lib.rs index 7af422a2c..0596a9bda 100644 --- a/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/src/lib.rs +++ b/braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/src/lib.rs @@ -1,34 +1,22 @@ use numpy::PyArray; use pyo3::{exceptions::PyValueError, prelude::*, types::PyDict}; -use braidz_chunked_iter::{DurationChunk, ToChunkIter}; +use braidz_chunked_iter::{ChunkSize, DurationChunk, ToChunkIter}; use csv_eof::EarlyEofOk; use zip_or_dir::{MaybeGzReader, ZipDirArchive}; -/// Iterate over duration-defined chunks of the `kalman_estimates` table. -/// -/// Parameters -/// ---------- -/// path : str -/// The path of the `.braidz` file (or `.braid` directory) to open. -/// duration_seconds: float -/// The duration of each chunk, in seconds. - #[pyclass(unsendable)] struct KalmanEstimatesChunker { chunker: &'static mut dyn Iterator, } -#[pymethods] impl KalmanEstimatesChunker { - #[new] - fn new(path: &str, duration_seconds: f64) -> PyResult { + fn new(path: &str, sz: ChunkSize) -> PyResult { let archive = zip_or_dir::ZipDirArchive::auto_from_path(&path).map_err(|e| { PyErr::new::(format!("Could not open file {}: '{}'", path, e)) })?; // leak to get static lifetime let archive: &'static mut ZipDirArchive<_> = Box::leak(Box::new(archive)); - let chunk_dur = std::time::Duration::from_secs_f64(duration_seconds); let mut first_row = None; let src_fname = flydra_types::KALMAN_ESTIMATES_CSV_FNAME; @@ -57,10 +45,9 @@ impl KalmanEstimatesChunker { let t1: csv::Reader> = csv::Reader::from_reader(rdr); let inner_iter = t1.into_deserialize().early_eof_ok(); - let my_iter = - ToChunkIter::to_chunk_iter(inner_iter, first_row, chunk_dur).map_err(|e| { - PyErr::new::(format!("Could chunk based on duration: '{e}'")) - })?; + let my_iter = ToChunkIter::to_chunk_iter(inner_iter, first_row, sz).map_err(|e| { + PyErr::new::(format!("Could chunk based on duration: '{e}'")) + })?; let chunker = Box::new(my_iter); // leak to get static lifetime let chunker = Box::leak(chunker); @@ -71,7 +58,10 @@ impl KalmanEstimatesChunker { ))) } } +} +#[pymethods] +impl KalmanEstimatesChunker { fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } @@ -158,9 +148,40 @@ impl KalmanEstimatesChunker { } } +/// Iterate over duration-defined chunks of the `kalman_estimates` table. +/// +/// Parameters +/// ---------- +/// path : str +/// The path of the `.braidz` file (or `.braid` directory) to open. +/// duration_seconds: float +/// The duration of each chunk, in seconds. +#[pyfunction] +fn chunk_on_duration(path: &str, duration_seconds: f64) -> PyResult { + let chunk_dur = std::time::Duration::from_secs_f64(duration_seconds); + let sz = ChunkSize::TimestampDuration(chunk_dur); + KalmanEstimatesChunker::new(path, sz) +} + +/// Iterate over duration-defined chunks of the `kalman_estimates` table. +/// +/// Parameters +/// ---------- +/// path : str +/// The path of the `.braidz` file (or `.braid` directory) to open. +/// num_frames: int +/// The number of frames included in each chunk. +#[pyfunction] +fn chunk_on_num_frames(path: &str, num_frames: usize) -> PyResult { + let sz = ChunkSize::FrameNumber(num_frames); + KalmanEstimatesChunker::new(path, sz) +} + /// Chunked iteration over tables in `.braidz` files. #[pymodule] fn pybraidz_chunked_iter(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_function(wrap_pyfunction!(chunk_on_duration, m)?)?; + m.add_function(wrap_pyfunction!(chunk_on_num_frames, m)?)?; Ok(()) } diff --git a/braidz-parser/braidz-chunked-iter/src/lib.rs b/braidz-parser/braidz-chunked-iter/src/lib.rs index 6ce471c43..53b9e110c 100644 --- a/braidz-parser/braidz-chunked-iter/src/lib.rs +++ b/braidz-parser/braidz-chunked-iter/src/lib.rs @@ -1,20 +1,24 @@ -use std::{fs::File, io::BufReader}; - use anyhow::Result; -use csv::DeserializeRecordsIntoIter; -use csv_eof::{EarlyEofOk, TerminateEarlyOnUnexpectedEof}; use flydra_types::{FlydraFloatTimestampLocal, KalmanEstimatesRow, Triggerbox}; -use zip_or_dir::{MaybeGzReader, ZipDirArchive}; type KItem = std::result::Result; +pub enum ChunkSize { + TimestampDuration(std::time::Duration), + FrameNumber(usize), +} + +enum ChunkStartAndDuration { + Timestamp(chrono::DateTime, std::time::Duration), + Frame(u64, usize), +} + pub struct ChunkIter where I: Iterator, { - start_stamp: chrono::DateTime, - dur: std::time::Duration, + source: ChunkStartAndDuration, next_chunk_index: usize, inner: std::iter::Peekable, } @@ -27,22 +31,53 @@ where fn next(&mut self) -> Option { let cur: u32 = self.next_chunk_index.try_into().unwrap(); let next = cur + 1; - let stop_dur = self.dur * next; - let stop_time = FlydraFloatTimestampLocal::::from(self.start_stamp + stop_dur); + + let mut stop_time = None; + let mut stop_frame = None; + match &self.source { + ChunkStartAndDuration::Timestamp(start_stamp, dur) => { + let stop_dur = *dur * next; + stop_time = Some(FlydraFloatTimestampLocal::::from( + *start_stamp + stop_dur, + )); + } + ChunkStartAndDuration::Frame(start_frame, n_frames_in_chunk) => { + let next_u64: u64 = next.try_into().unwrap(); + let n_frames_in_chunk_u64: u64 = (*n_frames_in_chunk).try_into().unwrap(); + let stop_dur = n_frames_in_chunk_u64 * next_u64; + stop_frame = Some(*start_frame + stop_dur); + } + } self.next_chunk_index += 1; let mut rows: Vec = vec![]; let mut do_return_rows = false; while let Some(Ok(peek_row)) = self.inner.peek() { - if let Some(ref this_timestamp) = &peek_row.timestamp { - if this_timestamp.as_f64() >= stop_time.as_f64() { - do_return_rows = true; - // done iterating - break; + match &self.source { + ChunkStartAndDuration::Timestamp(_start_stamp, _dur) => { + let stop_time = stop_time.as_ref().unwrap(); + + if let Some(ref this_timestamp) = &peek_row.timestamp { + if this_timestamp.as_f64() >= stop_time.as_f64() { + do_return_rows = true; + // done iterating + break; + } + } else { + // return Error - no timestamp on row + panic!("row {} has no timestamp", peek_row.frame); + } + } + + ChunkStartAndDuration::Frame(_start_frame, _n_frames_in_chunk) => { + let stop_frame = stop_frame.as_ref().unwrap(); + let this_frame = &peek_row.frame.0; + if this_frame >= stop_frame { + do_return_rows = true; + // done iterating + break; + } } - } else { - // return Error - no timestamp on row - panic!("row {} has no timestamp", peek_row.frame); } do_return_rows = true; rows.push(self.inner.next().unwrap().unwrap()); @@ -65,66 +100,32 @@ pub trait ToChunkIter where I: Iterator, { - fn to_chunk_iter( - self, - first_row: KalmanEstimatesRow, - dur: std::time::Duration, - ) -> Result>; + fn to_chunk_iter(self, first_row: KalmanEstimatesRow, sz: ChunkSize) -> Result>; } impl ToChunkIter for I where I: Iterator, { - fn to_chunk_iter( - self, - first_row: KalmanEstimatesRow, - dur: std::time::Duration, - ) -> Result> { - let start_stamp = first_row - .timestamp - .as_ref() - .ok_or_else(|| anyhow::anyhow!("no timestamp in first row"))?; - let start_stamp = start_stamp.into(); + fn to_chunk_iter(self, first_row: KalmanEstimatesRow, sz: ChunkSize) -> Result> { + let source = match sz { + ChunkSize::TimestampDuration(dur) => { + let start_stamp = first_row + .timestamp + .as_ref() + .ok_or_else(|| anyhow::anyhow!("no timestamp in first row"))?; + let start_stamp = start_stamp.into(); + ChunkStartAndDuration::Timestamp(start_stamp, dur) + } + ChunkSize::FrameNumber(num_frames) => { + let start_frame = first_row.frame.0; + ChunkStartAndDuration::Frame(start_frame, num_frames) + } + }; Ok(ChunkIter { - start_stamp, - dur, + source, next_chunk_index: 0, inner: self.peekable(), }) } } - -pub fn chunk_by_duration<'a>( - archive: &'a mut ZipDirArchive>, - dur: std::time::Duration, -) -> Result< - ChunkIter< - TerminateEarlyOnUnexpectedEof< - DeserializeRecordsIntoIter, KalmanEstimatesRow>, - KalmanEstimatesRow, - >, - >, -> { - let mut first_row = None; - let src_fname = flydra_types::KALMAN_ESTIMATES_CSV_FNAME; - - { - let rdr = archive.open_raw_or_gz(src_fname)?; - let kest_reader = csv::Reader::from_reader(rdr); - - if let Some(row) = kest_reader.into_deserialize().early_eof_ok().next() { - let row = row?; - first_row = Some(row); - } - } - if let Some(first_row) = first_row { - let rdr = archive.open_raw_or_gz(src_fname)?; - let t1: csv::Reader> = csv::Reader::from_reader(rdr); - - let inner_iter = t1.into_deserialize().early_eof_ok(); - Ok(ToChunkIter::to_chunk_iter(inner_iter, first_row, dur)?) - } else { - anyhow::bail!("no rows in {src_fname}"); - } -} diff --git a/strand-braid-user/users-guide/src/braidz-files.md b/strand-braid-user/users-guide/src/braidz-files.md index 4425ece4a..230b4b8c6 100644 --- a/strand-braid-user/users-guide/src/braidz-files.md +++ b/strand-braid-user/users-guide/src/braidz-files.md @@ -139,7 +139,10 @@ import pandas as pd braidz_fname = "20201104_174158.braidz" # Open the braidz file and create chunks of 60 second durations. -estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60) +estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60) + +# One could also create chunks with 100 frames of data. +# estimates_chunker = pybraidz_chunked_iter.chunk_on_num_frames(braidz_fname, 100) # Iterate over each chunk for chunk in estimates_chunker: