Skip to content

Commit

Permalink
Resolve new endpoint type
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Feb 21, 2022
1 parent a56945a commit 4cbdcd8
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 207 deletions.
1 change: 0 additions & 1 deletion src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::{
use std::{str::FromStr, thread::sleep};

use internet2::{NodeAddr, RemoteSocketAddr, ToNodeAddr};
use lnp::{message, LIGHTNING_P2P_DEFAULT_PORT};
use microservices::shell::Exec;

use farcaster_core::{
Expand Down
79 changes: 43 additions & 36 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::{
BitcoinAddress, BitcoinFundingInfo, FundingInfo, Keys, LaunchSwap, MoneroAddress,
MoneroFundingInfo, Outcome, PubOffer, RequestId, Reveal, Token,
},
service::Endpoints,
swapd::get_swap_id,
syncerd::opts::Coin,
walletd::NodeSecrets,
Senders,
};
use amplify::Wrapper;
use clap::IntoApp;
Expand Down Expand Up @@ -239,7 +239,6 @@ impl Stats {

impl esb::Handler<ServiceBus> for Runtime {
type Request = Request;
type Address = ServiceId;
type Error = Error;

fn identity(&self) -> ServiceId {
Expand All @@ -248,19 +247,23 @@ impl esb::Handler<ServiceBus> for Runtime {

fn handle(
&mut self,
senders: &mut esb::SenderList<ServiceBus, ServiceId>,
endpoints: &mut Endpoints,
bus: ServiceBus,
source: ServiceId,
request: Request,
) -> Result<(), Self::Error> {
match bus {
ServiceBus::Msg => self.handle_rpc_msg(senders, source, request),
ServiceBus::Ctl => self.handle_rpc_ctl(senders, source, request),
ServiceBus::Msg => self.handle_rpc_msg(endpoints, source, request),
ServiceBus::Ctl => self.handle_rpc_ctl(endpoints, source, request),
_ => Err(Error::NotSupported(ServiceBus::Bridge, request.get_type())),
}
}

fn handle_err(&mut self, _: esb::Error<ServiceId>) -> Result<(), esb::Error<ServiceId>> {
fn handle_err(
&mut self,
_: Endpoints,
_: esb::Error<ServiceId>,
) -> Result<(), esb::Error<ServiceId>> {
// We do nothing and do not propagate error; it's already being reported
// with `error!` macro by the controller. If we propagate error here
// this will make whole daemon panic
Expand All @@ -272,10 +275,10 @@ impl Runtime {
fn clean_up_after_swap(
&mut self,
swapid: &SwapId,
senders: &mut esb::SenderList<ServiceBus, ServiceId>,
endpoints: &mut Endpoints,
) -> Result<(), Error> {
if self.running_swaps.remove(swapid) {
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
ServiceId::Swap(*swapid),
Expand Down Expand Up @@ -315,7 +318,7 @@ impl Runtime {
});

if self.connections.remove(&connectionid) {
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
identity.clone(),
ServiceId::Peer(connectionid),
Expand All @@ -336,7 +339,7 @@ impl Runtime {
} else {
let service_id = ServiceId::Syncer(coin, network);
info!("Terminating {}", service_id);
if senders
if endpoints
.send_to(
ServiceBus::Ctl,
identity.clone(),
Expand Down Expand Up @@ -367,8 +370,12 @@ impl Runtime {
self.consumed_offers.contains_key(offerid)
}

fn _send_walletd(&self, senders: &mut Senders, message: request::Request) -> Result<(), Error> {
senders.send_to(ServiceBus::Ctl, self.identity(), ServiceId::Wallet, message)?;
fn _send_walletd(
&self,
endpoints: &mut Endpoints,
message: request::Request,
) -> Result<(), Error> {
endpoints.send_to(ServiceBus::Ctl, self.identity(), ServiceId::Wallet, message)?;
Ok(())
}
fn node_ids(&self) -> Vec<PublicKey> {
Expand All @@ -391,7 +398,7 @@ impl Runtime {
}
fn handle_rpc_msg(
&mut self,
senders: &mut esb::SenderList<ServiceBus, ServiceId>,
endpoints: &mut Endpoints,
source: ServiceId,
request: Request,
) -> Result<(), Error> {
Expand Down Expand Up @@ -423,7 +430,7 @@ impl Runtime {
if let Some(arb_addr) = self.arb_addrs.remove(&public_offer.id()) {
let btc_addr_req =
Request::BitcoinAddress(BitcoinAddress(*swap_id, arb_addr));
senders.send_to(
endpoints.send_to(
ServiceBus::Msg,
self.identity(),
ServiceId::Wallet,
Expand All @@ -435,7 +442,7 @@ impl Runtime {
if let Some(acc_addr) = self.acc_addrs.remove(&public_offer.id()) {
let xmr_addr_req =
Request::MoneroAddress(MoneroAddress(*swap_id, acc_addr));
senders.send_to(
endpoints.send_to(
ServiceBus::Msg,
self.identity(),
ServiceId::Wallet,
Expand All @@ -448,7 +455,7 @@ impl Runtime {
self.peerd_ids
.insert(public_offer.offer.id(), source.clone());

senders.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?;
endpoints.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?;
}
return Ok(());
}
Expand All @@ -462,7 +469,7 @@ impl Runtime {

fn handle_rpc_ctl(
&mut self,
senders: &mut esb::SenderList<ServiceBus, ServiceId>,
endpoints: &mut Endpoints,
source: ServiceId,
request: Request,
) -> Result<(), Error> {
Expand Down Expand Up @@ -570,7 +577,7 @@ impl Runtime {
&self.config,
)?;
// FIXME msgs should go to walletd?
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source.clone(),
Expand Down Expand Up @@ -617,7 +624,7 @@ impl Runtime {
&self.config,
)?;
// FIXME msgs should go to walletd?
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source.clone(),
Expand All @@ -641,7 +648,7 @@ impl Runtime {

Request::SwapOutcome(success) => {
let swapid = get_swap_id(&source)?;
self.clean_up_after_swap(&swapid, senders)?;
self.clean_up_after_swap(&swapid, endpoints)?;
self.stats.incr_outcome(&success);
match success {
Outcome::Buy => {
Expand Down Expand Up @@ -746,14 +753,14 @@ impl Runtime {
}?;
trace!("Procede executing pending request");
// recurse with request containing key
self.handle_rpc_ctl(senders, source, req)?
self.handle_rpc_ctl(endpoints, source, req)?
} else {
error!("Received unexpected peer keys");
}
}

Request::GetInfo => {
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd, // source
source, // destination
Expand All @@ -776,7 +783,7 @@ impl Runtime {
}

Request::ListPeers => {
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd, // source
source, // destination
Expand All @@ -785,7 +792,7 @@ impl Runtime {
}

Request::ListSwaps => {
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd, // source
source, // destination
Expand All @@ -801,7 +808,7 @@ impl Runtime {
.filter(|k| !self.consumed_offers_contains(&k.offer.id()))
.cloned()
.collect();
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd, // source
source, // destination
Expand All @@ -810,7 +817,7 @@ impl Runtime {
}

// Request::ListOfferIds => {
// senders.send_to(
// endpoints.send_to(
// ServiceBus::Ctl,
// ServiceId::Farcasterd, // source
// source, // destination
Expand All @@ -824,7 +831,7 @@ impl Runtime {
.values()
.map(|listen| listen.to_url_string()),
);
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd, // source
source, // destination
Expand Down Expand Up @@ -854,7 +861,7 @@ impl Runtime {
let resp = match (bindaddr, peer_secret_key, peer_public_key) {
(None, None, None) => {
trace!("Push MakeOffer to pending_requests and requesting a secret from Wallet");
return self.get_secret(senders, source, request);
return self.get_secret(endpoints, source, request);
}
(None, Some(sk), Some(pk)) => {
self.listens.insert(offer.id(), bind_addr);
Expand Down Expand Up @@ -967,7 +974,7 @@ impl Runtime {
// Connect
let peer_connected_is_ok =
match (self.connections.contains(&peer), peer_secret_key) {
(false, None) => return self.get_secret(senders, source, request),
(false, None) => return self.get_secret(endpoints, source, request),
(false, Some(sk)) => {
trace!(
"{} to remote peer {}",
Expand Down Expand Up @@ -1019,7 +1026,7 @@ impl Runtime {
internal_address,
peer_secret_key: None,
});
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
ServiceId::Wallet,
Expand Down Expand Up @@ -1063,7 +1070,7 @@ impl Runtime {
} else {
s!("Unknown swapd")
};
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source,
Expand Down Expand Up @@ -1245,7 +1252,7 @@ impl Runtime {
res
})
.collect();
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source,
Expand Down Expand Up @@ -1274,7 +1281,7 @@ impl Runtime {
res
})
.collect();
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source,
Expand All @@ -1297,7 +1304,7 @@ impl Runtime {
respond_to.bright_yellow_bold(),
resp.bright_blue_bold(),
);
senders.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?;
endpoints.send_to(ServiceBus::Ctl, self.identity(), respond_to, resp)?;
}
}
debug!("processed all cli notifications");
Expand Down Expand Up @@ -1383,7 +1390,7 @@ impl Runtime {

fn get_secret(
&mut self,
senders: &mut esb::SenderList<ServiceBus, ServiceId>,
endpoints: &mut Endpoints,
source: ServiceId,
request: Request,
) -> Result<(), Error> {
Expand All @@ -1395,7 +1402,7 @@ impl Runtime {
self.pending_requests
.insert(req_id.clone(), (request, source));
let wallet_token = GetKeys(self.wallet_token.clone(), req_id);
senders.send_to(
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
ServiceId::Wallet,
Expand Down
Loading

0 comments on commit 4cbdcd8

Please sign in to comment.