Skip to content

Commit

Permalink
pybraidz-chunked-iter: also allow chunking by num frames
Browse files Browse the repository at this point in the history
  • Loading branch information
astraw committed Oct 25, 2023
1 parent 744552a commit b325881
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 126 deletions.
5 changes: 0 additions & 5 deletions braidz-parser/braidz-chunked-iter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@ name = "braidz-chunked-iter"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
libflate = "1.2.0"
csv = "1.1"
chrono = "0.4.31"

csv-eof = { path = "../../csv-eof" }
flydra-types = { path = "../../flydra-types" }
zip-or-dir = { path = "../../zip-or-dir", features = ["with-gz"] }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
32 changes: 0 additions & 32 deletions braidz-parser/braidz-chunked-iter/examples/one.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
braidz_fname = sys.argv[1]

# Open the braidz file and create chunks of 60 second durations.
estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60)
estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60)

# One could also create chunks with 100 frames of data.
# estimates_chunker = pybraidz_chunked_iter.chunk_on_num_frames(braidz_fname, 100)

# Iterate over each chunk
for chunk in estimates_chunker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
braidz_fname = sys.argv[1]

# Open the braidz file and create chunks of 60 second durations.
estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60)
estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60)

# Iterate over each chunk
for chunk in estimates_chunker:
Expand Down
57 changes: 39 additions & 18 deletions braidz-parser/braidz-chunked-iter/pybraidz-chunked-iter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
use numpy::PyArray;
use pyo3::{exceptions::PyValueError, prelude::*, types::PyDict};

use braidz_chunked_iter::{DurationChunk, ToChunkIter};
use braidz_chunked_iter::{ChunkSize, DurationChunk, ToChunkIter};
use csv_eof::EarlyEofOk;
use zip_or_dir::{MaybeGzReader, ZipDirArchive};

/// Iterate over duration-defined chunks of the `kalman_estimates` table.
///
/// Parameters
/// ----------
/// path : str
/// The path of the `.braidz` file (or `.braid` directory) to open.
/// duration_seconds: float
/// The duration of each chunk, in seconds.
#[pyclass(unsendable)]
struct KalmanEstimatesChunker {
chunker: &'static mut dyn Iterator<Item = DurationChunk>,
}

#[pymethods]
impl KalmanEstimatesChunker {
#[new]
fn new(path: &str, duration_seconds: f64) -> PyResult<Self> {
fn new(path: &str, sz: ChunkSize) -> PyResult<Self> {
let archive = zip_or_dir::ZipDirArchive::auto_from_path(&path).map_err(|e| {
PyErr::new::<PyValueError, _>(format!("Could not open file {}: '{}'", path, e))
})?;
// leak to get static lifetime
let archive: &'static mut ZipDirArchive<_> = Box::leak(Box::new(archive));
let chunk_dur = std::time::Duration::from_secs_f64(duration_seconds);

let mut first_row = None;
let src_fname = flydra_types::KALMAN_ESTIMATES_CSV_FNAME;
Expand Down Expand Up @@ -57,10 +45,9 @@ impl KalmanEstimatesChunker {
let t1: csv::Reader<MaybeGzReader<'_>> = csv::Reader::from_reader(rdr);

let inner_iter = t1.into_deserialize().early_eof_ok();
let my_iter =
ToChunkIter::to_chunk_iter(inner_iter, first_row, chunk_dur).map_err(|e| {
PyErr::new::<PyValueError, _>(format!("Could chunk based on duration: '{e}'"))
})?;
let my_iter = ToChunkIter::to_chunk_iter(inner_iter, first_row, sz).map_err(|e| {
PyErr::new::<PyValueError, _>(format!("Could chunk based on duration: '{e}'"))
})?;
let chunker = Box::new(my_iter);
// leak to get static lifetime
let chunker = Box::leak(chunker);
Expand All @@ -71,7 +58,10 @@ impl KalmanEstimatesChunker {
)))
}
}
}

#[pymethods]
impl KalmanEstimatesChunker {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
Expand Down Expand Up @@ -158,9 +148,40 @@ impl KalmanEstimatesChunker {
}
}

