Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimenting towards allocator-free rendering #368

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ rubato = "0.14"
smallvec = "1.11"
symphonia = { version = "0.5", default-features = false }
vecmath = "1.0"
assert_no_alloc = { git = "https://github.com/Windfisch/rust-assert-no-alloc/", default-features = false, features = ["backtrace"] }

[dev-dependencies]
alloc_counter = "0.0.4"
env_logger = "0.10"
iai = "0.1.1"
rand = "0.8"
Expand Down
1 change: 1 addition & 0 deletions examples/audio_buffer_source_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn main() {
let now = context.current_time();
src.start_at(now);
src.stop_at(now + 1.);
drop(src);

std::thread::sleep(std::time::Duration::from_secs(4));
}
12 changes: 10 additions & 2 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub struct AudioBufferOptions {
///
#[derive(Clone, Debug)]
pub struct AudioBuffer {
pub(crate) channels: Vec<ChannelData>,
pub(crate) sample_rate: f32,
channels: Vec<ChannelData>,
sample_rate: f32,
}

impl AudioBuffer {
Expand All @@ -91,6 +91,14 @@ impl AudioBuffer {
}
}

/// Creates an invalid, but non-allocating AudioBuffer to be used as placeholder
pub(crate) fn tombstone() -> Self {
Self {
channels: Default::default(),
sample_rate: Default::default(),
}
}

/// Convert raw samples to an AudioBuffer
///
/// The outer Vec determine the channels. The inner Vecs should have the same length.
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
#![warn(clippy::clone_on_ref_ptr)]
#![deny(trivial_numeric_casts)]

#[global_allocator]
static A: assert_no_alloc::AllocDisabler = assert_no_alloc::AllocDisabler;

use std::error::Error;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};

Expand Down
46 changes: 26 additions & 20 deletions src/node/audio_buffer_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl AudioBufferSourceNode {
loop_state: loop_state.clone(),
render_state: AudioBufferRendererState::default(),
ended_triggered: false,
buffer_deallocator: Some(llq::Node::new(Box::new(AudioBuffer::tombstone()))),
};

let mut node = Self {
Expand Down Expand Up @@ -371,6 +372,7 @@ struct AudioBufferSourceRenderer {
loop_state: LoopState,
render_state: AudioBufferRendererState,
ended_triggered: bool,
buffer_deallocator: Option<llq::Node<Box<dyn Any + Send>>>,
}

impl AudioBufferSourceRenderer {
Expand All @@ -387,6 +389,26 @@ impl AudioBufferSourceRenderer {
ControlMessage::LoopEnd(loop_end) => self.loop_state.end = *loop_end,
}
}

fn on_end(&mut self, scope: &RenderScope) {
if self.ended_triggered {
return; // only run once
}
self.ended_triggered = true;

// notify the control thread
scope.send_ended_event();

// deallocate the AudioBuffer asynchronously - not in the render thread
if let Some(buffer) = self.buffer.take() {
// the holder contains a tombstone AudioBuffer that can be dropped without deallocation
let mut holder = self.buffer_deallocator.take().unwrap();
// replace the contents of the holder with the actual buffer
*holder.downcast_mut().unwrap() = buffer;
// ship the buffer to the deallocator thread
scope.deallocate_async(holder);
}
}
}

