Skip to content

Commit

Permalink
Name scope threads properly
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Jan 1, 2025
1 parent ed6f38f commit 2543e76
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,30 +195,36 @@ impl Index {

let chunks = new_headers.chunks(self.batch_size);
let index = &self; // to be moved into reader thread
let reader = scope.spawn(move || -> Result<()> {
for chunk in chunks {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let batch = index.index_blocks(daemon, chunk)?;
tx.send(batch).context("writer disconnected")?;
}
Ok(()) // `tx` is dropped, to stop the iteration on `rx`
});
let reader = thread::Builder::new()
.name("index_build".into())
.spawn_scoped(scope, move || -> Result<()> {
for chunk in chunks {
exit_flag.poll().with_context(|| {
format!(
"indexing interrupted at height: {}",
chunk.first().unwrap().height()
)
})?;
let batch = index.index_blocks(daemon, chunk)?;
tx.send(batch).context("writer disconnected")?;
}
Ok(()) // `tx` is dropped, to stop the iteration on `rx`
})
.expect("spawn failed");

let index = &self; // to be moved into writer thread
let writer = scope.spawn(move || {
let stats = &index.stats;
for mut batch in rx {
stats.observe_duration("sort", || batch.sort()); // pre-sort to optimize DB writes
stats.observe_batch(&batch);
stats.observe_duration("write", || index.store.write(&batch));
stats.observe_db(&index.store);
}
});
let writer = thread::Builder::new()
.name("index_write".into())
.spawn_scoped(scope, move || {
let stats = &index.stats;
for mut batch in rx {
stats.observe_duration("sort", || batch.sort()); // pre-sort to optimize DB writes
stats.observe_batch(&batch);
stats.observe_duration("write", || index.store.write(&batch));
stats.observe_db(&index.store);
}
})
.expect("spawn failed");

reader.join().expect("reader thread panic")?;
writer.join().expect("writer thread panic");
Expand Down

0 comments on commit 2543e76

Please sign in to comment.