Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect peerd if connection is dropped during running swap #426

Merged
merged 32 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
712c638
Swapd: Correct call ordering in Msg::Reveal handling
TheCharlatan May 4, 2022
811b267
Reconnect peerd if the connection is dropped
TheCharlatan May 5, 2022
9e0f12e
Peerd: Add hacked-in message to exchange identity before starting pee…
TheCharlatan May 6, 2022
49a4e22
Functional swap test: Add test for peerd reconnection
TheCharlatan May 6, 2022
2676677
Add sequencediagram for reconnect procedure
TheCharlatan May 10, 2022
ddad77e
Peerd: Disclaim the critical role of the identity
TheCharlatan May 10, 2022
6416a62
Taker reconnect: Move reconnect procedure to peerd Runtime
TheCharlatan May 12, 2022
0090a0f
Peerd Runtime: Remove commented code and replace TODO with comment
TheCharlatan May 12, 2022
e2199fb
Service: rename to match the name of the wrapped function
TheCharlatan May 12, 2022
94121ef
Update sequence reconnect sequencediagram
TheCharlatan May 12, 2022
65dd343
Peerd: Trigger reconnect routine also on failed send
TheCharlatan May 12, 2022
ee87e5c
Swap functional test: Change peerd kill heuristic
TheCharlatan May 12, 2022
bcecc27
Swap test: Fix lint
TheCharlatan May 12, 2022
03feb6c
Swap test: Remove needless clone
TheCharlatan May 12, 2022
76a9457
Update reconnect sequence diagram
TheCharlatan May 12, 2022
3517cde
Peerd: Remove unneeded runtime arguments
TheCharlatan May 12, 2022
033130a
rename peerd::Runtime::connect to forked_from_listener
Lederstrumpf May 15, 2022
47a446b
Peerd: Handle ping failing in receiver runtime
TheCharlatan May 15, 2022
0f6fcfe
Peerd: Handle all connection errors through receiver runtime termination
TheCharlatan May 16, 2022
257eb67
Peerd: Report receiver runtime errors to log.
TheCharlatan May 16, 2022
8dbd688
Peerd: Log bridge sending error
TheCharlatan May 16, 2022
f99900b
Rebase fixups
TheCharlatan May 27, 2022
d30b28f
Correct connect/bind zmq socket ordering
TheCharlatan May 28, 2022
34574ec
Improve peerd error reporting
TheCharlatan May 29, 2022
9b8e0cf
Peerd: set zmq immediate=true and linger=0
TheCharlatan May 30, 2022
7bf7081
Peerd: change zmq PAIR sockets to PUSH/PULL
TheCharlatan May 31, 2022
0e97200
Improve peer reconnect logging and error handling
TheCharlatan May 31, 2022
5b3a422
Service: s/add_service_bus/add_bridge_service_bus/g
TheCharlatan May 31, 2022
c6c5919
Farcasterd: Correct peerd termination comment
TheCharlatan May 31, 2022
f222148
Peerd: Log error on unxpected pong
TheCharlatan Jun 3, 2022
2c940f3
Farcasterd: Simplify find_map into pure find
TheCharlatan Jun 3, 2022
b41a750
Peerd: Attempt reconnect in while loop
TheCharlatan Jun 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/reconnect_sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 24 additions & 0 deletions doc/reconnect_sequencediagram.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
title Peer Reconnect Procedure

participant t_peerd_run
participant t_peerd_recv
participant m_peerd_recv
participant m_peerd_run
participant m_farcasterd
participant m_swapd

==Peer reconnect procedure==
m_swapd -> m_farcasterd : Ctl PeerdUnreachable
m_farcasterd -> m_peerd_run : Ctl Terminate
m_peerd_run -> m_farcasterd : Ctl PeerdTerminated
m_peerd_recv -> m_peerd_recv: terminate
m_peerd_run -> m_peerd_run: terminate
t_peerd_recv -> t_peerd_run : Ctl PeerdListenerRuntimeShutdown
t_peerd_recv -> t_peerd_recv: terminate
==Taker peerd restarts connection, connects to maker forked peerd==
m_peerd_run -> m_peerd_run : forked from listener
t_peerd_run -> m_peerd_run : NodeId (raw)
t_peerd_run -> t_peerd_recv: spawn
m_peerd_run -> m_farcasterd: Ctl Hello
m_farcasterd -> m_swapd: Ctl PeerdReconnected

