Skip to content

Commit

Permalink
Separate reading & writing into scoped threads
Browse files Browse the repository at this point in the history
If the index is on a SSD, sync perfomance should improve.
  • Loading branch information
romanz committed Dec 26, 2024
1 parent da1d38a commit 9338d34
Showing 1 changed file with 38 additions and 16 deletions.
54 changes: 38 additions & 16 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bitcoin::hashes::Hash;
use bitcoin::{BlockHash, OutPoint, Txid};
use bitcoin_slices::{bsl, Visit, Visitor};
use std::ops::ControlFlow;
use std::thread;

use crate::{
chain::{Chain, NewHeader},
Expand Down Expand Up @@ -188,22 +189,48 @@ impl Index {
return Ok(true); // no more blocks to index (done for now)
}
}
for chunk in new_headers.chunks(self.batch_size) {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
self.sync_blocks(daemon, chunk)?;
}

thread::scope(|s| -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);

let chunks = new_headers.chunks(self.batch_size);
let index = &self;
let reader = s.spawn(move || -> Result<()> {
for chunk in chunks {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let batch = index.index_blocks(daemon, chunk)?;
tx.send(batch).context("writer disconnected")?;
}
Ok(())
});

let index = &self;
let writer = s.spawn(move || {
let stats = &index.stats;
for mut batch in rx {
stats.observe_duration("sort", || batch.sort());
stats.observe_batch(&batch);
stats.observe_duration("write", || index.store.write(&batch));
stats.observe_db(&index.store);
}
});

reader.join().expect("reader thread panic")?;
writer.join().expect("writer thread panic");
Ok(())
})?;
self.chain.update(new_headers);
self.stats.observe_chain(&self.chain);
self.flush_needed = true;
Ok(false) // sync is not done
}

fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> {
fn index_blocks(&self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<WriteBatch> {
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());

Expand All @@ -222,12 +249,7 @@ impl Index {
"some blocks were not indexed: {:?}",
heights
);
batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
Ok(())
Ok(batch)
}

pub(crate) fn is_ready(&self) -> bool {
Expand Down

0 comments on commit 9338d34

Please sign in to comment.