From 46f2efe61596dee0e2c4fd3a6bebf8fc73c4159c Mon Sep 17 00:00:00 2001 From: satan Date: Fri, 29 Nov 2024 11:25:29 +0100 Subject: [PATCH 1/8] Made applying scheduled migrations atomic wrt commit block --- crates/node/src/shell/mod.rs | 27 +++++++++++++++++++++------ crates/sdk/src/migrations.rs | 17 +++-------------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index ffb176ff1a..cef20338b9 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -781,16 +781,31 @@ where /// hash. pub fn commit(&mut self) -> shim::Response { self.bump_last_processed_eth_block(); + let committed_height = self.state.in_mem().get_last_block_height(); + + let migration = match self.scheduled_migration.as_ref() { + Some(migration) if committed_height == migration.height => Some( + self.scheduled_migration + .take() + .unwrap() + .load() + .expect("The scheduled migration is not valid."), + ), + _ => None, + }; self.state .commit_block() .expect("Encountered a storage error while committing a block"); - let committed_height = self.state.in_mem().get_last_block_height(); - migrations::commit( - self.state.db(), - committed_height, - &mut self.scheduled_migration, - ); + + if let Some(migration) = migration { + migrations::commit(self.state.db(), migration); + self.state.commit_block().expect( + "Encountered a storage error while persisting changes \ + post-migration", + ); + } + let merkle_root = self.state.in_mem().merkle_root(); tracing::info!( diff --git a/crates/sdk/src/migrations.rs b/crates/sdk/src/migrations.rs index 062670e0d7..30dd222423 100644 --- a/crates/sdk/src/migrations.rs +++ b/crates/sdk/src/migrations.rs @@ -380,7 +380,7 @@ where Ok(scheduled_migration) } - fn load(&self) -> eyre::Result> { + pub fn load(&self) -> eyre::Result> { let update_json = self.validate()?; serde_json::from_str(&update_json) .map_err(|_| eyre!("Could not parse the updates file as json")) @@ -493,21 +493,10 @@ impl Display for UpdateStatus { /// Check if a scheduled migration should take place at this block height. /// If so, apply it to the DB. -pub fn commit( - db: &D, - height: BlockHeight, - migration: &mut Option>, -) where +pub fn commit(db: &D, migration: impl IntoIterator) +where D::Migrator: DeserializeOwned, { - let maybe_migration = migration; - let migration = match maybe_migration.as_ref() { - Some(migration) if height == migration.height => { - maybe_migration.take().unwrap().load().unwrap() - } - _ => return, - }; - tracing::info!( "A migration is scheduled to take place at this block height. \ Starting..." From 1497e3bb68e42c909fed489c912a08cb5cfd2b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Mon, 2 Dec 2024 15:09:13 +0000 Subject: [PATCH 2/8] node: ensure that migration can be decoded on start-up --- crates/node/src/shell/mod.rs | 2 +- crates/sdk/src/migrations.rs | 22 +++++++++------------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index cef20338b9..024f2b6f06 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -788,7 +788,7 @@ where self.scheduled_migration .take() .unwrap() - .load() + .load_and_validate() .expect("The scheduled migration is not valid."), ), _ => None, diff --git a/crates/sdk/src/migrations.rs b/crates/sdk/src/migrations.rs index 30dd222423..bf1d1d116c 100644 --- a/crates/sdk/src/migrations.rs +++ b/crates/sdk/src/migrations.rs @@ -376,26 +376,22 @@ where hash, phantom: Default::default(), }; - scheduled_migration.validate()?; + scheduled_migration.load_and_validate()?; Ok(scheduled_migration) } - pub fn load(&self) -> eyre::Result> { - let update_json = self.validate()?; - serde_json::from_str(&update_json) + pub fn load_and_validate(&self) -> eyre::Result> { + let update_json = self.load_bytes_and_validate()?; + serde_json::from_slice(&update_json) .map_err(|_| eyre!("Could not parse the updates file as json")) } - fn validate(&self) -> eyre::Result { - let update_json = - std::fs::read_to_string(&self.path).map_err(|_| { - eyre!( - "Could not find or read updates file at the specified \ - path." - ) - })?; + fn load_bytes_and_validate(&self) -> eyre::Result> { + let update_json = std::fs::read(&self.path).map_err(|_| { + eyre!("Could not find or read updates file at the specified path.") + })?; // validate contents against provided hash - if Hash::sha256(update_json.as_bytes()) != self.hash { + if Hash::sha256(&update_json) != self.hash { Err(eyre!( "Provided hash did not match the contents at the specified \ path." From 407ee7296af8d1d02fde6341abbbc3daeb020d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Mon, 2 Dec 2024 17:02:30 +0000 Subject: [PATCH 3/8] test/e2e/migration: ensure that node can be restarted --- .github/workflows/scripts/e2e.json | 4 ++-- crates/tests/src/e2e/ledger_tests.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/scripts/e2e.json b/.github/workflows/scripts/e2e.json index 51ac256df2..de3d28724d 100644 --- a/.github/workflows/scripts/e2e.json +++ b/.github/workflows/scripts/e2e.json @@ -17,7 +17,7 @@ "e2e::ledger_tests::run_ledger": 5, "e2e::ledger_tests::run_ledger_load_state_and_reset": 23, "e2e::ledger_tests::test_namada_shuts_down_if_tendermint_dies": 2, - "e2e::ledger_tests::test_namada_db_migration": 30, + "e2e::ledger_tests::test_namada_db_migration": 80, "e2e::ledger_tests::test_genesis_validators": 14, "e2e::ledger_tests::test_node_connectivity_and_consensus": 28, "e2e::ledger_tests::test_epoch_sleep": 12, @@ -34,4 +34,4 @@ "e2e::ledger_tests::masp_txs_and_queries": 82, "e2e::ledger_tests::test_genesis_chain_id_change": 35, "e2e::ledger_tests::test_genesis_manipulation": 103 -} +} \ No newline at end of file diff --git a/crates/tests/src/e2e/ledger_tests.rs b/crates/tests/src/e2e/ledger_tests.rs index cdccd01470..4906f282b1 100644 --- a/crates/tests/src/e2e/ledger_tests.rs +++ b/crates/tests/src/e2e/ledger_tests.rs @@ -369,16 +369,16 @@ fn test_db_migration() -> Result<()> { None, ); - // 1. Run the ledger node, halting at height 2 + // 1. Run the ledger node, halting at height 6 let mut ledger = run_as!( test, Who::Validator(0), Bin::Node, - &["ledger", "run-until", "--block-height", "2", "--halt",], + &["ledger", "run-until", "--block-height", "6", "--halt",], Some(40) )?; // Wait to commit a block - ledger.exp_string("Reached block height 2, halting the chain.")?; + ledger.exp_string("Reached block height 6, halting the chain.")?; ledger.exp_string(LEDGER_SHUTDOWN)?; ledger.exp_eof()?; drop(ledger); @@ -396,15 +396,16 @@ fn test_db_migration() -> Result<()> { "--path", migrations_json_path.to_string_lossy().as_ref(), "--block-height", - "2", + "6", ], Some(30), )?; - session.exp_eof()?; + session.assert_success(); let mut ledger = run_as!(test, Who::Validator(0), Bin::Node, &["ledger"], Some(40))?; ledger.exp_regex(r"Committed block hash.*, height: [0-9]+")?; + // 5. Check that a key was changed successfully let mut query = run_as!( test, From ed2addcb9487e3c6148f31ef8116dcba89ddb3cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Thu, 5 Dec 2024 18:33:55 +0000 Subject: [PATCH 4/8] rewrite migrations --- crates/apps/src/bin/namada-node/cli.rs | 38 +---- crates/apps_lib/src/cli.rs | 62 -------- crates/node/src/lib.rs | 40 +---- crates/node/src/shell/mod.rs | 12 +- crates/node/src/storage/rocksdb.rs | 117 ++++++-------- crates/sdk/src/migrations.rs | 206 +++++++++++++++++-------- crates/storage/src/db.rs | 51 ++++-- crates/storage/src/mockdb.rs | 44 ++++++ crates/tests/src/e2e/ledger_tests.rs | 17 +- 9 files changed, 284 insertions(+), 303 deletions(-) diff --git a/crates/apps/src/bin/namada-node/cli.rs b/crates/apps/src/bin/namada-node/cli.rs index 884380cdd9..a13657fcac 100644 --- a/crates/apps/src/bin/namada-node/cli.rs +++ b/crates/apps/src/bin/namada-node/cli.rs @@ -3,9 +3,7 @@ use eyre::{Context, Result}; use namada_apps_lib::cli::cmds::TestGenesis; use namada_apps_lib::cli::{self, cmds}; -use namada_apps_lib::config::{ - Action, ActionAtHeight, NodeLocalConfig, ValidatorLocalConfig, -}; +use namada_apps_lib::config::{NodeLocalConfig, ValidatorLocalConfig}; #[cfg(not(feature = "migrations"))] use namada_apps_lib::display_line; use namada_apps_lib::migrations::ScheduledMigration; @@ -58,40 +56,6 @@ pub fn main() -> Result<()> { node::rollback(chain_ctx.config.ledger) .wrap_err("Failed to rollback the Namada node")?; } - cmds::Ledger::UpdateDB(cmds::LedgerUpdateDB(args)) => { - #[cfg(not(feature = "migrations"))] - { - panic!( - "This command is only available if built with the \ - \"migrations\" feature." - ) - } - let mut chain_ctx = ctx.take_chain_or_exit(); - #[cfg(feature = "migrations")] - node::update_db_keys( - chain_ctx.config.ledger.clone(), - args.updates, - args.dry_run, - ); - if !args.dry_run { - let wasm_dir = chain_ctx.wasm_dir(); - chain_ctx.config.ledger.shell.action_at_height = - Some(ActionAtHeight { - height: args.last_height.checked_add(2).unwrap(), - action: Action::Halt, - }); - std::env::set_var( - "NAMADA_INITIAL_HEIGHT", - args.last_height.to_string(), - ); - // don't stop on panics - let handle = std::thread::spawn(|| { - node::run(chain_ctx.config.ledger, wasm_dir, None) - }); - _ = handle.join(); - std::env::remove_var("NAMADA_INITIAL_HEIGHT"); - } - } cmds::Ledger::QueryDB(cmds::LedgerQueryDB(args)) => { #[cfg(not(feature = "migrations"))] { diff --git a/crates/apps_lib/src/cli.rs b/crates/apps_lib/src/cli.rs index 36e0de0d7c..86ba075f3d 100644 --- a/crates/apps_lib/src/cli.rs +++ b/crates/apps_lib/src/cli.rs @@ -893,7 +893,6 @@ pub mod cmds { RunUntil(LedgerRunUntil), Reset(LedgerReset), DumpDb(LedgerDumpDb), - UpdateDB(LedgerUpdateDB), QueryDB(LedgerQueryDB), RollBack(LedgerRollBack), } @@ -906,13 +905,11 @@ pub mod cmds { let run = SubCmd::parse(matches).map(Self::Run); let reset = SubCmd::parse(matches).map(Self::Reset); let dump_db = SubCmd::parse(matches).map(Self::DumpDb); - let update_db = SubCmd::parse(matches).map(Self::UpdateDB); let query_db = SubCmd::parse(matches).map(Self::QueryDB); let rollback = SubCmd::parse(matches).map(Self::RollBack); let run_until = SubCmd::parse(matches).map(Self::RunUntil); run.or(reset) .or(dump_db) - .or(update_db) .or(query_db) .or(rollback) .or(run_until) @@ -936,7 +933,6 @@ pub mod cmds { .subcommand(LedgerRunUntil::def()) .subcommand(LedgerReset::def()) .subcommand(LedgerDumpDb::def()) - .subcommand(LedgerUpdateDB::def()) .subcommand(LedgerQueryDB::def()) .subcommand(LedgerRollBack::def()) } @@ -1022,29 +1018,6 @@ pub mod cmds { } } - #[derive(Clone, Debug)] - pub struct LedgerUpdateDB(pub args::LedgerUpdateDb); - - impl SubCmd for LedgerUpdateDB { - const CMD: &'static str = "update-db"; - - fn parse(matches: &ArgMatches) -> Option { - matches - .subcommand_matches(Self::CMD) - .map(|matches| Self(args::LedgerUpdateDb::parse(matches))) - } - - fn def() -> App { - App::new(Self::CMD) - .about(wrap!( - "Applies a set of updates to the DB for hard forking. The \ - input should be a path to a file dictating a set of keys \ - and their new values. Can be dry-run for testing." - )) - .add_args::() - } - } - #[derive(Clone, Debug)] pub struct LedgerQueryDB(pub args::LedgerQueryDb); @@ -3901,41 +3874,6 @@ pub mod args { } } - #[derive(Clone, Debug)] - pub struct LedgerUpdateDb { - pub updates: PathBuf, - pub dry_run: bool, - pub last_height: BlockHeight, - } - - impl Args for LedgerUpdateDb { - fn parse(matches: &ArgMatches) -> Self { - let updates = PATH.parse(matches); - let dry_run = DRY_RUN_TX.parse(matches); - let last_height = BLOCK_HEIGHT.parse(matches); - Self { - updates, - dry_run, - last_height, - } - } - - fn def(app: App) -> App { - app.arg(PATH.def().help(wrap!( - "The path to a json of key-value pairs to update the DB with." - ))) - .arg(DRY_RUN_TX.def().help(wrap!( - "If set, applies the updates but does not persist them. Using \ - for testing and debugging." - ))) - .arg( - BLOCK_HEIGHT.def().help(wrap!( - "The height at which the hard fork is happening." - )), - ) - } - } - #[derive(Clone, Debug)] pub struct LedgerQueryDb { pub key: storage::Key, diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index ad827f6d3d..65db933767 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -354,13 +354,14 @@ pub fn query_db( type_hash: &[u8; 32], cf: &DbColFam, ) { - use namada_sdk::migrations::DBUpdateVisitor; + use namada_apps_lib::storage::DBUpdateVisitor; + let chain_id = config.chain_id; let db_path = config.shell.db_dir(&chain_id); let db = storage::PersistentDB::open(db_path, None); - let db_visitor = storage::RocksDBUpdateVisitor::new(&db); - let bytes = db_visitor.read(key, cf).unwrap(); + let db_visitor = storage::RocksDBUpdateVisitor::default(); + let bytes = db_visitor.read(&db, key, cf).unwrap(); let deserializer = namada_migrations::get_deserializer(type_hash) .unwrap_or_else(|| { @@ -382,39 +383,6 @@ pub fn query_db( ); } -/// Change the funds of an account in-place. Use with -/// caution, as this modifies state in storage without -/// going through the consensus protocol. -#[cfg(feature = "migrations")] -pub fn update_db_keys(config: config::Ledger, updates: PathBuf, dry_run: bool) { - use std::io::Read; - - let mut update_json = String::new(); - let mut file = std::fs::File::open(updates) - .expect("Could not fine updates file at the specified path."); - file.read_to_string(&mut update_json) - .expect("Unable to read the updates json file"); - let updates: namada_sdk::migrations::DbChanges = - serde_json::from_str(&update_json) - .expect("Could not parse the updates file as json"); - let cometbft_path = config.cometbft_dir(); - let chain_id = config.chain_id; - let db_path = config.shell.db_dir(&chain_id); - - let db = storage::PersistentDB::open(db_path, None); - let batch = db.apply_migration_to_batch(updates.changes).unwrap(); - if !dry_run { - tracing::info!("Persisting DB changes..."); - db.exec_batch(batch).expect("Failed to execute write batch"); - db.flush(true).expect("Failed to flush data to disk"); - - // reset CometBFT's state, such that we can resume with a different appq - // hash - tendermint_node::reset_state(cometbft_path) - .expect("Failed to reset CometBFT state"); - } -} - /// Roll Namada state back to the previous height pub fn rollback(config: config::Ledger) -> Result<(), shell::Error> { shell::rollback(config) diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 024f2b6f06..49fd58b777 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -378,7 +378,7 @@ where /// Log of events emitted by `FinalizeBlock` ABCI calls. event_log: EventLog, /// A migration that can be scheduled at a given block height - pub scheduled_migration: Option>, + pub scheduled_migration: Option, /// When set, indicates after how many blocks a new snapshot /// will be taken (counting from the first block) pub blocks_between_snapshots: Option, @@ -477,7 +477,7 @@ where broadcast_sender: UnboundedSender>, eth_oracle: Option, db_cache: Option<&D::Cache>, - scheduled_migration: Option>, + scheduled_migration: Option, vp_wasm_compilation_cache: u64, tx_wasm_compilation_cache: u64, ) -> Self { @@ -781,10 +781,10 @@ where /// hash. pub fn commit(&mut self) -> shim::Response { self.bump_last_processed_eth_block(); - let committed_height = self.state.in_mem().get_last_block_height(); + let height_to_commit = self.state.in_mem().block.height; let migration = match self.scheduled_migration.as_ref() { - Some(migration) if committed_height == migration.height => Some( + Some(migration) if height_to_commit == migration.height => Some( self.scheduled_migration .take() .unwrap() @@ -799,7 +799,7 @@ where .expect("Encountered a storage error while committing a block"); if let Some(migration) = migration { - migrations::commit(self.state.db(), migration); + migrations::commit(&mut self.state, migration); self.state.commit_block().expect( "Encountered a storage error while persisting changes \ post-migration", @@ -809,7 +809,7 @@ where let merkle_root = self.state.in_mem().merkle_root(); tracing::info!( - "Committed block hash: {merkle_root}, height: {committed_height}", + "Committed block hash: {merkle_root}, height: {height_to_commit}", ); self.broadcast_queued_txs(); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index ac4717ae44..9d74e63e41 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -62,7 +62,6 @@ use namada_sdk::eth_bridge::storage::bridge_pool; use namada_sdk::eth_bridge::storage::proof::BridgePoolRootProof; use namada_sdk::gas::Gas; use namada_sdk::hash::Hash; -use namada_sdk::migrations::{DBUpdateVisitor, DbUpdateType}; use namada_sdk::state::merkle_tree::{ tree_key_prefix_with_epoch, tree_key_prefix_with_height, }; @@ -72,8 +71,9 @@ use namada_sdk::state::{ StoreType, DB, }; use namada_sdk::storage::{ - BlockHeader, BlockHeight, DbColFam, Epoch, Key, KeySeg, BLOCK_CF, DIFFS_CF, - REPLAY_PROTECTION_CF, ROLLBACK_CF, STATE_CF, SUBSPACE_CF, + BlockHeader, BlockHeight, DBUpdateVisitor, DbColFam, Epoch, Key, KeySeg, + BLOCK_CF, DIFFS_CF, REPLAY_PROTECTION_CF, ROLLBACK_CF, STATE_CF, + SUBSPACE_CF, }; use namada_sdk::{decode, encode, ethereum_events}; use rayon::prelude::*; @@ -85,7 +85,6 @@ use rocksdb::{ }; use crate::config::utils::num_of_threads; -use crate::storage; // TODO the DB schema will probably need some kind of versioning @@ -1077,7 +1076,7 @@ impl DbSnapshot { impl DB for RocksDB { type Cache = rocksdb::Cache; - type Migrator = DbUpdateType; + type Migrator = RocksDBUpdateVisitor; type RestoreSource<'a> = (&'a rocksdb::Cache, &'a mut std::fs::File); type WriteBatch = RocksDBWriteBatch; @@ -1817,100 +1816,65 @@ impl DB for RocksDB { Ok(()) } - #[inline] - fn apply_migration_to_batch( - &self, - updates: impl IntoIterator, - ) -> Result { - let mut db_visitor = storage::RocksDBUpdateVisitor::new(self); - for change in updates.into_iter() { - match change.update(&mut db_visitor) { - Ok(status) => { - tracing::info!("{}", status); - } - Err(e) => { - let error = format!( - "Attempt to write to key/pattern <{}> failed:\n{}.", - change.pattern(), - e - ); - tracing::error!(error); - return Err(Error::DBError(error)); - } - } - } - Ok(db_visitor.take_batch()) + fn migrator() -> Self::Migrator { + RocksDBUpdateVisitor::default() } } /// A struct that can visit a set of updates, /// registering them all in the batch -pub struct RocksDBUpdateVisitor<'db> { - db: &'db RocksDB, +#[derive(Default)] +pub struct RocksDBUpdateVisitor { batch: RocksDBWriteBatch, } -impl<'db> RocksDBUpdateVisitor<'db> { - pub fn new(db: &'db RocksDB) -> Self { - Self { - db, - batch: Default::default(), - } - } +impl DBUpdateVisitor for RocksDBUpdateVisitor { + type DB = RocksDB; - pub fn take_batch(self) -> RocksDBWriteBatch { - self.batch - } -} - -impl<'db> DBUpdateVisitor for RocksDBUpdateVisitor<'db> { - fn read(&self, key: &Key, cf: &DbColFam) -> Option> { + fn read(&self, db: &Self::DB, key: &Key, cf: &DbColFam) -> Option> { match cf { - DbColFam::SUBSPACE => self - .db + DbColFam::SUBSPACE => db .read_subspace_val(key) .expect("Failed to read from storage"), _ => { let cf_str = cf.to_str(); - let cf = self - .db + let cf = db .get_column_family(cf_str) .expect("Failed to read column family from storage"); - self.db - .read_value_bytes(cf, key.to_string()) + db.read_value_bytes(cf, key.to_string()) .expect("Failed to get key from storage") } } } - fn write(&mut self, key: &Key, cf: &DbColFam, value: impl AsRef<[u8]>) { - self.db - .overwrite_entry(&mut self.batch, cf, key, value) + fn write( + &mut self, + db: &Self::DB, + key: &Key, + cf: &DbColFam, + value: impl AsRef<[u8]>, + ) { + db.overwrite_entry(&mut self.batch, cf, key, value) .expect("Failed to overwrite a key in storage") } - fn delete(&mut self, key: &Key, cf: &DbColFam) { - let state_cf = self.db.get_column_family(STATE_CF).unwrap(); - let last_height: BlockHeight = self - .db - .read_value(state_cf, BLOCK_HEIGHT_KEY) - .unwrap() - .unwrap(); + fn delete(&mut self, db: &Self::DB, key: &Key, cf: &DbColFam) { + let state_cf = db.get_column_family(STATE_CF).unwrap(); + let last_height: BlockHeight = + db.read_value(state_cf, BLOCK_HEIGHT_KEY).unwrap().unwrap(); match cf { DbColFam::SUBSPACE => { - self.db - .batch_delete_subspace_val( - &mut self.batch, - last_height, - key, - true, - ) - .expect("Failed to delete key from storage"); + db.batch_delete_subspace_val( + &mut self.batch, + last_height, + key, + true, + ) + .expect("Failed to delete key from storage"); } _ => { let cf_str = cf.to_str(); - let cf = self - .db + let cf = db .get_column_family(cf_str) .expect("Failed to get read column family from storage"); self.batch.0.delete_cf(cf, key.to_string()); @@ -1918,12 +1882,19 @@ impl<'db> DBUpdateVisitor for RocksDBUpdateVisitor<'db> { }; } - fn get_pattern(&self, pattern: Regex) -> Vec<(String, Vec)> { - self.db - .iter_pattern(None, pattern) + fn get_pattern( + &self, + db: &Self::DB, + pattern: Regex, + ) -> Vec<(String, Vec)> { + db.iter_pattern(None, pattern) .map(|(k, v, _)| (k, v)) .collect() } + + fn commit(self, db: &Self::DB) -> Result<()> { + db.exec_batch(self.batch) + } } impl<'iter> DBIter<'iter> for RocksDB { diff --git a/crates/sdk/src/migrations.rs b/crates/sdk/src/migrations.rs index bf1d1d116c..2adf73980d 100644 --- a/crates/sdk/src/migrations.rs +++ b/crates/sdk/src/migrations.rs @@ -2,7 +2,6 @@ use core::fmt::{Display, Formatter}; use core::str::FromStr; -use std::marker::PhantomData; use std::path::{Path, PathBuf}; use borsh::{BorshDeserialize, BorshSerialize}; @@ -11,24 +10,19 @@ use data_encoding::HEXUPPER; use eyre::eyre; use namada_core::chain::BlockHeight; use namada_core::hash::Hash; -use namada_core::storage::Key; +use namada_core::storage; use namada_macros::{derive_borshdeserializer, typehash}; use namada_migrations::{TypeHash, *}; -use namada_storage::{DbColFam, DbMigration, DB}; +use namada_state::merkle_tree::NO_DIFF_KEY_PREFIX; +use namada_state::{DBIter, FullAccessState, KeySeg, StorageHasher}; +use namada_storage::{DBUpdateVisitor, DbColFam, DB}; use regex::Regex; -use serde::de::{DeserializeOwned, Error, Visitor}; +use serde::de::{Error, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; /// The maximum number of character printed per value. const PRINTLN_CUTOFF: usize = 300; -pub trait DBUpdateVisitor { - fn read(&self, key: &Key, cf: &DbColFam) -> Option>; - fn write(&mut self, key: &Key, cf: &DbColFam, value: impl AsRef<[u8]>); - fn delete(&mut self, key: &Key, cf: &DbColFam); - fn get_pattern(&self, pattern: Regex) -> Vec<(String, Vec)>; -} - #[derive(Debug, Clone, BorshSerialize, BorshDeserialize)] enum UpdateBytes { Raw { @@ -167,12 +161,12 @@ impl<'de> Deserialize<'de> for UpdateValue { /// An update to the database pub enum DbUpdateType { Add { - key: Key, + key: storage::Key, cf: DbColFam, value: UpdateValue, force: bool, }, - Delete(Key, DbColFam), + Delete(storage::Key, DbColFam), RepeatAdd { pattern: String, cf: DbColFam, @@ -182,8 +176,6 @@ pub enum DbUpdateType { RepeatDelete(String, DbColFam), } -impl DbMigration for DbUpdateType {} - impl DbUpdateType { /// Get the key or pattern being modified as string pub fn pattern(&self) -> String { @@ -273,15 +265,20 @@ impl DbUpdateType { /// Validate a DB change and persist it if so. The debug representation of /// the new value is returned for logging purposes. - pub fn update( + pub fn update( &self, - db: &mut DB, - ) -> eyre::Result { - match self { + state: &mut FullAccessState, + ) -> eyre::Result + where + D: 'static + DB + for<'iter> DBIter<'iter>, + H: 'static + StorageHasher, + { + let mut migrator = D::migrator(); + let status = match self { Self::Add { key, cf, value, .. } => { let (deserialized, deserializer) = self.validate()?; if let (Some(prev), Some(des)) = - (db.read(key, cf), deserializer) + (migrator.read(state.db(), key, cf), deserializer) { des(prev).ok_or_else(|| { eyre::eyre!( @@ -292,11 +289,39 @@ impl DbUpdateType { ) })?; } - db.write(key, cf, value.to_write()); + let value = value.bytes(); + if let DbColFam::SUBSPACE = cf { + // Update the merkle tree + let persist_diffs = (state.diff_key_filter)(key); + let merk_key = if !persist_diffs { + let prefix = storage::Key::from( + NO_DIFF_KEY_PREFIX.to_string().to_db_key(), + ); + &prefix.join(key) + } else { + key + }; + state.in_mem_mut().block.tree.update(merk_key, value)?; + } + + migrator.write(state.db(), key, cf, value); Ok(UpdateStatus::Add(vec![(key.to_string(), deserialized)])) } Self::Delete(key, cf) => { - db.delete(key, cf); + migrator.delete(state.db(), key, cf); + if let DbColFam::SUBSPACE = cf { + // Update the merkle tree + let persist_diffs = (state.diff_key_filter)(key); + let merk_key = if !persist_diffs { + let prefix = storage::Key::from( + NO_DIFF_KEY_PREFIX.to_string().to_db_key(), + ); + &prefix.join(key) + } else { + key + }; + state.in_mem_mut().block.tree.delete(merk_key)?; + } Ok(UpdateStatus::Deleted(vec![key.to_string()])) } DbUpdateType::RepeatAdd { @@ -305,7 +330,9 @@ impl DbUpdateType { let pattern = Regex::new(pattern).unwrap(); let mut pairs = vec![]; let (deserialized, deserializer) = self.validate()?; - for (key, prev) in db.get_pattern(pattern.clone()) { + for (key, prev) in + migrator.get_pattern(state.db(), pattern.clone()) + { if let Some(des) = deserializer { des(prev).ok_or_else(|| { eyre::eyre!( @@ -316,53 +343,87 @@ impl DbUpdateType { deserialized, ) })?; - pairs.push((key.to_string(), deserialized.clone())); + pairs.push((key.clone(), deserialized.clone())); } else { - pairs.push((key.to_string(), deserialized.clone())); + pairs.push((key.clone(), deserialized.clone())); } - db.write( - &Key::from_str(&key).unwrap(), - cf, - value.to_write(), - ); + let key = storage::Key::from_str(&key).unwrap(); + let value = value.bytes(); + if let DbColFam::SUBSPACE = cf { + // Update the merkle tree + let persist_diffs = (state.diff_key_filter)(&key); + let merk_key = if !persist_diffs { + let prefix = storage::Key::from( + NO_DIFF_KEY_PREFIX.to_string().to_db_key(), + ); + &prefix.join(&key) + } else { + &key + }; + state + .in_mem_mut() + .block + .tree + .update(merk_key, value)?; + } + migrator.write(state.db(), &key, cf, value); } - Ok(UpdateStatus::Add(pairs)) + Ok::<_, eyre::Error>(UpdateStatus::Add(pairs)) } DbUpdateType::RepeatDelete(pattern, cf) => { let pattern = Regex::new(pattern).unwrap(); Ok(UpdateStatus::Deleted( - db.get_pattern(pattern.clone()) + migrator + .get_pattern(state.db(), pattern.clone()) .into_iter() - .map(|(key, _)| { - db.delete(&Key::from_str(&key).unwrap(), cf); - key + .map(|(raw_key, _)| { + let key = storage::Key::from_str(&raw_key).unwrap(); + if let DbColFam::SUBSPACE = cf { + // Update the merkle tree + let persist_diffs = + (state.diff_key_filter)(&key); + let merk_key = if !persist_diffs { + let prefix = storage::Key::from( + NO_DIFF_KEY_PREFIX + .to_string() + .to_db_key(), + ); + &prefix.join(&key) + } else { + &key + }; + state + .in_mem_mut() + .block + .tree + .delete(merk_key)?; + } + + migrator.delete(state.db(), &key, cf); + Ok(raw_key) }) - .collect(), + .collect::>>()?, )) } - } + }?; + migrator.commit(state.db())?; + Ok(status) } } /// A set of key-value changes to be applied to /// the db at a specified height. #[derive(Debug, Clone)] -pub struct ScheduledMigration { +pub struct ScheduledMigration { /// The height at which to perform the changes pub height: BlockHeight, /// The actual set of changes pub path: PathBuf, /// A hash of the expected contents in the file pub hash: Hash, - /// For keeping track of what data type we deserialize the - /// contents of the file to. - phantom: PhantomData, } -impl ScheduledMigration -where - D: DbMigration + DeserializeOwned, -{ +impl ScheduledMigration { /// Read in a migrations json and a hash to verify the contents /// against. Also needs a height for which the changes are scheduled. pub fn from_path( @@ -374,13 +435,12 @@ where height, path: path.as_ref().to_path_buf(), hash, - phantom: Default::default(), }; scheduled_migration.load_and_validate()?; Ok(scheduled_migration) } - pub fn load_and_validate(&self) -> eyre::Result> { + pub fn load_and_validate(&self) -> eyre::Result { let update_json = self.load_bytes_and_validate()?; serde_json::from_slice(&update_json) .map_err(|_| eyre!("Could not parse the updates file as json")) @@ -403,13 +463,13 @@ where } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct DbChanges { - pub changes: Vec, +pub struct DbChanges { + pub changes: Vec, } -impl IntoIterator for DbChanges { - type IntoIter = std::vec::IntoIter; - type Item = D; +impl IntoIterator for DbChanges { + type IntoIter = std::vec::IntoIter; + type Item = DbUpdateType; fn into_iter(self) -> Self::IntoIter { self.changes.into_iter() @@ -489,26 +549,36 @@ impl Display for UpdateStatus { /// Check if a scheduled migration should take place at this block height. /// If so, apply it to the DB. -pub fn commit(db: &D, migration: impl IntoIterator) -where - D::Migrator: DeserializeOwned, +pub fn commit( + state: &mut FullAccessState, + migration: impl IntoIterator, +) where + D: 'static + DB + for<'iter> DBIter<'iter>, + H: 'static + StorageHasher, { tracing::info!( "A migration is scheduled to take place at this block height. \ Starting..." ); - match db.apply_migration_to_batch(migration) { - Ok(batch) => { - tracing::info!("Persisting DB changes..."); - db.exec_batch(batch).expect("Failed to execute write batch"); - } - Err(e) => { - panic!( - "Failed to execute migration, no changes persisted. \ - Encountered error: {}", - e - ); + for change in migration.into_iter() { + match change.update(state) { + Ok(status) => { + tracing::info!("{status}"); + } + Err(e) => { + let error = format!( + "Attempt to write to key/pattern <{}> failed:\n{}.", + change.pattern(), + e + ); + tracing::error!(error); + panic!( + "Failed to execute migration, no changes persisted. \ + Encountered error: {}", + e + ); + } } } } @@ -557,7 +627,7 @@ mod test_migrations { fn test_scheduled_migration_validate() { let file = tempfile::Builder::new().tempfile().expect("Test failed"); let updates = [DbUpdateType::Add { - key: Key::parse("bing/fucking/bong").expect("Test failed"), + key: storage::Key::parse("bing/fucking/bong").expect("Test failed"), cf: DbColFam::SUBSPACE, value: Amount::native_whole(1337).into(), force: false, @@ -568,7 +638,7 @@ mod test_migrations { let json = serde_json::to_string(&changes).expect("Test failed"); let hash = Hash::sha256("derpy derp".as_bytes()); std::fs::write(file.path(), json).expect("Test failed"); - let migration = ScheduledMigration::::from_path( + let migration = ScheduledMigration::from_path( file.path(), hash, Default::default(), diff --git a/crates/storage/src/db.rs b/crates/storage/src/db.rs index c4abb55d28..7de9ccbfb5 100644 --- a/crates/storage/src/db.rs +++ b/crates/storage/src/db.rs @@ -14,8 +14,6 @@ use namada_merkle_tree::{ StoreType, }; use regex::Regex; -use serde::de::DeserializeOwned; -use serde::Serialize; use thiserror::Error; use crate::conversion_state::ConversionState; @@ -124,9 +122,8 @@ pub trait DB: Debug { /// A handle for batch writes type WriteBatch: DBWriteBatch; - /// A type that can apply a key-value - /// change to DB. - type Migrator: DbMigration + DeserializeOwned; + /// A type placeholder for DB migration implementation. + type Migrator: DBUpdateVisitor; /// Source data to restore a database. type RestoreSource<'a>; @@ -299,14 +296,39 @@ pub trait DB: Debug { new_value: impl AsRef<[u8]>, ) -> Result<()>; - /// Apply a series of key-value changes - /// to the DB. - fn apply_migration_to_batch( + /// Get an instance of DB migrator + fn migrator() -> Self::Migrator; +} + +/// A CRUD DB access +pub trait DBUpdateVisitor { + /// The DB type + type DB; + + /// Try to read key's value from a DB column + fn read(&self, db: &Self::DB, key: &Key, cf: &DbColFam) -> Option>; + + /// Write key's value to a DB column + fn write( + &mut self, + db: &Self::DB, + key: &Key, + cf: &DbColFam, + value: impl AsRef<[u8]>, + ); + + /// Delete key-val + fn delete(&mut self, db: &Self::DB, key: &Key, cf: &DbColFam); + + /// Get key-vals matching the pattern + fn get_pattern( &self, - _updates: impl IntoIterator, - ) -> Result { - unimplemented!() - } + db: &Self::DB, + pattern: Regex, + ) -> Vec<(String, Vec)>; + + /// Commit the changes + fn commit(self, db: &Self::DB) -> Result<()>; } /// A database prefix iterator. @@ -359,8 +381,3 @@ pub trait DBIter<'iter> { /// Atomic batch write. pub trait DBWriteBatch {} - -/// A type that can apply a key-value change to a DB -pub trait DbMigration: Debug + Clone + Serialize {} - -impl DbMigration for () {} diff --git a/crates/storage/src/mockdb.rs b/crates/storage/src/mockdb.rs index eb8e849ca0..30d14af139 100644 --- a/crates/storage/src/mockdb.rs +++ b/crates/storage/src/mockdb.rs @@ -24,6 +24,7 @@ use crate::db::{ BlockStateRead, BlockStateWrite, DBIter, DBWriteBatch, Error, Result, DB, }; use crate::types::{KVBytes, PatternIterator, PrefixIterator}; +use crate::DBUpdateVisitor; const SUBSPACE_CF: &str = "subspace"; @@ -644,6 +645,10 @@ impl DB for MockDB { ) -> Result<()> { unimplemented!() } + + fn migrator() -> Self::Migrator { + unimplemented!("Migration isn't implemented in MockDB") + } } impl<'iter> DBIter<'iter> for MockDB { @@ -822,3 +827,42 @@ impl Iterator for MockPatternIterator { } impl DBWriteBatch for MockDBWriteBatch {} + +impl DBUpdateVisitor for () { + type DB = crate::mockdb::MockDB; + + fn read( + &self, + _db: &Self::DB, + _key: &Key, + _cf: &DbColFam, + ) -> Option> { + unimplemented!() + } + + fn write( + &mut self, + _db: &Self::DB, + _key: &Key, + _cf: &DbColFam, + _value: impl AsRef<[u8]>, + ) { + unimplemented!() + } + + fn delete(&mut self, _db: &Self::DB, _key: &Key, _cf: &DbColFam) { + unimplemented!() + } + + fn get_pattern( + &self, + _db: &Self::DB, + _pattern: Regex, + ) -> Vec<(String, Vec)> { + unimplemented!() + } + + fn commit(self, _db: &Self::DB) -> Result<()> { + unimplemented!() + } +} diff --git a/crates/tests/src/e2e/ledger_tests.rs b/crates/tests/src/e2e/ledger_tests.rs index 4906f282b1..1e9fce003c 100644 --- a/crates/tests/src/e2e/ledger_tests.rs +++ b/crates/tests/src/e2e/ledger_tests.rs @@ -385,22 +385,31 @@ fn test_db_migration() -> Result<()> { let migrations_json_path = working_dir() .join("examples") .join("migration_example.json"); + let migration_hash = namada_core::hash::Hash::sha256( + std::fs::read(&migrations_json_path).unwrap(), + ) + .to_string(); // 2. Update the db - let mut session = run_as!( + let mut ledger = run_as!( test, Who::Validator(0), Bin::Node, &[ "ledger", - "update-db", + "run", "--path", migrations_json_path.to_string_lossy().as_ref(), - "--block-height", + "--height", "6", + "--hash", + &migration_hash ], Some(30), )?; - session.assert_success(); + + ledger.exp_regex(r"Committed block hash.*, height: [0-9]+")?; + ledger.interrupt()?; + ledger.exp_eof()?; let mut ledger = run_as!(test, Who::Validator(0), Bin::Node, &["ledger"], Some(40))?; From 5d711737bcb87dad5aa0d1ada126d5b221714840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Fri, 6 Dec 2024 11:58:17 +0000 Subject: [PATCH 5/8] update merkle tree stores after migration --- crates/node/src/shell/mod.rs | 7 ++--- crates/node/src/storage/rocksdb.rs | 45 ++++++++++++++++++++++++++++++ crates/state/src/wl_state.rs | 7 +++++ crates/storage/src/db.rs | 6 ++++ crates/storage/src/mockdb.rs | 34 +++++++++++++++++++++- 5 files changed, 94 insertions(+), 5 deletions(-) diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 49fd58b777..06310598c3 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -800,10 +800,9 @@ where if let Some(migration) = migration { migrations::commit(&mut self.state, migration); - self.state.commit_block().expect( - "Encountered a storage error while persisting changes \ - post-migration", - ); + self.state + .update_last_block_merkle_tree() + .expect("Must update merkle tree after migration"); } let merkle_root = self.state.in_mem().merkle_root(); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 9d74e63e41..fe64affd08 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -1819,6 +1819,51 @@ impl DB for RocksDB { fn migrator() -> Self::Migrator { RocksDBUpdateVisitor::default() } + + fn update_last_block_merkle_tree( + &self, + merkle_tree_stores: namada_vp::state::MerkleTreeStoresWrite<'_>, + ) -> Result<()> { + let state_cf = self.get_column_family(STATE_CF)?; + let block_cf = self.get_column_family(BLOCK_CF)?; + + // Read the last block's height + let height: BlockHeight = + self.read_value(state_cf, BLOCK_HEIGHT_KEY)?.unwrap(); + + // Read the last block's epoch + let prefix = height.raw(); + let epoch_key = format!("{prefix}/{EPOCH_KEY_SEGMENT}"); + let epoch = self.read_value(block_cf, epoch_key)?.unwrap(); + + let mut batch = RocksDBWriteBatch::default(); + for st in StoreType::iter() { + if st.is_stored_every_block() { + let key_prefix = if st.is_stored_every_block() { + tree_key_prefix_with_height(st, height) + } else { + tree_key_prefix_with_epoch(st, epoch) + }; + let root_key = + format!("{key_prefix}/{MERKLE_TREE_ROOT_KEY_SEGMENT}"); + self.add_value_to_batch( + block_cf, + root_key, + merkle_tree_stores.root(st), + &mut batch, + ); + let store_key = + format!("{key_prefix}/{MERKLE_TREE_STORE_KEY_SEGMENT}"); + self.add_value_bytes_to_batch( + block_cf, + store_key, + merkle_tree_stores.store(st).encode(), + &mut batch, + ); + } + } + self.exec_batch(batch) + } } /// A struct that can visit a set of updates, diff --git a/crates/state/src/wl_state.rs b/crates/state/src/wl_state.rs index f148fb8a84..739bbc7062 100644 --- a/crates/state/src/wl_state.rs +++ b/crates/state/src/wl_state.rs @@ -680,6 +680,13 @@ where self.db.exec_batch(batch)?; Ok(()) } + + /// Update the merkle tree written for the committed last block + pub fn update_last_block_merkle_tree(&self) -> Result<()> { + self.db + .update_last_block_merkle_tree(self.in_mem.block.tree.stores())?; + Ok(()) + } } impl WlState diff --git a/crates/storage/src/db.rs b/crates/storage/src/db.rs index 7de9ccbfb5..a00b569365 100644 --- a/crates/storage/src/db.rs +++ b/crates/storage/src/db.rs @@ -298,6 +298,12 @@ pub trait DB: Debug { /// Get an instance of DB migrator fn migrator() -> Self::Migrator; + + /// Update the merkle tree written for the committed last block + fn update_last_block_merkle_tree( + &self, + merkle_tree_stores: MerkleTreeStoresWrite<'_>, + ) -> Result<()>; } /// A CRUD DB access diff --git a/crates/storage/src/mockdb.rs b/crates/storage/src/mockdb.rs index 30d14af139..fc764eb72b 100644 --- a/crates/storage/src/mockdb.rs +++ b/crates/storage/src/mockdb.rs @@ -15,7 +15,7 @@ use namada_core::{decode, encode, ethereum_events}; use namada_gas::Gas; use namada_merkle_tree::{ tree_key_prefix_with_epoch, tree_key_prefix_with_height, - MerkleTreeStoresRead, StoreType, + MerkleTreeStoresRead, MerkleTreeStoresWrite, StoreType, }; use namada_replay_protection as replay_protection; use regex::Regex; @@ -649,6 +649,38 @@ impl DB for MockDB { fn migrator() -> Self::Migrator { unimplemented!("Migration isn't implemented in MockDB") } + + fn update_last_block_merkle_tree( + &self, + merkle_tree_stores: MerkleTreeStoresWrite<'_>, + ) -> Result<()> { + // Read the last block's height + let height: BlockHeight = self.read_value(BLOCK_HEIGHT_KEY)?.unwrap(); + + // Read the last block's epoch + let prefix = height.raw(); + let epoch_key = format!("{prefix}/{EPOCH_KEY_SEGMENT}"); + let epoch: Epoch = self.read_value(epoch_key)?.unwrap(); + + for st in StoreType::iter() { + if st.is_stored_every_block() { + let key_prefix = if st.is_stored_every_block() { + tree_key_prefix_with_height(st, height) + } else { + tree_key_prefix_with_epoch(st, epoch) + }; + let root_key = + format!("{key_prefix}/{MERKLE_TREE_ROOT_KEY_SEGMENT}"); + self.write_value(root_key, merkle_tree_stores.root(st)); + let store_key = + format!("{key_prefix}/{MERKLE_TREE_STORE_KEY_SEGMENT}"); + self.0 + .borrow_mut() + .insert(store_key, merkle_tree_stores.store(st).encode()); + } + } + Ok(()) + } } impl<'iter> DBIter<'iter> for MockDB { From c1f2b14cef6f73c128e8d963e14857b7588d372a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Mon, 2 Dec 2024 17:30:13 +0000 Subject: [PATCH 6/8] changelog: add #4135 --- .changelog/unreleased/bug-fixes/4135-migration-fixes.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/unreleased/bug-fixes/4135-migration-fixes.md diff --git a/.changelog/unreleased/bug-fixes/4135-migration-fixes.md b/.changelog/unreleased/bug-fixes/4135-migration-fixes.md new file mode 100644 index 0000000000..a67dfc9fd4 --- /dev/null +++ b/.changelog/unreleased/bug-fixes/4135-migration-fixes.md @@ -0,0 +1,3 @@ +- Strengthen migration checks and ensure to update merkle tree during + migrations. The redundant `namadan update-db` command has been removed in the + process. ([\#4135](https://github.com/anoma/namada/pull/4135)) \ No newline at end of file From d59a87b1341a1e98b9437d1b703b3ebac3983dba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Fri, 6 Dec 2024 16:52:54 +0000 Subject: [PATCH 7/8] use diff key filter for migrations --- crates/node/src/storage/rocksdb.rs | 16 ++++++++++++---- crates/sdk/src/migrations.rs | 22 +++++++++++++--------- crates/storage/src/db.rs | 10 +++++++++- crates/storage/src/mockdb.rs | 10 +++++++++- 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index fe64affd08..1a5ea90850 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -1788,6 +1788,7 @@ impl DB for RocksDB { cf: &DbColFam, key: &Key, new_value: impl AsRef<[u8]>, + persist_diffs: bool, ) -> Result<()> { self.insert_entry(batch, cf, key, new_value.as_ref())?; let state_cf = self.get_column_family(STATE_CF)?; @@ -1798,7 +1799,7 @@ impl DB for RocksDB { })?; // If the CF is subspace, additionally update the diffs - if cf == &DbColFam::SUBSPACE { + if persist_diffs && cf == &DbColFam::SUBSPACE { let diffs_cf = self.get_column_family(DIFFS_CF)?; let diffs_key = Key::from(last_height.to_db_key()) .with_segment("new".to_owned()) @@ -1898,12 +1899,19 @@ impl DBUpdateVisitor for RocksDBUpdateVisitor { key: &Key, cf: &DbColFam, value: impl AsRef<[u8]>, + persist_diffs: bool, ) { - db.overwrite_entry(&mut self.batch, cf, key, value) + db.overwrite_entry(&mut self.batch, cf, key, value, persist_diffs) .expect("Failed to overwrite a key in storage") } - fn delete(&mut self, db: &Self::DB, key: &Key, cf: &DbColFam) { + fn delete( + &mut self, + db: &Self::DB, + key: &Key, + cf: &DbColFam, + persist_diffs: bool, + ) { let state_cf = db.get_column_family(STATE_CF).unwrap(); let last_height: BlockHeight = db.read_value(state_cf, BLOCK_HEIGHT_KEY).unwrap().unwrap(); @@ -1913,7 +1921,7 @@ impl DBUpdateVisitor for RocksDBUpdateVisitor { &mut self.batch, last_height, key, - true, + persist_diffs, ) .expect("Failed to delete key from storage"); } diff --git a/crates/sdk/src/migrations.rs b/crates/sdk/src/migrations.rs index 2adf73980d..03f96ce145 100644 --- a/crates/sdk/src/migrations.rs +++ b/crates/sdk/src/migrations.rs @@ -290,9 +290,9 @@ impl DbUpdateType { })?; } let value = value.bytes(); + let persist_diffs = (state.diff_key_filter)(key); if let DbColFam::SUBSPACE = cf { // Update the merkle tree - let persist_diffs = (state.diff_key_filter)(key); let merk_key = if !persist_diffs { let prefix = storage::Key::from( NO_DIFF_KEY_PREFIX.to_string().to_db_key(), @@ -304,14 +304,14 @@ impl DbUpdateType { state.in_mem_mut().block.tree.update(merk_key, value)?; } - migrator.write(state.db(), key, cf, value); + migrator.write(state.db(), key, cf, value, persist_diffs); Ok(UpdateStatus::Add(vec![(key.to_string(), deserialized)])) } Self::Delete(key, cf) => { - migrator.delete(state.db(), key, cf); + let persist_diffs = (state.diff_key_filter)(key); + migrator.delete(state.db(), key, cf, persist_diffs); if let DbColFam::SUBSPACE = cf { // Update the merkle tree - let persist_diffs = (state.diff_key_filter)(key); let merk_key = if !persist_diffs { let prefix = storage::Key::from( NO_DIFF_KEY_PREFIX.to_string().to_db_key(), @@ -349,9 +349,9 @@ impl DbUpdateType { } let key = storage::Key::from_str(&key).unwrap(); let value = value.bytes(); + let persist_diffs = (state.diff_key_filter)(&key); if let DbColFam::SUBSPACE = cf { // Update the merkle tree - let persist_diffs = (state.diff_key_filter)(&key); let merk_key = if !persist_diffs { let prefix = storage::Key::from( NO_DIFF_KEY_PREFIX.to_string().to_db_key(), @@ -366,7 +366,7 @@ impl DbUpdateType { .tree .update(merk_key, value)?; } - migrator.write(state.db(), &key, cf, value); + migrator.write(state.db(), &key, cf, value, persist_diffs); } Ok::<_, eyre::Error>(UpdateStatus::Add(pairs)) } @@ -378,10 +378,9 @@ impl DbUpdateType { .into_iter() .map(|(raw_key, _)| { let key = storage::Key::from_str(&raw_key).unwrap(); + let persist_diffs = (state.diff_key_filter)(&key); if let DbColFam::SUBSPACE = cf { // Update the merkle tree - let persist_diffs = - (state.diff_key_filter)(&key); let merk_key = if !persist_diffs { let prefix = storage::Key::from( NO_DIFF_KEY_PREFIX @@ -399,7 +398,12 @@ impl DbUpdateType { .delete(merk_key)?; } - migrator.delete(state.db(), &key, cf); + migrator.delete( + state.db(), + &key, + cf, + persist_diffs, + ); Ok(raw_key) }) .collect::>>()?, diff --git a/crates/storage/src/db.rs b/crates/storage/src/db.rs index a00b569365..d0865494e5 100644 --- a/crates/storage/src/db.rs +++ b/crates/storage/src/db.rs @@ -294,6 +294,7 @@ pub trait DB: Debug { cf: &DbColFam, key: &Key, new_value: impl AsRef<[u8]>, + persist_diffs: bool, ) -> Result<()>; /// Get an instance of DB migrator @@ -321,10 +322,17 @@ pub trait DBUpdateVisitor { key: &Key, cf: &DbColFam, value: impl AsRef<[u8]>, + persist_diffs: bool, ); /// Delete key-val - fn delete(&mut self, db: &Self::DB, key: &Key, cf: &DbColFam); + fn delete( + &mut self, + db: &Self::DB, + key: &Key, + cf: &DbColFam, + persist_diffs: bool, + ); /// Get key-vals matching the pattern fn get_pattern( diff --git a/crates/storage/src/mockdb.rs b/crates/storage/src/mockdb.rs index fc764eb72b..be3c6cd6f1 100644 --- a/crates/storage/src/mockdb.rs +++ b/crates/storage/src/mockdb.rs @@ -642,6 +642,7 @@ impl DB for MockDB { _cf: &DbColFam, _key: &Key, _new_value: impl AsRef<[u8]>, + _persist_diffs: bool, ) -> Result<()> { unimplemented!() } @@ -878,11 +879,18 @@ impl DBUpdateVisitor for () { _key: &Key, _cf: &DbColFam, _value: impl AsRef<[u8]>, + _persist_diffs: bool, ) { unimplemented!() } - fn delete(&mut self, _db: &Self::DB, _key: &Key, _cf: &DbColFam) { + fn delete( + &mut self, + _db: &Self::DB, + _key: &Key, + _cf: &DbColFam, + _persist_diffs: bool, + ) { unimplemented!() } From ffefc11da05b9977856e1c5cd7f8e8194904cf7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Mon, 9 Dec 2024 13:26:59 +0000 Subject: [PATCH 8/8] check if the block is full commit on migrations merkle tree update --- crates/node/src/storage/rocksdb.rs | 3 ++- crates/state/src/wl_state.rs | 16 ++++++++++++---- crates/storage/src/db.rs | 1 + crates/storage/src/mockdb.rs | 3 ++- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 1a5ea90850..2cbbd3d725 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -1824,6 +1824,7 @@ impl DB for RocksDB { fn update_last_block_merkle_tree( &self, merkle_tree_stores: namada_vp::state::MerkleTreeStoresWrite<'_>, + is_full_commit: bool, ) -> Result<()> { let state_cf = self.get_column_family(STATE_CF)?; let block_cf = self.get_column_family(BLOCK_CF)?; @@ -1839,7 +1840,7 @@ impl DB for RocksDB { let mut batch = RocksDBWriteBatch::default(); for st in StoreType::iter() { - if st.is_stored_every_block() { + if st.is_stored_every_block() || is_full_commit { let key_prefix = if st.is_stored_every_block() { tree_key_prefix_with_height(st, height) } else { diff --git a/crates/state/src/wl_state.rs b/crates/state/src/wl_state.rs index 739bbc7062..25b120263c 100644 --- a/crates/state/src/wl_state.rs +++ b/crates/state/src/wl_state.rs @@ -616,8 +616,7 @@ where mut batch: D::WriteBatch, ) -> Result<()> { // All states are written only when the first height or a new epoch - let is_full_commit = self.in_mem.block.height.0 == 1 - || self.in_mem.last_epoch != self.in_mem.block.epoch; + let is_full_commit = self.is_full_commit(); // For convenience in tests, fill-in a header if it's missing. // Normally, the header is added in `FinalizeBlock`. @@ -681,10 +680,19 @@ where Ok(()) } + /// A full commit is executed only at the first height or one a new epoch. + pub fn is_full_commit(&self) -> bool { + self.in_mem.block.height.0 == 1 + || self.in_mem.last_epoch != self.in_mem.block.epoch + } + /// Update the merkle tree written for the committed last block pub fn update_last_block_merkle_tree(&self) -> Result<()> { - self.db - .update_last_block_merkle_tree(self.in_mem.block.tree.stores())?; + let is_full_commit = self.is_full_commit(); + self.db.update_last_block_merkle_tree( + self.in_mem.block.tree.stores(), + is_full_commit, + )?; Ok(()) } } diff --git a/crates/storage/src/db.rs b/crates/storage/src/db.rs index d0865494e5..b6e697b186 100644 --- a/crates/storage/src/db.rs +++ b/crates/storage/src/db.rs @@ -304,6 +304,7 @@ pub trait DB: Debug { fn update_last_block_merkle_tree( &self, merkle_tree_stores: MerkleTreeStoresWrite<'_>, + is_full_commit: bool, ) -> Result<()>; } diff --git a/crates/storage/src/mockdb.rs b/crates/storage/src/mockdb.rs index be3c6cd6f1..272e177923 100644 --- a/crates/storage/src/mockdb.rs +++ b/crates/storage/src/mockdb.rs @@ -654,6 +654,7 @@ impl DB for MockDB { fn update_last_block_merkle_tree( &self, merkle_tree_stores: MerkleTreeStoresWrite<'_>, + is_full_commit: bool, ) -> Result<()> { // Read the last block's height let height: BlockHeight = self.read_value(BLOCK_HEIGHT_KEY)?.unwrap(); @@ -664,7 +665,7 @@ impl DB for MockDB { let epoch: Epoch = self.read_value(epoch_key)?.unwrap(); for st in StoreType::iter() { - if st.is_stored_every_block() { + if st.is_stored_every_block() || is_full_commit { let key_prefix = if st.is_stored_every_block() { tree_key_prefix_with_height(st, height) } else {