55 changes: 19 additions & 36 deletions src/bin/peerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ use std::net::TcpListener;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;

use bitcoin::secp256k1::PublicKey;
use farcaster_node::peerd::{self, Opts};
use farcaster_node::LogStyle;
use farcaster_node::ServiceConfig;
use internet2::{session, FramingProtocol, NodeAddr, RemoteNodeAddr, RemoteSocketAddr};
use internet2::{session, FramingProtocol, RemoteNodeAddr, RemoteSocketAddr};
use microservices::peer::PeerConnection;

/*
Expand Down Expand Up @@ -177,26 +176,23 @@ fn main() {
*/

let local_node = opts.peer_key_opts.local_node();
let local_id = local_node.node_id();
info!(
"{}: {}",
"Local node id".bright_green_bold(),
local_id.bright_yellow_bold()
local_node.node_id().bright_yellow_bold()
);

let peer_socket = PeerSocket::from(opts.clone());
debug!("Peer socket parameter interpreted as {}", peer_socket);

let internal_id: NodeAddr;
let mut local_socket: Option<InetSocketAddr> = None;
let mut remote_id: Option<PublicKey> = None;
let mut remote_socket: InetSocketAddr;
let connect: bool;
let mut remote_node_addr: Option<RemoteNodeAddr> = None;
let forked_from_listener: bool;
let connection = match peer_socket {
PeerSocket::Listen(RemoteSocketAddr::Ftcp(inet_addr)) => {
debug!("Running in LISTEN mode");

connect = false;
forked_from_listener = true;
local_socket = Some(inet_addr);

debug!("Binding TCP socket {}", inet_addr);
Expand All @@ -213,8 +209,6 @@ fn main() {
.expect("Error accepting incoming peer connection");
debug!("New connection from {}", remote_socket_addr);

remote_socket = remote_socket_addr.into();

// TODO: Support multithread mode
debug!("Forking child process");
if let ForkResult::Child = unsafe { fork().expect("Unable to fork child process") }
Expand All @@ -228,30 +222,25 @@ fn main() {
session::Raw::with_brontide(stream, local_node.private_key(), inet_addr)
.expect("Unable to establish session with the remote peer");

internal_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: opts.peer_key_opts.internal_node().node_id(),
remote_addr: RemoteSocketAddr::Ftcp(remote_socket),
});
debug!(
"Session successfully established with new unique id: {}",
internal_id
"Session successfully established with {}",
remote_socket_addr
);

break PeerConnection::with(session);
}
debug!("Child forked; returning into main listener event loop");
continue;
}
}
PeerSocket::Connect(remote_node_addr) => {
debug!("Running in CONNECT mode");
PeerSocket::Connect(remote_node) => {
debug!("Peerd running in CONNECT mode");

connect = true;
internal_id = NodeAddr::Remote(remote_node_addr.clone());
remote_id = Some(remote_node_addr.node_id);
remote_socket = remote_node_addr.remote_addr.into();
forked_from_listener = false;
remote_node_addr = Some(remote_node.clone());

debug!("Connecting to {}", &remote_node_addr.addr());
PeerConnection::connect(remote_node_addr, &local_node)
debug!("Connecting to {}", &remote_node.addr());
PeerConnection::connect(remote_node, &local_node)
.expect("Unable to connect to the remote peer")
}
_ => unimplemented!(),
Expand All @@ -260,27 +249,21 @@ fn main() {
debug!("Starting runtime ...");

/* A maker / listener passes the following content
internal_id: local key and remote address
remote_id: None
remote_node_addr: none
local_socket: local inet address
remote_socket: address of the remote socket
connect: false

A taker / connecter passes the following content
internal_id: remote key and remote address
remote_id: remote peer id
remote_node_addr: full internet2 remote node address
local_socket: None
remote_socket: remote node addr
connect: true */
peerd::run(
service_config,
connection,
internal_id,
local_id,
remote_id,
remote_node_addr,
local_socket,
remote_socket,
connect,
local_node,
forked_from_listener,
)
.expect("Error running peerd runtime");

Expand Down
65 changes: 57 additions & 8 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub fn run(
listens: none!(),
started: SystemTime::now(),
connections: none!(),
report_peerd_reconnect: none!(),
running_swaps: none!(),
spawning_services: none!(),
making_swaps: none!(),
Expand Down Expand Up @@ -135,16 +136,17 @@ pub struct Runtime {
listens: HashMap<OfferId, RemoteSocketAddr>,
started: SystemTime,
connections: HashSet<NodeAddr>,
report_peerd_reconnect: HashMap<NodeAddr, ServiceId>,
running_swaps: HashSet<SwapId>,
spawning_services: HashMap<ServiceId, ServiceId>,
making_swaps: HashMap<ServiceId, (request::InitSwap, Network)>,
taking_swaps: HashMap<ServiceId, (request::InitSwap, Network)>,
public_offers: HashSet<PublicOffer<BtcXmr>>,
arb_addrs: HashMap<PublicOfferId, bitcoin::Address>,
acc_addrs: HashMap<PublicOfferId, monero::Address>,
consumed_offers: HashMap<OfferId, SwapId>,
node_ids: HashMap<OfferId, PublicKey>, // TODO is it possible? HashMap<SwapId, PublicKey>
peerd_ids: HashMap<OfferId, ServiceId>,
consumed_offers: HashMap<OfferId, (SwapId, ServiceId)>,
node_ids: HashMap<OfferId, PublicKey>, // Only populated by maker. TODO is it possible? HashMap<SwapId, PublicKey>
peerd_ids: HashMap<OfferId, ServiceId>, // Only populated by maker.
wallet_token: Token,
pending_requests: HashMap<request::RequestId, (Request, ServiceId)>,
syncer_services: HashMap<(Coin, Network), ServiceId>,
Expand Down Expand Up @@ -285,9 +287,9 @@ impl Runtime {
self.consumed_offers = self
.consumed_offers
.drain()
.filter_map(|(k, v)| {
if swapid != &v {
Some((k, v))
.filter_map(|(k, (swap_id, service_id))| {
if swapid != &swap_id {
Some((k, (swap_id, service_id)))
} else {
offerid = Some(k);
None
Expand Down Expand Up @@ -496,6 +498,17 @@ impl Runtime {
connection_id.bright_blue_italic(),
self.connections.len().bright_blue_bold()
);
if let Some(swap_service_id) =
self.report_peerd_reconnect.remove(connection_id)
{
debug!("Letting {} know of peer reconnection.", swap_service_id);
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
swap_service_id,
Request::PeerdReconnected,
)?;
}
} else {
warn!(
"Connection {} was already registered; the service probably was relaunched",
Expand Down Expand Up @@ -694,7 +707,7 @@ impl Runtime {
);

self.consumed_offers
.insert(public_offer.offer.id(), swap_id);
.insert(public_offer.offer.id(), (swap_id, peer.clone()));
self.stats.incr_initiated();
launch_swapd(
self,
Expand All @@ -715,7 +728,7 @@ impl Runtime {
}

Request::Keys(Keys(sk, pk, id)) if self.pending_requests.contains_key(&id) => {
trace!("received peerd keys");
debug!("received peerd keys {}", sk.display_secret());
if let Some((request, source)) = self.pending_requests.remove(&id) {
// storing node_id
trace!("Received expected peer keys, injecting key in request");
Expand Down Expand Up @@ -1295,10 +1308,46 @@ impl Runtime {
"removed connection {} from farcasterd registered connections",
addr
);

// log a message if a swap running over this connection
// is not completed, and thus present in consumed_offers
let peerd_id = ServiceId::Peer(addr.clone());
if self
.consumed_offers
.iter()
.find_map(|(_, (_, service_id))| {
zkao marked this conversation as resolved.
Show resolved Hide resolved
if service_id.clone() == peerd_id {
Some(0)
} else {
None
}
})
.is_some()
{
info!("a swap is still running over the terminated peer {}, the counterparty will attempt to reconnect.", addr);
}
}
}
}

Request::PeerdUnreachable(ServiceId::Peer(addr)) => {
if self.connections.contains(&addr) {
warn!(
"Peerd {} was reported to be unreachable, attempting to
terminate to kick-off re-connect procedure, if we are
taker and the swap is still running.",
addr
);
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
ServiceId::Peer(addr.clone()),
Request::Terminate,
)?;
}
self.report_peerd_reconnect.insert(addr, source);
}

req => {
error!("Ignoring unsupported request: {}", req.err());
}
Expand Down
Loading