Skip to content

Commit

Permalink
aggregator: Simplify create_nonce_streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyhunsen committed Jan 3, 2025
1 parent b339085 commit 0e0ffe9
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ impl Aggregator {

/// Creates a stream of nonces from verifiers.
/// This will automatically get's the first response from the verifiers.
///
/// # Returns
///
/// - Vec<[`clementine::NonceGenFirstResponse`]>: First response from each verifier
/// - Vec<BoxStream<Result<[`MuSigPubNonce`], BridgeError>>>: Stream of nonces from each verifier
async fn create_nonce_streams(
&self,
num_nonces: u32,
Expand All @@ -44,12 +49,9 @@ impl Aggregator {
),
BridgeError,
> {
// generate nonces from all verifiers
// Generate nonces from all verifiers.
let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|client| {
// Clone each client to avoid mutable borrow.
// https://github.com/hyperium/tonic/issues/33#issuecomment-538150828
let mut client = client.clone();

async move {
let response_stream = client
.nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
Expand All @@ -62,24 +64,27 @@ impl Aggregator {
}))
.await?;

// Get the first responses from each stream
// Get the first responses from each stream.
let first_responses: Vec<clementine::NonceGenFirstResponse> =
try_join_all(nonce_streams.iter_mut().map(|s| async {
let nonce_gen_first_response = s.message().await?;
let nonce_gen_first_response = nonce_gen_first_response
let nonce_gen_first_response = s
.message()
.await?
.ok_or(BridgeError::Error("NonceGen returns nothing".to_string()))?
.response
.ok_or(BridgeError::Error(
"NonceGen response field is empty".to_string(),
))?;
// this response is an enum, so we need to match on it
match nonce_gen_first_response {
clementine::nonce_gen_response::Response::FirstResponse(
nonce_gen_first_response,
) => Ok(nonce_gen_first_response),
_ => Err(BridgeError::Error(

if let clementine::nonce_gen_response::Response::FirstResponse(
nonce_gen_first_response,
) = nonce_gen_first_response
{
Ok(nonce_gen_first_response)
} else {
Err(BridgeError::Error(
"NonceGen response is not FirstResponse".to_string(),
)),
))
}
}))
.await?;
Expand All @@ -88,16 +93,11 @@ impl Aggregator {
.into_iter()
.map(|stream| {
stream
.map(|result| {
result
.map_err(BridgeError::from)
.and_then(|nonce_gen_response| {
Self::extract_pub_nonce(nonce_gen_response.response)
})
})
.map(|result| Self::extract_pub_nonce(result?.response))
.boxed()
})
.collect();

Ok((first_responses, transformed_streams))
}
}
Expand Down Expand Up @@ -189,22 +189,18 @@ impl ClementineAggregator for Aggregator {
Ok(Response::new(Empty {}))
}

// #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
#[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
async fn new_deposit(
&self,
deposit_params_req: Request<DepositParams>,
) -> Result<Response<RawSignedMoveTx>, Status> {
tracing::info!("Recieved new deposit request: {:?}", deposit_params_req);

let deposit_params = deposit_params_req.into_inner();

let deposit_outpoint: bitcoin::OutPoint = deposit_params
.clone()
.deposit_outpoint
.ok_or(Status::internal("No deposit outpoint received"))
.unwrap()
.try_into()
.unwrap();
.ok_or(Status::internal("No deposit outpoint received"))?
.try_into()?;
let evm_address: EVMAddress = deposit_params.clone().evm_address.try_into().unwrap();
let recovery_taproot_address = deposit_params
.clone()
Expand Down

0 comments on commit 0e0ffe9

Please sign in to comment.