/// Iterate over duration-defined chunks of the `kalman_estimates` table.
///
/// Parameters
/// ----------
/// path : str
/// The path of the `.braidz` file (or `.braid` directory) to open.
/// duration_seconds: float
/// The duration of each chunk, in seconds.
#[pyfunction]
fn chunk_on_duration(path: &str, duration_seconds: f64) -> PyResult<KalmanEstimatesChunker> {
let chunk_dur = std::time::Duration::from_secs_f64(duration_seconds);
let sz = ChunkSize::TimestampDuration(chunk_dur);
KalmanEstimatesChunker::new(path, sz)
}

/// Iterate over duration-defined chunks of the `kalman_estimates` table.
///
/// Parameters
/// ----------
/// path : str
/// The path of the `.braidz` file (or `.braid` directory) to open.
/// num_frames: int
/// The number of frames included in each chunk.
#[pyfunction]
fn chunk_on_num_frames(path: &str, num_frames: usize) -> PyResult<KalmanEstimatesChunker> {
let sz = ChunkSize::FrameNumber(num_frames);
KalmanEstimatesChunker::new(path, sz)
}

/// Chunked iteration over tables in `.braidz` files.
#[pymodule]
fn pybraidz_chunked_iter(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<KalmanEstimatesChunker>()?;
m.add_function(wrap_pyfunction!(chunk_on_duration, m)?)?;
m.add_function(wrap_pyfunction!(chunk_on_num_frames, m)?)?;
Ok(())
}
137 changes: 69 additions & 68 deletions braidz-parser/braidz-chunked-iter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use std::{fs::File, io::BufReader};

use anyhow::Result;
use csv::DeserializeRecordsIntoIter;

use csv_eof::{EarlyEofOk, TerminateEarlyOnUnexpectedEof};
use flydra_types::{FlydraFloatTimestampLocal, KalmanEstimatesRow, Triggerbox};
use zip_or_dir::{MaybeGzReader, ZipDirArchive};

type KItem = std::result::Result<KalmanEstimatesRow, csv::Error>;

pub enum ChunkSize {
TimestampDuration(std::time::Duration),
FrameNumber(usize),
}

enum ChunkStartAndDuration {
Timestamp(chrono::DateTime<chrono::Utc>, std::time::Duration),
Frame(u64, usize),
}

