diff --git a/src/cli/command.rs b/src/cli/command.rs index 8260844fa..b5f5ce1eb 100644 --- a/src/cli/command.rs +++ b/src/cli/command.rs @@ -21,7 +21,6 @@ use std::{ use std::{str::FromStr, thread::sleep}; use internet2::{NodeAddr, RemoteSocketAddr, ToNodeAddr}; -use lnp::{message, LIGHTNING_P2P_DEFAULT_PORT}; use microservices::shell::Exec; use farcaster_core::{ diff --git a/src/farcasterd/runtime.rs b/src/farcasterd/runtime.rs index b438aa292..ed76b756a 100644 --- a/src/farcasterd/runtime.rs +++ b/src/farcasterd/runtime.rs @@ -19,10 +19,10 @@ use crate::{ BitcoinAddress, BitcoinFundingInfo, FundingInfo, Keys, LaunchSwap, MoneroAddress, MoneroFundingInfo, Outcome, PubOffer, RequestId, Reveal, Token, }, + service::Endpoints, swapd::get_swap_id, syncerd::opts::Coin, walletd::NodeSecrets, - Senders, }; use amplify::Wrapper; use clap::IntoApp; @@ -239,7 +239,6 @@ impl Stats { impl esb::Handler for Runtime { type Request = Request; - type Address = ServiceId; type Error = Error; fn identity(&self) -> ServiceId { @@ -248,19 +247,23 @@ impl esb::Handler for Runtime { fn handle( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, bus: ServiceBus, source: ServiceId, request: Request, ) -> Result<(), Self::Error> { match bus { - ServiceBus::Msg => self.handle_rpc_msg(senders, source, request), - ServiceBus::Ctl => self.handle_rpc_ctl(senders, source, request), + ServiceBus::Msg => self.handle_rpc_msg(endpoints, source, request), + ServiceBus::Ctl => self.handle_rpc_ctl(endpoints, source, request), _ => Err(Error::NotSupported(ServiceBus::Bridge, request.get_type())), } } - fn handle_err(&mut self, _: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: Endpoints, + _: esb::Error, + ) -> Result<(), esb::Error> { // We do nothing and do not propagate error; it's already being reported // with `error!` macro by the controller. If we propagate error here // this will make whole daemon panic @@ -272,10 +275,10 @@ impl Runtime { fn clean_up_after_swap( &mut self, swapid: &SwapId, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, ) -> Result<(), Error> { if self.running_swaps.remove(swapid) { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Swap(*swapid), @@ -315,7 +318,7 @@ impl Runtime { }); if self.connections.remove(&connectionid) { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, identity.clone(), ServiceId::Peer(connectionid), @@ -336,7 +339,7 @@ impl Runtime { } else { let service_id = ServiceId::Syncer(coin, network); info!("Terminating {}", service_id); - if senders + if endpoints .send_to( ServiceBus::Ctl, identity.clone(), @@ -367,8 +370,12 @@ impl Runtime { self.consumed_offers.contains_key(offerid) } - fn _send_walletd(&self, senders: &mut Senders, message: request::Request) -> Result<(), Error> { - senders.send_to(ServiceBus::Ctl, self.identity(), ServiceId::Wallet, message)?; + fn _send_walletd( + &self, + endpoints: &mut Endpoints, + message: request::Request, + ) -> Result<(), Error> { + endpoints.send_to(ServiceBus::Ctl, self.identity(), ServiceId::Wallet, message)?; Ok(()) } fn node_ids(&self) -> Vec { @@ -391,7 +398,7 @@ impl Runtime { } fn handle_rpc_msg( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -423,7 +430,7 @@ impl Runtime { if let Some(arb_addr) = self.arb_addrs.remove(&public_offer.id()) { let btc_addr_req = Request::BitcoinAddress(BitcoinAddress(*swap_id, arb_addr)); - senders.send_to( + endpoints.send_to( ServiceBus::Msg, self.identity(), ServiceId::Wallet, @@ -435,7 +442,7 @@ impl Runtime { if let Some(acc_addr) = self.acc_addrs.remove(&public_offer.id()) { let xmr_addr_req = Request::MoneroAddress(MoneroAddress(*swap_id, acc_addr)); - senders.send_to( + endpoints.send_to( ServiceBus::Msg, self.identity(), ServiceId::Wallet, @@ -448,7 +455,7 @@ impl Runtime { self.peerd_ids .insert(public_offer.offer.id(), source.clone()); - senders.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?; + endpoints.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?; } return Ok(()); } @@ -462,7 +469,7 @@ impl Runtime { fn handle_rpc_ctl( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -570,7 +577,7 @@ impl Runtime { &self.config, )?; // FIXME msgs should go to walletd? - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source.clone(), @@ -617,7 +624,7 @@ impl Runtime { &self.config, )?; // FIXME msgs should go to walletd? - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source.clone(), @@ -641,7 +648,7 @@ impl Runtime { Request::SwapOutcome(success) => { let swapid = get_swap_id(&source)?; - self.clean_up_after_swap(&swapid, senders)?; + self.clean_up_after_swap(&swapid, endpoints)?; self.stats.incr_outcome(&success); match success { Outcome::Buy => { @@ -746,14 +753,14 @@ impl Runtime { }?; trace!("Procede executing pending request"); // recurse with request containing key - self.handle_rpc_ctl(senders, source, req)? + self.handle_rpc_ctl(endpoints, source, req)? } else { error!("Received unexpected peer keys"); } } Request::GetInfo => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, // source source, // destination @@ -776,7 +783,7 @@ impl Runtime { } Request::ListPeers => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, // source source, // destination @@ -785,7 +792,7 @@ impl Runtime { } Request::ListSwaps => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, // source source, // destination @@ -801,7 +808,7 @@ impl Runtime { .filter(|k| !self.consumed_offers_contains(&k.offer.id())) .cloned() .collect(); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, // source source, // destination @@ -810,7 +817,7 @@ impl Runtime { } // Request::ListOfferIds => { - // senders.send_to( + // endpoints.send_to( // ServiceBus::Ctl, // ServiceId::Farcasterd, // source // source, // destination @@ -824,7 +831,7 @@ impl Runtime { .values() .map(|listen| listen.to_url_string()), ); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, // source source, // destination @@ -854,7 +861,7 @@ impl Runtime { let resp = match (bindaddr, peer_secret_key, peer_public_key) { (None, None, None) => { trace!("Push MakeOffer to pending_requests and requesting a secret from Wallet"); - return self.get_secret(senders, source, request); + return self.get_secret(endpoints, source, request); } (None, Some(sk), Some(pk)) => { self.listens.insert(offer.id(), bind_addr); @@ -967,7 +974,7 @@ impl Runtime { // Connect let peer_connected_is_ok = match (self.connections.contains(&peer), peer_secret_key) { - (false, None) => return self.get_secret(senders, source, request), + (false, None) => return self.get_secret(endpoints, source, request), (false, Some(sk)) => { trace!( "{} to remote peer {}", @@ -1019,7 +1026,7 @@ impl Runtime { internal_address, peer_secret_key: None, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Wallet, @@ -1063,7 +1070,7 @@ impl Runtime { } else { s!("Unknown swapd") }; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source, @@ -1245,7 +1252,7 @@ impl Runtime { res }) .collect(); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source, @@ -1274,7 +1281,7 @@ impl Runtime { res }) .collect(); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source, @@ -1297,7 +1304,7 @@ impl Runtime { respond_to.bright_yellow_bold(), resp.bright_blue_bold(), ); - senders.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?; + endpoints.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?; } } debug!("processed all cli notifications"); @@ -1383,7 +1390,7 @@ impl Runtime { fn get_secret( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -1395,7 +1402,7 @@ impl Runtime { self.pending_requests .insert(req_id.clone(), (request, source)); let wallet_token = GetKeys(self.wallet_token.clone(), req_id); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, ServiceId::Wallet, diff --git a/src/peerd/runtime.rs b/src/peerd/runtime.rs index daf4e72d5..229aea8a2 100644 --- a/src/peerd/runtime.rs +++ b/src/peerd/runtime.rs @@ -12,6 +12,7 @@ // along with this software. // If not, see . +use crate::service::Endpoints; use std::time::{Duration, SystemTime}; use std::{collections::HashMap, sync::Arc}; use std::{rc::Rc, thread::spawn}; @@ -21,7 +22,7 @@ use bitcoin::secp256k1::rand::{self, Rng}; use bitcoin::secp256k1::PublicKey; use internet2::{addr::InetSocketAddr, CreateUnmarshaller, Unmarshall, Unmarshaller}; use internet2::{presentation, transport, zmqsocket, NodeAddr, TypedEnum, ZmqType, ZMQ_CONTEXT}; -use lnp::{message, Messages}; +use lnp::{message, Message}; use microservices::esb::{self, Handler}; use microservices::node::TryService; use microservices::peer::{self, PeerConnection, PeerSender, SendMessage}; @@ -111,7 +112,7 @@ impl esb::Handler for BridgeHandler { fn handle( &mut self, - _senders: &mut Endpoints, + _endpoints: &mut Endpoints, _bus: ServiceBus, _addr: ServiceId, request: Request, @@ -121,7 +122,11 @@ impl esb::Handler for BridgeHandler { Ok(()) } - fn handle_err(&mut self, err: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: &mut Endpoints, + err: esb::Error, + ) -> Result<(), esb::Error> { // We simply propagate the error since it's already being reported Err(err) } @@ -215,23 +220,19 @@ impl CtlServer for Runtime {} impl esb::Handler for Runtime { type Request = Request; - type Address = ServiceId; type Error = Error; fn identity(&self) -> ServiceId { self.identity.clone() } - fn on_ready( - &mut self, - _senders: &mut esb::SenderList, - ) -> Result<(), Error> { + fn on_ready(&mut self, _endpoints: &mut Endpoints) -> Result<(), Error> { if self.connect { info!( "{} with the remote peer", "Initializing connection".bright_blue_bold() ); - // self.send_ctl(senders, ServiceId::Wallet, request::PeerSecret) + // self.send_ctl(endpoints, ServiceId::Wallet, request::PeerSecret) // self.sender.send_message(Messages::Init(message::Init { // global_features: none!(), @@ -247,19 +248,23 @@ impl esb::Handler for Runtime { fn handle( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, bus: ServiceBus, source: ServiceId, request: Request, ) -> Result<(), Self::Error> { match bus { - ServiceBus::Msg => self.handle_rpc_msg(senders, source, request), - ServiceBus::Ctl => self.handle_rpc_ctl(senders, source, request), - ServiceBus::Bridge => self.handle_bridge(senders, source, request), + ServiceBus::Msg => self.handle_rpc_msg(endpoints, source, request), + ServiceBus::Ctl => self.handle_rpc_ctl(endpoints, source, request), + ServiceBus::Bridge => self.handle_bridge(endpoints, source, request), } } - fn handle_err(&mut self, _: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: &mut Endpoints, + _: esb::Error, + ) -> Result<(), esb::Error> { // We do nothing and do not propagate error; it's already being reported // with `error!` macro by the controller. If we propagate error here // this will make whole daemon panic @@ -271,7 +276,7 @@ impl Runtime { /// send messages over the bridge fn handle_rpc_msg( &mut self, - _senders: &mut esb::SenderList, + _endpoints: &mut Endpoints, _source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -311,7 +316,7 @@ impl Runtime { fn handle_rpc_ctl( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -348,7 +353,7 @@ impl Runtime { connected: !self.connect, awaits_pong: self.awaited_pong.is_some(), }; - self.send_ctl(senders, source, Request::PeerInfo(info))?; + self.send_ctl(endpoints, source, Request::PeerInfo(info))?; } _ => { @@ -361,7 +366,7 @@ impl Runtime { /// receive messages arriving over the bridge fn handle_bridge( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, _source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -397,7 +402,7 @@ impl Runtime { // swap initiation message Request::Protocol(Msg::TakerCommit(_)) => { - senders.send_to( + endpoints.send_to( ServiceBus::Msg, self.identity(), ServiceId::Farcasterd, @@ -405,7 +410,7 @@ impl Runtime { )?; } Request::Protocol(msg) => { - senders.send_to( + endpoints.send_to( ServiceBus::Msg, self.identity(), ServiceId::Swap(msg.swap_id()), @@ -414,7 +419,7 @@ impl Runtime { } // } // Request::PeerMessage(Messages::OpenChannel(_)) => { - // senders.send_to( + // endpoints.send_to( // ServiceBus::Msg, // self.identity(), // ServiceId::Farcasterd, @@ -425,14 +430,14 @@ impl Runtime { // Request::PeerMessage(Messages::AcceptChannel(accept_channel)) => { // let channeld: ServiceId = accept_channel.temporary_channel_id.into(); // self.routing.insert(channeld.clone(), channeld.clone()); - // senders.send_to(ServiceBus::Msg, self.identity(), channeld, request)?; + // endpoints.send_to(ServiceBus::Msg, self.identity(), channeld, request)?; // } // Request::PeerMessage(Messages::FundingCreated(message::FundingCreated { // temporary_channel_id, // .. // })) => { - // senders.send_to( + // endpoints.send_to( // ServiceBus::Msg, // self.identity(), // temporary_channel_id.clone().into(), @@ -464,7 +469,7 @@ impl Runtime { // message::UpdateFailMalformedHtlc { channel_id, .. }, // )) => { // let channeld: ServiceId = channel_id.clone().into(); - // senders.send_to( + // endpoints.send_to( // ServiceBus::Msg, // self.identity(), // self.routing.get(&channeld).cloned().unwrap_or(channeld), diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 6642a18bd..04cdd18e9 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -12,6 +12,7 @@ // along with this software. // If not, see . +use crate::service::Endpoints; use std::convert::TryInto; use std::thread::sleep; use std::time::Duration; @@ -140,7 +141,6 @@ pub struct Handler { impl esb::Handler for Handler { type Request = Request; - type Address = ServiceId; type Error = Error; fn identity(&self) -> ServiceId { @@ -149,7 +149,7 @@ impl esb::Handler for Handler { fn handle( &mut self, - _senders: &mut esb::SenderList, + _endpoints: &mut Endpoints, _bus: ServiceBus, _addr: ServiceId, _request: Request, @@ -158,7 +158,11 @@ impl esb::Handler for Handler { Ok(()) } - fn handle_err(&mut self, err: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: Endpoints, + err: esb::Error, + ) -> Result<(), esb::Error> { // We simply propagate the error since it's already being reported Err(err) } diff --git a/src/service.rs b/src/service.rs index 3e33f9af9..b11c0cfc2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -190,19 +190,21 @@ where } else { None }; - let esb = esb::Controller::with( - map! { - ServiceBus::Msg => esb::BusConfig::with_locator( - config.msg_endpoint.try_into() - .expect("Only ZMQ RPC is currently supported"), - router.clone() - ), - ServiceBus::Ctl => esb::BusConfig::with_locator( - config.ctl_endpoint.try_into() + let services = map! { + ServiceBus::Msg => esb::BusConfig::with_locator( + config.msg_endpoint.try_into() .expect("Only ZMQ RPC is currently supported"), - router - ) - }, + router.clone() + ), + ServiceBus::Ctl => esb::BusConfig::with_locator( + config.ctl_endpoint.try_into() + .expect("Only ZMQ RPC is currently supported"), + router + ) + }; + + let esb = esb::Controller::with( + services, runtime, if broker { ZmqType::RouterBind @@ -256,7 +258,7 @@ where } } -pub type Senders = esb::EndpointList; +pub type Endpoints = esb::EndpointList; pub trait TryToServiceId { fn try_to_service_id(&self) -> Option; @@ -282,12 +284,12 @@ impl TryToServiceId for Option { pub trait CtlServer where - Self: esb::Handler, + Self: esb::Handler, esb::Error: From, { fn report_success_to( &mut self, - senders: &mut Senders, + senders: &mut Endpoints, dest: impl TryToServiceId, msg: Option, ) -> Result<(), Error> { @@ -304,7 +306,7 @@ where fn report_progress_to( &mut self, - senders: &mut Senders, + senders: &mut Endpoints, dest: impl TryToServiceId, msg: impl ToString, ) -> Result<(), Error> { @@ -321,7 +323,7 @@ where fn report_failure_to( &mut self, - senders: &mut Senders, + senders: &mut Endpoints, dest: impl TryToServiceId, failure: impl Into, ) -> Error { @@ -340,7 +342,7 @@ where fn send_ctl( &mut self, - senders: &mut Senders, + senders: &mut Endpoints, dest: impl TryToServiceId, request: Request, ) -> Result<(), Error> { @@ -353,7 +355,7 @@ where fn send_wallet( &mut self, bus: ServiceBus, - senders: &mut Senders, + senders: &mut Endpoints, request: Request, ) -> Result<(), Error> { let source = self.identity(); diff --git a/src/swapd/runtime.rs b/src/swapd/runtime.rs index b341e8df5..1d36f6c4f 100644 --- a/src/swapd/runtime.rs +++ b/src/swapd/runtime.rs @@ -13,6 +13,7 @@ // along with this software. // If not, see . +use crate::service::Endpoints; use crate::{ rpc::request::Outcome, rpc::request::{BitcoinFundingInfo, FundingInfo, MoneroFundingInfo}, @@ -37,7 +38,7 @@ use crate::rpc::{ request::{self, Msg}, Request, ServiceBus, }; -use crate::{CtlServer, Error, LogStyle, Senders, Service, ServiceConfig, ServiceId}; +use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId}; use bitcoin::{consensus::Encodable, secp256k1}; use bitcoin::{ hashes::{hex::FromHex, sha256, Hash, HashEngine}, @@ -689,7 +690,6 @@ impl CtlServer for Runtime {} impl esb::Handler for Runtime { type Request = Request; - type Address = ServiceId; type Error = Error; fn identity(&self) -> ServiceId { @@ -698,19 +698,23 @@ impl esb::Handler for Runtime { fn handle( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, bus: ServiceBus, source: ServiceId, request: Request, ) -> Result<(), Self::Error> { match bus { - ServiceBus::Msg => self.handle_rpc_msg(senders, source, request), - ServiceBus::Ctl => self.handle_rpc_ctl(senders, source, request), + ServiceBus::Msg => self.handle_rpc_msg(endpoints, source, request), + ServiceBus::Ctl => self.handle_rpc_ctl(endpoints, source, request), _ => Err(Error::NotSupported(ServiceBus::Bridge, request.get_type())), } } - fn handle_err(&mut self, _: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: Endpoints, + _: esb::Error, + ) -> Result<(), esb::Error> { // We do nothing and do not propagate error; it's already being reported // with `error!` macro by the controller. If we propagate error here // this will make whole daemon panic @@ -971,9 +975,9 @@ impl SyncerState { } } impl Runtime { - fn send_peer(&self, senders: &mut Senders, msg: request::Msg) -> Result<(), Error> { + fn send_peer(&self, endpoints: &mut Endpoints, msg: request::Msg) -> Result<(), Error> { trace!("sending peer message {}", msg.bright_yellow_bold()); - senders.send_to( + endpoints.send_to( ServiceBus::Msg, self.identity(), self.peer_service.clone(), // = ServiceId::Loopback @@ -991,7 +995,7 @@ impl Runtime { } } - fn state_update(&mut self, senders: &mut Senders, next_state: State) -> Result<(), Error> { + fn state_update(&mut self, endpoints: &mut Endpoints, next_state: State) -> Result<(), Error> { let msg = format!( "State transition: {} -> {}", self.state.bright_white_bold(), @@ -999,7 +1003,7 @@ impl Runtime { ); info!("{} | {}", self.swap_id.bright_blue_italic(), &msg); self.state = next_state; - self.report_success_to(senders, self.enquirer.clone(), Some(msg))?; + self.report_success_to(endpoints, self.enquirer.clone(), Some(msg))?; Ok(()) } @@ -1007,7 +1011,7 @@ impl Runtime { &mut self, tx: bitcoin::Transaction, tx_label: TxLabel, - senders: &mut Senders, + endpoints: &mut Endpoints, ) -> Result<(), Error> { let req = Request::SyncerTask(Task::BroadcastTransaction(BroadcastTransaction { id: self.syncer_state.tasks.new_taskid(), @@ -1020,7 +1024,7 @@ impl Runtime { tx_label.bright_white_bold(), tx.txid().bright_yellow_italic() ); - Ok(senders.send_to( + Ok(endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -1030,7 +1034,7 @@ impl Runtime { fn handle_rpc_msg( &mut self, - senders: &mut Senders, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -1074,7 +1078,7 @@ impl Runtime { .syncer_state .watch_addr_btc(addr.script_pubkey(), txlabel); self.send_ctl( - senders, + endpoints, self.syncer_state.bitcoin_syncer(), Request::SyncerTask(task), )?; @@ -1086,7 +1090,7 @@ impl Runtime { id: self.syncer_state.tasks.new_taskid(), lifetime: self.syncer_state.task_lifetime(Coin::Bitcoin), }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -1098,13 +1102,13 @@ impl Runtime { id: self.syncer_state.tasks.new_taskid(), lifetime: self.syncer_state.task_lifetime(Coin::Monero), }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(watch_height_monero), )?; - self.send_wallet(msg_bus, senders, request)?; + self.send_wallet(msg_bus, endpoints, request)?; } Msg::TakerCommit(_) => { unreachable!( @@ -1141,7 +1145,7 @@ impl Runtime { &ServiceId::Wallet, &ServiceBus::Msg ); - self.send_wallet(msg_bus, senders, request)? + self.send_wallet(msg_bus, endpoints, request)? } } } @@ -1177,7 +1181,7 @@ impl Runtime { &ServiceId::Wallet, &ServiceBus::Msg ); - self.send_wallet(msg_bus, senders, request)? + self.send_wallet(msg_bus, endpoints, request)? } SwapRole::Bob => { // sending this request will initialize the @@ -1220,7 +1224,7 @@ impl Runtime { self.syncer_state.awaiting_funding = true; if let Some(enquirer) = self.enquirer.clone() { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), enquirer, @@ -1244,7 +1248,7 @@ impl Runtime { .syncer_state .watch_addr_btc(addr.script_pubkey(), txlabel); self.send_ctl( - senders, + endpoints, self.syncer_state.bitcoin_syncer(), Request::SyncerTask(watch_addr_task), )?; @@ -1255,7 +1259,7 @@ impl Runtime { id: self.syncer_state.tasks.new_taskid(), lifetime: self.syncer_state.task_lifetime(Coin::Bitcoin), }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -1267,7 +1271,7 @@ impl Runtime { id: self.syncer_state.tasks.new_taskid(), lifetime: self.syncer_state.task_lifetime(Coin::Monero), }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -1295,7 +1299,7 @@ impl Runtime { let txid = tx.txid(); if !self.syncer_state.is_watched_tx(&tx_label) { let task = self.syncer_state.watch_tx_btc(txid, tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -1306,11 +1310,11 @@ impl Runtime { self.syncer_state.tasks.txids.insert(TxLabel::Refund, txid); } } - self.send_wallet(msg_bus, senders, request)?; + self.send_wallet(msg_bus, endpoints, request)?; } // bob receives, alice sends Msg::RefundProcedureSignatures(_) if self.state.b_core_arb() => { - self.send_wallet(msg_bus, senders, request)?; + self.send_wallet(msg_bus, endpoints, request)?; } // alice receives, bob sends Msg::BuyProcedureSignature(BuyProcedureSignature { buy, .. }) @@ -1322,14 +1326,14 @@ impl Runtime { if !self.syncer_state.is_watched_tx(&tx_label) { let txid = buy.clone().extract_tx().txid(); let task = self.syncer_state.watch_tx_btc(txid, tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), Request::SyncerTask(task), )?; } - self.send_wallet(msg_bus, senders, request)? + self.send_wallet(msg_bus, endpoints, request)? } // bob and alice @@ -1352,7 +1356,7 @@ impl Runtime { fn handle_rpc_ctl( &mut self, - senders: &mut Senders, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -1459,10 +1463,10 @@ impl Runtime { self.maker_peer = Some(addr.clone()); } let local_commit = - self.taker_commit(senders, local_params.clone()) + self.taker_commit(endpoints, local_params.clone()) .map_err(|err| { self.report_failure_to( - senders, + endpoints, &report_to, microservices::rpc::Failure { code: 0, // TODO: Create error type system @@ -1486,8 +1490,8 @@ impl Runtime { public_offer, swap_id, }; - self.send_peer(senders, Msg::TakerCommit(take_swap))?; - self.state_update(senders, next_state)?; + self.send_peer(endpoints, Msg::TakerCommit(take_swap))?; + self.state_update(endpoints, next_state)?; } Request::Protocol(Msg::Reveal(reveal)) if self.state.commit() && self.state.remote_commit().is_some() => @@ -1498,13 +1502,13 @@ impl Runtime { .expect("commit state has local_params"); let reveal_proof = Msg::Reveal(reveal); let swap_id = reveal_proof.swap_id(); - self.send_peer(senders, reveal_proof)?; + self.send_peer(endpoints, reveal_proof)?; trace!("sent reveal_proof to peerd"); let reveal_params: Reveal = (swap_id, local_params.clone()).into(); - self.send_peer(senders, Msg::Reveal(reveal_params))?; + self.send_peer(endpoints, Msg::Reveal(reveal_params))?; trace!("sent reveal_proof to peerd"); let next_state = self.state.clone().sup_commit_to_reveal(); - self.state_update(senders, next_state)?; + self.state_update(endpoints, next_state)?; } Request::MakeSwap(InitSwap { @@ -1522,10 +1526,10 @@ impl Runtime { self.enquirer = report_to.clone(); self.local_params = Some(local_params.clone()); let local_commit = self - .maker_commit(senders, &peerd, swap_id, &local_params) + .maker_commit(endpoints, &peerd, swap_id, &local_params) .map_err(|err| { self.report_failure_to( - senders, + endpoints, &report_to, microservices::rpc::Failure { code: 0, // TODO: Create error type system @@ -1541,8 +1545,8 @@ impl Runtime { ); trace!("sending peer MakerCommit msg {}", &local_commit); - self.send_peer(senders, Msg::MakerCommit(local_commit))?; - self.state_update(senders, next_state)?; + self.send_peer(endpoints, Msg::MakerCommit(local_commit))?; + self.state_update(endpoints, next_state)?; } Request::FundingUpdated if source == ServiceId::Wallet @@ -1586,7 +1590,7 @@ impl Runtime { &dest_proof, &bus_id_proof ); - senders.send_to(bus_id_proof, self.identity(), dest_proof, request_proof)? + endpoints.send_to(bus_id_proof, self.identity(), dest_proof, request_proof)? } else { error!("Not the expected request: found {:?}", request); } @@ -1605,7 +1609,7 @@ impl Runtime { &dest_parameters, &bus_id_parameters ); - senders.send_to( + endpoints.send_to( bus_id_parameters, self.identity(), dest_parameters, @@ -1642,7 +1646,7 @@ impl Runtime { let txlabel = TxLabel::AccLock; if !self.syncer_state.is_watched_tx(&txlabel) { if self.syncer_state.awaiting_funding { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -1651,7 +1655,7 @@ impl Runtime { self.syncer_state.awaiting_funding = false; } let task = self.syncer_state.watch_tx_xmr(hash.clone(), txlabel); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -1663,7 +1667,7 @@ impl Runtime { task_target: TaskTarget::TaskId(*id), respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -1693,7 +1697,7 @@ impl Runtime { if !self.syncer_state.is_watched_tx(&tx_label) { let watch_tx = self.syncer_state.watch_tx_xmr(hash.clone(), tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -1705,7 +1709,7 @@ impl Runtime { task_target: TaskTarget::TaskId(*id), respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -1746,7 +1750,7 @@ impl Runtime { self.swap_id.bright_blue_italic(), self.syncer_state.monero_height.bright_white_bold() ); - senders.send_to(bus_id, self.identity(), dest, request)?; + endpoints.send_to(bus_id, self.identity(), dest, request)?; } else { error!( "Not the sweep task {} or not Ctl bus found {}", @@ -1781,11 +1785,11 @@ impl Runtime { if let (Request::Protocol(Msg::BuyProcedureSignature(_)), ServiceBus::Msg) = (&request, &bus_id) { - senders.send_to(bus_id, self.identity(), dest, request)?; + endpoints.send_to(bus_id, self.identity(), dest, request)?; debug!("sent buyproceduresignature at state {}", &self.state); let next_state = State::Bob(BobState::BuySigB(BuySigB { buy_tx_seen: false })); - self.state_update(senders, next_state)?; + self.state_update(endpoints, next_state)?; } else { error!( "Not buyproceduresignatures {} or not Msg bus found {}", @@ -1837,7 +1841,7 @@ impl Runtime { self.syncer_state.awaiting_funding = false; match self.state.swap_role() { SwapRole::Alice => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -1845,7 +1849,7 @@ impl Runtime { )?; } SwapRole::Bob => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -1858,13 +1862,13 @@ impl Runtime { task_target: TaskTarget::AllTasks, respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(abort_all.clone()), )?; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -1872,13 +1876,13 @@ impl Runtime { )?; let success = if self.state.b_buy_sig() { self.state_update( - senders, + endpoints, State::Bob(BobState::FinishB(Outcome::Buy)), )?; Some(Outcome::Buy) } else if self.state.a_refund_seen() { self.state_update( - senders, + endpoints, State::Alice(AliceState::FinishA(Outcome::Refund)), )?; Some(Outcome::Refund) @@ -1888,8 +1892,8 @@ impl Runtime { }; if let Some(success) = success { let swap_success_req = Request::SwapOutcome(success); - self.send_ctl(senders, ServiceId::Wallet, swap_success_req.clone())?; - self.send_ctl(senders, ServiceId::Farcasterd, swap_success_req)?; + self.send_ctl(endpoints, ServiceId::Wallet, swap_success_req.clone())?; + self.send_ctl(endpoints, ServiceId::Farcasterd, swap_success_req)?; // remove txs to invalidate outdated states self.txs.remove(&TxLabel::Cancel); self.txs.remove(&TxLabel::Refund); @@ -1926,7 +1930,7 @@ impl Runtime { log_tx_seen(self.swap_id, txlabel, &tx.txid()); if self.syncer_state.awaiting_funding { self.syncer_state.awaiting_funding = false; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -1934,7 +1938,7 @@ impl Runtime { )?; } let req = Request::Tx(Tx::Funding(tx)); - self.send_wallet(ServiceBus::Ctl, senders, req)?; + self.send_wallet(ServiceBus::Ctl, endpoints, req)?; } txlabel => { @@ -1962,7 +1966,7 @@ impl Runtime { log_tx_seen(self.swap_id, &txlabel, &tx.txid()); self.state.b_sup_buysig_buy_tx_seen(); let req = Request::Tx(Tx::Buy(tx.clone())); - self.send_wallet(ServiceBus::Ctl, senders, req)? + self.send_wallet(ServiceBus::Ctl, endpoints, req)? } TxLabel::Buy => { warn!( @@ -1981,7 +1985,7 @@ impl Runtime { { log_tx_seen(self.swap_id, &txlabel, &tx.txid()); let req = Request::Tx(Tx::Refund(tx.clone())); - self.send_wallet(ServiceBus::Ctl, senders, req)? + self.send_wallet(ServiceBus::Ctl, endpoints, req)? } txlabel => { error!( @@ -1998,7 +2002,7 @@ impl Runtime { let (_tx_label, task) = self.syncer_state.tasks.retrieving_txs.get(id).unwrap(); std::thread::sleep(core::time::Duration::from_millis(500)); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2070,7 +2074,7 @@ impl Runtime { ); self.syncer_state.awaiting_funding = true; if let Some(enquirer) = self.enquirer.clone() { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), enquirer, @@ -2081,7 +2085,7 @@ impl Runtime { if !self.syncer_state.is_watched_addr(&txlabel) { let watch_addr_task = self.syncer_state.watch_addr_xmr(spend, view, txlabel); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -2099,7 +2103,7 @@ impl Runtime { { let (tx_label, cancel_tx) = self.txs.remove_entry(&TxLabel::Cancel).unwrap(); - self.broadcast(cancel_tx, tx_label, senders)? + self.broadcast(cancel_tx, tx_label, endpoints)? } TxLabel::Lock if self.temporal_safety.safe_buy(*confirmations) @@ -2113,7 +2117,7 @@ impl Runtime { if let Some((txlabel, buy_tx)) = self.txs.remove_entry(&TxLabel::Buy) { - self.broadcast(buy_tx, txlabel, senders)?; + self.broadcast(buy_tx, txlabel, endpoints)?; self.state = State::Alice(AliceState::RefundSigA(RefundSigA { buy_published: true, xmr_locked, @@ -2141,7 +2145,7 @@ impl Runtime { self.swap_id.bright_blue_italic() ); self.syncer_state.awaiting_funding = false; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -2164,7 +2168,7 @@ impl Runtime { if !self.syncer_state.is_watched_tx(&tx_label) { let txid = punish_tx.clone().txid(); let task = self.syncer_state.watch_tx_btc(txid, tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2172,7 +2176,7 @@ impl Runtime { )?; } - self.broadcast(punish_tx, tx_label, senders)?; + self.broadcast(punish_tx, tx_label, endpoints)?; } TxLabel::Cancel @@ -2183,7 +2187,7 @@ impl Runtime { trace!("here Bob publishes refund tx"); let (tx_label, refund_tx) = self.txs.remove_entry(&TxLabel::Refund).unwrap(); - self.broadcast(refund_tx, tx_label, senders)?; + self.broadcast(refund_tx, tx_label, endpoints)?; } TxLabel::Cancel if (self.state.swap_role() == SwapRole::Alice @@ -2194,7 +2198,7 @@ impl Runtime { self.swap_id.bright_blue_italic() ); if self.syncer_state.awaiting_funding { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), ServiceId::Farcasterd, @@ -2203,20 +2207,20 @@ impl Runtime { self.syncer_state.awaiting_funding = false; } self.state_update( - senders, + endpoints, State::Alice(AliceState::FinishA(Outcome::Refund)), )?; let abort_all = Task::Abort(Abort { task_target: TaskTarget::AllTasks, respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(abort_all.clone()), )?; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2225,10 +2229,10 @@ impl Runtime { let swap_success_req = Request::SwapOutcome(Outcome::Refund); self.send_wallet( ServiceBus::Ctl, - senders, + endpoints, swap_success_req.clone(), )?; - self.send_ctl(senders, ServiceId::Farcasterd, swap_success_req)?; + self.send_ctl(endpoints, ServiceId::Farcasterd, swap_success_req)?; self.txs.remove(&TxLabel::Buy); self.txs.remove(&TxLabel::Cancel); self.txs.remove(&TxLabel::Punish); @@ -2241,20 +2245,20 @@ impl Runtime { // FIXME: swap ends here for alice // wallet + farcaster self.state_update( - senders, + endpoints, State::Alice(AliceState::FinishA(Outcome::Buy)), )?; let abort_all = Task::Abort(Abort { task_target: TaskTarget::AllTasks, respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(abort_all.clone()), )?; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2263,10 +2267,10 @@ impl Runtime { let swap_success_req = Request::SwapOutcome(Outcome::Buy); self.send_wallet( ServiceBus::Ctl, - senders, + endpoints, swap_success_req.clone(), )?; - self.send_ctl(senders, ServiceId::Farcasterd, swap_success_req)?; + self.send_ctl(endpoints, ServiceId::Farcasterd, swap_success_req)?; self.txs.remove(&TxLabel::Cancel); self.txs.remove(&TxLabel::Punish); } @@ -2278,7 +2282,7 @@ impl Runtime { let (txlabel, txid) = self.syncer_state.tasks.txids.remove_entry(txlabel).unwrap(); let task = self.syncer_state.retrieve_tx_btc(txid, txlabel); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2295,7 +2299,7 @@ impl Runtime { let (txlabel, txid) = self.syncer_state.tasks.txids.remove_entry(txlabel).unwrap(); let task = self.syncer_state.retrieve_tx_btc(txid, txlabel); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2308,29 +2312,29 @@ impl Runtime { task_target: TaskTarget::AllTasks, respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(abort_all.clone()), )?; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), Request::SyncerTask(abort_all), )?; self.state_update( - senders, + endpoints, State::Bob(BobState::FinishB(Outcome::Refund)), )?; let swap_success_req = Request::SwapOutcome(Outcome::Refund); self.send_ctl( - senders, + endpoints, ServiceId::Wallet, swap_success_req.clone(), )?; - self.send_ctl(senders, ServiceId::Farcasterd, swap_success_req)?; + self.send_ctl(endpoints, ServiceId::Farcasterd, swap_success_req)?; // remove txs to invalidate outdated states self.txs.remove(&TxLabel::Cancel); self.txs.remove(&TxLabel::Refund); @@ -2342,13 +2346,13 @@ impl Runtime { task_target: TaskTarget::AllTasks, respond: Boolean::False, }); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), Request::SyncerTask(abort_all.clone()), )?; - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2356,24 +2360,24 @@ impl Runtime { )?; match self.state.swap_role() { SwapRole::Alice => self.state_update( - senders, + endpoints, State::Alice(AliceState::FinishA(Outcome::Punish)), )?, SwapRole::Bob => { warn!("{}", "You were punished!".err()); self.state_update( - senders, + endpoints, State::Bob(BobState::FinishB(Outcome::Punish)), )? } } let swap_success_req = Request::SwapOutcome(Outcome::Punish); self.send_ctl( - senders, + endpoints, ServiceId::Wallet, swap_success_req.clone(), )?; - self.send_ctl(senders, ServiceId::Farcasterd, swap_success_req)?; + self.send_ctl(endpoints, ServiceId::Farcasterd, swap_success_req)?; // remove txs to invalidate outdated states self.txs.remove(&TxLabel::Cancel); self.txs.remove(&TxLabel::Refund); @@ -2431,7 +2435,7 @@ impl Runtime { if !self.syncer_state.is_watched_tx(&tx_label) { let txid = tx.clone().extract_tx().txid(); let task = self.syncer_state.watch_tx_btc(txid, tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2440,14 +2444,14 @@ impl Runtime { } } trace!("sending peer CoreArbitratingSetup msg: {}", &core_arb_setup); - self.send_peer(senders, Msg::CoreArbitratingSetup(core_arb_setup.clone()))?; + self.send_peer(endpoints, Msg::CoreArbitratingSetup(core_arb_setup.clone()))?; let next_state = State::Bob(BobState::CorearbB(core_arb_setup, false)); - self.state_update(senders, next_state)?; + self.state_update(endpoints, next_state)?; } Request::Tx(Tx::Lock(btc_lock)) if self.state.b_core_arb() => { log_tx_received(self.swap_id, TxLabel::Lock); - self.broadcast(btc_lock, TxLabel::Lock, senders)?; + self.broadcast(btc_lock, TxLabel::Lock, endpoints)?; if let (Some(Params::Bob(bob_params)), Some(Params::Alice(alice_params))) = (&self.local_params, &self.remote_params) { @@ -2456,7 +2460,7 @@ impl Runtime { let txlabel = TxLabel::AccLock; if !self.syncer_state.is_watched_addr(&txlabel) { let task = self.syncer_state.watch_addr_xmr(spend, view, txlabel); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.monero_syncer(), @@ -2494,13 +2498,13 @@ impl Runtime { match transaction { Tx::Cancel(_) | Tx::Buy(_) => { if let Some(lock_tx_confs_req) = self.syncer_state.lock_tx_confs.clone() { - self.handle_rpc_ctl(senders, source, lock_tx_confs_req)?; + self.handle_rpc_ctl(endpoints, source, lock_tx_confs_req)?; } } Tx::Refund(_) | Tx::Punish(_) => { if let Some(cancel_tx_confs_req) = self.syncer_state.cancel_tx_confs.clone() { - self.handle_rpc_ctl(senders, source, cancel_tx_confs_req)?; + self.handle_rpc_ctl(endpoints, source, cancel_tx_confs_req)?; } } _ => {} @@ -2510,7 +2514,7 @@ impl Runtime { Request::Protocol(Msg::RefundProcedureSignatures(refund_proc_sigs)) if self.state.reveal() => { - self.send_peer(senders, Msg::RefundProcedureSignatures(refund_proc_sigs))?; + self.send_peer(endpoints, Msg::RefundProcedureSignatures(refund_proc_sigs))?; trace!("sent peer RefundProcedureSignatures msg"); let next_state = State::Alice(AliceState::RefundSigA(RefundSigA { xmr_locked: false, @@ -2518,7 +2522,7 @@ impl Runtime { cancel_seen: false, refund_seen: false, })); - self.state_update(senders, next_state)?; + self.state_update(endpoints, next_state)?; } Request::Protocol(Msg::BuyProcedureSignature(ref buy_proc_sig)) @@ -2533,7 +2537,7 @@ impl Runtime { let tx_label = TxLabel::Buy; if !self.syncer_state.is_watched_tx(&tx_label) { let task = self.syncer_state.watch_tx_btc(txid, tx_label); - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), self.syncer_state.bitcoin_syncer(), @@ -2592,7 +2596,7 @@ impl Runtime { local_keys: dumb!(), remote_keys: bmap(&self.maker_peer, &dumb!()), }; - self.send_ctl(senders, source, Request::SwapInfo(info))?; + self.send_ctl(endpoints, source, Request::SwapInfo(info))?; } _ => { @@ -2607,7 +2611,7 @@ impl Runtime { impl Runtime { pub fn taker_commit( &mut self, - senders: &mut Senders, + endpoints: &mut Endpoints, params: Params, ) -> Result { let msg = format!( @@ -2628,14 +2632,14 @@ impl Runtime { // Ignoring possible reporting errors here and after: do not want to // halt the swap just because the client disconnected let enquirer = self.enquirer.clone(); - let _ = self.report_progress_to(senders, &enquirer, msg); + let _ = self.report_progress_to(endpoints, &enquirer, msg); Ok(commitment) } pub fn maker_commit( &mut self, - senders: &mut Senders, + endpoints: &mut Endpoints, peerd: &ServiceId, swap_id: SwapId, params: &Params, @@ -2651,7 +2655,7 @@ impl Runtime { // Ignoring possible reporting errors here and after: do not want to // halt the channel just because the client disconnected let enquirer = self.enquirer.clone(); - let _ = self.report_progress_to(senders, &enquirer, msg); + let _ = self.report_progress_to(endpoints, &enquirer, msg); let engine = CommitmentEngine; let commitment = match params.clone() { @@ -2669,7 +2673,7 @@ impl Runtime { swap_id.bright_green_italic(), peerd.bright_green_italic() ); - let _ = self.report_success_to(senders, &enquirer, Some(msg)); + let _ = self.report_success_to(endpoints, &enquirer, Some(msg)); Ok(commitment) } } diff --git a/src/syncerd/runtime.rs b/src/syncerd/runtime.rs index b5515245b..11ec46e64 100644 --- a/src/syncerd/runtime.rs +++ b/src/syncerd/runtime.rs @@ -12,6 +12,7 @@ // along with this software. // If not, see . +use crate::service::Endpoints; use crate::syncerd::bitcoin_syncer::BitcoinSyncer; use crate::syncerd::monero_syncer::MoneroSyncer; use crate::syncerd::opts::{Coin, Opts}; @@ -108,7 +109,6 @@ pub struct Runtime { impl esb::Handler for Runtime { type Request = Request; - type Address = ServiceId; type Error = Error; fn identity(&self) -> ServiceId { @@ -117,19 +117,23 @@ impl esb::Handler for Runtime { fn handle( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, bus: ServiceBus, source: ServiceId, request: Request, ) -> Result<(), Self::Error> { match bus { - ServiceBus::Msg => self.handle_rpc_msg(senders, source, request), - ServiceBus::Ctl => self.handle_rpc_ctl(senders, source, request), - ServiceBus::Bridge => self.handle_bridge(senders, source, request), + ServiceBus::Msg => self.handle_rpc_msg(endpoints, source, request), + ServiceBus::Ctl => self.handle_rpc_ctl(endpoints, source, request), + ServiceBus::Bridge => self.handle_bridge(endpoints, source, request), } } - fn handle_err(&mut self, _: esb::Error) -> Result<(), esb::Error> { + fn handle_err( + &mut self, + _: Endpoints, + _: esb::Error, + ) -> Result<(), esb::Error> { // We do nothing and do not propagate error; it's already being reported // with `error!` macro by the controller. If we propagate error here // this will make whole daemon panic @@ -140,7 +144,7 @@ impl esb::Handler for Runtime { impl Runtime { fn handle_rpc_msg( &mut self, - _senders: &mut esb::SenderList, + _endpoints: &mut Endpoints, _source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -158,7 +162,7 @@ impl Runtime { } fn handle_rpc_ctl( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, source: ServiceId, request: Request, ) -> Result<(), Error> { @@ -182,7 +186,7 @@ impl Runtime { }; } (Request::GetInfo, _) => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source, @@ -201,7 +205,7 @@ impl Runtime { } (Request::ListTasks, ServiceId::Client(_)) => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), source.clone(), @@ -224,21 +228,21 @@ impl Runtime { } if let Some((Some(respond_to), resp)) = notify_cli { - senders.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?; + endpoints.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?; } Ok(()) } fn handle_bridge( &mut self, - senders: &mut esb::SenderList, + endpoints: &mut Endpoints, _source: ServiceId, request: Request, ) -> Result<(), Error> { debug!("Syncerd BRIDGE RPC request: {}", request); match request { Request::SyncerdBridgeEvent(syncerd_bridge_event) => { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), syncerd_bridge_event.source,