Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekrembal committed Jan 6, 2025
1 parent b339085 commit e32eb52
Showing 1 changed file with 231 additions and 68 deletions.
299 changes: 231 additions & 68 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,180 @@ use bitcoin::{hashes::Hash, Amount};
use futures::{future::try_join_all, stream::BoxStream, FutureExt, StreamExt};
use std::pin::pin;
use tonic::{async_trait, Request, Response, Status};
use tokio::sync::mpsc::{channel, Sender, Receiver};

struct AggNonceQueueItem {
agg_nonce: ByteArray32,
sighash: bitcoin::Sighash,
}

struct FinalSigQueueItem {
final_sig: Vec<u8>,
}

async fn nonce_aggregator_task(
mut nonce_streams: Vec<BoxStream<Result<MuSigPubNonce, BridgeError>>>,
mut sighash_stream: impl Stream<Item = Result<bitcoin::Sighash, BridgeError>> + Unpin + Send + 'static,
agg_nonce_sender: Sender<AggNonceQueueItem>,
) {
while let Ok(sighash) = sighash_stream.next().await.transpose() {
let sighash = match sighash {
Ok(s) => s,
Err(e) => {
tracing::error!("Failed to get sighash: {:?}", e);
break;
}
};

// Collect nonces from all verifiers
let pub_nonces = match try_join_all(nonce_streams.iter_mut().map(|s| async {
s.next()
.await
.ok_or_else(|| BridgeError::Error("Stream ended unexpectedly".into()))?
.map_err(|e| BridgeError::Error(format!("Failed to get nonce: {:?}", e)))
}))
.await
{
Ok(nonces) => nonces,
Err(e) => {
tracing::error!("Failed to collect nonces: {:?}", e);
break;
}
};

// Aggregate nonces in blocking thread
let agg_nonce = match tokio::task::spawn_blocking(move || aggregate_nonces(pub_nonces)).await {
Ok(nonce) => nonce,
Err(e) => {
tracing::error!("Failed to aggregate nonces: {:?}", e);
break;
}
};

if agg_nonce_sender
.send(AggNonceQueueItem {
agg_nonce,
sighash,
})
.await
.is_err()
{
tracing::error!("Failed to send aggregated nonce to queue");
break;
}
}
}

async fn nonce_distributor_task(
mut agg_nonce_receiver: Receiver<AggNonceQueueItem>,
partial_sig_streams: Vec<(tonic::Streaming<clementine::VerifierPartialSig>, tokio::sync::mpsc::Sender<clementine::VerifierDepositSignParams>)>,
partial_sig_sender: Sender<(Vec<ByteArray32>, AggNonceQueueItem)>,
) {
while let Some(queue_item) = agg_nonce_receiver.recv().await {
let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
queue_item.agg_nonce.0.to_vec(),
)),
};

// Send aggregated nonce to all verifiers
for (_, tx) in partial_sig_streams.iter() {
if tx.send(agg_nonce_wrapped.clone()).await.is_err() {
tracing::error!("Failed to send aggregated nonce to verifier");
return;
}
}

// Collect partial signatures
let partial_sigs = match try_join_all(partial_sig_streams.iter().map(|(s, _)| async {
let partial_sig = s.message().await?;
let partial_sig = partial_sig.ok_or(BridgeError::Error("No partial sig received".into()))?;
Ok::<_, BridgeError>(ByteArray32(partial_sig.partial_sig.try_into().unwrap()))
}))
.await
{
Ok(sigs) => sigs,
Err(e) => {
tracing::error!("Failed to collect partial signatures: {:?}", e);
return;
}
};

if partial_sig_sender
.send((partial_sigs, queue_item))
.await
.is_err()
{
tracing::error!("Failed to send partial signatures to queue");
return;
}
}
}

async fn signature_aggregator_task(
mut partial_sig_receiver: Receiver<(Vec<ByteArray32>, AggNonceQueueItem)>,
verifiers_public_keys: Vec<Vec<u8>>,
final_sig_sender: Sender<FinalSigQueueItem>,
) {
while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
// Aggregate signatures in blocking thread
let final_sig = match tokio::task::spawn_blocking(move || {
crate::musig2::aggregate_partial_signatures(
verifiers_public_keys.clone(),
None,
false,
&queue_item.agg_nonce,
partial_sigs,
ByteArray32(queue_item.sighash.to_byte_array()),
)
})
.await
{
Ok(Ok(sig)) => sig,
Ok(Err(e)) => {
tracing::error!("Failed to aggregate signatures: {:?}", e);
continue;
}
Err(e) => {
tracing::error!("Failed to run signature aggregation: {:?}", e);
continue;
}
};

if final_sig_sender
.send(FinalSigQueueItem {
final_sig: final_sig.to_vec(),
})
.await
.is_err()
{
tracing::error!("Failed to send final signature to queue");
return;
}
}
}