pub struct ChunkIter<I>
where
I: Iterator<Item = KItem>,
{
start_stamp: chrono::DateTime<chrono::Utc>,
dur: std::time::Duration,
source: ChunkStartAndDuration,
next_chunk_index: usize,
inner: std::iter::Peekable<I>,
}
Expand All @@ -27,22 +31,53 @@ where
fn next(&mut self) -> Option<Self::Item> {
let cur: u32 = self.next_chunk_index.try_into().unwrap();
let next = cur + 1;
let stop_dur = self.dur * next;
let stop_time = FlydraFloatTimestampLocal::<Triggerbox>::from(self.start_stamp + stop_dur);

let mut stop_time = None;
let mut stop_frame = None;
match &self.source {
ChunkStartAndDuration::Timestamp(start_stamp, dur) => {
let stop_dur = *dur * next;
stop_time = Some(FlydraFloatTimestampLocal::<Triggerbox>::from(
*start_stamp + stop_dur,
));
}
ChunkStartAndDuration::Frame(start_frame, n_frames_in_chunk) => {
let next_u64: u64 = next.try_into().unwrap();
let n_frames_in_chunk_u64: u64 = (*n_frames_in_chunk).try_into().unwrap();
let stop_dur = n_frames_in_chunk_u64 * next_u64;
stop_frame = Some(*start_frame + stop_dur);
}
}
self.next_chunk_index += 1;

let mut rows: Vec<KalmanEstimatesRow> = vec![];
let mut do_return_rows = false;
while let Some(Ok(peek_row)) = self.inner.peek() {
if let Some(ref this_timestamp) = &peek_row.timestamp {
if this_timestamp.as_f64() >= stop_time.as_f64() {
do_return_rows = true;
// done iterating
break;
match &self.source {
ChunkStartAndDuration::Timestamp(_start_stamp, _dur) => {
let stop_time = stop_time.as_ref().unwrap();

if let Some(ref this_timestamp) = &peek_row.timestamp {
if this_timestamp.as_f64() >= stop_time.as_f64() {
do_return_rows = true;
// done iterating
break;
}
} else {
// return Error - no timestamp on row
panic!("row {} has no timestamp", peek_row.frame);
}
}

ChunkStartAndDuration::Frame(_start_frame, _n_frames_in_chunk) => {
let stop_frame = stop_frame.as_ref().unwrap();
let this_frame = &peek_row.frame.0;
if this_frame >= stop_frame {
do_return_rows = true;
// done iterating
break;
}
}
} else {
// return Error - no timestamp on row
panic!("row {} has no timestamp", peek_row.frame);
}
do_return_rows = true;
rows.push(self.inner.next().unwrap().unwrap());
Expand All @@ -65,66 +100,32 @@ pub trait ToChunkIter<I>
where
I: Iterator<Item = KItem>,
{
fn to_chunk_iter(
self,
first_row: KalmanEstimatesRow,
dur: std::time::Duration,
) -> Result<ChunkIter<I>>;
fn to_chunk_iter(self, first_row: KalmanEstimatesRow, sz: ChunkSize) -> Result<ChunkIter<I>>;
}

impl<I> ToChunkIter<I> for I
where
I: Iterator<Item = KItem>,
{
fn to_chunk_iter(
self,
first_row: KalmanEstimatesRow,
dur: std::time::Duration,
) -> Result<ChunkIter<I>> {
let start_stamp = first_row
.timestamp
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no timestamp in first row"))?;
let start_stamp = start_stamp.into();
fn to_chunk_iter(self, first_row: KalmanEstimatesRow, sz: ChunkSize) -> Result<ChunkIter<I>> {
let source = match sz {
ChunkSize::TimestampDuration(dur) => {
let start_stamp = first_row
.timestamp
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no timestamp in first row"))?;
let start_stamp = start_stamp.into();
ChunkStartAndDuration::Timestamp(start_stamp, dur)
}
ChunkSize::FrameNumber(num_frames) => {
let start_frame = first_row.frame.0;
ChunkStartAndDuration::Frame(start_frame, num_frames)
}
};
Ok(ChunkIter {
start_stamp,
dur,
source,
next_chunk_index: 0,
inner: self.peekable(),
})
}
}

pub fn chunk_by_duration<'a>(
archive: &'a mut ZipDirArchive<BufReader<File>>,
dur: std::time::Duration,
) -> Result<
ChunkIter<
TerminateEarlyOnUnexpectedEof<
DeserializeRecordsIntoIter<MaybeGzReader<'a>, KalmanEstimatesRow>,
KalmanEstimatesRow,
>,
>,
> {
let mut first_row = None;
let src_fname = flydra_types::KALMAN_ESTIMATES_CSV_FNAME;

{
let rdr = archive.open_raw_or_gz(src_fname)?;
let kest_reader = csv::Reader::from_reader(rdr);

if let Some(row) = kest_reader.into_deserialize().early_eof_ok().next() {
let row = row?;
first_row = Some(row);
}
}
if let Some(first_row) = first_row {
let rdr = archive.open_raw_or_gz(src_fname)?;
let t1: csv::Reader<MaybeGzReader<'a>> = csv::Reader::from_reader(rdr);

let inner_iter = t1.into_deserialize().early_eof_ok();
Ok(ToChunkIter::to_chunk_iter(inner_iter, first_row, dur)?)
} else {
anyhow::bail!("no rows in {src_fname}");
}
}
5 changes: 4 additions & 1 deletion strand-braid-user/users-guide/src/braidz-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ import pandas as pd
braidz_fname = "20201104_174158.braidz"

# Open the braidz file and create chunks of 60 second durations.
estimates_chunker = pybraidz_chunked_iter.KalmanEstimatesChunker(braidz_fname, 60)
estimates_chunker = pybraidz_chunked_iter.chunk_on_timestamp(braidz_fname, 60)

# One could also create chunks with 100 frames of data.
# estimates_chunker = pybraidz_chunked_iter.chunk_on_num_frames(braidz_fname, 100)

# Iterate over each chunk
for chunk in estimates_chunker:
Expand Down

0 comments on commit b325881

Please sign in to comment.