diff --git a/chain-signatures/contract/src/config/impls.rs b/chain-signatures/contract/src/config/impls.rs index 36c7d5b58..ad8f9823d 100644 --- a/chain-signatures/contract/src/config/impls.rs +++ b/chain-signatures/contract/src/config/impls.rs @@ -46,6 +46,7 @@ impl Default for TripleConfig { min_triples: 1024, max_triples: 1024 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, generation_timeout: min_to_ms(10), + preview_limit: 128, other: Default::default(), } @@ -58,6 +59,7 @@ impl Default for PresignatureConfig { min_presignatures: 512, max_presignatures: 512 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, generation_timeout: secs_to_ms(45), + preview_limit: 128, other: Default::default(), } diff --git a/chain-signatures/contract/src/config/mod.rs b/chain-signatures/contract/src/config/mod.rs index 682e2408f..742da8313 100644 --- a/chain-signatures/contract/src/config/mod.rs +++ b/chain-signatures/contract/src/config/mod.rs @@ -59,6 +59,8 @@ pub struct TripleConfig { pub max_triples: u32, /// Timeout for triple generation in milliseconds. pub generation_timeout: u64, + /// Max amount of Triple IDs allowed to be previewed in state. + pub preview_limit: u32, /// The remaining entries that can be present in future forms of the configuration. #[serde(flatten)] @@ -73,6 +75,8 @@ pub struct PresignatureConfig { pub max_presignatures: u32, /// Timeout for presignature generation in milliseconds. pub generation_timeout: u64, + /// Max amount of Presignature IDs allowed to be previewed in state. + pub preview_limit: u32, /// The remaining entries that can be present in future forms of the configuration. #[serde(flatten)] @@ -110,12 +114,14 @@ mod tests { "triple": { "min_triples": 10, "max_triples": 100, - "generation_timeout": 10000 + "generation_timeout": 10000, + "preview_limit": 128, }, "presignature": { "min_presignatures": 10, "max_presignatures": 100, - "generation_timeout": 10000 + "generation_timeout": 10000, + "preview_limit": 256, }, "signature": { "generation_timeout": 10000, diff --git a/chain-signatures/contract/src/lib.rs b/chain-signatures/contract/src/lib.rs index c75fccb90..41d8eecc9 100644 --- a/chain-signatures/contract/src/lib.rs +++ b/chain-signatures/contract/src/lib.rs @@ -352,6 +352,7 @@ impl VersionedMpcContract { join_votes, .. }) => { + log!("voting for {candidate:?} in {candidates:?}"); let candidate_info = candidates .get(&candidate) .ok_or(VoteError::JoinNotCandidate)?; diff --git a/chain-signatures/contract/src/primitives.rs b/chain-signatures/contract/src/primitives.rs index 9fc321b8e..cae052491 100644 --- a/chain-signatures/contract/src/primitives.rs +++ b/chain-signatures/contract/src/primitives.rs @@ -205,8 +205,8 @@ impl Candidates { self.candidates.insert(account_id, candidate); } - pub fn remove(&mut self, account_id: &AccountId) { - self.candidates.remove(account_id); + pub fn remove(&mut self, account_id: &AccountId) -> Option { + self.candidates.remove(account_id) } pub fn get(&self, account_id: &AccountId) -> Option<&CandidateInfo> { diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 02768ddf3..dfa98c178 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -216,6 +216,14 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); } + let config = Arc::new(RwLock::new(Config::new(LocalConfig { + over: override_config.unwrap_or_else(Default::default), + network: NetworkConfig { + cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?, + sign_sk, + }, + }))); + tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( @@ -228,13 +236,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { sign_queue, key_storage, triple_storage, - Config::new(LocalConfig { - over: override_config.unwrap_or_else(Default::default), - network: NetworkConfig { - cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?, - sign_sk, - }, - }), + config.clone(), ); rt.block_on(async { @@ -243,7 +245,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { tracing::debug!("protocol thread spawned"); let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?; let web_handle = tokio::spawn(async move { - web::run(web_port, sender, cipher_sk, protocol_state, indexer).await + web::run(web_port, sender, cipher_sk, protocol_state, indexer, config).await }); tracing::debug!("protocol http server spawned"); diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 2ecdb958f..1a8ea06a0 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -336,9 +336,8 @@ pub fn run( let Ok(lake) = rt.block_on(async { let latest = context.indexer.latest_block_height().await; - if i > 0 { - tracing::warn!("indexer latest height {latest}, restart count={i}"); - } + tracing::info!("indexer latest height {latest}"); + let mut lake_builder = LakeBuilder::default() .s3_bucket_name(&options.s3_bucket) .s3_region_name(&options.s3_region) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index ef6029ac3..3bc4e89c2 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; @@ -6,6 +6,8 @@ use tokio::sync::RwLock; use url::Url; use crate::protocol::contract::primitives::Participants; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::ProtocolState; use crate::web::StateView; @@ -19,16 +21,19 @@ pub struct Pool { http: reqwest::Client, connections: RwLock, potential_connections: RwLock, - status: RwLock>, - /// The currently active participants for this epoch. current_active: RwLock>, // Potentially active participants that we can use to establish a connection in the next epoch. potential_active: RwLock>, + + pub status: RwLock>, } impl Pool { - pub async fn ping(&self) -> Participants { + pub async fn ping( + &mut self, + previews: Option<(HashSet, HashSet)>, + ) -> Participants { if let Some((ref active, timestamp)) = *self.current_active.read().await { if timestamp.elapsed() < DEFAULT_TIMEOUT { return active.clone(); @@ -37,7 +42,21 @@ impl Pool { let connections = self.connections.read().await; + let mut params = HashMap::new(); + if let Some((triples, presignatures)) = previews { + if !triples.is_empty() { + params.insert("triple_preview", triples); + } + if !presignatures.is_empty() { + params.insert("presignature_preview", presignatures); + } + } + let mut status = self.status.write().await; + // Clear the status before we overwrite it just so we don't have any stale participant + // statuses that are no longer in the network after a reshare. + status.clear(); + let mut participants = Participants::default(); for (participant, info) in connections.iter() { let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { @@ -49,13 +68,21 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url.clone()).send().await else { - tracing::warn!( - "Pool.ping resp err participant {:?} url {}", - participant, - url - ); - continue; + let mut req = self.http.get(url.clone()); + if !params.is_empty() { + req = req.header("content-type", "application/json").json(¶ms); + } + let resp = match req.send().await { + Ok(resp) => resp, + Err(err) => { + tracing::warn!( + ?err, + "Pool.ping resp err participant {:?} url {}", + participant, + url + ); + continue; + } }; let Ok(state): Result = resp.json().await else { @@ -77,7 +104,10 @@ impl Pool { participants } - pub async fn ping_potential(&self) -> Participants { + pub async fn ping_potential( + &mut self, + previews: Option<(HashSet, HashSet)>, + ) -> Participants { if let Some((ref active, timestamp)) = *self.potential_active.read().await { if timestamp.elapsed() < DEFAULT_TIMEOUT { return active.clone(); @@ -86,6 +116,16 @@ impl Pool { let connections = self.potential_connections.read().await; + let mut params = HashMap::new(); + if let Some((triples, presignatures)) = previews { + if !triples.is_empty() { + params.insert("triple_preview", triples); + } + if !presignatures.is_empty() { + params.insert("presignature_preview", presignatures); + } + } + let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { @@ -93,8 +133,21 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url).send().await else { - continue; + let mut req = self.http.get(url.clone()); + if !params.is_empty() { + req = req.header("content-type", "application/json").json(¶ms); + } + let resp = match req.send().await { + Ok(resp) => resp, + Err(err) => { + tracing::warn!( + ?err, + "Pool.ping_potential resp err participant {:?} url {}", + participant, + url + ); + continue; + } }; let Ok(state): Result = resp.json().await else { @@ -104,7 +157,6 @@ impl Pool { status.insert(*participant, state); participants.insert(participant, info.clone()); } - drop(status); let mut potential_active = self.potential_active.write().await; *potential_active = Some((participants.clone(), Instant::now())); @@ -156,6 +208,7 @@ impl Pool { .get(participant) .map_or(false, |state| match state { StateView::Running { is_stable, .. } => *is_stable, + StateView::Resharing { is_stable, .. } => *is_stable, _ => false, }) } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 56b7b9ace..3a7b82dd7 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,5 +1,12 @@ +use std::collections::{HashMap, HashSet}; + +use cait_sith::protocol::Participant; + use crate::protocol::contract::primitives::Participants; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::ProtocolState; +use crate::web::StateView; pub mod connection; @@ -64,7 +71,12 @@ impl Mesh { self.connections .establish_participants(contract_state) .await; - self.ping().await; + } + + /// Ping the active participants such that we can see who is alive. + pub async fn ping(&mut self, previews: Option<(HashSet, HashSet)>) { + self.active_participants = self.connections.ping(previews.clone()).await; + self.active_potential_participants = self.connections.ping_potential(previews).await; tracing::debug!( active = ?self.active_participants.account_ids(), @@ -73,9 +85,7 @@ impl Mesh { ); } - /// Ping the active participants such that we can see who is alive. - pub async fn ping(&mut self) { - self.active_participants = self.connections.ping().await; - self.active_potential_participants = self.connections.ping_potential().await; + pub async fn state_views(&self) -> HashMap { + self.connections.status.read().await.clone() } } diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 8b6065ea2..f67734d0c 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -31,6 +31,7 @@ use url::Url; use near_account_id::AccountId; use near_crypto::InMemorySigner; +#[async_trait::async_trait] pub trait ConsensusCtx { fn my_account_id(&self) -> &AccountId; fn http_client(&self) -> &reqwest::Client; @@ -41,7 +42,7 @@ pub trait ConsensusCtx { fn sign_queue(&self) -> Arc>; fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> LockTripleNodeStorageBox; - fn cfg(&self) -> &Config; + async fn cfg(&self) -> Config; } #[derive(thiserror::Error, Debug)] @@ -658,12 +659,13 @@ impl ConsensusProtocol for JoiningState { tracing::info!( "joining(running): sending a transaction to join the participant set" ); + let cfg = ctx.cfg().await; ctx.rpc_client() .call(ctx.signer(), ctx.mpc_contract_id(), "join") .args_json(json!({ "url": ctx.my_address(), - "cipher_pk": ctx.cfg().local.network.cipher_pk.to_bytes(), - "sign_pk": ctx.cfg().local.network.sign_sk.public_key(), + "cipher_pk": cfg.local.network.cipher_pk.to_bytes(), + "sign_pk": cfg.local.network.sign_sk.public_key(), })) .max_gas() .retry_exponential(10, 3) diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 294a8242c..f35f8d60c 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -23,7 +23,7 @@ pub trait CryptographicCtx { fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; fn secret_storage(&mut self) -> &mut SecretNodeStorageBox; - fn cfg(&self) -> &Config; + async fn cfg(&self) -> Config; /// Active participants is the active participants at the beginning of each protocol loop. fn mesh(&self) -> &Mesh; @@ -74,6 +74,7 @@ impl CryptographicProtocol for GeneratingState { mut self, mut ctx: C, ) -> Result { + let cfg = ctx.cfg().await; tracing::info!(active = ?ctx.mesh().active_participants().keys_vec(), "generating: progressing key generation"); let mut protocol = self.protocol.write().await; loop { @@ -97,10 +98,10 @@ impl CryptographicProtocol for GeneratingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -159,10 +160,10 @@ impl CryptographicProtocol for GeneratingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -191,16 +192,17 @@ impl CryptographicProtocol for WaitingForConsensusState { mut self, ctx: C, ) -> Result { + let cfg = ctx.cfg().await; let failures = self .messages .write() .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -221,6 +223,8 @@ impl CryptographicProtocol for ResharingState { mut self, mut ctx: C, ) -> Result { + let cfg = ctx.cfg().await; + // TODO: we are not using active potential participants here, but we should in the future. // Currently resharing protocol does not timeout and restart with new set of participants. // So if it picks up a participant that is not active, it will never be able to send a message to it. @@ -253,10 +257,10 @@ impl CryptographicProtocol for ResharingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), &active, - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -321,10 +325,10 @@ impl CryptographicProtocol for ResharingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), &active, - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -356,10 +360,12 @@ impl CryptographicProtocol for RunningState { mut self, ctx: C, ) -> Result { - let protocol_cfg = &ctx.cfg().protocol; + let cfg = ctx.cfg().await; + let protocol_cfg = &cfg.protocol; let active = ctx.mesh().active_participants(); + let state_views = ctx.mesh().state_views().await; if active.len() < self.threshold { - tracing::info!( + tracing::warn!( active = ?active.keys_vec(), "running: not enough participants to progress" ); @@ -397,6 +403,7 @@ impl CryptographicProtocol for RunningState { if let Err(err) = presignature_manager .stockpile( active, + &state_views, &self.public_key, &self.private_share, &mut triple_manager, @@ -445,6 +452,7 @@ impl CryptographicProtocol for RunningState { signature_manager.handle_requests( self.threshold, &stable, + &state_views, my_requests, &mut presignature_manager, protocol_cfg, @@ -463,7 +471,7 @@ impl CryptographicProtocol for RunningState { let failures = messages .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), active, protocol_cfg, diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index 77ba0b2cb..d071c4f82 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -23,7 +23,7 @@ use tokio::sync::RwLock; pub trait MessageCtx { async fn me(&self) -> Participant; fn mesh(&self) -> &Mesh; - fn cfg(&self) -> &crate::config::Config; + async fn cfg(&self) -> crate::config::Config; } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -232,7 +232,7 @@ impl MessageHandler for RunningState { ctx: C, queue: &mut MpcMessageQueue, ) -> Result<(), MessageHandleError> { - let protocol_cfg = &ctx.cfg().protocol; + let protocol_cfg = &ctx.cfg().await.protocol; let participants = ctx.mesh().active_participants(); let mut triple_manager = self.triple_manager.write().await; diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 3762c9400..8a12d1e37 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -50,10 +50,11 @@ struct Ctx { sign_queue: Arc>, secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, - cfg: Config, + cfg: Arc>, mesh: Mesh, } +#[async_trait::async_trait] impl ConsensusCtx for &mut MpcSignProtocol { fn my_account_id(&self) -> &AccountId { &self.ctx.account_id @@ -87,8 +88,8 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.secret_storage } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } fn triple_storage(&self) -> LockTripleNodeStorageBox { @@ -122,8 +123,8 @@ impl CryptographicCtx for &mut MpcSignProtocol { &mut self.ctx.secret_storage } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } fn mesh(&self) -> &Mesh { @@ -141,8 +142,8 @@ impl MessageCtx for &MpcSignProtocol { &self.ctx.mesh } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } } @@ -164,7 +165,7 @@ impl MpcSignProtocol { sign_queue: Arc>, secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, - cfg: Config, + cfg: Arc>, ) -> (Self, Arc>) { let my_address = my_address.into_url().unwrap(); let rpc_url = rpc_client.rpc_addr(); @@ -213,15 +214,17 @@ impl MpcSignProtocol { let mut last_state_update = Instant::now(); let mut last_config_update = Instant::now(); let mut last_pinged = Instant::now(); + let mut require_ping = false; // Sets the latest configurations from the contract: - if let Err(err) = self - .ctx - .cfg - .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) - .await { - tracing::error!("could not fetch contract's config on startup: {err:?}"); + let mut cfg = self.ctx.cfg.write().await; + if let Err(err) = cfg + .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) + .await + { + tracing::error!("could not fetch contract's config on startup: {err:?}"); + } } loop { @@ -263,6 +266,7 @@ impl MpcSignProtocol { // set which participants are currently active in the protocol and determines who will be // receiving messages. self.ctx.mesh.establish_participants(&contract_state).await; + require_ping = true; last_state_update = Instant::now(); Some(contract_state) @@ -272,9 +276,8 @@ impl MpcSignProtocol { if last_config_update.elapsed() > Duration::from_secs(5 * 60) { // Sets the latest configurations from the contract: - if let Err(err) = self - .ctx - .cfg + let mut cfg = self.ctx.cfg.write().await; + if let Err(err) = cfg .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) .await { @@ -283,16 +286,19 @@ impl MpcSignProtocol { last_config_update = Instant::now(); } - if last_pinged.elapsed() > Duration::from_millis(300) { - self.ctx.mesh.ping().await; - last_pinged = Instant::now(); - } - let state = { let guard = self.state.read().await; guard.clone() }; + if require_ping || last_pinged.elapsed() > Duration::from_millis(300) { + let protocol_cfg = &self.ctx.cfg.read().await.protocol; + let planned_previews = state.plan_preview(protocol_cfg).await; + self.ctx.mesh.ping(planned_previews).await; + last_pinged = Instant::now(); + require_ping = false; + } + let crypto_time = Instant::now(); let mut state = match state.progress(&mut self).await { Ok(state) => { diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 0859d0228..5bd306437 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -3,6 +3,7 @@ use super::triple::{Triple, TripleId, TripleManager}; use crate::protocol::contract::primitives::Participants; use crate::types::{PresignatureProtocol, SecretKeyShare}; use crate::util::AffinePointExt; +use crate::web::StateView; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; use cait_sith::{KeygenOutput, PresignArguments, PresignOutput}; @@ -107,7 +108,7 @@ pub struct PresignatureManager { /// Ongoing presignature generation protocols. generators: HashMap, /// List of presignature ids generation of which was initiated by the current node. - mine: VecDeque, + pub mine: VecDeque, /// The set of presignatures that were introduced to the system by the current node. introduced: HashSet, /// Garbage collection for presignatures that have either been taken or failed. This @@ -173,7 +174,7 @@ impl PresignatureManager { #[allow(clippy::too_many_arguments)] fn generate_internal( - participants: &Participants, + participants: &[Participant], me: Participant, threshold: usize, triple0: Triple, @@ -183,13 +184,12 @@ impl PresignatureManager { mine: bool, timeout: u64, ) -> Result { - let participants: Vec<_> = participants.keys().cloned().collect(); let protocol = Box::new(cait_sith::presign( - &participants, + participants, me, // These paramaters appear to be to make it easier to use different indexing schemes for triples // Introduced in this PR https://github.com/LIT-Protocol/cait-sith/pull/7 - &participants, + participants, me, PresignArguments { triple0: (triple0.share, triple0.public), @@ -203,7 +203,7 @@ impl PresignatureManager { )?); Ok(PresignatureGenerator::new( protocol, - participants, + participants.into(), triple0.id, triple1.id, mine, @@ -214,7 +214,7 @@ impl PresignatureManager { /// Starts a new presignature generation protocol. pub fn generate( &mut self, - participants: &Participants, + participants: &[Participant], triple0: Triple, triple1: Triple, public_key: &PublicKey, @@ -260,57 +260,118 @@ impl PresignatureManager { pub async fn stockpile( &mut self, active: &Participants, + state_views: &HashMap, pk: &PublicKey, sk_share: &SecretKeyShare, triple_manager: &mut TripleManager, cfg: &ProtocolConfig, ) -> Result<(), InitializationError> { - let not_enough_presignatures = { + let enough_presignatures = { // Stopgap to prevent too many presignatures in the system. This should be around min_presig*nodes*2 // for good measure so that we have enough presignatures to do sig generation while also maintain // the minimum number of presignature where a single node can't flood the system. - if self.potential_len() >= cfg.presignature.max_presignatures as usize { + if self.potential_len() < cfg.presignature.max_presignatures as usize { false } else { // We will always try to generate a new triple if we have less than the minimum - self.my_len() < cfg.presignature.min_presignatures as usize - && self.introduced.len() < cfg.max_concurrent_introduction as usize + self.my_len() >= cfg.presignature.min_presignatures as usize + || self.introduced.len() >= cfg.max_concurrent_introduction as usize } }; - if not_enough_presignatures { - tracing::trace!("not enough presignatures, generating"); - // To ensure there is no contention between different nodes we are only using triples - // that we proposed. This way in a non-BFT environment we are guaranteed to never try - // to use the same triple as any other node. - if let Some((triple0, triple1)) = triple_manager.take_two_mine().await { - let presig_participants = active - .intersection(&[&triple0.public.participants, &triple1.public.participants]); - if presig_participants.len() < self.threshold { - tracing::debug!( - participants = ?presig_participants.keys_vec(), - "running: the intersection of participants is less than the threshold" - ); - - // Insert back the triples to be used later since this active set of - // participants were not able to make use of these triples. - triple_manager.insert_mine(triple0).await; - triple_manager.insert_mine(triple1).await; + if enough_presignatures { + return Ok(()); + } + + tracing::trace!("not enough presignatures, generating"); + // To ensure there is no contention between different nodes we are only using triples + // that we proposed. This way in a non-BFT environment we are guaranteed to never try + // to use the same triple as any other node. + let Some((triple0, triple1)) = triple_manager.peek_two_mine() else { + tracing::debug!( + triple_mine = triple_manager.my_len(), + triple_potential = triple_manager.potential_len(), + "running: we don't have enough triples to generate a presignature" + ); + return Ok(()); + }; + let id0 = triple0.id; + let id1 = triple1.id; + let presig_participants = + active.intersection(&[&triple0.public.participants, &triple1.public.participants]); + if presig_participants.len() < self.threshold { + tracing::warn!( + id0, + id1, + threshold = self.threshold, + triple0 = ?triple0.public.participants, + triple1 = ?triple1.public.participants, + active = ?active.keys_vec(), + "running: the intersection of participants is less than the threshold" + ); + return Ok(()); + } + + // Filter out the active participants with the state views that have the triples we want to use. + let active_filtered = presig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))) + .filter(|(_, state_view)| { + if let StateView::Running { + triple_postview, .. + } = state_view + { + triple_postview.contains(&triple0.id) && triple_postview.contains(&triple1.id) } else { - self.generate( - &presig_participants, - triple0, - triple1, - pk, - sk_share, - cfg.presignature.generation_timeout, - )?; + false } - } else { - tracing::debug!("running: we don't have enough triples to generate a presignature"); - } + }) + .map(|(p, _)| p) + .collect::>(); + + if active_filtered.len() < self.threshold { + tracing::warn!( + id0, + id1, + threshold = self.threshold, + triple0 = ?triple0.public.participants, + triple1 = ?triple1.public.participants, + active = ?active.keys_vec(), + ?active_filtered, + ?state_views, + "running: filtered participants are less than threshold for presignature generation" + ); + return Ok(()); } + // Actually take the triples now that we have done the necessary checks. + let Some((triple0, triple1)) = triple_manager.take_two_mine().await else { + tracing::warn!( + id0, + id1, + potential = triple_manager.potential_len(), + "running: popping after peeking should have succeeded", + ); + return Ok(()); + }; + + if let Err(err @ InitializationError::BadParameters(_)) = self.generate( + &active_filtered, + triple0, + triple1, + pk, + sk_share, + cfg.presignature.generation_timeout, + ) { + tracing::warn!( + id0, + id1, + ?err, + ?active_filtered, + "we had to trash two triples due to bad parameters" + ); + return Err(err); + } Ok(()) } @@ -382,7 +443,7 @@ impl PresignatureManager { }, }; let generator = Self::generate_internal( - participants, + &participants.keys_vec(), self.me, self.threshold, triple0, @@ -536,6 +597,14 @@ impl PresignatureManager { messages } + + pub fn preview(&self, presignatures: &HashSet) -> HashSet { + presignatures + .iter() + .filter(|id| self.presignatures.contains_key(id)) + .cloned() + .collect() + } } pub fn hash_as_id(triple0: TripleId, triple1: TripleId) -> PresignatureId { diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 19d2e4f7e..ae7b4edb0 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -5,7 +5,7 @@ use crate::indexer::ContractSignRequest; use crate::kdf::{derive_delta, into_eth_sig}; use crate::types::SignatureProtocol; use crate::util::AffinePointExt; -use near_primitives::hash::CryptoHash; +use crate::web::StateView; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; use cait_sith::{FullSignature, PresignOutput}; @@ -105,12 +105,6 @@ impl SignQueue { my_account_id: &AccountId, ) { if stable.len() < threshold { - tracing::warn!( - "Require at least {} stable participants to organize, got {}: {:?}", - threshold, - stable.len(), - stable.keys_vec() - ); return; } for request in self.unorganized_requests.drain(..) { @@ -165,7 +159,7 @@ pub struct SignatureGenerator { pub presignature_id: PresignatureId, pub request: ContractSignRequest, pub epsilon: Scalar, - pub receipt_id: CryptoHash, + pub receipt_id: ReceiptId, pub entropy: [u8; 32], pub sign_request_timestamp: Instant, pub generator_timestamp: Instant, @@ -182,7 +176,7 @@ impl SignatureGenerator { presignature_id: PresignatureId, request: ContractSignRequest, epsilon: Scalar, - receipt_id: CryptoHash, + receipt_id: ReceiptId, entropy: [u8; 32], sign_request_timestamp: Instant, cfg: &ProtocolConfig, @@ -227,7 +221,7 @@ pub struct GenerationRequest { pub proposer: Participant, pub request: ContractSignRequest, pub epsilon: Scalar, - pub receipt_id: CryptoHash, + pub receipt_id: ReceiptId, pub entropy: [u8; 32], pub sign_request_timestamp: Instant, } @@ -304,14 +298,13 @@ impl SignatureManager { #[allow(clippy::too_many_arguments)] #[allow(clippy::result_large_err)] fn generate_internal( - participants: &Participants, + participants: &[Participant], me: Participant, public_key: PublicKey, presignature: Presignature, req: GenerationRequest, cfg: &ProtocolConfig, ) -> Result { - let participants = participants.keys_vec(); let GenerationRequest { proposer, request, @@ -331,7 +324,7 @@ impl SignatureManager { let presignature_id = presignature.id; let protocol = Box::new( cait_sith::sign( - &participants, + participants, me, derive_key(public_key, epsilon), output, @@ -341,7 +334,7 @@ impl SignatureManager { ); Ok(SignatureGenerator::new( protocol, - participants, + participants.into(), proposer, presignature_id, request, @@ -359,10 +352,10 @@ impl SignatureManager { receipt_id: ReceiptId, req: GenerationRequest, presignature: Presignature, - participants: &Participants, + participants: &[Participant], cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { - tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); + tracing::info!(%receipt_id, ?participants, "restarting failed protocol to generate signature"); let generator = Self::generate_internal( participants, self.me, @@ -383,7 +376,7 @@ impl SignatureManager { #[allow(clippy::result_large_err)] pub fn generate( &mut self, - participants: &Participants, + participants: &[Participant], receipt_id: ReceiptId, presignature: Presignature, request: ContractSignRequest, @@ -396,7 +389,7 @@ impl SignatureManager { %receipt_id, me = ?self.me, presignature_id = presignature.id, - participants = ?participants.keys_vec(), + ?participants, "starting protocol to generate a new signature", ); let generator = Self::generate_internal( @@ -465,7 +458,7 @@ impl SignatureManager { }; tracing::info!(me = ?self.me, presignature_id, "found presignature: ready to start signature generation"); let generator = match Self::generate_internal( - participants, + &participants.keys_vec(), self.me, self.public_key, presignature, @@ -608,13 +601,14 @@ impl SignatureManager { &mut self, threshold: usize, stable: &Participants, + state_views: &HashMap, my_requests: &mut ParticipantRequests, presignature_manager: &mut PresignatureManager, cfg: &ProtocolConfig, ) { if stable.len() < threshold { tracing::warn!( - "Require at least {} stable participants to handle_requests, got {}: {:?}", + "require at least {} stable participants to handle_requests, got {}: {:?}", threshold, stable.len(), stable.keys_vec() @@ -622,68 +616,114 @@ impl SignatureManager { return; } let mut failed_presigs = Vec::new(); - while let Some(mut presignature) = { + let mut alternate = false; + while let Some(presignature) = { if self.failed.is_empty() && my_requests.is_empty() { None } else { presignature_manager.take_mine() } } { + let id = presignature.id; let sig_participants = stable.intersection(&[&presignature.participants]); if sig_participants.len() < threshold { tracing::warn!( + id, + threshold, + stable = ?stable.keys_vec(), participants = ?sig_participants.keys_vec(), - "intersection of stable participants and presignature participants is less than threshold" + "intersection of stable and presignature participants is less than threshold" + ); + failed_presigs.push(presignature); + continue; + } + + // Filter out the active participants with the state views that have the triples we want to use. + let stable_filtered = sig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))) + .filter(|(_, state_view)| { + if let StateView::Running { + presignature_postview, + .. + } = state_view + { + presignature_postview.contains(&presignature.id) + } else { + false + } + }) + .map(|(p, _)| p) + .collect::>(); + + if stable_filtered.len() < threshold { + tracing::warn!( + id, + threshold, + stable = ?stable.keys_vec(), + participants = ?presignature.participants, + ?state_views, + "unable to use presignature for signature generation", ); failed_presigs.push(presignature); continue; } - let presig_id = presignature.id; // NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures. // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of // when the request made it into the NEAR network. // issue: https://github.com/near/mpc-recovery/issues/596 - if let Some((receipt_id, failed_req)) = self.failed.pop_front() { + + alternate = !alternate; + if alternate && !self.failed.is_empty() { + let Some((receipt_id, failed_req)) = self.failed.pop_front() else { + failed_presigs.push(presignature); + continue; + }; + if let Err((presignature, InitializationError::BadParameters(err))) = self .retry_failed_generation( receipt_id, failed_req, presignature, - &sig_participants, + &stable_filtered, cfg, ) { - tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature"); failed_presigs.push(presignature); - continue; + tracing::warn!( + %receipt_id, + id, + ?stable_filtered, + ?err, + "failed to retry signature generation: trashing presignature", + ); } - - if let Some(another_presignature) = presignature_manager.take_mine() { - presignature = another_presignature; - } else { - break; + } else { + let Some((receipt_id, my_request)) = my_requests.pop_front() else { + failed_presigs.push(presignature); + continue; + }; + if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( + &stable_filtered, + receipt_id, + presignature, + my_request.request, + my_request.epsilon, + my_request.entropy, + my_request.time_added, + cfg, + ) { + failed_presigs.push(presignature); + tracing::warn!( + %receipt_id, + id, + ?stable_filtered, + ?err, + "failed to start signature generation: trashing presignature", + ); } } - - let Some((receipt_id, my_request)) = my_requests.pop_front() else { - failed_presigs.push(presignature); - continue; - }; - if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( - &sig_participants, - receipt_id, - presignature, - my_request.request, - my_request.epsilon, - my_request.entropy, - my_request.time_added, - cfg, - ) { - failed_presigs.push(presignature); - tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature"); - continue; - } } // add back the failed presignatures that were incompatible to be made into diff --git a/chain-signatures/node/src/protocol/state.rs b/chain-signatures/node/src/protocol/state.rs index 58fd4a506..c6e4b9c4c 100644 --- a/chain-signatures/node/src/protocol/state.rs +++ b/chain-signatures/node/src/protocol/state.rs @@ -1,9 +1,9 @@ use super::contract::primitives::{ParticipantInfo, Participants}; use super::cryptography::CryptographicError; use super::monitor::StuckMonitor; -use super::presignature::PresignatureManager; +use super::presignature::{PresignatureId, PresignatureManager}; use super::signature::SignatureManager; -use super::triple::TripleManager; +use super::triple::{TripleId, TripleManager}; use super::SignQueue; use crate::http_client::MessageQueue; use crate::storage::triple_storage::TripleData; @@ -11,8 +11,10 @@ use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use cait_sith::protocol::Participant; use crypto_shared::PublicKey; +use mpc_contract::config::ProtocolConfig; use near_account_id::AccountId; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -205,6 +207,34 @@ impl NodeState { NodeState::Joining(state) => state.participants.find_participant_info(account_id), } } + + pub async fn plan_preview( + &self, + cfg: &ProtocolConfig, + ) -> Option<(HashSet, HashSet)> { + match self { + NodeState::Running(state) => { + let triple_manager = state.triple_manager.read().await; + let triple_preview = triple_manager + .mine + .iter() + .take(cfg.triple.preview_limit as usize) + .cloned() + .collect(); + + let presignature_manager = state.presignature_manager.read().await; + let presignature_preview = presignature_manager + .mine + .iter() + .take(cfg.presignature.preview_limit as usize) + .cloned() + .collect(); + + Some((triple_preview, presignature_preview)) + } + _ => None, + } + } } fn fetch_participant<'a>( diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index ab49ba48a..3e9424f9c 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -359,8 +359,7 @@ impl TripleManager { let id1 = self.mine.pop_front()?; tracing::info!(id0, id1, me = ?self.me, "trying to take two mine triples"); - let take_two_result = self.take_two(id0, id1).await; - match take_two_result { + match self.take_two(id0, id1).await { Err(error) if matches!( error, @@ -390,6 +389,17 @@ impl TripleManager { } } + pub fn peek_two_mine(&self) -> Option<(&Triple, &Triple)> { + if self.mine.len() < 2 { + return None; + } + let id0 = self.mine.front()?; + let id1 = self.mine.get(1)?; + let triple0 = self.triples.get(id0)?; + let triple1 = self.triples.get(id1)?; + Some((triple0, triple1)) + } + pub async fn insert_mine(&mut self, triple: Triple) { tracing::trace!(id = triple.id, "inserting mine triple"); self.mine.push_back(triple.id); @@ -608,6 +618,14 @@ impl TripleManager { let _ = tokio_retry::Retry::spawn(retry_strategy, action).await; } } + + pub fn preview(&self, triples: &HashSet) -> HashSet { + triples + .iter() + .filter(|id| self.triples.contains_key(id)) + .cloned() + .collect() + } } #[cfg(test)] diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index c38e2b4c6..9ebf58700 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -1,8 +1,11 @@ mod error; use self::error::Error; +use crate::config::Config; use crate::indexer::Indexer; use crate::protocol::message::SignedMessage; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::{MpcMessage, NodeState}; use crate::web::error::Result; use anyhow::Context; @@ -15,6 +18,7 @@ use mpc_keys::hpke::{self, Ciphered}; use near_primitives::types::BlockHeight; use prometheus::{Encoder, TextEncoder}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc::Sender, RwLock}; @@ -23,6 +27,7 @@ struct AxumState { protocol_state: Arc>, cipher_sk: hpke::SecretKey, indexer: Indexer, + config: Arc>, } pub async fn run( @@ -31,6 +36,7 @@ pub async fn run( cipher_sk: hpke::SecretKey, protocol_state: Arc>, indexer: Indexer, + config: Arc>, ) -> anyhow::Result<()> { tracing::debug!("running a node"); let axum_state = AxumState { @@ -38,6 +44,7 @@ pub async fn run( protocol_state, cipher_sk, indexer, + config, }; let app = Router::new() @@ -99,6 +106,14 @@ async fn msg( } #[derive(Debug, Serialize, Deserialize)] +pub struct StateParams { + #[serde(default)] + pub triple_preview: Vec, + #[serde(default)] + pub presignature_preview: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] #[non_exhaustive] @@ -108,9 +123,13 @@ pub enum StateView { triple_count: usize, triple_mine_count: usize, triple_potential_count: usize, + #[serde(default, skip_serializing_if = "HashSet::is_empty")] + triple_postview: HashSet, presignature_count: usize, presignature_mine_count: usize, presignature_potential_count: usize, + #[serde(default, skip_serializing_if = "HashSet::is_empty")] + presignature_postview: HashSet, latest_block_height: BlockHeight, is_stable: bool, }, @@ -128,10 +147,14 @@ pub enum StateView { } #[tracing::instrument(level = "debug", skip_all)] -async fn state(Extension(state): Extension>) -> Result> { +async fn state( + Extension(state): Extension>, + params: Option>, +) -> Result> { tracing::trace!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; let is_stable = state.indexer.is_on_track().await; + let config = state.config.read().await; let protocol_state = state.protocol_state.read().await; match &*protocol_state { @@ -146,14 +169,38 @@ async fn state(Extension(state): Extension>) -> Result anyhow::Result<()> { let vote_futures = accounts .iter() .map(|account| { - tracing::info!( - "{} voting for new participant: {}", - account.id(), - account_id - ); + tracing::info!("{} voting for new participant: {}", account.id(), candidate); account .call(mpc_contract, "vote_join") .args_json(serde_json::json!({ - "candidate": account_id + "candidate": candidate })) .transact() }) .collect::>(); - futures::future::join_all(vote_futures) + let successes = futures::future::join_all(vote_futures) .await .iter() - .for_each(|result| { - assert!(result.as_ref().unwrap().failures().is_empty()); + .fold(0, |acc, next| { + let outcome = next.as_ref().unwrap(); + if outcome.is_failure() { + tracing::error!("voting failed: {:?}", outcome); + } + + let val = outcome.is_success() as usize; + acc + val }); + assert!( + successes >= threshold, + "Voting did not pass the threshold={} for new participant to join", + threshold + ); Ok(()) } diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index 2cb2cad70..2476b9743 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -28,7 +28,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add participant 3"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -39,7 +39,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { state.participants.keys().nth(2).unwrap().clone().as_ref(), ) .unwrap(); - assert!(ctx.remove_participant(Some(&account_2)).await.is_ok()); + ctx.remove_participant(Some(&account_2)).await.unwrap(); let account_0 = near_workspaces::types::AccountId::from_str( state.participants.keys().next().unwrap().clone().as_ref(), ) @@ -63,7 +63,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add back participant 0"); - assert!(ctx.add_participant(Some(node_cfg_0)).await.is_ok()); + ctx.add_participant(Some(node_cfg_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index 74fab5526..9323a6e72 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&self.cfg, &node_account).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; // T number of participants should vote let participants = self.participant_accounts().await?; @@ -70,13 +70,14 @@ impl MultichainTestContext<'_> { .take(state.threshold) .cloned() .collect::>(); - assert!(vote_join( + vote_join( &voting_participants, self.contract().id(), node_account.id(), + self.cfg.threshold, ) .await - .is_ok()); + .unwrap(); let new_state = wait_for::running_mpc(self, Some(state.epoch + 1)).await?; assert_eq!(new_state.participants.len(), state.participants.len() + 1);