async fn signature_distributor_task(
mut final_sig_receiver: Receiver<FinalSigQueueItem>,
deposit_finalize_sender: Vec<tokio::sync::mpsc::Sender<clementine::VerifierDepositFinalizeParams>>,
) {
while let Some(queue_item) = final_sig_receiver.recv().await {
let final_params = clementine::VerifierDepositFinalizeParams {
params: Some(
clementine::verifier_deposit_finalize_params::Params::SchnorrSig(
queue_item.final_sig,
),
),
};

for tx in &deposit_finalize_sender {
if tx.send(final_params.clone()).await.is_err() {
tracing::error!("Failed to send final signature to verifier");
return;
}
}
}
}

impl Aggregator {
// Extracts pub_nonce from given stream.
Expand Down Expand Up @@ -100,6 +274,55 @@ impl Aggregator {
.collect();
Ok((first_responses, transformed_streams))
}

async fn process_signature_rounds(
nonce_streams: Vec<BoxStream<Result<MuSigPubNonce, BridgeError>>>,
partial_sig_streams: Vec<(tonic::Streaming<clementine::VerifierPartialSig>, tokio::sync::mpsc::Sender<clementine::VerifierDepositSignParams>)>,
deposit_finalize_sender: Vec<tokio::sync::mpsc::Sender<clementine::VerifierDepositFinalizeParams>>,
verifiers_public_keys: Vec<Vec<u8>>,
sighash_stream: impl Stream<Item = Result<bitcoin::Sighash, BridgeError>> + Unpin + Send + 'static,
) -> Result<(), Status> {
// Create channels for the pipeline stages
let (agg_nonce_sender, agg_nonce_receiver) = channel(32);
let (partial_sig_sender, partial_sig_receiver) = channel(32);
let (final_sig_sender, final_sig_receiver) = channel(32);

// Spawn all pipeline tasks
let nonce_agg_handle = tokio::spawn(nonce_aggregator_task(
nonce_streams,
sighash_stream,
agg_nonce_sender,
));

let nonce_dist_handle = tokio::spawn(nonce_distributor_task(
agg_nonce_receiver,
partial_sig_streams,
partial_sig_sender,
));

let sig_agg_handle = tokio::spawn(signature_aggregator_task(
partial_sig_receiver,
verifiers_public_keys,
final_sig_sender,
));

let sig_dist_handle = tokio::spawn(signature_distributor_task(
final_sig_receiver,
deposit_finalize_sender,
));

// Wait for all tasks to complete
try_join_all(vec![
nonce_agg_handle,
nonce_dist_handle,
sig_agg_handle,
sig_dist_handle,
])
.await
.map_err(|e| Status::internal(format!("Pipeline task failed: {:?}", e)))?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -318,74 +541,14 @@ impl ClementineAggregator for Aggregator {
self.config.num_watchtowers,
);

for _ in 0..num_required_sigs {
// Get the next nonce from each stream
let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async {
s.next()
.await
.ok_or_else(|| Status::internal("Stream ended unexpectedly"))? // Handle if stream ends early
.map_err(|e| Status::internal(format!("Failed to get nonce: {:?}", e)))
// Handle if there's an error in the stream item
}))
.await?;

tracing::debug!("RECEIVED PUB NONCES: {:?}", pub_nonces);

// Aggregate the nonces
let agg_nonce = aggregate_nonces(pub_nonces);

let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
agg_nonce.0.to_vec(),
)),
};

// Send the aggregated nonce to each verifier
for (_, tx) in partial_sig_streams.iter_mut() {
tx.send(agg_nonce_wrapped.clone()).await.unwrap();
}

// Get the partial signatures from each verifier
let partial_sigs = try_join_all(partial_sig_streams.iter_mut().map(|(s, _)| async {
let partial_sig = s.message().await?;
let partial_sig = partial_sig.ok_or(Status::internal("No partial sig received"))?;
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(ByteArray32(
partial_sig.partial_sig.try_into().unwrap(),
))
}))
.await
.map_err(|e| Status::internal(format!("Failed to get partial sig: {:?}", e)))?;

tracing::trace!("Received partial sigs: {:?}", partial_sigs);

let sighash = sighash_stream.next().await.unwrap().unwrap();

tracing::debug!("Aggregator found sighash: {:?}", sighash);

let final_sig = crate::musig2::aggregate_partial_signatures(
verifiers_public_keys.clone(),
None,
false,
&agg_nonce,
partial_sigs,
ByteArray32(sighash.to_byte_array()),
)
.unwrap();

tracing::debug!("Final signature: {:?}", final_sig);

for tx in deposit_finalize_sender.iter() {
tx.send(clementine::VerifierDepositFinalizeParams {
params: Some(
clementine::verifier_deposit_finalize_params::Params::SchnorrSig(
final_sig.to_vec(),
),
),
})
.await
.unwrap();
}
}
self.process_signature_rounds(
nonce_streams,
partial_sig_streams,
deposit_finalize_sender,
verifiers_public_keys.clone(),
sighash_stream,
)
.await?;

tracing::debug!("Waiting for deposit finalization");

Expand Down

0 comments on commit e32eb52

Please sign in to comment.