impl AudioProcessor for AudioBufferSourceRenderer {
Expand Down Expand Up @@ -453,13 +475,7 @@ impl AudioProcessor for AudioBufferSourceRenderer {
|| self.render_state.buffer_time_elapsed >= self.duration
{
output.make_silent(); // also converts to mono

// @note: we need this check because this is called a until the program
// ends, such as if the node was never removed from the graph
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}

Expand All @@ -471,19 +487,13 @@ impl AudioProcessor for AudioBufferSourceRenderer {
if !is_looping {
if computed_playback_rate > 0. && buffer_time >= buffer_duration {
output.make_silent(); // also converts to mono
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}

if computed_playback_rate < 0. && buffer_time < 0. {
output.make_silent(); // also converts to mono
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}
}
Expand Down Expand Up @@ -764,11 +774,7 @@ impl AudioProcessor for AudioBufferSourceRenderer {
std::mem::swap(current_buffer, buffer);
} else {
// Creating the tombstone buffer does not cause allocations.
let tombstone_buffer = AudioBuffer {
channels: Default::default(),
sample_rate: Default::default(),
};
self.buffer = Some(std::mem::replace(buffer, tombstone_buffer));
self.buffer = Some(std::mem::replace(buffer, AudioBuffer::tombstone()));
}
return;
};
Expand Down
2 changes: 1 addition & 1 deletion src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ struct AudioParamEventTimeline {
impl AudioParamEventTimeline {
fn new() -> Self {
Self {
inner: Vec::new(),
inner: Vec::with_capacity(5),
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a real solution to the problem

dirty: false,
}
}
Expand Down
60 changes: 60 additions & 0 deletions src/render/garbage_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::cell::RefCell;
use std::rc::Rc;

use std::any::Any;
use std::time::Duration;

type AnyChannel = llq::Producer<Box<dyn Any + Send>>;

#[derive(Clone, Default)]
pub(crate) struct GarbageCollector {
gc: Option<Rc<RefCell<AnyChannel>>>,
}

impl GarbageCollector {
pub fn deallocate_async(&self, value: llq::Node<Box<dyn Any + Send>>) {
if let Some(gc) = self.gc.as_ref() {
gc.borrow_mut().push(value);
}
}

pub fn spawn_garbage_collector_thread(&mut self) {
if self.gc.is_none() {
let (gc_producer, gc_consumer) = llq::Queue::new().split();
spawn_garbage_collector_thread(gc_consumer);
self.gc = Some(Rc::new(RefCell::new(gc_producer)));
}
}
}

// Controls the polling frequency of the garbage collector thread.
const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100);

// Poison pill that terminates the garbage collector thread.
#[derive(Debug)]
pub(crate) struct TerminateGarbageCollectorThread;

// Spawns a sidecar thread of the `RenderThread` for dropping resources.
fn spawn_garbage_collector_thread(consumer: llq::Consumer<Box<dyn Any + Send>>) {
let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer));
}

fn run_garbage_collector_thread(mut consumer: llq::Consumer<Box<dyn Any + Send>>) {
log::debug!("Entering garbage collector thread");
loop {
if let Some(node) = consumer.pop() {
if node
.as_ref()
.downcast_ref::<TerminateGarbageCollectorThread>()
.is_some()
{
log::info!("Terminating garbage collector thread");
break;
}
// Implicitly drop the received node.
} else {
std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT);
}
}
log::info!("Exiting garbage collector thread");
}
28 changes: 17 additions & 11 deletions src/render/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use super::{Alloc, AudioParamValues, AudioProcessor, AudioRenderQuantum, NodeCol
use crate::node::ChannelConfig;
use crate::render::RenderScope;

const INITIAL_GRAPH_SIZE: usize = 16;
const INITIAL_CHANNEL_DATA_COUNT: usize = INITIAL_GRAPH_SIZE * 4;
Copy link
Collaborator

@b-ma b-ma Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not per se solve the allocation problem, but taking a very large value for INITIAL_GRAPH_SIZE could help (4096 or 8185), as vector growth seems to be exponential (cf. https://nnethercote.github.io/perf-book/heap-allocations.html?highlight=borrow#vec-growth)


/// Connection between two audio nodes
struct OutgoingEdge {
/// index of the current Nodes output port
Expand All @@ -27,9 +30,9 @@ pub struct Node {
/// Renderer: converts inputs to outputs
processor: Box<dyn AudioProcessor>,
/// Reusable input buffers
inputs: Vec<AudioRenderQuantum>,
inputs: SmallVec<[AudioRenderQuantum; 2]>,
/// Reusable output buffers, consumed by subsequent Nodes in this graph
outputs: Vec<AudioRenderQuantum>,
outputs: SmallVec<[AudioRenderQuantum; 2]>,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not real solutions to the deallocation problems of this struct

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, maybe I'm missing something but it seems to me that the only nodes that have multiple inputs / outputs are the ChannelSplitterNode and ChannelMergerNode which are clamped to MAX_CHANNELS, cf. https://webaudio.github.io/web-audio-api/#dom-baseaudiocontext-createchannelmerger, no?

Then maybe we could simply use an ArrayVec<AudioRenderQuantum, MAX_CHANNELS> to fix this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True that, but of course we also allow users to implement their own AudioNodes which I think could use an unbounded number of inputs/outputs.

Scratch that, in the JS interface the https://webaudio.github.io/web-audio-api/#AudioWorkletNode is bound to 1 input and 1 output. So technically I could restrict the raw AudioNode trait and panic when the users supplies a 32+ input/output count.

We need to check performance though, ArrayVec<AudioRenderQuantum, MAX_CHANNELS> is a rather large object to have on the stack, and rust will probably memcpy it around a lot, so we may want to have it on the heap anyway. We can experiment with pre-allocating that Vec on the control thread

/// Channel configuration: determines up/down-mixing of inputs
channel_config: ChannelConfig,
/// Outgoing edges: tuple of outcoming node reference, our output index and their input index
Expand Down Expand Up @@ -97,14 +100,14 @@ pub(crate) struct Graph {
impl Graph {
pub fn new(reclaim_id_channel: llq::Producer<AudioNodeId>) -> Self {
Graph {
nodes: NodeCollection::new(),
alloc: Alloc::with_capacity(64),
nodes: NodeCollection::with_capacity(INITIAL_GRAPH_SIZE),
alloc: Alloc::with_capacity(INITIAL_CHANNEL_DATA_COUNT),
reclaim_id_channel,
ordered: vec![],
marked: vec![],
marked_temp: vec![],
in_cycle: vec![],
cycle_breakers: vec![],
ordered: Vec::with_capacity(INITIAL_GRAPH_SIZE),
marked: Vec::with_capacity(INITIAL_GRAPH_SIZE),
marked_temp: Vec::with_capacity(INITIAL_GRAPH_SIZE),
in_cycle: Vec::with_capacity(INITIAL_GRAPH_SIZE),
cycle_breakers: Vec::with_capacity(INITIAL_GRAPH_SIZE),
}
}

Expand All @@ -127,8 +130,8 @@ impl Graph {

// set input and output buffers to single channel of silence, will be upmixed when
// necessary
let inputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs];
let outputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs];
let inputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs];
let outputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs];

self.nodes.insert(
index,
Expand Down Expand Up @@ -487,6 +490,7 @@ impl Graph {
#[cfg(test)]
mod tests {
use super::*;
use crate::render::GarbageCollector;

#[derive(Debug, Clone)]
struct TestNode {
Expand Down Expand Up @@ -682,6 +686,7 @@ mod tests {
sample_rate: 48000.,
node_id: std::cell::Cell::new(AudioNodeId(0)),
event_sender: None,
garbage_collector: GarbageCollector::default(),
};
graph.render(&scope);

Expand Down Expand Up @@ -733,6 +738,7 @@ mod tests {
sample_rate: 48000.,
node_id: std::cell::Cell::new(AudioNodeId(0)),
event_sender: None,
garbage_collector: GarbageCollector::default(),
};
graph.render(&scope);

Expand Down
5 changes: 4 additions & 1 deletion src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ pub(crate) use thread::*;
// public mods
mod processor;
pub use processor::*;

mod quantum;
pub use quantum::*;

mod node_collection;
pub(crate) use node_collection::NodeCollection;

pub use quantum::*;
mod garbage_collector;
pub(crate) use garbage_collector::{GarbageCollector, TerminateGarbageCollectorThread};
6 changes: 3 additions & 3 deletions src/render/node_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ pub(crate) struct NodeCollection {
}

impl NodeCollection {
pub fn new() -> Self {
pub fn with_capacity(capacity: usize) -> Self {
let mut instance = Self {
nodes: Vec::with_capacity(64),
nodes: Vec::with_capacity(capacity),
};
instance.ensure_capacity(64);
instance.ensure_capacity(capacity);
instance
}

Expand Down
7 changes: 6 additions & 1 deletion src/render/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::context::{AudioNodeId, AudioParamId};
use crate::events::{ErrorEvent, EventDispatch};
use crate::{Event, RENDER_QUANTUM_SIZE};

use super::{graph::Node, AudioRenderQuantum, NodeCollection};
use super::{graph::Node, AudioRenderQuantum, GarbageCollector, NodeCollection};

use crossbeam_channel::Sender;
use std::cell::Cell;
Expand All @@ -24,6 +24,7 @@ pub struct RenderScope {

pub(crate) node_id: Cell<AudioNodeId>,
pub(crate) event_sender: Option<Sender<EventDispatch>>,
pub(crate) garbage_collector: GarbageCollector,
}

impl RenderScope {
Expand Down Expand Up @@ -61,6 +62,10 @@ impl RenderScope {
let _ = sender.try_send(EventDispatch::processor_error(self.node_id.get(), event));
}
}

pub(crate) fn deallocate_async(&self, value: llq::Node<Box<dyn Any + Send>>) {
self.garbage_collector.deallocate_async(value)
}
}

/// Interface for audio processing code that runs on the audio rendering thread.
Expand Down
6 changes: 3 additions & 3 deletions src/render/quantum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ mod tests {
let alloc = Alloc::with_capacity(2);
assert_eq!(alloc.pool_size(), 2);

alloc_counter::deny_alloc(|| {
assert_no_alloc::assert_no_alloc(|| {
{
// take a buffer out of the pool
let a = alloc.allocate();
Expand Down Expand Up @@ -712,7 +712,7 @@ mod tests {
let a = alloc.allocate();
let b = alloc.allocate();

let c = alloc_counter::allow_alloc(|| {
let c = assert_no_alloc::permit_alloc(|| {
// we can allocate beyond the pool size
let c = alloc.allocate();
assert_eq!(alloc.pool_size(), 0);
Expand All @@ -728,7 +728,7 @@ mod tests {
};

// dropping c will cause a re-allocation: the pool capacity is extended
alloc_counter::allow_alloc(move || {
assert_no_alloc::permit_alloc(move || {
std::mem::drop(c);
});

Expand Down
Loading
Loading