From 108523845ab3156c749c76888e9cace5604eb008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ceyhun=20=C5=9Een?= Date: Thu, 9 Jan 2025 16:01:23 +0300 Subject: [PATCH 1/2] Parallelism for deposits in aggregator (#391) * WIP * aggregator: Fix first part of errors. * aggregator: Turn tasks return type into result. * Make it compile (#393) * aggregator: Fix error handlings for the new tasks. * aggregator: Remove unwraps. * aggregator: Spawn deposit tasks in a thread. * aggregator: Don't spawn threads for distributor tasks. * aggregator: Update comments. --------- Co-authored-by: Ekrem BAL Co-authored-by: Roman --- core/src/errors.rs | 12 +- core/src/rpc/aggregator.rs | 543 ++++++++++++++++++++++++------------- core/src/rpc/operator.rs | 2 +- core/tests/rpc.rs | 55 ---- 4 files changed, 365 insertions(+), 247 deletions(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index ecddf86d..0e99857c 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -88,12 +88,18 @@ pub enum BridgeError { /// Merkle Proof Error #[error("MerkleProofError")] MerkleProofError, - /// JSON RPC call failed + #[error("JsonRpcError: {0}")] JsonRpcError(#[from] jsonrpsee::core::client::Error), - /// RPC interface requires a parameter #[error("RPC function field {0} is required!")] - RPCRequiredFieldError(&'static str), + RPCRequiredParam(&'static str), + #[error("RPC function parameter {0} is malformed: {1}")] + RPCParamMalformed(&'static str, String), + #[error("RPC stream ended unexpectedly: {0}")] + RPCStreamEndedUnexpectedly(String), + #[error("Invalid response from an RPC endpoint: {0}")] + RPCInvalidResponse(String), + /// ConfigError is returned when the configuration is invalid #[error("ConfigError: {0}")] ConfigError(String), diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 6b23b025..f89b0826 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -1,6 +1,8 @@ use super::clementine::{ - clementine_aggregator_server::ClementineAggregator, DepositParams, Empty, RawSignedMoveTx, + clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params, + DepositParams, Empty, RawSignedMoveTx, VerifierDepositFinalizeParams, }; +use crate::rpc::clementine::clementine_verifier_client::ClementineVerifierClient; use crate::{ aggregator::Aggregator, builder::sighash::{calculate_num_required_sigs, create_nofn_sighash_stream}, @@ -9,10 +11,225 @@ use crate::{ rpc::clementine::{self, DepositSignSession}, ByteArray32, ByteArray66, EVMAddress, }; -use bitcoin::{hashes::Hash, Amount}; -use futures::{future::try_join_all, stream::BoxStream, FutureExt, StreamExt}; -use std::pin::pin; -use tonic::{async_trait, Request, Response, Status}; +use bitcoin::{hashes::Hash, Amount, TapSighash}; +use futures::{future::try_join_all, stream::BoxStream, FutureExt, Stream, StreamExt}; +use std::thread; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tonic::{async_trait, Request, Response, Status, Streaming}; + +struct AggNonceQueueItem { + agg_nonce: ByteArray66, + sighash: TapSighash, +} + +struct FinalSigQueueItem { + final_sig: Vec, +} + +/// Collects public nonces from given streams and aggregates them. +async fn nonce_aggregator( + mut nonce_streams: Vec< + impl Stream> + Unpin + Send + 'static, + >, + mut sighash_stream: impl Stream> + Unpin + Send + 'static, + agg_nonce_sender: Sender, +) -> Result<(), BridgeError> { + while let Ok(sighash) = sighash_stream.next().await.transpose() { + let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async { + s.next().await.ok_or_else(|| { + BridgeError::RPCStreamEndedUnexpectedly("Not enough nonces".into()) + })? + })) + .await?; + + let agg_nonce = aggregate_nonces(pub_nonces); + + agg_nonce_sender + .send(AggNonceQueueItem { + agg_nonce, + sighash: sighash.ok_or(BridgeError::RPCStreamEndedUnexpectedly( + "Not enough sighashes".into(), + ))?, + }) + .await + .map_err(|e| { + BridgeError::RPCStreamEndedUnexpectedly(format!( + "Can't send aggregated nonces: {}", + e + )) + })?; + } + + Ok(()) +} + +/// Reroutes aggregated nonces to the signature aggregator. +async fn nonce_distributor( + mut agg_nonce_receiver: Receiver, + mut partial_sig_streams: Vec<( + Streaming, + Sender, + )>, + partial_sig_sender: Sender<(Vec, AggNonceQueueItem)>, +) -> Result<(), BridgeError> { + while let Some(queue_item) = agg_nonce_receiver.recv().await { + let agg_nonce_wrapped = clementine::VerifierDepositSignParams { + params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce( + queue_item.agg_nonce.0.to_vec(), + )), + }; + + for (_, tx) in partial_sig_streams.iter_mut() { + tx.send(agg_nonce_wrapped.clone()).await.map_err(|e| { + BridgeError::RPCStreamEndedUnexpectedly(format!( + "Can't send aggregated nonces: {}", + e + )) + })?; + } + + let partial_sigs = try_join_all(partial_sig_streams.iter_mut().map(|(stream, _)| async { + let partial_sig = stream + .message() + .await? + .ok_or(BridgeError::Error("No partial sig received".into()))?; + + Ok::<_, BridgeError>(ByteArray32(partial_sig.partial_sig.try_into().unwrap())) + })) + .await?; + + partial_sig_sender + .send((partial_sigs, queue_item)) + .await + .map_err(|e| { + BridgeError::RPCStreamEndedUnexpectedly(format!("Can't send partial sigs: {}", e)) + })?; + } + + Ok(()) +} + +/// Collects partial signatures from given stream and aggregates them. +async fn signature_aggregator( + mut partial_sig_receiver: Receiver<(Vec, AggNonceQueueItem)>, + verifiers_public_keys: Vec, + final_sig_sender: Sender, +) -> Result<(), BridgeError> { + while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await { + let final_sig = crate::musig2::aggregate_partial_signatures( + verifiers_public_keys.clone(), + None, + false, + &queue_item.agg_nonce, + partial_sigs, + ByteArray32(queue_item.sighash.to_byte_array()), + )?; + + final_sig_sender + .send(FinalSigQueueItem { + final_sig: final_sig.to_vec(), + }) + .await + .map_err(|e| { + BridgeError::RPCStreamEndedUnexpectedly(format!("Can't send final sigs: {}", e)) + })?; + } + + Ok(()) +} + +/// Reroutes aggregated signatures to the caller. +async fn signature_distributor( + mut final_sig_receiver: Receiver, + deposit_finalize_sender: Vec>, +) -> Result<(), BridgeError> { + while let Some(queue_item) = final_sig_receiver.recv().await { + let final_params = VerifierDepositFinalizeParams { + params: Some(verifier_deposit_finalize_params::Params::SchnorrSig( + queue_item.final_sig, + )), + }; + + for tx in &deposit_finalize_sender { + tx.send(final_params.clone()).await.map_err(|e| { + BridgeError::RPCStreamEndedUnexpectedly(format!("Can't send final params: {}", e)) + })?; + } + } + + Ok(()) +} + +/// Creates a stream of nonces from verifiers. +/// This will automatically get's the first response from the verifiers. +/// +/// # Returns +/// +/// - Vec<[`clementine::NonceGenFirstResponse`]>: First response from each verifier +/// - Vec>>: Stream of nonces from each verifier +async fn create_nonce_streams( + verifier_clients: Vec>, + num_nonces: u32, +) -> Result< + ( + Vec, + Vec>>, + ), + BridgeError, +> { + let mut nonce_streams = try_join_all(verifier_clients.into_iter().map(|client| { + let mut client = client.clone(); + + async move { + let response_stream = client + .nonce_gen(tonic::Request::new(clementine::NonceGenRequest { + num_nonces, + })) + .await?; + + Ok::<_, Status>(response_stream.into_inner()) + } + })) + .await?; + + // Get the first responses. + let first_responses: Vec = + try_join_all(nonce_streams.iter_mut().map(|stream| async { + let nonce_gen_first_response = stream + .message() + .await? + .ok_or(BridgeError::RPCStreamEndedUnexpectedly( + "NonceGen returns nothing".to_string(), + ))? + .response + .ok_or(BridgeError::RPCStreamEndedUnexpectedly( + "NonceGen response field is empty".to_string(), + ))?; + + if let clementine::nonce_gen_response::Response::FirstResponse( + nonce_gen_first_response, + ) = nonce_gen_first_response + { + Ok(nonce_gen_first_response) + } else { + Err(BridgeError::RPCInvalidResponse( + "NonceGen response is not FirstResponse".to_string(), + )) + } + })) + .await?; + + let transformed_streams: Vec>> = nonce_streams + .into_iter() + .map(|stream| { + stream + .map(|result| Aggregator::extract_pub_nonce(result?.response)) + .boxed() + }) + .collect(); + + Ok((first_responses, transformed_streams)) +} impl Aggregator { // Extracts pub_nonce from given stream. @@ -31,75 +248,6 @@ impl Aggregator { )), } } - - /// Creates a stream of nonces from verifiers. - /// This will automatically get's the first response from the verifiers. - async fn create_nonce_streams( - &self, - num_nonces: u32, - ) -> Result< - ( - Vec, - Vec>>, - ), - BridgeError, - > { - // generate nonces from all verifiers - let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|client| { - // Clone each client to avoid mutable borrow. - // https://github.com/hyperium/tonic/issues/33#issuecomment-538150828 - let mut client = client.clone(); - - async move { - let response_stream = client - .nonce_gen(tonic::Request::new(clementine::NonceGenRequest { - num_nonces, - })) - .await?; - - Ok::<_, Status>(response_stream.into_inner()) - } - })) - .await?; - - // Get the first responses from each stream - let first_responses: Vec = - try_join_all(nonce_streams.iter_mut().map(|s| async { - let nonce_gen_first_response = s.message().await?; - let nonce_gen_first_response = nonce_gen_first_response - .ok_or(BridgeError::Error("NonceGen returns nothing".to_string()))? - .response - .ok_or(BridgeError::Error( - "NonceGen response field is empty".to_string(), - ))?; - // this response is an enum, so we need to match on it - match nonce_gen_first_response { - clementine::nonce_gen_response::Response::FirstResponse( - nonce_gen_first_response, - ) => Ok(nonce_gen_first_response), - _ => Err(BridgeError::Error( - "NonceGen response is not FirstResponse".to_string(), - )), - } - })) - .await?; - - let transformed_streams: Vec>> = nonce_streams - .into_iter() - .map(|stream| { - stream - .map(|result| { - result - .map_err(BridgeError::from) - .and_then(|nonce_gen_response| { - Self::extract_pub_nonce(nonce_gen_response.response) - }) - }) - .boxed() - }) - .collect(); - Ok((first_responses, transformed_streams)) - } } #[async_trait] @@ -189,7 +337,7 @@ impl ClementineAggregator for Aggregator { Ok(Response::new(Empty {})) } - // #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] + #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] async fn new_deposit( &self, deposit_params_req: Request, @@ -202,46 +350,50 @@ impl ClementineAggregator for Aggregator { .clone() .deposit_outpoint .ok_or(Status::internal("No deposit outpoint received")) - .unwrap() + .clone()? + .try_into()?; + let evm_address: EVMAddress = deposit_params + .evm_address + .clone() .try_into() - .unwrap(); - let evm_address: EVMAddress = deposit_params.clone().evm_address.try_into().unwrap(); + .map_err(|e: &str| BridgeError::RPCParamMalformed("evm_address", e.to_string()))?; let recovery_taproot_address = deposit_params .clone() .recovery_taproot_address + .clone() .parse::>() - .unwrap(); - let user_takes_after = deposit_params.clone().user_takes_after; + .map_err(|e| { + BridgeError::RPCParamMalformed("recovery_taproot_address", e.to_string()) + })?; + let user_takes_after = deposit_params.user_takes_after; let verifiers_public_keys = self.config.verifiers_public_keys.clone(); tracing::debug!("Parsed deposit params"); - // generate nonces from all verifiers + // Generate nonce streams for all verifiers. let num_required_sigs = calculate_num_required_sigs( self.config.num_operators, self.config.num_time_txs, self.config.num_watchtowers, ); - let (first_responses, mut nonce_streams) = - self.create_nonce_streams(num_required_sigs as u32).await?; + let (first_responses, nonce_streams) = + create_nonce_streams(self.verifier_clients.clone(), num_required_sigs as u32).await?; - // Open the streams for deposit_sign for each verifier - let mut partial_sig_streams = try_join_all(self.verifier_clients.iter().map(|v| { - let mut client = v.clone(); // Clone each client to avoid mutable borrow - // https://github.com/hyperium/tonic/issues/33#issuecomment-538150828 - async move { - let (tx, rx) = tokio::sync::mpsc::channel(1280); - let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - // let x = tokio_stream::iter(1..usize::MAX).map(|i| i.to_string()); - let stream = client.deposit_sign(receiver_stream).await?; - Ok::<_, Status>((stream.into_inner(), tx)) - // Return the stream - } - })) - .await - .map_err(|e| { - Status::internal(format!("Failed to generate partial sig streams: {:?}", e)) - })?; + // Open the deposit signing streams for each verifier. + let mut partial_sig_streams = + try_join_all(self.verifier_clients.iter().map(|verifier_client| { + let mut verifier_client = verifier_client.clone(); + + async move { + let (tx, rx) = tokio::sync::mpsc::channel(1280); + let stream = verifier_client + .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await?; + + Ok::<_, Status>((stream.into_inner(), tx)) + } + })) + .await?; tracing::debug!("Generated partial sig streams"); @@ -262,26 +414,24 @@ impl ClementineAggregator for Aggregator { ), }) .await - .unwrap(); + .map_err(|e| { + Status::internal(format!("Failed to send deposit sign session: {:?}", e)) + })?; } let mut deposit_finalize_clients = self.verifier_clients.clone(); - // Open the streams for deposit_finalize - let deposit_finalize_streams = try_join_all(deposit_finalize_clients.iter_mut().map(|v| { - async move { + let deposit_finalize_streams = try_join_all(deposit_finalize_clients.iter_mut().map( + |verifier_client| async move { let (tx, rx) = tokio::sync::mpsc::channel(1280); let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - // Move `client` into this async block and use it directly - let deposit_finalize_futures = v.deposit_finalize(receiver_stream).boxed(); + let deposit_finalize_futures = + verifier_client.deposit_finalize(receiver_stream).boxed(); Ok::<_, Status>((deposit_finalize_futures, tx)) - } - })) - .await - .map_err(|e| { - Status::internal(format!("Failed to generate partial sig streams: {:?}", e)) - })?; + }, + )) + .await?; let (mut deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) = deposit_finalize_streams.into_iter().unzip(); @@ -294,10 +444,17 @@ impl ClementineAggregator for Aggregator { }; for tx in deposit_finalize_sender.iter() { - tx.send(deposit_finalize_first_param.clone()).await.unwrap(); + tx.send(deposit_finalize_first_param.clone()) + .await + .map_err(|e| { + Status::internal(format!( + "Failed to send deposit finalize first param: {:?}", + e + )) + })?; } - let mut sighash_stream = pin!(create_nofn_sighash_stream( + let sighash_stream = create_nofn_sighash_stream( self.db.clone(), self.config.clone(), deposit_outpoint, @@ -310,82 +467,46 @@ impl ClementineAggregator for Aggregator { 100, self.config.bridge_amount_sats, self.config.network, - )); - - let num_required_sigs = calculate_num_required_sigs( - self.config.num_operators, - self.config.num_time_txs, - self.config.num_watchtowers, ); - - for _ in 0..num_required_sigs { - // Get the next nonce from each stream - let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async { - s.next() - .await - .ok_or_else(|| Status::internal("Stream ended unexpectedly"))? // Handle if stream ends early - .map_err(|e| Status::internal(format!("Failed to get nonce: {:?}", e))) - // Handle if there's an error in the stream item - })) - .await?; - - tracing::debug!("RECEIVED PUB NONCES: {:?}", pub_nonces); - - // Aggregate the nonces - let agg_nonce = aggregate_nonces(pub_nonces); - - let agg_nonce_wrapped = clementine::VerifierDepositSignParams { - params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce( - agg_nonce.0.to_vec(), - )), - }; - - // Send the aggregated nonce to each verifier - for (_, tx) in partial_sig_streams.iter_mut() { - tx.send(agg_nonce_wrapped.clone()).await.unwrap(); - } - - // Get the partial signatures from each verifier - let partial_sigs = try_join_all(partial_sig_streams.iter_mut().map(|(s, _)| async { - let partial_sig = s.message().await?; - let partial_sig = partial_sig.ok_or(Status::internal("No partial sig received"))?; - Ok::<_, Box>(ByteArray32( - partial_sig.partial_sig.try_into().unwrap(), + let sighash_stream = Box::pin(sighash_stream); + + let (agg_nonce_sender, agg_nonce_receiver) = channel(32); + let (partial_sig_sender, partial_sig_receiver) = channel(32); + let (final_sig_sender, final_sig_receiver) = channel(32); + + // Spawn all pipeline tasks + let nonce_agg_handle = thread::spawn(|| { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(nonce_aggregator( + nonce_streams, + sighash_stream, + agg_nonce_sender, )) - })) - .await - .map_err(|e| Status::internal(format!("Failed to get partial sig: {:?}", e)))?; - - tracing::trace!("Received partial sigs: {:?}", partial_sigs); - - let sighash = sighash_stream.next().await.unwrap().unwrap(); - - tracing::debug!("Aggregator found sighash: {:?}", sighash); - - let final_sig = crate::musig2::aggregate_partial_signatures( - verifiers_public_keys.clone(), - None, - false, - &agg_nonce, - partial_sigs, - ByteArray32(sighash.to_byte_array()), - ) - .unwrap(); + }); + let nonce_dist_handle = tokio::spawn(nonce_distributor( + agg_nonce_receiver, + partial_sig_streams, + partial_sig_sender, + )); + let sig_agg_handle = thread::spawn(|| { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(signature_aggregator( + partial_sig_receiver, + verifiers_public_keys, + final_sig_sender, + )) + }); + let sig_dist_handle = tokio::spawn(signature_distributor( + final_sig_receiver, + deposit_finalize_sender, + )); - tracing::debug!("Final signature: {:?}", final_sig); - - for tx in deposit_finalize_sender.iter() { - tx.send(clementine::VerifierDepositFinalizeParams { - params: Some( - clementine::verifier_deposit_finalize_params::Params::SchnorrSig( - final_sig.to_vec(), - ), - ), - }) - .await - .unwrap(); - } - } + nonce_agg_handle.join().unwrap().unwrap(); + try_join_all(vec![nonce_dist_handle]).await.unwrap(); + sig_agg_handle.join().unwrap().unwrap(); + try_join_all(vec![sig_dist_handle]).await.unwrap(); tracing::debug!("Waiting for deposit finalization"); @@ -405,12 +526,15 @@ impl ClementineAggregator for Aggregator { #[cfg(test)] mod tests { + use bitcoin::Txid; + use crate::{ config::BridgeConfig, create_test_config_with_thread_name, database::Database, errors::BridgeError, initialize_database, + rpc::clementine::DepositParams, servers::{ create_aggregator_grpc_server, create_operator_grpc_server, create_verifier_grpc_server, create_watchtower_grpc_server, @@ -424,7 +548,7 @@ mod tests { verifier::Verifier, watchtower::Watchtower, }; - use std::{env, thread}; + use std::{env, str::FromStr, thread}; #[tokio::test] #[serial_test::serial] @@ -490,4 +614,47 @@ mod tests { verifier_wpks ); } + + #[tokio::test] + #[serial_test::serial] + async fn aggregator_setup_and_deposit() { + let mut config = create_test_config_with_thread_name!(None); + + // Change default values for making the test faster. + config.num_time_txs = 1; + config.num_operators = 1; + config.num_verifiers = 1; + config.num_watchtowers = 1; + + let aggregator = create_actors!(config).2; + let mut aggregator_client = + ClementineAggregatorClient::connect(format!("http://{}", aggregator.0)) + .await + .unwrap(); + + aggregator_client + .setup(tonic::Request::new(clementine::Empty {})) + .await + .unwrap(); + + aggregator_client + .new_deposit(DepositParams { + deposit_outpoint: Some( + bitcoin::OutPoint { + txid: Txid::from_str( + "17e3fc7aae1035e77a91e96d1ba27f91a40a912cf669b367eb32c13a8f82bb02", + ) + .unwrap(), + vout: 0, + } + .into(), + ), + evm_address: [1u8; 20].to_vec(), + recovery_taproot_address: + "tb1pk8vus63mx5zwlmmmglq554kwu0zm9uhswqskxg99k66h8m3arguqfrvywa".to_string(), + user_takes_after: 5, + }) + .await + .unwrap(); + } } diff --git a/core/src/rpc/operator.rs b/core/src/rpc/operator.rs index 04604726..4d382ee3 100644 --- a/core/src/rpc/operator.rs +++ b/core/src/rpc/operator.rs @@ -99,7 +99,7 @@ impl ClementineOperator for Operator { .get_ref() .deposit_outpoint .clone() - .ok_or(BridgeError::RPCRequiredFieldError("deposit_outpoint"))? + .ok_or(BridgeError::RPCRequiredParam("deposit_outpoint"))? .try_into()?; self.withdrawal_proved_on_citrea(withdrawal_idx, deposit_outpoint) diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 5818c140..fa65b4f1 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -2,63 +2,8 @@ //! //! This tests checks if typical RPC flows works or not. -use bitcoin::Txid; -use clementine_core::errors::BridgeError; -use clementine_core::extended_rpc::ExtendedRpc; -use clementine_core::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient; -use clementine_core::rpc::clementine::{self, DepositParams}; -use clementine_core::servers::{ - create_aggregator_grpc_server, create_operator_grpc_server, create_verifier_grpc_server, - create_watchtower_grpc_server, -}; -use clementine_core::{config::BridgeConfig, database::Database, utils::initialize_logger}; -use std::str::FromStr; -use std::{env, thread}; - mod common; -#[tokio::test] -async fn aggregator_setup_and_deposit() { - let mut config = create_test_config_with_thread_name!(None); - - // Change default values for making the test faster. - config.num_time_txs = 1; - config.num_operators = 1; - config.num_verifiers = 1; - config.num_watchtowers = 1; - - let aggregator = create_actors!(config).2; - let mut aggregator_client = - ClementineAggregatorClient::connect(format!("http://{}", aggregator.0)) - .await - .unwrap(); - - aggregator_client - .setup(tonic::Request::new(clementine::Empty {})) - .await - .unwrap(); - - aggregator_client - .new_deposit(DepositParams { - deposit_outpoint: Some( - bitcoin::OutPoint { - txid: Txid::from_str( - "17e3fc7aae1035e77a91e96d1ba27f91a40a912cf669b367eb32c13a8f82bb02", - ) - .unwrap(), - vout: 0, - } - .into(), - ), - evm_address: [1u8; 20].to_vec(), - recovery_taproot_address: - "tb1pk8vus63mx5zwlmmmglq554kwu0zm9uhswqskxg99k66h8m3arguqfrvywa".to_string(), - user_takes_after: 5, - }) - .await - .unwrap(); -} - // #[ignore = "We are switching to gRPC"] // #[tokio::test] // #[serial_test::serial] From 32a0ac6789a5c9cb0d7f120c93c5d0dabfb571bd Mon Sep 17 00:00:00 2001 From: atacann <111396231+atacann@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:25:57 +0100 Subject: [PATCH 2/2] disprove tx (#388) * fix vout of operator challenge nack txin and some typos * fix some typos * remove unnecessary parameter * implement disprove tx * fix formatting * add nofn sigs of disprove to stream * implement challenge tx * add nofn sighash of challenge tx * fix formatting * fix prevout script bug in challenge tx * Implement happy reimburse tx * add nofn sighash of happy reimburse * Moved signing functions used from actor to builder/sighash.rs * remove tracing * function name refactor * fix sighash * reimburse to reimbursement_address instead * fix function calls in sighash * fix happy reimburse * add other sighash types --- core/src/builder/sighash.rs | 366 +++++++++++++++++++++----------- core/src/builder/transaction.rs | 192 ++++++++++++++++- core/src/constants.rs | 2 +- 3 files changed, 428 insertions(+), 132 deletions(-) diff --git a/core/src/builder/sighash.rs b/core/src/builder/sighash.rs index 88d7db81..254372a7 100644 --- a/core/src/builder/sighash.rs +++ b/core/src/builder/sighash.rs @@ -1,9 +1,12 @@ +use crate::builder::transaction::TxHandler; use crate::config::BridgeConfig; use crate::constants::NUM_INTERMEDIATE_STEPS; use crate::errors::BridgeError; -use crate::{actor::Actor, builder, database::Database, EVMAddress}; +use crate::{builder, database::Database, EVMAddress}; use async_stream::try_stream; -use bitcoin::{address::NetworkUnchecked, Address, Amount, OutPoint}; +use bitcoin::sighash::SighashCache; +use bitcoin::taproot::LeafVersion; +use bitcoin::{address::NetworkUnchecked, Address, Amount, OutPoint, TapLeafHash, TapSighashType}; use bitcoin::{TapSighash, Txid}; use futures_core::stream::Stream; @@ -15,6 +18,67 @@ pub fn calculate_num_required_sigs( num_operators * num_time_txs * (1 + 3 * num_watchtowers + 1) } +pub fn convert_tx_to_pubkey_spend( + tx_handler: &mut TxHandler, + txin_index: usize, + sighash_type: Option, +) -> Result { + let mut sighash_cache: SighashCache<&mut bitcoin::Transaction> = + SighashCache::new(&mut tx_handler.tx); + let prevouts = &match sighash_type { + Some(TapSighashType::SinglePlusAnyoneCanPay) + | Some(TapSighashType::AllPlusAnyoneCanPay) + | Some(TapSighashType::NonePlusAnyoneCanPay) => { + bitcoin::sighash::Prevouts::One(txin_index, tx_handler.prevouts[txin_index].clone()) + } + _ => bitcoin::sighash::Prevouts::All(&tx_handler.prevouts), + }; + + let sig_hash = sighash_cache.taproot_key_spend_signature_hash( + txin_index, + prevouts, + sighash_type.unwrap_or(TapSighashType::Default), + )?; + + Ok(sig_hash) +} + +pub fn convert_tx_to_script_spend( + tx_handler: &mut TxHandler, + txin_index: usize, + script_index: usize, + sighash_type: Option, +) -> Result { + let mut sighash_cache: SighashCache<&mut bitcoin::Transaction> = + SighashCache::new(&mut tx_handler.tx); + + let prevouts = &match sighash_type { + Some(TapSighashType::SinglePlusAnyoneCanPay) + | Some(TapSighashType::AllPlusAnyoneCanPay) + | Some(TapSighashType::NonePlusAnyoneCanPay) => { + bitcoin::sighash::Prevouts::One(txin_index, tx_handler.prevouts[txin_index].clone()) + } + _ => bitcoin::sighash::Prevouts::All(&tx_handler.prevouts), + }; + let leaf_hash = TapLeafHash::from_script( + tx_handler + .scripts + .get(txin_index) + .ok_or(BridgeError::NoScriptsForTxIn(txin_index))? + .get(script_index) + .ok_or(BridgeError::NoScriptAtIndex(script_index))?, + LeafVersion::TapScript, + ); + let sig_hash = sighash_cache.taproot_script_spend_signature_hash( + txin_index, + prevouts, + leaf_hash, + sighash_type.unwrap_or(TapSighashType::Default), + )?; + + Ok(sig_hash) +} + /// First iterate over operators /// For each operator, iterate over time txs /// For each time tx, create kickoff txid @@ -38,149 +102,209 @@ pub fn create_nofn_sighash_stream( network: bitcoin::Network, ) -> impl Stream> { try_stream! { - let move_txid = builder::transaction::create_move_tx( - deposit_outpoint, - nofn_xonly_pk, - bridge_amount_sats, - network, - ) - .compute_txid(); - - let operators: Vec<(secp256k1::XOnlyPublicKey, bitcoin::Address, Txid)> = - db.get_operators(None).await?; - if operators.len() < config.num_operators { - panic!("Not enough operators"); - } + let move_txid = builder::transaction::create_move_tx( + deposit_outpoint, + nofn_xonly_pk, + bridge_amount_sats, + network, + ) + .compute_txid(); - for (operator_idx, (operator_xonly_pk, _operator_reimburse_address, collateral_funding_txid)) in - operators.iter().enumerate() - { - // Get watchtower Winternitz pubkeys for this operator. - let watchtower_challenge_wotss = (0..config.num_watchtowers) - .map(|i| db.get_watchtower_winternitz_public_keys(None, i as u32, operator_idx as u32)) - .collect::>(); - let watchtower_challenge_wotss = - futures::future::try_join_all(watchtower_challenge_wotss).await?; - - let mut input_txid = *collateral_funding_txid; - let mut input_amount = collateral_funding_amount; - - for time_tx_idx in 0..config.num_time_txs { - let time_txid = builder::transaction::create_time_tx( - *operator_xonly_pk, - input_txid, - input_amount, - timeout_block_count, - max_withdrawal_time_block_count, - network, - ) - .compute_txid(); - - let kickoff_txid = builder::transaction::create_kickoff_tx( - time_txid, - nofn_xonly_pk, - *operator_xonly_pk, - move_txid, - operator_idx, - network, - ) - .compute_txid(); + let operators: Vec<(secp256k1::XOnlyPublicKey, bitcoin::Address, Txid)> = + db.get_operators(None).await?; + if operators.len() < config.num_operators { + panic!("Not enough operators"); + } - let watchtower_wots = (0..config.num_watchtowers) - .map(|i| watchtower_challenge_wotss[i][time_tx_idx].clone()) + for (operator_idx, (operator_xonly_pk, _operator_reimburse_address, collateral_funding_txid)) in + operators.iter().enumerate() + { + // Get watchtower Winternitz pubkeys for this operator. + let watchtower_challenge_wotss = (0..config.num_watchtowers) + .map(|i| db.get_watchtower_winternitz_public_keys(None, i as u32, operator_idx as u32)) .collect::>(); + let watchtower_challenge_wotss = + futures::future::try_join_all(watchtower_challenge_wotss).await?; - let mut watchtower_challenge_page_tx_handler = - builder::transaction::create_watchtower_challenge_page_txhandler( - kickoff_txid, + let mut input_txid = *collateral_funding_txid; + let mut input_amount = collateral_funding_amount; + + for time_tx_idx in 0..config.num_time_txs { + let time_txid = builder::transaction::create_time_tx( + *operator_xonly_pk, + input_txid, + input_amount, + timeout_block_count, + max_withdrawal_time_block_count, + network, + ) + .compute_txid(); + + let kickoff_txid = builder::transaction::create_kickoff_tx( + time_txid, nofn_xonly_pk, - config.num_watchtowers as u32, - watchtower_wots.clone(), + *operator_xonly_pk, + move_txid, + operator_idx, network, + ) + .compute_txid(); + + let mut challenge_tx = builder::transaction::create_challenge_txhandler( + kickoff_txid, + nofn_xonly_pk, + *operator_xonly_pk, + _operator_reimburse_address, + network ); - yield Actor::convert_tx_to_sighash_pubkey_spend( - &mut watchtower_challenge_page_tx_handler, - 0, - )?; + yield convert_tx_to_pubkey_spend( + &mut challenge_tx, + 0, + Some(bitcoin::sighash::TapSighashType::SinglePlusAnyoneCanPay) + )?; - let wcp_txid = watchtower_challenge_page_tx_handler.tx.compute_txid(); + let mut happy_reimburse_tx = builder::transaction::create_happy_reimburse_txhandler( + move_txid, + kickoff_txid, + nofn_xonly_pk, + *operator_xonly_pk, + _operator_reimburse_address, + bridge_amount_sats, + network, + ); - for (i, watchtower_wots) in watchtower_wots.iter().enumerate().take(config.num_watchtowers) { - let mut watchtower_challenge_txhandler = - builder::transaction::create_watchtower_challenge_txhandler( - wcp_txid, - i, - watchtower_wots.clone(), - &[0u8; 20], - nofn_xonly_pk, - *operator_xonly_pk, - network, - ); - yield Actor::convert_tx_to_sighash_script_spend( - &mut watchtower_challenge_txhandler, - 0, + // move utxo + yield convert_tx_to_pubkey_spend( + &mut happy_reimburse_tx, 0, + None )?; + // nofn_or_nofn3week utxo + yield convert_tx_to_pubkey_spend( + &mut happy_reimburse_tx, + 2, + None + )?; + + + let watchtower_wots = (0..config.num_watchtowers) + .map(|i| watchtower_challenge_wotss[i][time_tx_idx].clone()) + .collect::>(); - let mut operator_challenge_nack_txhandler = - builder::transaction::create_operator_challenge_nack_txhandler( - watchtower_challenge_txhandler.tx.compute_txid(), - time_txid, + let mut watchtower_challenge_page_tx_handler = + builder::transaction::create_watchtower_challenge_page_txhandler( kickoff_txid, - input_amount, - &[0u8; 20], nofn_xonly_pk, - *operator_xonly_pk, + config.num_watchtowers as u32, + watchtower_wots.clone(), network, ); - yield Actor::convert_tx_to_sighash_script_spend( - &mut operator_challenge_nack_txhandler, + + yield convert_tx_to_pubkey_spend( + &mut watchtower_challenge_page_tx_handler, 0, - 1, + None, )?; - yield Actor::convert_tx_to_sighash_pubkey_spend( - &mut operator_challenge_nack_txhandler, - 1, + + let wcp_txid = watchtower_challenge_page_tx_handler.tx.compute_txid(); + + for (i, watchtower_wots) in watchtower_wots.iter().enumerate().take(config.num_watchtowers) { + let mut watchtower_challenge_txhandler = + builder::transaction::create_watchtower_challenge_txhandler( + wcp_txid, + i, + watchtower_wots.clone(), + &[0u8; 20], + nofn_xonly_pk, + *operator_xonly_pk, + network, + ); + yield convert_tx_to_script_spend( + &mut watchtower_challenge_txhandler, + 0, + 0, + None, + )?; + + let mut operator_challenge_nack_txhandler = + builder::transaction::create_operator_challenge_nack_txhandler( + watchtower_challenge_txhandler.tx.compute_txid(), + time_txid, + kickoff_txid, + input_amount, + &[0u8; 20], + nofn_xonly_pk, + *operator_xonly_pk, + network, + ); + yield convert_tx_to_script_spend( + &mut operator_challenge_nack_txhandler, + 0, + 1, + None, + )?; + yield convert_tx_to_pubkey_spend( + &mut operator_challenge_nack_txhandler, + 1, + None, + )?; + } + + let intermediate_wots = + vec![vec![vec![[0u8; 20]; 48]; NUM_INTERMEDIATE_STEPS]; config.num_time_txs]; // TODO: Fetch from db + let assert_begin_txid = builder::transaction::create_assert_begin_txhandler( + kickoff_txid, + nofn_xonly_pk, + *operator_xonly_pk, + intermediate_wots[time_tx_idx].clone(), + network, + ) + .tx + .compute_txid(); + + let mut assert_end_tx = builder::transaction::create_assert_end_txhandler( + kickoff_txid, + assert_begin_txid, + nofn_xonly_pk, + *operator_xonly_pk, + network, + ); + yield convert_tx_to_pubkey_spend( + &mut assert_end_tx, + NUM_INTERMEDIATE_STEPS, + None, )?; - } - let intermediate_wots = - vec![vec![vec![[0u8; 20]; 48]; NUM_INTERMEDIATE_STEPS]; config.num_time_txs]; // TODO: Fetch from db - let assert_begin_txid = builder::transaction::create_assert_begin_txhandler( - kickoff_txid, - nofn_xonly_pk, - *operator_xonly_pk, - intermediate_wots[time_tx_idx].clone(), - network, - ) - .tx - .compute_txid(); - - let mut assert_end_tx = builder::transaction::create_assert_end_txhandler( - kickoff_txid, - assert_begin_txid, - nofn_xonly_pk, - *operator_xonly_pk, - network, - ); - yield Actor::convert_tx_to_sighash_pubkey_spend( - &mut assert_end_tx, - NUM_INTERMEDIATE_STEPS, - )?; + let mut disprove_tx = builder::transaction::create_disprove_txhandler( + assert_end_tx.tx.compute_txid(), + time_txid, + nofn_xonly_pk, + network, + ); - let time2_tx = builder::transaction::create_time2_tx( - *operator_xonly_pk, - time_txid, - input_amount, - network, - ); + // sign for all disprove scripts + for i in 0..NUM_INTERMEDIATE_STEPS { + yield convert_tx_to_script_spend( + &mut disprove_tx, + 0, + i, + Some(bitcoin::sighash::TapSighashType::None), + )?; + } - input_txid = time2_tx.compute_txid(); - input_amount = time2_tx.output[0].value; + let time2_tx = builder::transaction::create_time2_tx( + *operator_xonly_pk, + time_txid, + input_amount, + network, + ); + + input_txid = time2_tx.compute_txid(); + input_amount = time2_tx.output[0].value; + } } } - } } pub fn create_timout_tx_sighash_stream( @@ -213,7 +337,7 @@ pub fn create_timout_tx_sighash_stream( network, ); - yield Actor::convert_tx_to_sighash_script_spend(&mut timeout_tx_handler, 0, 0)?; + yield convert_tx_to_script_spend(&mut timeout_tx_handler, 0, 0, None)?; let time2_tx = builder::transaction::create_time2_tx( operator_xonly_pk, diff --git a/core/src/builder/transaction.rs b/core/src/builder/transaction.rs index c80232f5..47c1a757 100644 --- a/core/src/builder/transaction.rs +++ b/core/src/builder/transaction.rs @@ -5,7 +5,7 @@ use super::address::create_taproot_address; use crate::builder; -use crate::constants::{NUM_DISRPOVE_SCRIPTS, NUM_INTERMEDIATE_STEPS}; +use crate::constants::{NUM_DISPROVE_SCRIPTS, NUM_INTERMEDIATE_STEPS}; use crate::{utils, EVMAddress, UTXO}; use bitcoin::address::NetworkUnchecked; use bitcoin::hashes::Hash; @@ -44,6 +44,7 @@ pub const TIME2_TX_MIN_RELAY_FEE: Amount = Amount::from_sat(350); pub const KICKOFF_INPUT_AMOUNT: Amount = Amount::from_sat(100_000); pub const OPERATOR_REIMBURSE_CONNECTOR_AMOUNT: Amount = Amount::from_sat(330); pub const ANCHOR_AMOUNT: Amount = Amount::from_sat(330); +pub const OPERATOR_CHALLENGE_AMOUNT: Amount = Amount::from_sat(200_000_000); /// Creates the `time_tx`. It will always use `input_txid`'s first vout as the input. /// @@ -324,14 +325,15 @@ pub fn create_kickoff_tx( txid: time_txid, vout: 2, }]); - let nofn_1week = builder::script::generate_relative_timelock_script(nofn_xonly_pk, 7 * 24 * 6); + let operator_1week = + builder::script::generate_relative_timelock_script(operator_xonly_pk, 7 * 24 * 6); let operator_2week = builder::script::generate_relative_timelock_script(operator_xonly_pk, 2 * 7 * 24 * 6); let nofn_3week = builder::script::generate_relative_timelock_script(nofn_xonly_pk, 3 * 7 * 24 * 6); - let (nofn_or_nofn_1week, _) = - builder::address::create_taproot_address(&[nofn_1week], Some(nofn_xonly_pk), network); + let (nofn_or_operator_1week, _) = + builder::address::create_taproot_address(&[operator_1week], Some(nofn_xonly_pk), network); let (nofn_or_operator_2week, _) = builder::address::create_taproot_address(&[operator_2week], Some(nofn_xonly_pk), network); @@ -341,12 +343,16 @@ pub fn create_kickoff_tx( let (nofn_taproot_address, _) = builder::address::create_musig2_address(nofn_xonly_pk, network); + // TODO: change to normal sats let mut tx_outs = create_tx_outs(vec![ ( KICKOFF_UTXO_AMOUNT_SATS, nofn_taproot_address.script_pubkey(), ), - (KICKOFF_UTXO_AMOUNT_SATS, nofn_or_nofn_1week.script_pubkey()), + ( + KICKOFF_UTXO_AMOUNT_SATS, + nofn_or_operator_1week.script_pubkey(), + ), ( KICKOFF_UTXO_AMOUNT_SATS, nofn_or_operator_2week.script_pubkey(), @@ -667,19 +673,19 @@ pub fn create_assert_end_txhandler( vout: 3, }); - let mut disprve_scripts = vec![]; - for _ in 0..NUM_DISRPOVE_SCRIPTS { - disprve_scripts.push(builder::script::checksig_script(nofn_xonly_pk)); // TODO: ADD actual disprove scripts here + let mut disprove_scripts = vec![]; + for _ in 0..NUM_DISPROVE_SCRIPTS { + disprove_scripts.push(builder::script::checksig_script(nofn_xonly_pk)); // TODO: ADD actual disprove scripts here } let (disprove_address, _disprove_taproot_spend_info) = builder::address::create_taproot_address( - &disprve_scripts.clone(), + &disprove_scripts.clone(), Some(nofn_xonly_pk), network, ); let tx_outs = vec![ TxOut { - value: Amount::from_sat(330), // TOOD: Hand calculate this + value: Amount::from_sat(330), // TODO: Hand calculate this script_pubkey: disprove_address.script_pubkey(), }, builder::script::anyone_can_spend_txout(), @@ -728,6 +734,172 @@ pub fn create_assert_end_txhandler( } } +pub fn create_disprove_txhandler( + assert_end_txid: Txid, + time_txid: Txid, + nofn_xonly_pk: XOnlyPublicKey, + network: bitcoin::Network, +) -> TxHandler { + let tx_ins = create_tx_ins(vec![ + OutPoint { + txid: assert_end_txid, + vout: 0, + }, + OutPoint { + txid: time_txid, + vout: 0, + }, + ]); + + let tx_outs = vec![builder::script::anyone_can_spend_txout()]; + + let disprove_tx = create_btc_tx(tx_ins, tx_outs); + + let mut disprove_scripts = vec![]; + for _ in 0..NUM_DISPROVE_SCRIPTS { + disprove_scripts.push(builder::script::checksig_script(nofn_xonly_pk)); // TODO: ADD actual disprove scripts here + } + let (disprove_address, disprove_taproot_spend_info) = + builder::address::create_taproot_address(&disprove_scripts, Some(nofn_xonly_pk), network); + let prevouts = vec![TxOut { + value: Amount::from_sat(330), // TODO: Hand calculate this + script_pubkey: disprove_address.script_pubkey(), + }]; + + TxHandler { + tx: disprove_tx, + prevouts, + scripts: vec![disprove_scripts, vec![]], + taproot_spend_infos: vec![disprove_taproot_spend_info], + } +} + +pub fn create_challenge_txhandler( + kickoff_txid: Txid, + nofn_xonly_pk: XOnlyPublicKey, + operator_xonly_pk: XOnlyPublicKey, + operator_reimbursement_address: &bitcoin::Address, + network: bitcoin::Network, +) -> TxHandler { + let tx_ins = create_tx_ins(vec![OutPoint { + txid: kickoff_txid, + vout: 1, + }]); + + let tx_outs = vec![TxOut { + value: OPERATOR_CHALLENGE_AMOUNT, + script_pubkey: operator_reimbursement_address.script_pubkey(), + }]; + + let challenge_tx = create_btc_tx(tx_ins, tx_outs); + + let operator_1week = + builder::script::generate_relative_timelock_script(operator_xonly_pk, 7 * 24 * 6); + + let (nofn_or_operator_1week, nofn_or_operator_1week_spend_info) = + builder::address::create_taproot_address( + &[operator_1week.clone()], + Some(nofn_xonly_pk), + network, + ); + + let prevouts = vec![TxOut { + script_pubkey: nofn_or_operator_1week.script_pubkey(), + value: KICKOFF_UTXO_AMOUNT_SATS, + }]; + + TxHandler { + tx: challenge_tx, + prevouts, + scripts: vec![vec![operator_1week]], + taproot_spend_infos: vec![nofn_or_operator_1week_spend_info], + } +} + +pub fn create_happy_reimburse_txhandler( + move_txid: Txid, + kickoff_txid: Txid, + nofn_xonly_pk: XOnlyPublicKey, + operator_xonly_pk: XOnlyPublicKey, + operator_reimbursement_address: &bitcoin::Address, + bridge_amount_sats: Amount, + network: bitcoin::Network, +) -> TxHandler { + let tx_ins = create_tx_ins(vec![ + OutPoint { + txid: move_txid, + vout: 0, + }, + OutPoint { + txid: kickoff_txid, + vout: 1, + }, + OutPoint { + txid: kickoff_txid, + vout: 3, + }, + ]); + let (nofn_taproot_address, nofn_taproot_spend) = + builder::address::create_taproot_address(&[], Some(nofn_xonly_pk), network); + + let anyone_can_spend_txout = builder::script::anyone_can_spend_txout(); + + let tx_outs = vec![ + TxOut { + // value in create_move_tx currently + value: bridge_amount_sats - MOVE_TX_MIN_RELAY_FEE - anyone_can_spend_txout.value, + script_pubkey: operator_reimbursement_address.script_pubkey(), + }, + anyone_can_spend_txout.clone(), + ]; + + let happy_reimburse_tx = create_btc_tx(tx_ins, tx_outs); + + let operator_1week = + builder::script::generate_relative_timelock_script(operator_xonly_pk, 7 * 24 * 6); + let nofn_3week = + builder::script::generate_relative_timelock_script(nofn_xonly_pk, 3 * 7 * 24 * 6); + + let (nofn_or_operator_1week, nofn_or_operator_1week_spend) = + builder::address::create_taproot_address( + &[operator_1week.clone()], + Some(nofn_xonly_pk), + network, + ); + + let (nofn_or_nofn_3week, nofn_or_nofn_3week_spend) = builder::address::create_taproot_address( + &[nofn_3week.clone()], + Some(nofn_xonly_pk), + network, + ); + + let prevouts = vec![ + TxOut { + script_pubkey: nofn_taproot_address.script_pubkey(), + value: bridge_amount_sats - MOVE_TX_MIN_RELAY_FEE - anyone_can_spend_txout.value, + }, + TxOut { + value: KICKOFF_UTXO_AMOUNT_SATS, + script_pubkey: nofn_or_operator_1week.script_pubkey(), + }, + TxOut { + value: KICKOFF_UTXO_AMOUNT_SATS, + script_pubkey: nofn_or_nofn_3week.script_pubkey(), + }, + ]; + + TxHandler { + tx: happy_reimburse_tx, + prevouts, + scripts: vec![vec![], vec![operator_1week], vec![nofn_3week]], + taproot_spend_infos: vec![ + nofn_taproot_spend, + nofn_or_operator_1week_spend, + nofn_or_nofn_3week_spend, + ], + } +} + pub fn create_slash_or_take_tx( deposit_outpoint: OutPoint, kickoff_utxo: UTXO, diff --git a/core/src/constants.rs b/core/src/constants.rs index de7074e9..d9931e19 100644 --- a/core/src/constants.rs +++ b/core/src/constants.rs @@ -36,4 +36,4 @@ pub type VerifierChallenge = (BlockHash, U256, u8); pub const NUM_INTERMEDIATE_STEPS: usize = 100; -pub const NUM_DISRPOVE_SCRIPTS: usize = 100; +pub const NUM_DISPROVE_SCRIPTS: usize = 100;