From a3be6c34ea3033124f91dd8dfc3f3570a4c6812c Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 29 Nov 2024 16:48:44 +0800 Subject: [PATCH 01/14] Add shadowsocks-service crate for tentacle Signed-off-by: Eval EXEC --- tentacle/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 1182704b..28174ce4 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -44,6 +44,8 @@ igd = { version = "0.15", optional = true, package = "igd-next" } #tls tokio-rustls = { version = "0.26.0", optional = true } +shadowsocks-service = { version = "1.21.2", features = ["local"]} +shadowsocks = "1.21.0" [target.'cfg(not(target_family = "wasm"))'.dependencies] # rand 0.8 not support wasm32 From 3b739645987c5cd1d3cb1cf0ef149f83bb2256a3 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sat, 30 Nov 2024 01:32:02 +0800 Subject: [PATCH 02/14] Add socks5::connect to tentacle runtime --- tentacle/src/runtime/mod.rs | 2 ++ tentacle/src/runtime/socks5.rs | 43 ++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 tentacle/src/runtime/socks5.rs diff --git a/tentacle/src/runtime/mod.rs b/tentacle/src/runtime/mod.rs index 8348ac30..aaed3fd1 100644 --- a/tentacle/src/runtime/mod.rs +++ b/tentacle/src/runtime/mod.rs @@ -56,6 +56,8 @@ mod generic_split { } } +mod socks5; + mod budget; pub use budget::*; diff --git a/tentacle/src/runtime/socks5.rs b/tentacle/src/runtime/socks5.rs new file mode 100644 index 00000000..7132e199 --- /dev/null +++ b/tentacle/src/runtime/socks5.rs @@ -0,0 +1,43 @@ +use log::trace; +use shadowsocks::relay::socks5::{ + self, Address, Command, Error, HandshakeRequest, HandshakeResponse, Reply, TcpRequestHeader, + TcpResponseHeader, +}; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpStream, ToSocketAddrs}, +}; + +pub async fn connect(addr: A, proxy: P) -> Result +where + A: Into
, + P: ToSocketAddrs, +{ + let mut s = TcpStream::connect(proxy).await?; + + // 1. Handshake + let hs = HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE]); + trace!("client connected, going to send handshake: {:?}", hs); + + hs.write_to(&mut s).await?; + + let hsp = HandshakeResponse::read_from(&mut s).await?; + + trace!("got handshake response: {:?}", hsp); + assert_eq!(hsp.chosen_method, socks5::SOCKS5_AUTH_METHOD_NONE); + + // 2. Send request header + let h = TcpRequestHeader::new(Command::TcpConnect, addr.into()); + trace!("going to connect, req: {:?}", h); + h.write_to(&mut s).await?; + + let hp = TcpResponseHeader::read_from(&mut s).await?; + + trace!("got response: {:?}", hp); + match hp.reply { + Reply::Succeeded => (), + r => return Err(Error::Reply(r)), + } + + Ok(s) +} From 70e62bf9db509786f34c9afc5f081726ca99b0de Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sat, 30 Nov 2024 01:32:26 +0800 Subject: [PATCH 03/14] Add ProxyConfig to TcpSocketConfig Signed-off-by: Eval EXEC --- tentacle/Cargo.toml | 1 + tentacle/src/builder.rs | 14 ++- .../mod.rs} | 4 +- tentacle/src/runtime/mod.rs | 2 - tentacle/src/runtime/socks5.rs | 43 ------- .../mod.rs} | 46 +++++--- tentacle/src/runtime/tokio_runtime/socks5.rs | 111 ++++++++++++++++++ tentacle/src/service.rs | 1 + tentacle/src/service/config.rs | 38 +++--- 9 files changed, 178 insertions(+), 82 deletions(-) rename tentacle/src/runtime/{async_runtime.rs => async_runtime/mod.rs} (98%) delete mode 100644 tentacle/src/runtime/socks5.rs rename tentacle/src/runtime/{tokio_runtime.rs => tokio_runtime/mod.rs} (76%) create mode 100644 tentacle/src/runtime/tokio_runtime/socks5.rs diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 28174ce4..9056d377 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -46,6 +46,7 @@ igd = { version = "0.15", optional = true, package = "igd-next" } tokio-rustls = { version = "0.26.0", optional = true } shadowsocks-service = { version = "1.21.2", features = ["local"]} shadowsocks = "1.21.0" +url = "2.5.4" [target.'cfg(not(target_family = "wasm"))'.dependencies] # rand 0.8 not support wasm32 diff --git a/tentacle/src/builder.rs b/tentacle/src/builder.rs index 7be24c65..7b7b4112 100644 --- a/tentacle/src/builder.rs +++ b/tentacle/src/builder.rs @@ -1,5 +1,6 @@ use std::{io, sync::Arc, time::Duration}; +use crate::service::config::ProxyConfig; use nohash_hasher::IntMap; use tokio_util::codec::LengthDelimitedCodec; @@ -219,7 +220,14 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.tcp = Arc::new(f); + self.config.tcp_config.tcp.tcp_socket_config = Arc::new(f); + self + } + + /// Proxy config for tcp + #[cfg(not(target_family = "wasm"))] + pub fn tcp_proxy_config(mut self, proxy_conifg: Option) -> Self { + self.config.tcp_config.tcp.proxy_config = proxy_conifg; self } @@ -230,7 +238,7 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.ws = Arc::new(f); + self.config.tcp_config.ws.tcp_socket_config = Arc::new(f); self } @@ -254,7 +262,7 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.tls = Arc::new(f); + self.config.tcp_config.tls.tcp_socket_config = Arc::new(f); self } } diff --git a/tentacle/src/runtime/async_runtime.rs b/tentacle/src/runtime/async_runtime/mod.rs similarity index 98% rename from tentacle/src/runtime/async_runtime.rs rename to tentacle/src/runtime/async_runtime/mod.rs index 143ca58a..e7e1bb33 100644 --- a/tentacle/src/runtime/async_runtime.rs +++ b/tentacle/src/runtime/async_runtime/mod.rs @@ -149,7 +149,7 @@ mod os { let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; let socket = { - let t = tcp_config(TcpSocket { inner: socket })?; + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; t.inner }; // `bind` twice will return error @@ -188,7 +188,7 @@ mod os { // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = tcp_config(TcpSocket { inner: socket })?; + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; t.inner }; diff --git a/tentacle/src/runtime/mod.rs b/tentacle/src/runtime/mod.rs index aaed3fd1..8348ac30 100644 --- a/tentacle/src/runtime/mod.rs +++ b/tentacle/src/runtime/mod.rs @@ -56,8 +56,6 @@ mod generic_split { } } -mod socks5; - mod budget; pub use budget::*; diff --git a/tentacle/src/runtime/socks5.rs b/tentacle/src/runtime/socks5.rs deleted file mode 100644 index 7132e199..00000000 --- a/tentacle/src/runtime/socks5.rs +++ /dev/null @@ -1,43 +0,0 @@ -use log::trace; -use shadowsocks::relay::socks5::{ - self, Address, Command, Error, HandshakeRequest, HandshakeResponse, Reply, TcpRequestHeader, - TcpResponseHeader, -}; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - net::{TcpStream, ToSocketAddrs}, -}; - -pub async fn connect(addr: A, proxy: P) -> Result -where - A: Into
, - P: ToSocketAddrs, -{ - let mut s = TcpStream::connect(proxy).await?; - - // 1. Handshake - let hs = HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE]); - trace!("client connected, going to send handshake: {:?}", hs); - - hs.write_to(&mut s).await?; - - let hsp = HandshakeResponse::read_from(&mut s).await?; - - trace!("got handshake response: {:?}", hsp); - assert_eq!(hsp.chosen_method, socks5::SOCKS5_AUTH_METHOD_NONE); - - // 2. Send request header - let h = TcpRequestHeader::new(Command::TcpConnect, addr.into()); - trace!("going to connect, req: {:?}", h); - h.write_to(&mut s).await?; - - let hp = TcpResponseHeader::read_from(&mut s).await?; - - trace!("got response: {:?}", hp); - match hp.reply { - Reply::Succeeded => (), - r => return Err(Error::Reply(r)), - } - - Ok(s) -} diff --git a/tentacle/src/runtime/tokio_runtime.rs b/tentacle/src/runtime/tokio_runtime/mod.rs similarity index 76% rename from tentacle/src/runtime/tokio_runtime.rs rename to tentacle/src/runtime/tokio_runtime/mod.rs index eda4dbef..e74dc4f2 100644 --- a/tentacle/src/runtime/tokio_runtime.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -1,3 +1,5 @@ +pub(crate) mod socks5; +use socks5::Socks5Config; pub use tokio::{ net::{TcpListener, TcpStream}, spawn, @@ -88,7 +90,7 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = tcp_config(TcpSocket { inner: socket })?; + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; t.inner.set_nonblocking(true)?; // safety: fd convert by socket2 unsafe { @@ -117,21 +119,31 @@ pub(crate) async fn connect( addr: SocketAddr, tcp_config: TcpSocketConfig, ) -> io::Result { - let domain = Domain::for_address(addr); - let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; - - let socket = { - let t = tcp_config(TcpSocket { inner: socket })?; - t.inner.set_nonblocking(true)?; - // safety: fd convert by socket2 - unsafe { - #[cfg(unix)] - let socket = TokioTcp::from_raw_fd(t.into_raw_fd()); - #[cfg(windows)] - let socket = TokioTcp::from_raw_socket(t.into_raw_socket()); - socket + match tcp_config.proxy_config { + Some(proxy_config) => { + let proxy_config: Socks5Config = super::socks5::parse(&proxy_config.proxy_url)?; + super::socks5::connect(addr, proxy_config) + .await + .map_err(|err| io::Error::other(err)) } - }; - - socket.connect(addr).await + None => { + let domain = Domain::for_address(addr); + let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; + + let socket = { + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + t.inner.set_nonblocking(true)?; + // safety: fd convert by socket2 + unsafe { + #[cfg(unix)] + let socket = TokioTcp::from_raw_fd(t.into_raw_fd()); + #[cfg(windows)] + let socket = TokioTcp::from_raw_socket(t.into_raw_socket()); + socket + } + }; + + socket.connect(addr).await + } + } } diff --git a/tentacle/src/runtime/tokio_runtime/socks5.rs b/tentacle/src/runtime/tokio_runtime/socks5.rs new file mode 100644 index 00000000..f73725ea --- /dev/null +++ b/tentacle/src/runtime/tokio_runtime/socks5.rs @@ -0,0 +1,111 @@ +use std::io; + +use log::debug; +use shadowsocks::relay::socks5::{ + self, Address, Command, Error as Socks5Error, HandshakeRequest, HandshakeResponse, + PasswdAuthRequest, PasswdAuthResponse, Reply, TcpRequestHeader, TcpResponseHeader, +}; +use tokio::net::TcpStream; + +#[derive(Debug)] +pub(crate) struct Socks5Config { + pub(crate) proxy_url: String, + pub(crate) auth: Option<(String, String)>, +} + +// parse proxy url like "socks5://username:password@localhost:1080" to Socks5Config +pub(crate) fn parse(proxy_url: &str) -> io::Result { + let parsed_url = url::Url::parse(proxy_url).map_err(|err| io::Error::other(err))?; + let scheme = parsed_url.scheme(); + match scheme { + "socks5" => { + let auth = match parsed_url.username() { + "" => None, + username => Some(( + username.to_string(), + parsed_url.password().unwrap_or("").to_string(), + )), + }; + let port = parsed_url.port().ok_or(io::Error::other("missing port"))?; + let proxy_url = String::new() + + parsed_url + .host_str() + .ok_or(io::Error::other("missing host"))? + + ":" + + &format!("{port}"); + Ok(Socks5Config { proxy_url, auth }) + } + _ => Err(io::Error::other(format!( + "tentacle doesn't support proxy scheme: {}", + scheme + ))), + } +} + +pub async fn connect(addr: A, socks5_config: Socks5Config) -> Result +where + A: Into
, +{ + debug!( + "client connecting proxy server: config {}, with auth: {}", + socks5_config.proxy_url, + socks5_config.auth.is_some() + ); + // destruct socks5_config + let Socks5Config { auth, proxy_url } = socks5_config; + + let mut s = TcpStream::connect(proxy_url).await?; + + // 1. Handshake + let hs = { + if auth.is_some() { + HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_PASSWORD]) + } else { + HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE]) + } + }; + debug!("client connected, going to send handshake: {:?}", hs); + + hs.write_to(&mut s).await?; + + let hsp = HandshakeResponse::read_from(&mut s).await?; + + debug!("got handshake response: {:?}", hsp); + match hsp.chosen_method { + socks5::SOCKS5_AUTH_METHOD_NONE => (), + socks5::SOCKS5_AUTH_METHOD_PASSWORD => { + if let Some((uname, passwd)) = auth { + let pr = PasswdAuthRequest::new(uname, passwd); + pr.write_to(&mut s).await?; + let prp = PasswdAuthResponse::read_from(&mut s).await?; + match Reply::from_u8(prp.status) { + Reply::Succeeded => debug!("password auth succeeded"), + r => return Err(Socks5Error::Reply(r)), + } + } else { + return Err(Socks5Error::PasswdAuthInvalidRequest); + } + } + _ => { + return Err(Socks5Error::IoError(io::Error::other(format!( + "unsupported auth method: {}", + hsp.chosen_method + )))) + } + } + + // 2. Send request header + let h = TcpRequestHeader::new(Command::TcpConnect, addr.into()); + debug!("going to connect, req: {:?}", h); + h.write_to(&mut s).await?; + + let hp = TcpResponseHeader::read_from(&mut s).await?; + + debug!("got response: {:?}", hp); + match hp.reply { + Reply::Succeeded => (), + r => return Err(Socks5Error::Reply(r)), + } + + Ok(s) +} diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index 1d917aeb..7ef9c44f 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -53,6 +53,7 @@ pub use crate::service::{ }; use bytes::Bytes; +pub use crate::service::config::ProxyConfig; #[cfg(feature = "tls")] pub use crate::service::config::TlsConfig; diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index 3e568d64..d009d980 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -86,11 +86,31 @@ impl Default for SessionConfig { } } -pub(crate) type TcpSocketConfig = - Arc Result + Send + Sync + 'static>; +/// Proxy related config +#[derive(Clone)] +pub struct ProxyConfig { + /// proxy url, like: socks5://127.0.0.1:9050 + pub proxy_url: String, +} -/// This config Allow users to set various underlying parameters of TCP #[derive(Clone)] +pub(crate) struct TcpSocketConfig { + pub(crate) tcp_socket_config: + Arc Result + Send + Sync + 'static>, + pub(crate) proxy_config: Option, +} + +impl Default for TcpSocketConfig { + fn default() -> Self { + Self { + tcp_socket_config: Arc::new(Ok), + proxy_config: None, + } + } +} + +/// This config Allow users to set various underlying parameters of TCP +#[derive(Clone, Default)] pub(crate) struct TcpConfig { /// When dial/listen on tcp, tentacle will call it allow user to set all tcp socket config pub tcp: TcpSocketConfig, @@ -102,18 +122,6 @@ pub(crate) struct TcpConfig { pub tls: TcpSocketConfig, } -impl Default for TcpConfig { - fn default() -> Self { - TcpConfig { - tcp: Arc::new(Ok), - #[cfg(feature = "ws")] - ws: Arc::new(Ok), - #[cfg(feature = "tls")] - tls: Arc::new(Ok), - } - } -} - /// A TCP socket that has not yet been converted to a `TcpStream` or /// `TcpListener`. /// From e32db26de68d1bfc2afb58c4c0fb0d6afb8b704d Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 24 Sep 2024 15:51:27 +0800 Subject: [PATCH 04/14] Support and `Onion3` Protocol Signed-off-by: Eval EXEC --- multiaddr/Cargo.toml | 3 ++ multiaddr/src/error.rs | 2 + multiaddr/src/lib.rs | 2 + multiaddr/src/onion_addr.rs | 51 +++++++++++++++++++++ multiaddr/src/protocol.rs | 88 ++++++++++++++++++++++++++++++++++++- 5 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 multiaddr/src/onion_addr.rs diff --git a/multiaddr/Cargo.toml b/multiaddr/Cargo.toml index 3edbc8b1..0169271c 100644 --- a/multiaddr/Cargo.toml +++ b/multiaddr/Cargo.toml @@ -15,6 +15,9 @@ bytes = "1.0" bs58 = "0.5.0" sha2 = "0.10.0" serde = "1" +byteorder = "1.5.0" +data-encoding = "2.6.0" +arrayref = "0.3.9" [dev-dependencies] parity-multiaddr = "0.11" diff --git a/multiaddr/src/error.rs b/multiaddr/src/error.rs index a185b2b2..87722ba4 100644 --- a/multiaddr/src/error.rs +++ b/multiaddr/src/error.rs @@ -5,6 +5,7 @@ use unsigned_varint::decode; pub enum Error { DataLessThanLen, InvalidMultiaddr, + InvalidOnion3Addr, InvalidProtocolString, InvalidUvar(decode::Error), ParsingError(Box), @@ -19,6 +20,7 @@ impl fmt::Display for Error { Error::DataLessThanLen => f.write_str("we have less data than indicated by length"), Error::InvalidMultiaddr => f.write_str("invalid multiaddr"), Error::InvalidProtocolString => f.write_str("invalid protocol string"), + Error::InvalidOnion3Addr => f.write_str("invalid onion3 address"), Error::InvalidUvar(e) => write!(f, "failed to decode unsigned varint: {}", e), Error::ParsingError(e) => write!(f, "failed to parse: {}", e), Error::UnknownHash => write!(f, "unknown hash"), diff --git a/multiaddr/src/lib.rs b/multiaddr/src/lib.rs index aea9eac7..43ff1227 100644 --- a/multiaddr/src/lib.rs +++ b/multiaddr/src/lib.rs @@ -1,8 +1,10 @@ ///! Mini Implementation of [multiaddr](https://github.com/jbenet/multiaddr) in Rust. mod error; +mod onion_addr; mod protocol; pub use self::error::Error; +pub use self::onion_addr::Onion3Addr; pub use self::protocol::Protocol; use bytes::{Bytes, BytesMut}; use serde::{ diff --git a/multiaddr/src/onion_addr.rs b/multiaddr/src/onion_addr.rs new file mode 100644 index 00000000..08cedb70 --- /dev/null +++ b/multiaddr/src/onion_addr.rs @@ -0,0 +1,51 @@ +use std::{borrow::Cow, fmt}; + +/// Represents an Onion v3 address +#[derive(Clone)] +pub struct Onion3Addr<'a>(Cow<'a, [u8; 35]>, u16); + +impl<'a> Onion3Addr<'a> { + /// Return the hash of the public key as bytes + pub fn hash(&self) -> &[u8; 35] { + self.0.as_ref() + } + + /// Return the port + pub fn port(&self) -> u16 { + self.1 + } + + /// Consume this instance and create an owned version containing the same address + pub fn acquire<'b>(self) -> Onion3Addr<'b> { + Onion3Addr(Cow::Owned(self.0.into_owned()), self.1) + } +} + +impl PartialEq for Onion3Addr<'_> { + fn eq(&self, other: &Self) -> bool { + self.1 == other.1 && self.0[..] == other.0[..] + } +} + +impl Eq for Onion3Addr<'_> {} + +impl From<([u8; 35], u16)> for Onion3Addr<'_> { + fn from(parts: ([u8; 35], u16)) -> Self { + Self(Cow::Owned(parts.0), parts.1) + } +} + +impl<'a> From<(&'a [u8; 35], u16)> for Onion3Addr<'a> { + fn from(parts: (&'a [u8; 35], u16)) -> Self { + Self(Cow::Borrowed(parts.0), parts.1) + } +} + +impl fmt::Debug for Onion3Addr<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_tuple("Onion3Addr") + .field(&format!("{:02x?}", &self.0[..])) + .field(&self.1) + .finish() + } +} diff --git a/multiaddr/src/protocol.rs b/multiaddr/src/protocol.rs index 7c2e5962..9cc24f51 100644 --- a/multiaddr/src/protocol.rs +++ b/multiaddr/src/protocol.rs @@ -1,4 +1,7 @@ +use arrayref::array_ref; +use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut}; +use data_encoding::BASE32; use std::{ borrow::Cow, fmt, @@ -7,7 +10,7 @@ use std::{ str::{self, FromStr}, }; -use crate::error::Error; +use crate::{error::Error, Onion3Addr}; const DNS4: u32 = 0x36; const DNS6: u32 = 0x37; @@ -19,6 +22,7 @@ const TLS: u32 = 0x01c0; const WS: u32 = 0x01dd; const WSS: u32 = 0x01de; const MEMORY: u32 = 0x0309; +const ONION3: u32 = 0x01bd; const SHA256_CODE: u16 = 0x12; const SHA256_SIZE: u8 = 32; @@ -37,6 +41,7 @@ pub enum Protocol<'a> { Wss, /// Contains the "port" to contact. Similar to TCP or UDP, 0 means "assign me a port". Memory(u64), + Onion3(Onion3Addr<'a>), } impl<'a> Protocol<'a> { @@ -87,6 +92,11 @@ impl<'a> Protocol<'a> { let s = iter.next().ok_or(Error::InvalidProtocolString)?; Ok(Protocol::Memory(s.parse()?)) } + "onion3" => iter + .next() + .ok_or(Error::InvalidProtocolString) + .and_then(|s| read_onion3(&s.to_uppercase())) + .map(|(a, p)| Protocol::Onion3((a, p).into())), _ => Err(Error::UnknownProtocolString), } } @@ -101,6 +111,14 @@ impl<'a> Protocol<'a> { } Ok(input.split_at(n)) } + + fn split_at(n: usize, input: &[u8]) -> Result<(&[u8], &[u8]), Error> { + if input.len() < n { + return Err(Error::DataLessThanLen); + } + Ok(input.split_at(n)) + } + let (id, input) = decode::u32(input)?; match id { DNS4 => { @@ -160,6 +178,14 @@ impl<'a> Protocol<'a> { let num = rdr.get_u64(); Ok((Protocol::Memory(num), rest)) } + ONION3 => { + let (data, rest) = split_at(37, input)?; + let port = BigEndian::read_u16(&data[35..]); + Ok(( + Protocol::Onion3((array_ref!(data, 0, 35), port).into()), + rest, + )) + } _ => Err(Error::UnknownProtocolId(id)), } } @@ -213,6 +239,11 @@ impl<'a> Protocol<'a> { w.put(encode::u32(MEMORY, &mut buf)); w.put_u64(*port) } + Protocol::Onion3(addr) => { + w.put(encode::u32(ONION3, &mut buf)); + w.put(addr.hash().as_ref()); + w.put_u16(addr.port()); + } } } @@ -229,6 +260,7 @@ impl<'a> Protocol<'a> { Protocol::Ws => Protocol::Ws, Protocol::Wss => Protocol::Wss, Protocol::Memory(a) => Protocol::Memory(a), + Protocol::Onion3(addr) => Protocol::Onion3(addr.acquire()), } } } @@ -247,6 +279,10 @@ impl<'a> fmt::Display for Protocol<'a> { Ws => write!(f, "/ws"), Wss => write!(f, "/wss"), Memory(port) => write!(f, "/memory/{}", port), + Onion3(addr) => { + let s = BASE32.encode(addr.hash()); + write!(f, "/onion3/{}:{}", s.to_lowercase(), addr.port()) + } } } } @@ -291,3 +327,53 @@ fn check_p2p(data: &[u8]) -> Result<(), Error> { } Ok(()) } + +macro_rules! read_onion_impl { + ($name:ident, $len:expr, $encoded_len:expr) => { + fn $name(s: &str) -> Result<([u8; $len], u16), Error> { + let mut parts = s.split(':'); + + // address part (without ".onion") + let b32 = parts.next().ok_or(Error::InvalidMultiaddr)?; + if b32.len() != $encoded_len { + return Err(Error::InvalidMultiaddr); + } + + // port number + let port = parts + .next() + .ok_or(Error::InvalidMultiaddr) + .and_then(|p| str::parse(p).map_err(From::from))?; + + // port == 0 is not valid for onion + if port == 0 { + return Err(Error::InvalidMultiaddr); + } + + // nothing else expected + if parts.next().is_some() { + return Err(Error::InvalidMultiaddr); + } + + if $len + != BASE32 + .decode_len(b32.len()) + .map_err(|_| Error::InvalidMultiaddr)? + { + return Err(Error::InvalidMultiaddr); + } + + let mut buf = [0u8; $len]; + BASE32 + .decode_mut(b32.as_bytes(), &mut buf) + .map_err(|_| Error::InvalidMultiaddr)?; + + Ok((buf, port)) + } + }; +} + +// Parse a version 3 onion address and return its binary representation. +// +// Format: ":" +read_onion_impl!(read_onion3, 35, 56); From cff8a06270e67207df68bf30a0997b3f6b87bd73 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 15:04:59 +0800 Subject: [PATCH 05/14] Add `TransportType::Onion` Signed-off-by: Eval EXEC --- tentacle/src/runtime/tokio_runtime/mod.rs | 18 ++++++++- tentacle/src/transports/mod.rs | 30 ++++++++++++++- tentacle/src/transports/onion.rs | 46 +++++++++++++++++++++++ tentacle/src/utils.rs | 4 ++ tentacle/src/utils/dns.rs | 4 ++ 5 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 tentacle/src/transports/onion.rs diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index e74dc4f2..5f6f854c 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod socks5; +use multiaddr::MultiAddr; use socks5::Socks5Config; pub use tokio::{ net::{TcpListener, TcpStream}, @@ -12,7 +13,7 @@ use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; use std::os::unix::io::{FromRawFd, IntoRawFd}; #[cfg(windows)] use std::os::windows::io::{FromRawSocket, IntoRawSocket}; -use std::{io, net::SocketAddr}; +use std::{io, net::SocketAddr, str::FromStr}; use tokio::net::TcpSocket as TokioTcp; #[cfg(feature = "tokio-timer")] @@ -147,3 +148,18 @@ pub(crate) async fn connect( } } } + +pub(crate) async fn connect_tor_proxy( + onion_addr: MultiAddr, + tcp_config: TcpSocketConfig, +) -> io::Result { + let proxy_config = tcp_config.proxy_config.ok_or(std::io::Error::other( + "need tor proxy server to connect to onion address", + ))?; + let socks5_config: Socks5Config = super::socks5::parse(&proxy_config.proxy_url)?; + let address = shadowsocks::relay::Address::from_str(onion_addr.to_string().as_str()) + .map_err(|err| std::io::Error::other(err))?; + super::socks5::connect(address, socks5_config) + .await + .map_err(|err| io::Error::other(err)) +} diff --git a/tentacle/src/transports/mod.rs b/tentacle/src/transports/mod.rs index 9078273c..9b3ffe59 100644 --- a/tentacle/src/transports/mod.rs +++ b/tentacle/src/transports/mod.rs @@ -16,6 +16,8 @@ mod browser; #[cfg(not(target_family = "wasm"))] mod memory; #[cfg(not(target_family = "wasm"))] +mod onion; +#[cfg(not(target_family = "wasm"))] mod tcp; #[cfg(not(target_family = "wasm"))] mod tcp_base_listen; @@ -95,6 +97,8 @@ mod os { }; use futures::{prelude::Stream, FutureExt, StreamExt}; + use multiaddr::MultiAddr; + use onion::OnionTransport; use std::{ collections::HashMap, fmt, @@ -162,7 +166,7 @@ mod os { fn listen(self, address: Multiaddr) -> Result { match find_type(&address) { - TransportType::Tcp => { + TransportType::Tcp | TransportType::Onion => { match TcpTransport::from_multi_transport(self, TcpListenMode::Tcp) .listen(address) { @@ -216,6 +220,12 @@ mod os { Err(e) => Err(e), } } + TransportType::Onion => { + match OnionTransport::new(self.timeout, self.tcp_config.tcp).dial(address) { + Ok(res) => Ok(MultiDialFuture::Tcp(res)), + Err(e) => Err(e), + } + } #[cfg(feature = "ws")] TransportType::Ws => { match WsTransport::new(self.timeout, self.tcp_config.ws).dial(address) { @@ -436,6 +446,24 @@ mod os { Ok(res) => res.map_err(Into::into), } } + + /// onion common dial realization + #[inline(always)] + pub async fn onion_dial( + onion_addr: MultiAddr, + tcp_config: TcpSocketConfig, + timeout: Duration, + ) -> Result { + match crate::runtime::timeout( + timeout, + crate::runtime::connect_tor_proxy(onion_addr, tcp_config), + ) + .await + { + Err(_) => Err(TransportErrorKind::Io(io::ErrorKind::TimedOut.into())), + Ok(res) => res.map_err(Into::into), + } + } } #[cfg(target_family = "wasm")] diff --git a/tentacle/src/transports/onion.rs b/tentacle/src/transports/onion.rs new file mode 100644 index 00000000..13ad96f8 --- /dev/null +++ b/tentacle/src/transports/onion.rs @@ -0,0 +1,46 @@ +use crate::{ + multiaddr::Multiaddr, + runtime::TcpStream, + service::config::TcpSocketConfig, + transports::{onion_dial, Result, TransportDial, TransportFuture}, +}; +use futures::future::ok; +use std::{future::Future, pin::Pin, time::Duration}; + +/// Onion connect +async fn connect( + onion_address: impl Future>, + timeout: Duration, + tcp_config: TcpSocketConfig, +) -> Result<(Multiaddr, TcpStream)> { + let onion_addr = onion_address.await?; + let stream = onion_dial(onion_addr.clone(), tcp_config, timeout).await?; + Ok((onion_addr, stream)) +} + +/// Onion transport +pub struct OnionTransport { + timeout: Duration, + tcp_config: TcpSocketConfig, +} + +impl OnionTransport { + pub fn new(timeout: Duration, tcp_config: TcpSocketConfig) -> Self { + Self { + timeout, + tcp_config, + } + } +} + +pub type OnionDialFuture = + TransportFuture> + Send>>>; + +impl TransportDial for OnionTransport { + type DialFuture = OnionDialFuture; + + fn dial(self, address: Multiaddr) -> Result { + let dial = connect(ok(address), self.timeout, self.tcp_config); + Ok(TransportFuture::new(Box::pin(dial))) + } +} diff --git a/tentacle/src/utils.rs b/tentacle/src/utils.rs index a35894ae..1e83b035 100644 --- a/tentacle/src/utils.rs +++ b/tentacle/src/utils.rs @@ -127,6 +127,8 @@ pub enum TransportType { Tls, /// Memory Memory, + /// Onion + Onion, } /// Confirm the transport used by multiaddress @@ -142,6 +144,8 @@ pub fn find_type(addr: &Multiaddr) -> TransportType { Some(TransportType::Tls) } else if let Protocol::Memory(_) = proto { Some(TransportType::Memory) + } else if let Protocol::Onion3(_) = proto { + Some(TransportType::Onion) } else { None } diff --git a/tentacle/src/utils/dns.rs b/tentacle/src/utils/dns.rs index edf39b96..d38e689b 100644 --- a/tentacle/src/utils/dns.rs +++ b/tentacle/src/utils/dns.rs @@ -1,4 +1,5 @@ use futures::FutureExt; +use log::warn; use std::{ borrow::Cow, future::Future, @@ -86,6 +87,9 @@ impl DnsResolver { } TransportType::Ws => address.push(Protocol::Ws), TransportType::Wss => address.push(Protocol::Wss), + TransportType::Onion => { + warn!("Onion transport type should not have dns resolve") + } } if let Some(peer_id) = self.peer_id.take() { From e4e74c99836489d6c1cb30a2654bca6b6776bf14 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 20:08:28 +0800 Subject: [PATCH 06/14] Extract socks5_config from tokio_runtime --- tentacle/src/runtime/mod.rs | 1 + tentacle/src/runtime/socks5_config.rs | 36 ++++++++++++++++++++ tentacle/src/runtime/tokio_runtime/mod.rs | 19 ++++++----- tentacle/src/runtime/tokio_runtime/socks5.rs | 35 +------------------ 4 files changed, 48 insertions(+), 43 deletions(-) create mode 100644 tentacle/src/runtime/socks5_config.rs diff --git a/tentacle/src/runtime/mod.rs b/tentacle/src/runtime/mod.rs index 8348ac30..52928afb 100644 --- a/tentacle/src/runtime/mod.rs +++ b/tentacle/src/runtime/mod.rs @@ -16,6 +16,7 @@ mod async_runtime; all(target_family = "wasm", feature = "wasm-timer") ))] mod generic_timer; +pub(crate) mod socks5_config; #[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))] mod tokio_runtime; #[cfg(target_family = "wasm")] diff --git a/tentacle/src/runtime/socks5_config.rs b/tentacle/src/runtime/socks5_config.rs new file mode 100644 index 00000000..71b31416 --- /dev/null +++ b/tentacle/src/runtime/socks5_config.rs @@ -0,0 +1,36 @@ +use std::io; + +#[derive(Debug)] +pub(crate) struct Socks5Config { + pub(crate) proxy_url: String, + pub(crate) auth: Option<(String, String)>, +} + +// parse proxy url like "socks5://username:password@localhost:1080" to Socks5Config +pub(crate) fn parse(proxy_url: &str) -> io::Result { + let parsed_url = url::Url::parse(proxy_url).map_err(io::Error::other)?; + let scheme = parsed_url.scheme(); + match scheme { + "socks5" => { + let auth = match parsed_url.username() { + "" => None, + username => Some(( + username.to_string(), + parsed_url.password().unwrap_or("").to_string(), + )), + }; + let port = parsed_url.port().ok_or(io::Error::other("missing port"))?; + let proxy_url = String::new() + + parsed_url + .host_str() + .ok_or(io::Error::other("missing host"))? + + ":" + + &format!("{port}"); + Ok(Socks5Config { proxy_url, auth }) + } + _ => Err(io::Error::other(format!( + "tentacle doesn't support proxy scheme: {}", + scheme + ))), + } +} diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index 5f6f854c..fa6c3e70 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -1,6 +1,6 @@ -pub(crate) mod socks5; +use super::socks5_config; +mod socks5; use multiaddr::MultiAddr; -use socks5::Socks5Config; pub use tokio::{ net::{TcpListener, TcpStream}, spawn, @@ -122,10 +122,11 @@ pub(crate) async fn connect( ) -> io::Result { match tcp_config.proxy_config { Some(proxy_config) => { - let proxy_config: Socks5Config = super::socks5::parse(&proxy_config.proxy_url)?; - super::socks5::connect(addr, proxy_config) + let proxy_config: socks5_config::Socks5Config = + socks5_config::parse(&proxy_config.proxy_url)?; + super::tokio_runtime::socks5::connect(addr, proxy_config) .await - .map_err(|err| io::Error::other(err)) + .map_err(io::Error::other) } None => { let domain = Domain::for_address(addr); @@ -156,10 +157,10 @@ pub(crate) async fn connect_tor_proxy( let proxy_config = tcp_config.proxy_config.ok_or(std::io::Error::other( "need tor proxy server to connect to onion address", ))?; - let socks5_config: Socks5Config = super::socks5::parse(&proxy_config.proxy_url)?; + let socks5_config: socks5_config::Socks5Config = socks5_config::parse(&proxy_config.proxy_url)?; let address = shadowsocks::relay::Address::from_str(onion_addr.to_string().as_str()) - .map_err(|err| std::io::Error::other(err))?; - super::socks5::connect(address, socks5_config) + .map_err(std::io::Error::other)?; + socks5::connect(address, socks5_config) .await - .map_err(|err| io::Error::other(err)) + .map_err(io::Error::other) } diff --git a/tentacle/src/runtime/tokio_runtime/socks5.rs b/tentacle/src/runtime/tokio_runtime/socks5.rs index f73725ea..25f28846 100644 --- a/tentacle/src/runtime/tokio_runtime/socks5.rs +++ b/tentacle/src/runtime/tokio_runtime/socks5.rs @@ -7,40 +7,7 @@ use shadowsocks::relay::socks5::{ }; use tokio::net::TcpStream; -#[derive(Debug)] -pub(crate) struct Socks5Config { - pub(crate) proxy_url: String, - pub(crate) auth: Option<(String, String)>, -} - -// parse proxy url like "socks5://username:password@localhost:1080" to Socks5Config -pub(crate) fn parse(proxy_url: &str) -> io::Result { - let parsed_url = url::Url::parse(proxy_url).map_err(|err| io::Error::other(err))?; - let scheme = parsed_url.scheme(); - match scheme { - "socks5" => { - let auth = match parsed_url.username() { - "" => None, - username => Some(( - username.to_string(), - parsed_url.password().unwrap_or("").to_string(), - )), - }; - let port = parsed_url.port().ok_or(io::Error::other("missing port"))?; - let proxy_url = String::new() - + parsed_url - .host_str() - .ok_or(io::Error::other("missing host"))? - + ":" - + &format!("{port}"); - Ok(Socks5Config { proxy_url, auth }) - } - _ => Err(io::Error::other(format!( - "tentacle doesn't support proxy scheme: {}", - scheme - ))), - } -} +use super::super::socks5_config::Socks5Config; pub async fn connect(addr: A, socks5_config: Socks5Config) -> Result where From 3a47039e86764768a611961f44f08e64312abb36 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 21:31:11 +0800 Subject: [PATCH 07/14] Allow tcp_config_transformer make affect to proxy connection --- tentacle/src/runtime/tokio_runtime/mod.rs | 99 +++++++++++++------- tentacle/src/runtime/tokio_runtime/socks5.rs | 12 ++- tentacle/src/service/config.rs | 6 +- tentacle/src/transports/mod.rs | 2 +- 4 files changed, 75 insertions(+), 44 deletions(-) diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index fa6c3e70..6cbce569 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -7,7 +7,10 @@ pub use tokio::{ task::{block_in_place, spawn_blocking, yield_now, JoinHandle}, }; -use crate::service::config::{TcpSocket, TcpSocketConfig}; +use crate::service::{ + config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer}, + ProxyConfig, +}; use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; #[cfg(unix)] use std::os::unix::io::{FromRawFd, IntoRawFd}; @@ -116,51 +119,75 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul socket.listen(1024) } -pub(crate) async fn connect( +async fn connect_direct( addr: SocketAddr, - tcp_config: TcpSocketConfig, + tcp_socket_transformer: TcpSocketTransformer, ) -> io::Result { - match tcp_config.proxy_config { - Some(proxy_config) => { - let proxy_config: socks5_config::Socks5Config = - socks5_config::parse(&proxy_config.proxy_url)?; - super::tokio_runtime::socks5::connect(addr, proxy_config) - .await - .map_err(io::Error::other) - } - None => { - let domain = Domain::for_address(addr); - let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; - - let socket = { - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; - t.inner.set_nonblocking(true)?; - // safety: fd convert by socket2 - unsafe { - #[cfg(unix)] - let socket = TokioTcp::from_raw_fd(t.into_raw_fd()); - #[cfg(windows)] - let socket = TokioTcp::from_raw_socket(t.into_raw_socket()); - socket - } - }; - - socket.connect(addr).await + let domain = Domain::for_address(addr); + let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; + + let socket = { + let t = tcp_socket_transformer(TcpSocket { inner: socket })?; + t.inner.set_nonblocking(true)?; + // safety: fd convert by socket2 + unsafe { + #[cfg(unix)] + let socket = TokioTcp::from_raw_fd(t.into_raw_fd()); + #[cfg(windows)] + let socket = TokioTcp::from_raw_socket(t.into_raw_socket()); + socket } + }; + + socket.connect(addr).await +} + +async fn connect_by_proxy( + target_addr: A, + tcp_socket_transformer: TcpSocketTransformer, + proxy_config: ProxyConfig, +) -> io::Result +where + A: Into, +{ + let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; + + let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; + let stream = connect_direct(dial_addr, tcp_socket_transformer).await?; + + super::tokio_runtime::socks5::establish_connection(stream, target_addr, socks5_config) + .await + .map_err(io::Error::other) +} + +pub(crate) async fn connect( + target_addr: SocketAddr, + tcp_config: TcpSocketConfig, +) -> io::Result { + let TcpSocketConfig { + tcp_socket_config, + proxy_config, + } = tcp_config; + + match proxy_config { + Some(proxy_config) => connect_by_proxy(target_addr, tcp_socket_config, proxy_config).await, + None => connect_direct(target_addr, tcp_socket_config).await, } } -pub(crate) async fn connect_tor_proxy( +pub(crate) async fn connect_onion( onion_addr: MultiAddr, tcp_config: TcpSocketConfig, ) -> io::Result { - let proxy_config = tcp_config.proxy_config.ok_or(std::io::Error::other( + let TcpSocketConfig { + tcp_socket_config, + proxy_config, + } = tcp_config; + let proxy_config = proxy_config.ok_or(io::Error::other( "need tor proxy server to connect to onion address", ))?; - let socks5_config: socks5_config::Socks5Config = socks5_config::parse(&proxy_config.proxy_url)?; - let address = shadowsocks::relay::Address::from_str(onion_addr.to_string().as_str()) + let onion_address = shadowsocks::relay::Address::from_str(onion_addr.to_string().as_str()) .map_err(std::io::Error::other)?; - socks5::connect(address, socks5_config) - .await - .map_err(io::Error::other) + + connect_by_proxy(onion_address, tcp_socket_config, proxy_config).await } diff --git a/tentacle/src/runtime/tokio_runtime/socks5.rs b/tentacle/src/runtime/tokio_runtime/socks5.rs index 25f28846..d374609a 100644 --- a/tentacle/src/runtime/tokio_runtime/socks5.rs +++ b/tentacle/src/runtime/tokio_runtime/socks5.rs @@ -9,7 +9,11 @@ use tokio::net::TcpStream; use super::super::socks5_config::Socks5Config; -pub async fn connect(addr: A, socks5_config: Socks5Config) -> Result +pub async fn establish_connection( + mut s: TcpStream, + target_addr: A, + socks5_config: Socks5Config, +) -> Result where A: Into
, { @@ -19,9 +23,7 @@ where socks5_config.auth.is_some() ); // destruct socks5_config - let Socks5Config { auth, proxy_url } = socks5_config; - - let mut s = TcpStream::connect(proxy_url).await?; + let Socks5Config { auth, proxy_url: _ } = socks5_config; // 1. Handshake let hs = { @@ -62,7 +64,7 @@ where } // 2. Send request header - let h = TcpRequestHeader::new(Command::TcpConnect, addr.into()); + let h = TcpRequestHeader::new(Command::TcpConnect, target_addr.into()); debug!("going to connect, req: {:?}", h); h.write_to(&mut s).await?; diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index d009d980..f3ce52f9 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -93,10 +93,12 @@ pub struct ProxyConfig { pub proxy_url: String, } +pub(crate) type TcpSocketTransformer = + Arc Result + Send + Sync + 'static>; #[derive(Clone)] + pub(crate) struct TcpSocketConfig { - pub(crate) tcp_socket_config: - Arc Result + Send + Sync + 'static>, + pub(crate) tcp_socket_config: TcpSocketTransformer, pub(crate) proxy_config: Option, } diff --git a/tentacle/src/transports/mod.rs b/tentacle/src/transports/mod.rs index 9b3ffe59..52bf63e2 100644 --- a/tentacle/src/transports/mod.rs +++ b/tentacle/src/transports/mod.rs @@ -456,7 +456,7 @@ mod os { ) -> Result { match crate::runtime::timeout( timeout, - crate::runtime::connect_tor_proxy(onion_addr, tcp_config), + crate::runtime::connect_onion(onion_addr, tcp_config), ) .await { From bdc08ce28fcbddd8ec0b3fbe09da230c57b440ff Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 21:37:37 +0800 Subject: [PATCH 08/14] Extract async_runtime's inner os and os::time crate Signed-off-by: Eval EXEC --- tentacle/src/runtime/async_runtime/mod.rs | 278 +-------------------- tentacle/src/runtime/async_runtime/os.rs | 209 ++++++++++++++++ tentacle/src/runtime/async_runtime/time.rs | 62 +++++ 3 files changed, 272 insertions(+), 277 deletions(-) create mode 100644 tentacle/src/runtime/async_runtime/os.rs create mode 100644 tentacle/src/runtime/async_runtime/time.rs diff --git a/tentacle/src/runtime/async_runtime/mod.rs b/tentacle/src/runtime/async_runtime/mod.rs index e7e1bb33..599b0e3c 100644 --- a/tentacle/src/runtime/async_runtime/mod.rs +++ b/tentacle/src/runtime/async_runtime/mod.rs @@ -12,280 +12,4 @@ where pub use os::*; #[cfg(not(target_family = "wasm"))] -mod os { - use crate::{ - runtime::CompatStream2, - service::config::{TcpSocket, TcpSocketConfig}, - }; - use async_io::Async; - use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs}; - use futures::{ - channel::{ - mpsc::{channel, Receiver}, - oneshot::{self, Sender}, - }, - future::select, - FutureExt, SinkExt, StreamExt, - }; - use std::{ - pin::Pin, - task::{Context, Poll}, - }; - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - - #[derive(Debug)] - pub struct TcpListener { - /// Why does this need to be handled here? - /// - /// https://www.driftluo.com/article/9e85ea7c-219a-4b25-ab32-e66c5d3027d0 - /// - /// Not only because of idempotent operation reasons, at the same time, - /// after the task is registered to the event monitor, the relationship between - /// the task and the corresponding waker needs to be ensured. If the task is dropped - /// immediately after registration, the waker cannot wake up the corresponding task. - /// - /// Since the async-std api was designed without leaving the corresponding poll interface, - /// this will force users to ensure that they are used in an async environment - recv: Receiver>, - local_addr: SocketAddr, - _close_sender: Sender<()>, - } - - impl TcpListener { - fn new(listener: AsyncListener, local_addr: SocketAddr) -> TcpListener { - let (mut tx, rx) = channel(24); - let (tx_c, rx_c) = oneshot::channel::<()>(); - let task = async move { - loop { - let res = listener.accept().await; - let _ignore = tx.send(res).await; - } - } - .boxed(); - crate::runtime::spawn(select(task, rx_c)); - TcpListener { - recv: rx, - local_addr, - _close_sender: tx_c, - } - } - - pub async fn bind(addrs: A) -> io::Result { - let listener = AsyncListener::bind(addrs).await?; - let local_addr = listener.local_addr()?; - Ok(Self::new(listener, local_addr)) - } - - pub fn local_addr(&self) -> io::Result { - Ok(self.local_addr) - } - - pub fn poll_accept( - &mut self, - cx: &mut Context, - ) -> Poll> { - match self.recv.poll_next_unpin(cx) { - Poll::Ready(Some(res)) => { - Poll::Ready(res.map(|x| (TcpStream(CompatStream2::new(x.0)), x.1))) - } - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())), - } - } - } - - #[derive(Debug)] - pub struct TcpStream(CompatStream2); - - impl TcpStream { - pub fn peer_addr(&self) -> io::Result { - self.0.get_ref().peer_addr() - } - - pub async fn peek(&self, buf: &mut [u8]) -> io::Result { - self.0.get_ref().peek(buf).await - } - } - - impl AsyncRead for TcpStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf) - } - } - - impl AsyncWrite for TcpStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - #[inline] - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } - } - - #[cfg(feature = "async-timer")] - pub use async_std::future::timeout; - #[cfg(feature = "async-timer")] - pub use time::*; - - use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; - use std::{io, net::SocketAddr}; - - pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Result { - let domain = Domain::for_address(addr); - let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; - - let socket = { - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; - t.inner - }; - // `bind` twice will return error - // - // code 22 means: - // EINVAL The socket is already bound to an address. - // ref: https://man7.org/linux/man-pages/man2/bind.2.html - if let Err(e) = socket.bind(&addr.into()) { - if Some(22) != e.raw_os_error() { - return Err(e); - } - } - socket.listen(1024)?; - - let listen = std::net::TcpListener::from(socket); - let addr = listen.local_addr()?; - Ok(TcpListener::new(AsyncListener::from(listen), addr)) - } - - pub(crate) async fn connect( - addr: SocketAddr, - tcp_config: TcpSocketConfig, - ) -> io::Result { - let domain = Domain::for_address(addr); - let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; - - let socket = { - // On platforms with Berkeley-derived sockets, this allows to quickly - // rebind a socket, without needing to wait for the OS to clean up the - // previous one. - // - // On Windows, this allows rebinding sockets which are actively in use, - // which allows “socket hijacking”, so we explicitly don't set it here. - // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse - // - // user can disable it on tcp_config - #[cfg(not(windows))] - socket.set_reuse_address(true)?; - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; - t.inner - }; - - // Begin async connect and ignore the inevitable "in progress" error. - socket.set_nonblocking(true)?; - socket.connect(&addr.into()).or_else(|err| { - // Check for EINPROGRESS on Unix and WSAEWOULDBLOCK on Windows. - #[cfg(unix)] - let in_progress = err.raw_os_error() == Some(libc::EINPROGRESS); - #[cfg(windows)] - let in_progress = err.kind() == io::ErrorKind::WouldBlock; - - // If connect results with an "in progress" error, that's not an error. - if in_progress { - Ok(()) - } else { - Err(err) - } - })?; - let stream = Async::new(std::net::TcpStream::from(socket))?; - - // The stream becomes writable when connected. - stream.writable().await?; - - // Check if there was an error while connecting. - match stream.get_ref().take_error()? { - None => { - let tcp = stream.into_inner().unwrap(); - Ok(TcpStream(CompatStream2::new(AsyncStream::from(tcp)))) - } - Some(err) => Err(err), - } - } - - #[cfg(feature = "async-timer")] - mod time { - use async_io::Timer; - use futures::{Future, Stream}; - use std::{ - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, - }; - - pub struct Delay(Timer); - - impl Delay { - pub fn new(duration: Duration) -> Self { - Delay(Timer::after(duration)) - } - } - - impl Future for Delay { - type Output = Instant; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx) - } - } - - pub fn delay_for(duration: Duration) -> Delay { - Delay::new(duration) - } - - pub struct Interval { - delay: Delay, - period: Duration, - } - - impl Interval { - fn new(period: Duration) -> Self { - Self { - delay: Delay::new(period), - period, - } - } - } - - impl Stream for Interval { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => { - let dur = self.period; - self.delay.0.set_after(dur); - Poll::Ready(Some(())) - } - Poll::Pending => Poll::Pending, - } - } - } - - pub fn interval(period: Duration) -> Interval { - assert!(period > Duration::new(0, 0), "`period` must be non-zero."); - - Interval::new(period) - } - } -} +mod os; diff --git a/tentacle/src/runtime/async_runtime/os.rs b/tentacle/src/runtime/async_runtime/os.rs new file mode 100644 index 00000000..3c3bbe58 --- /dev/null +++ b/tentacle/src/runtime/async_runtime/os.rs @@ -0,0 +1,209 @@ +use crate::{ + runtime::CompatStream2, + service::config::{TcpSocket, TcpSocketConfig}, +}; +use async_io::Async; +use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs}; +use futures::{ + channel::{ + mpsc::{channel, Receiver}, + oneshot::{self, Sender}, + }, + future::select, + FutureExt, SinkExt, StreamExt, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +#[cfg(feature = "async-timer")] +mod time; + +#[derive(Debug)] +pub struct TcpListener { + /// Why does this need to be handled here? + /// + /// https://www.driftluo.com/article/9e85ea7c-219a-4b25-ab32-e66c5d3027d0 + /// + /// Not only because of idempotent operation reasons, at the same time, + /// after the task is registered to the event monitor, the relationship between + /// the task and the corresponding waker needs to be ensured. If the task is dropped + /// immediately after registration, the waker cannot wake up the corresponding task. + /// + /// Since the async-std api was designed without leaving the corresponding poll interface, + /// this will force users to ensure that they are used in an async environment + recv: Receiver>, + local_addr: SocketAddr, + _close_sender: Sender<()>, +} + +impl TcpListener { + fn new(listener: AsyncListener, local_addr: SocketAddr) -> TcpListener { + let (mut tx, rx) = channel(24); + let (tx_c, rx_c) = oneshot::channel::<()>(); + let task = async move { + loop { + let res = listener.accept().await; + let _ignore = tx.send(res).await; + } + } + .boxed(); + crate::runtime::spawn(select(task, rx_c)); + TcpListener { + recv: rx, + local_addr, + _close_sender: tx_c, + } + } + + pub async fn bind(addrs: A) -> io::Result { + let listener = AsyncListener::bind(addrs).await?; + let local_addr = listener.local_addr()?; + Ok(Self::new(listener, local_addr)) + } + + pub fn local_addr(&self) -> io::Result { + Ok(self.local_addr) + } + + pub fn poll_accept(&mut self, cx: &mut Context) -> Poll> { + match self.recv.poll_next_unpin(cx) { + Poll::Ready(Some(res)) => { + Poll::Ready(res.map(|x| (TcpStream(CompatStream2::new(x.0)), x.1))) + } + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())), + } + } +} + +#[derive(Debug)] +pub struct TcpStream(CompatStream2); + +impl TcpStream { + pub fn peer_addr(&self) -> io::Result { + self.0.get_ref().peer_addr() + } + + pub async fn peek(&self, buf: &mut [u8]) -> io::Result { + self.0.get_ref().peek(buf).await + } +} + +impl AsyncRead for TcpStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + #[inline] + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + +#[cfg(feature = "async-timer")] +pub use async_std::future::timeout; +#[cfg(feature = "async-timer")] +pub use time::*; + +use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; +use std::{io, net::SocketAddr}; + +pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Result { + let domain = Domain::for_address(addr); + let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; + + let socket = { + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + t.inner + }; + // `bind` twice will return error + // + // code 22 means: + // EINVAL The socket is already bound to an address. + // ref: https://man7.org/linux/man-pages/man2/bind.2.html + if let Err(e) = socket.bind(&addr.into()) { + if Some(22) != e.raw_os_error() { + return Err(e); + } + } + socket.listen(1024)?; + + let listen = std::net::TcpListener::from(socket); + let addr = listen.local_addr()?; + Ok(TcpListener::new(AsyncListener::from(listen), addr)) +} + +pub(crate) async fn connect( + addr: SocketAddr, + tcp_config: TcpSocketConfig, +) -> io::Result { + let domain = Domain::for_address(addr); + let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; + + let socket = { + // On platforms with Berkeley-derived sockets, this allows to quickly + // rebind a socket, without needing to wait for the OS to clean up the + // previous one. + // + // On Windows, this allows rebinding sockets which are actively in use, + // which allows “socket hijacking”, so we explicitly don't set it here. + // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse + // + // user can disable it on tcp_config + #[cfg(not(windows))] + socket.set_reuse_address(true)?; + let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + t.inner + }; + + // Begin async connect and ignore the inevitable "in progress" error. + socket.set_nonblocking(true)?; + socket.connect(&addr.into()).or_else(|err| { + // Check for EINPROGRESS on Unix and WSAEWOULDBLOCK on Windows. + #[cfg(unix)] + let in_progress = err.raw_os_error() == Some(libc::EINPROGRESS); + #[cfg(windows)] + let in_progress = err.kind() == io::ErrorKind::WouldBlock; + + // If connect results with an "in progress" error, that's not an error. + if in_progress { + Ok(()) + } else { + Err(err) + } + })?; + let stream = Async::new(std::net::TcpStream::from(socket))?; + + // The stream becomes writable when connected. + stream.writable().await?; + + // Check if there was an error while connecting. + match stream.get_ref().take_error()? { + None => { + let tcp = stream.into_inner().unwrap(); + Ok(TcpStream(CompatStream2::new(AsyncStream::from(tcp)))) + } + Some(err) => Err(err), + } +} diff --git a/tentacle/src/runtime/async_runtime/time.rs b/tentacle/src/runtime/async_runtime/time.rs new file mode 100644 index 00000000..3fcbd3b8 --- /dev/null +++ b/tentacle/src/runtime/async_runtime/time.rs @@ -0,0 +1,62 @@ +use async_io::Timer; +use futures::{Future, Stream}; +use std::{ + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +pub struct Delay(Timer); + +impl Delay { + pub fn new(duration: Duration) -> Self { + Delay(Timer::after(duration)) + } +} + +impl Future for Delay { + type Output = Instant; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } +} + +pub fn delay_for(duration: Duration) -> Delay { + Delay::new(duration) +} + +pub struct Interval { + delay: Delay, + period: Duration, +} + +impl Interval { + fn new(period: Duration) -> Self { + Self { + delay: Delay::new(period), + period, + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => { + let dur = self.period; + self.delay.0.set_after(dur); + Poll::Ready(Some(())) + } + Poll::Pending => Poll::Pending, + } + } +} + +pub fn interval(period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + + Interval::new(period) +} From 3b72031b6d894a86269f50ce12658d78eca0e084 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 21:47:27 +0800 Subject: [PATCH 09/14] Implement connect_by_proxy and connect_onion for async_runtime Signed-off-by: Eval EXEC --- tentacle/src/runtime/async_runtime/os.rs | 33 ++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/tentacle/src/runtime/async_runtime/os.rs b/tentacle/src/runtime/async_runtime/os.rs index 3c3bbe58..c58a6aed 100644 --- a/tentacle/src/runtime/async_runtime/os.rs +++ b/tentacle/src/runtime/async_runtime/os.rs @@ -1,6 +1,9 @@ use crate::{ - runtime::CompatStream2, - service::config::{TcpSocket, TcpSocketConfig}, + runtime::{proxy::socks5_config, CompatStream2}, + service::{ + config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer}, + ProxyConfig, + }, }; use async_io::Async; use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs}; @@ -12,6 +15,7 @@ use futures::{ future::select, FutureExt, SinkExt, StreamExt, }; +use multiaddr::MultiAddr; use std::{ pin::Pin, task::{Context, Poll}, @@ -207,3 +211,28 @@ pub(crate) async fn connect( Some(err) => Err(err), } } + +async fn connect_by_proxy( + target_addr: A, + tcp_socket_transformer: TcpSocketTransformer, + proxy_config: ProxyConfig, +) -> io::Result +where + A: Into, +{ + let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; + + let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; + let stream = connect_direct(dial_addr, tcp_socket_transformer).await?; + + crate::runtime::proxy::socks5::establish_connection(stream, target_addr, socks5_config) + .await + .map_err(io::Error::other) +} + +pub(crate) async fn connect_onion( + onion_addr: MultiAddr, + tcp_config: TcpSocketConfig, +) -> io::Result { + todo!() +} From e13e69732b7b4d28ea2369919a197f7ccbb2f109 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 18 Dec 2024 22:01:04 +0800 Subject: [PATCH 10/14] Let async_runtime and tokio_runtime reuse socks5 establish Signed-off-by: Eval EXEC --- tentacle/Cargo.toml | 3 +- .../async_runtime/{os.rs => os/mod.rs} | 51 ++++++++++++++++--- .../runtime/async_runtime/{ => os}/time.rs | 0 tentacle/src/runtime/mod.rs | 2 +- tentacle/src/runtime/proxy/mod.rs | 3 ++ .../{tokio_runtime => proxy}/socks5.rs | 11 ++-- .../src/runtime/{ => proxy}/socks5_config.rs | 0 tentacle/src/runtime/tokio_runtime/mod.rs | 22 ++++++-- 8 files changed, 72 insertions(+), 20 deletions(-) rename tentacle/src/runtime/async_runtime/{os.rs => os/mod.rs} (83%) rename tentacle/src/runtime/async_runtime/{ => os}/time.rs (100%) create mode 100644 tentacle/src/runtime/proxy/mod.rs rename tentacle/src/runtime/{tokio_runtime => proxy}/socks5.rs (91%) rename tentacle/src/runtime/{ => proxy}/socks5_config.rs (100%) diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 9056d377..574e2357 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -44,14 +44,13 @@ igd = { version = "0.15", optional = true, package = "igd-next" } #tls tokio-rustls = { version = "0.26.0", optional = true } -shadowsocks-service = { version = "1.21.2", features = ["local"]} -shadowsocks = "1.21.0" url = "2.5.4" [target.'cfg(not(target_family = "wasm"))'.dependencies] # rand 0.8 not support wasm32 rand = "0.8" socket2 = { version = "0.5.0", features = ["all"] } +shadowsocks = { version = "1.21.0", default-features = false } [target.'cfg(target_family = "wasm")'.dependencies] js-sys = "0.3" diff --git a/tentacle/src/runtime/async_runtime/os.rs b/tentacle/src/runtime/async_runtime/os/mod.rs similarity index 83% rename from tentacle/src/runtime/async_runtime/os.rs rename to tentacle/src/runtime/async_runtime/os/mod.rs index c58a6aed..9ee26bf1 100644 --- a/tentacle/src/runtime/async_runtime/os.rs +++ b/tentacle/src/runtime/async_runtime/os/mod.rs @@ -1,3 +1,5 @@ +mod time; + use crate::{ runtime::{proxy::socks5_config, CompatStream2}, service::{ @@ -18,13 +20,11 @@ use futures::{ use multiaddr::MultiAddr; use std::{ pin::Pin, + str::FromStr, task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -#[cfg(feature = "async-timer")] -mod time; - #[derive(Debug)] pub struct TcpListener { /// Why does this need to be handled here? @@ -158,9 +158,9 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul Ok(TcpListener::new(AsyncListener::from(listen), addr)) } -pub(crate) async fn connect( +async fn connect_direct( addr: SocketAddr, - tcp_config: TcpSocketConfig, + tcp_socket_transformer: TcpSocketTransformer, ) -> io::Result { let domain = Domain::for_address(addr); let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; @@ -177,7 +177,7 @@ pub(crate) async fn connect( // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + let t = tcp_socket_transformer(TcpSocket { inner: socket })?; t.inner }; @@ -230,9 +230,46 @@ where .map_err(io::Error::other) } +pub(crate) async fn connect( + addr: SocketAddr, + tcp_config: TcpSocketConfig, +) -> io::Result { + let TcpSocketConfig { + tcp_socket_config, + proxy_config, + } = tcp_config; + match proxy_config { + Some(proxy_config) => connect_by_proxy(addr, tcp_socket_config, proxy_config).await, + None => connect_direct(addr, tcp_socket_config).await, + } +} + pub(crate) async fn connect_onion( onion_addr: MultiAddr, tcp_config: TcpSocketConfig, ) -> io::Result { - todo!() + let TcpSocketConfig { + tcp_socket_config, + proxy_config, + } = tcp_config; + let proxy_config = proxy_config.ok_or(io::Error::other( + "need tor proxy server to connect to onion address", + ))?; + + let onion_protocol = onion_addr.iter().next().ok_or(io::Error::other( + "connect_onion need Protocol::Onion3 multiaddr", + ))?; + // onion_str looks like: "/onion3/wsglappcvp4y4e2ff3ubowpkoxuoaudzvmih6gc54442vfabebwf42ad:8114" + let onion_str = onion_protocol.to_string(); + // remove prefix "/onion3/", if not contains /onion3/, return error + let onion_str = onion_str + .strip_prefix("/onion3/") + .ok_or(io::Error::other(format!( + "connect_onion need Protocol::Onion3 multiaddr, but got {}", + onion_str + )))?; + let onion_address = + shadowsocks::relay::Address::from_str(onion_str).map_err(std::io::Error::other)?; + + connect_by_proxy(onion_address, tcp_socket_config, proxy_config).await } diff --git a/tentacle/src/runtime/async_runtime/time.rs b/tentacle/src/runtime/async_runtime/os/time.rs similarity index 100% rename from tentacle/src/runtime/async_runtime/time.rs rename to tentacle/src/runtime/async_runtime/os/time.rs diff --git a/tentacle/src/runtime/mod.rs b/tentacle/src/runtime/mod.rs index 52928afb..478a04d9 100644 --- a/tentacle/src/runtime/mod.rs +++ b/tentacle/src/runtime/mod.rs @@ -16,7 +16,7 @@ mod async_runtime; all(target_family = "wasm", feature = "wasm-timer") ))] mod generic_timer; -pub(crate) mod socks5_config; +pub(crate) mod proxy; #[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))] mod tokio_runtime; #[cfg(target_family = "wasm")] diff --git a/tentacle/src/runtime/proxy/mod.rs b/tentacle/src/runtime/proxy/mod.rs new file mode 100644 index 00000000..ab299fe4 --- /dev/null +++ b/tentacle/src/runtime/proxy/mod.rs @@ -0,0 +1,3 @@ +#[cfg(not(target_family = "wasm"))] +pub(crate) mod socks5; +pub(crate) mod socks5_config; diff --git a/tentacle/src/runtime/tokio_runtime/socks5.rs b/tentacle/src/runtime/proxy/socks5.rs similarity index 91% rename from tentacle/src/runtime/tokio_runtime/socks5.rs rename to tentacle/src/runtime/proxy/socks5.rs index d374609a..6ec4e381 100644 --- a/tentacle/src/runtime/tokio_runtime/socks5.rs +++ b/tentacle/src/runtime/proxy/socks5.rs @@ -5,16 +5,17 @@ use shadowsocks::relay::socks5::{ self, Address, Command, Error as Socks5Error, HandshakeRequest, HandshakeResponse, PasswdAuthRequest, PasswdAuthResponse, Reply, TcpRequestHeader, TcpResponseHeader, }; -use tokio::net::TcpStream; +use tokio::io::{AsyncRead, AsyncWrite}; -use super::super::socks5_config::Socks5Config; +use super::socks5_config::Socks5Config; -pub async fn establish_connection( - mut s: TcpStream, +pub async fn establish_connection( + mut s: S, target_addr: A, socks5_config: Socks5Config, -) -> Result +) -> Result where + S: AsyncRead + AsyncWrite + Unpin, A: Into
, { debug!( diff --git a/tentacle/src/runtime/socks5_config.rs b/tentacle/src/runtime/proxy/socks5_config.rs similarity index 100% rename from tentacle/src/runtime/socks5_config.rs rename to tentacle/src/runtime/proxy/socks5_config.rs diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index 6cbce569..d30dd773 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -1,5 +1,4 @@ -use super::socks5_config; -mod socks5; +use super::proxy::socks5_config; use multiaddr::MultiAddr; pub use tokio::{ net::{TcpListener, TcpStream}, @@ -155,7 +154,7 @@ where let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; let stream = connect_direct(dial_addr, tcp_socket_transformer).await?; - super::tokio_runtime::socks5::establish_connection(stream, target_addr, socks5_config) + super::proxy::socks5::establish_connection(stream, target_addr, socks5_config) .await .map_err(io::Error::other) } @@ -186,8 +185,21 @@ pub(crate) async fn connect_onion( let proxy_config = proxy_config.ok_or(io::Error::other( "need tor proxy server to connect to onion address", ))?; - let onion_address = shadowsocks::relay::Address::from_str(onion_addr.to_string().as_str()) - .map_err(std::io::Error::other)?; + let onion_protocol = onion_addr.iter().next().ok_or(io::Error::other( + "connect_onion need Protocol::Onion3 multiaddr", + ))?; + // onion_str looks like: "/onion3/wsglappcvp4y4e2ff3ubowpkoxuoaudzvmih6gc54442vfabebwf42ad:8114" + let onion_str = onion_protocol.to_string(); + // remove prefix "/onion3/", if not contains /onion3/, return error + let onion_str = onion_str + .strip_prefix("/onion3/") + .ok_or(io::Error::other(format!( + "connect_onion need Protocol::Onion3 multiaddr, but got {}", + onion_str + )))?; + let onion_str = onion_str.replace(":", ".onion:"); + let onion_address = + shadowsocks::relay::Address::from_str(&onion_str).map_err(std::io::Error::other)?; connect_by_proxy(onion_address, tcp_socket_config, proxy_config).await } From 20df0ad858b843a25923f01e274511bf7c1bfa58 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Thu, 19 Dec 2024 09:45:10 +0800 Subject: [PATCH 11/14] Rename tcp_socket_config to socket_transformer since it accept a TcpSocket and return a Result Signed-off-by: Eval EXEC --- tentacle/src/builder.rs | 6 +++--- tentacle/src/runtime/async_runtime/os/mod.rs | 20 ++++++++++---------- tentacle/src/runtime/tokio_runtime/mod.rs | 20 ++++++++++---------- tentacle/src/service/config.rs | 6 +++--- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/tentacle/src/builder.rs b/tentacle/src/builder.rs index 7b7b4112..5ca626f0 100644 --- a/tentacle/src/builder.rs +++ b/tentacle/src/builder.rs @@ -220,7 +220,7 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.tcp.tcp_socket_config = Arc::new(f); + self.config.tcp_config.tcp.socket_transformer = Arc::new(f); self } @@ -238,7 +238,7 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.ws.tcp_socket_config = Arc::new(f); + self.config.tcp_config.ws.socket_transformer = Arc::new(f); self } @@ -262,7 +262,7 @@ where where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, { - self.config.tcp_config.tls.tcp_socket_config = Arc::new(f); + self.config.tcp_config.tls.socket_transformer = Arc::new(f); self } } diff --git a/tentacle/src/runtime/async_runtime/os/mod.rs b/tentacle/src/runtime/async_runtime/os/mod.rs index 9ee26bf1..b2ea1a5b 100644 --- a/tentacle/src/runtime/async_runtime/os/mod.rs +++ b/tentacle/src/runtime/async_runtime/os/mod.rs @@ -138,7 +138,7 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; let socket = { - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket })?; t.inner }; // `bind` twice will return error @@ -160,7 +160,7 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul async fn connect_direct( addr: SocketAddr, - tcp_socket_transformer: TcpSocketTransformer, + socket_transformer: TcpSocketTransformer, ) -> io::Result { let domain = Domain::for_address(addr); let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; @@ -177,7 +177,7 @@ async fn connect_direct( // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = tcp_socket_transformer(TcpSocket { inner: socket })?; + let t = socket_transformer(TcpSocket { inner: socket })?; t.inner }; @@ -214,7 +214,7 @@ async fn connect_direct( async fn connect_by_proxy( target_addr: A, - tcp_socket_transformer: TcpSocketTransformer, + socket_transformer: TcpSocketTransformer, proxy_config: ProxyConfig, ) -> io::Result where @@ -223,7 +223,7 @@ where let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; - let stream = connect_direct(dial_addr, tcp_socket_transformer).await?; + let stream = connect_direct(dial_addr, socket_transformer).await?; crate::runtime::proxy::socks5::establish_connection(stream, target_addr, socks5_config) .await @@ -235,12 +235,12 @@ pub(crate) async fn connect( tcp_config: TcpSocketConfig, ) -> io::Result { let TcpSocketConfig { - tcp_socket_config, + socket_transformer, proxy_config, } = tcp_config; match proxy_config { - Some(proxy_config) => connect_by_proxy(addr, tcp_socket_config, proxy_config).await, - None => connect_direct(addr, tcp_socket_config).await, + Some(proxy_config) => connect_by_proxy(addr, socket_transformer, proxy_config).await, + None => connect_direct(addr, socket_transformer).await, } } @@ -249,7 +249,7 @@ pub(crate) async fn connect_onion( tcp_config: TcpSocketConfig, ) -> io::Result { let TcpSocketConfig { - tcp_socket_config, + socket_transformer, proxy_config, } = tcp_config; let proxy_config = proxy_config.ok_or(io::Error::other( @@ -271,5 +271,5 @@ pub(crate) async fn connect_onion( let onion_address = shadowsocks::relay::Address::from_str(onion_str).map_err(std::io::Error::other)?; - connect_by_proxy(onion_address, tcp_socket_config, proxy_config).await + connect_by_proxy(onion_address, socket_transformer, proxy_config).await } diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index d30dd773..d54d61c7 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -93,7 +93,7 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?; + let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket })?; t.inner.set_nonblocking(true)?; // safety: fd convert by socket2 unsafe { @@ -120,13 +120,13 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul async fn connect_direct( addr: SocketAddr, - tcp_socket_transformer: TcpSocketTransformer, + socket_transformer: TcpSocketTransformer, ) -> io::Result { let domain = Domain::for_address(addr); let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; let socket = { - let t = tcp_socket_transformer(TcpSocket { inner: socket })?; + let t = socket_transformer(TcpSocket { inner: socket })?; t.inner.set_nonblocking(true)?; // safety: fd convert by socket2 unsafe { @@ -143,7 +143,7 @@ async fn connect_direct( async fn connect_by_proxy( target_addr: A, - tcp_socket_transformer: TcpSocketTransformer, + socket_transformer: TcpSocketTransformer, proxy_config: ProxyConfig, ) -> io::Result where @@ -152,7 +152,7 @@ where let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; - let stream = connect_direct(dial_addr, tcp_socket_transformer).await?; + let stream = connect_direct(dial_addr, socket_transformer).await?; super::proxy::socks5::establish_connection(stream, target_addr, socks5_config) .await @@ -164,13 +164,13 @@ pub(crate) async fn connect( tcp_config: TcpSocketConfig, ) -> io::Result { let TcpSocketConfig { - tcp_socket_config, + socket_transformer, proxy_config, } = tcp_config; match proxy_config { - Some(proxy_config) => connect_by_proxy(target_addr, tcp_socket_config, proxy_config).await, - None => connect_direct(target_addr, tcp_socket_config).await, + Some(proxy_config) => connect_by_proxy(target_addr, socket_transformer, proxy_config).await, + None => connect_direct(target_addr, socket_transformer).await, } } @@ -179,7 +179,7 @@ pub(crate) async fn connect_onion( tcp_config: TcpSocketConfig, ) -> io::Result { let TcpSocketConfig { - tcp_socket_config, + socket_transformer, proxy_config, } = tcp_config; let proxy_config = proxy_config.ok_or(io::Error::other( @@ -201,5 +201,5 @@ pub(crate) async fn connect_onion( let onion_address = shadowsocks::relay::Address::from_str(&onion_str).map_err(std::io::Error::other)?; - connect_by_proxy(onion_address, tcp_socket_config, proxy_config).await + connect_by_proxy(onion_address, socket_transformer, proxy_config).await } diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index f3ce52f9..df6abe0d 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -95,17 +95,17 @@ pub struct ProxyConfig { pub(crate) type TcpSocketTransformer = Arc Result + Send + Sync + 'static>; -#[derive(Clone)] +#[derive(Clone)] pub(crate) struct TcpSocketConfig { - pub(crate) tcp_socket_config: TcpSocketTransformer, + pub(crate) socket_transformer: TcpSocketTransformer, pub(crate) proxy_config: Option, } impl Default for TcpSocketConfig { fn default() -> Self { Self { - tcp_socket_config: Arc::new(Ok), + socket_transformer: Arc::new(Ok), proxy_config: None, } } From 1eb0f491e74b9bff6e166e6fa890b65bc6bb1abf Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Mon, 23 Dec 2024 16:02:36 +0800 Subject: [PATCH 12/14] Add `TransformerContext` to `TcpSocketTransformer` --- tentacle/src/service/config.rs | 38 ++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index df6abe0d..58280f8b 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -13,7 +13,7 @@ use std::os::{ fd::AsFd, unix::io::{AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}, }; -use std::{sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; #[cfg(feature = "tls")] use tokio_rustls::rustls::{ClientConfig, ServerConfig}; @@ -93,8 +93,38 @@ pub struct ProxyConfig { pub proxy_url: String, } -pub(crate) type TcpSocketTransformer = - Arc Result + Send + Sync + 'static>; +pub enum SocketState { + Listen, + Dial, +} + +pub struct TransformerContext { + pub state: SocketState, + // if dial, remote address; if listen, local address + pub address: SocketAddr, +} + +impl TransformerContext { + pub fn new_listen(address: SocketAddr) -> Self { + TransformerContext { + state: SocketState::Listen, + address, + } + } + pub fn new_dial(address: SocketAddr) -> Self { + TransformerContext { + state: SocketState::Dial, + address, + } + } +} + +pub(crate) type TcpSocketTransformer = Arc< + dyn Fn(TcpSocket, TransformerContext) -> Result + + Send + + Sync + + 'static, +>; #[derive(Clone)] pub(crate) struct TcpSocketConfig { @@ -105,7 +135,7 @@ pub(crate) struct TcpSocketConfig { impl Default for TcpSocketConfig { fn default() -> Self { Self { - socket_transformer: Arc::new(Ok), + socket_transformer: Arc::new(|tcp_socket, _| Ok(tcp_socket)), proxy_config: None, } } From f620bfe595ba7f8c839d321ef6985b1948a8ec3f Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Mon, 23 Dec 2024 16:03:21 +0800 Subject: [PATCH 13/14] Create TransformerContext before executing socket_transformer Signed-off-by: Eval EXEC --- tentacle/src/builder.rs | 10 +++++++--- tentacle/src/runtime/async_runtime/os/mod.rs | 8 +++++--- tentacle/src/runtime/tokio_runtime/mod.rs | 8 +++++--- tentacle/src/service.rs | 3 ++- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/tentacle/src/builder.rs b/tentacle/src/builder.rs index 5ca626f0..391312ea 100644 --- a/tentacle/src/builder.rs +++ b/tentacle/src/builder.rs @@ -1,6 +1,7 @@ use std::{io, sync::Arc, time::Duration}; use crate::service::config::ProxyConfig; +use crate::service::config::TransformerContext; use nohash_hasher::IntMap; use tokio_util::codec::LengthDelimitedCodec; @@ -218,7 +219,10 @@ where #[cfg(not(target_family = "wasm"))] pub fn tcp_config(mut self, f: F) -> Self where - F: Fn(TcpSocket) -> Result + Send + Sync + 'static, + F: Fn(TcpSocket, TransformerContext) -> Result + + Send + + Sync + + 'static, { self.config.tcp_config.tcp.socket_transformer = Arc::new(f); self @@ -236,7 +240,7 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub fn tcp_config_on_ws(mut self, f: F) -> Self where - F: Fn(TcpSocket) -> Result + Send + Sync + 'static, + F: Fn(TcpSocket, TransformerContext) -> Result + Send + Sync + 'static, { self.config.tcp_config.ws.socket_transformer = Arc::new(f); self @@ -260,7 +264,7 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub fn tcp_config_on_tls(mut self, f: F) -> Self where - F: Fn(TcpSocket) -> Result + Send + Sync + 'static, + F: Fn(TcpSocket, TransformerContext) -> Result + Send + Sync + 'static, { self.config.tcp_config.tls.socket_transformer = Arc::new(f); self diff --git a/tentacle/src/runtime/async_runtime/os/mod.rs b/tentacle/src/runtime/async_runtime/os/mod.rs index b2ea1a5b..c9b2037a 100644 --- a/tentacle/src/runtime/async_runtime/os/mod.rs +++ b/tentacle/src/runtime/async_runtime/os/mod.rs @@ -3,7 +3,7 @@ mod time; use crate::{ runtime::{proxy::socks5_config, CompatStream2}, service::{ - config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer}, + config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext}, ProxyConfig, }, }; @@ -138,7 +138,8 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; let socket = { - let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket })?; + let transformer_context = TransformerContext::new_listen(addr); + let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket }, transformer_context)?; t.inner }; // `bind` twice will return error @@ -177,7 +178,8 @@ async fn connect_direct( // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = socket_transformer(TcpSocket { inner: socket })?; + let transformer_context = TransformerContext::new_dial(addr); + let t = socket_transformer(TcpSocket { inner: socket }, transformer_context)?; t.inner }; diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index d54d61c7..9d174905 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -7,7 +7,7 @@ pub use tokio::{ }; use crate::service::{ - config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer}, + config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext}, ProxyConfig, }; use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; @@ -93,7 +93,8 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul // user can disable it on tcp_config #[cfg(not(windows))] socket.set_reuse_address(true)?; - let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket })?; + let transformer_context = TransformerContext::new_listen(addr); + let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket }, transformer_context)?; t.inner.set_nonblocking(true)?; // safety: fd convert by socket2 unsafe { @@ -126,7 +127,8 @@ async fn connect_direct( let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?; let socket = { - let t = socket_transformer(TcpSocket { inner: socket })?; + let transformer_context = TransformerContext::new_dial(addr); + let t = socket_transformer(TcpSocket { inner: socket }, transformer_context)?; t.inner.set_nonblocking(true)?; // safety: fd convert by socket2 unsafe { diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index 7ef9c44f..260d2375 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -45,7 +45,8 @@ mod helper; pub use crate::service::{ config::{ - HandshakeType, ProtocolHandle, ProtocolMeta, TargetProtocol, TargetSession, TcpSocket, + HandshakeType, ProtocolHandle, ProtocolMeta, SocketState, TargetProtocol, TargetSession, + TcpSocket, TransformerContext, }, control::{ServiceAsyncControl, ServiceControl}, event::{ServiceError, ServiceEvent}, From 3330101ae8049957c144bedfc94184a001972300 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 10 Jan 2025 16:20:49 +0800 Subject: [PATCH 14/14] Add onion_url for Onion Transport Signed-off-by: Eval EXEC --- tentacle/src/builder.rs | 22 ++++++++++++++----- tentacle/src/runtime/async_runtime/os/mod.rs | 23 ++++++++++---------- tentacle/src/runtime/tokio_runtime/mod.rs | 23 ++++++++++---------- tentacle/src/service.rs | 1 - tentacle/src/service/config.rs | 13 ++++------- 5 files changed, 44 insertions(+), 38 deletions(-) diff --git a/tentacle/src/builder.rs b/tentacle/src/builder.rs index 391312ea..a187b287 100644 --- a/tentacle/src/builder.rs +++ b/tentacle/src/builder.rs @@ -1,6 +1,5 @@ use std::{io, sync::Arc, time::Duration}; -use crate::service::config::ProxyConfig; use crate::service::config::TransformerContext; use nohash_hasher::IntMap; use tokio_util::codec::LengthDelimitedCodec; @@ -230,8 +229,15 @@ where /// Proxy config for tcp #[cfg(not(target_family = "wasm"))] - pub fn tcp_proxy_config(mut self, proxy_conifg: Option) -> Self { - self.config.tcp_config.tcp.proxy_config = proxy_conifg; + pub fn tcp_proxy_config(mut self, proxy_url: Option) -> Self { + self.config.tcp_config.tcp.proxy_url = proxy_url; + self + } + + /// Onion config for tcp + #[cfg(not(target_family = "wasm"))] + pub fn tcp_onion_config(mut self, onion_url: Option) -> Self { + self.config.tcp_config.tcp.onion_url = onion_url; self } @@ -240,7 +246,10 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub fn tcp_config_on_ws(mut self, f: F) -> Self where - F: Fn(TcpSocket, TransformerContext) -> Result + Send + Sync + 'static, + F: Fn(TcpSocket, TransformerContext) -> Result + + Send + + Sync + + 'static, { self.config.tcp_config.ws.socket_transformer = Arc::new(f); self @@ -264,7 +273,10 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub fn tcp_config_on_tls(mut self, f: F) -> Self where - F: Fn(TcpSocket, TransformerContext) -> Result + Send + Sync + 'static, + F: Fn(TcpSocket, TransformerContext) -> Result + + Send + + Sync + + 'static, { self.config.tcp_config.tls.socket_transformer = Arc::new(f); self diff --git a/tentacle/src/runtime/async_runtime/os/mod.rs b/tentacle/src/runtime/async_runtime/os/mod.rs index c9b2037a..84205a37 100644 --- a/tentacle/src/runtime/async_runtime/os/mod.rs +++ b/tentacle/src/runtime/async_runtime/os/mod.rs @@ -2,10 +2,7 @@ mod time; use crate::{ runtime::{proxy::socks5_config, CompatStream2}, - service::{ - config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext}, - ProxyConfig, - }, + service::config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext}, }; use async_io::Async; use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs}; @@ -217,12 +214,12 @@ async fn connect_direct( async fn connect_by_proxy( target_addr: A, socket_transformer: TcpSocketTransformer, - proxy_config: ProxyConfig, + proxy_url: String, ) -> io::Result where A: Into, { - let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; + let socks5_config = socks5_config::parse(&proxy_url)?; let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; let stream = connect_direct(dial_addr, socket_transformer).await?; @@ -238,10 +235,11 @@ pub(crate) async fn connect( ) -> io::Result { let TcpSocketConfig { socket_transformer, - proxy_config, + proxy_url, + onion_url: _, } = tcp_config; - match proxy_config { - Some(proxy_config) => connect_by_proxy(addr, socket_transformer, proxy_config).await, + match proxy_url { + Some(proxy_url) => connect_by_proxy(addr, socket_transformer, proxy_url).await, None => connect_direct(addr, socket_transformer).await, } } @@ -252,9 +250,10 @@ pub(crate) async fn connect_onion( ) -> io::Result { let TcpSocketConfig { socket_transformer, - proxy_config, + proxy_url, + onion_url, } = tcp_config; - let proxy_config = proxy_config.ok_or(io::Error::other( + let onion_url = onion_url.or(proxy_url).ok_or(io::Error::other( "need tor proxy server to connect to onion address", ))?; @@ -273,5 +272,5 @@ pub(crate) async fn connect_onion( let onion_address = shadowsocks::relay::Address::from_str(onion_str).map_err(std::io::Error::other)?; - connect_by_proxy(onion_address, socket_transformer, proxy_config).await + connect_by_proxy(onion_address, socket_transformer, onion_url).await } diff --git a/tentacle/src/runtime/tokio_runtime/mod.rs b/tentacle/src/runtime/tokio_runtime/mod.rs index 9d174905..1ae24072 100644 --- a/tentacle/src/runtime/tokio_runtime/mod.rs +++ b/tentacle/src/runtime/tokio_runtime/mod.rs @@ -6,9 +6,8 @@ pub use tokio::{ task::{block_in_place, spawn_blocking, yield_now, JoinHandle}, }; -use crate::service::{ - config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext}, - ProxyConfig, +use crate::service::config::{ + TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext, }; use socket2::{Domain, Protocol as SocketProtocol, Socket, Type}; #[cfg(unix)] @@ -146,12 +145,12 @@ async fn connect_direct( async fn connect_by_proxy( target_addr: A, socket_transformer: TcpSocketTransformer, - proxy_config: ProxyConfig, + proxy_url: String, ) -> io::Result where A: Into, { - let socks5_config = socks5_config::parse(&proxy_config.proxy_url)?; + let socks5_config = socks5_config::parse(&proxy_url)?; let dial_addr: SocketAddr = socks5_config.proxy_url.parse().map_err(io::Error::other)?; let stream = connect_direct(dial_addr, socket_transformer).await?; @@ -167,11 +166,12 @@ pub(crate) async fn connect( ) -> io::Result { let TcpSocketConfig { socket_transformer, - proxy_config, + proxy_url, + onion_url: _, } = tcp_config; - match proxy_config { - Some(proxy_config) => connect_by_proxy(target_addr, socket_transformer, proxy_config).await, + match proxy_url { + Some(proxy_url) => connect_by_proxy(target_addr, socket_transformer, proxy_url).await, None => connect_direct(target_addr, socket_transformer).await, } } @@ -182,9 +182,10 @@ pub(crate) async fn connect_onion( ) -> io::Result { let TcpSocketConfig { socket_transformer, - proxy_config, + proxy_url, + onion_url, } = tcp_config; - let proxy_config = proxy_config.ok_or(io::Error::other( + let onion_url = onion_url.or(proxy_url).ok_or(io::Error::other( "need tor proxy server to connect to onion address", ))?; let onion_protocol = onion_addr.iter().next().ok_or(io::Error::other( @@ -203,5 +204,5 @@ pub(crate) async fn connect_onion( let onion_address = shadowsocks::relay::Address::from_str(&onion_str).map_err(std::io::Error::other)?; - connect_by_proxy(onion_address, socket_transformer, proxy_config).await + connect_by_proxy(onion_address, socket_transformer, onion_url).await } diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index 260d2375..133a6c94 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -54,7 +54,6 @@ pub use crate::service::{ }; use bytes::Bytes; -pub use crate::service::config::ProxyConfig; #[cfg(feature = "tls")] pub use crate::service::config::TlsConfig; diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index 58280f8b..94cd7f5c 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -86,13 +86,6 @@ impl Default for SessionConfig { } } -/// Proxy related config -#[derive(Clone)] -pub struct ProxyConfig { - /// proxy url, like: socks5://127.0.0.1:9050 - pub proxy_url: String, -} - pub enum SocketState { Listen, Dial, @@ -129,14 +122,16 @@ pub(crate) type TcpSocketTransformer = Arc< #[derive(Clone)] pub(crate) struct TcpSocketConfig { pub(crate) socket_transformer: TcpSocketTransformer, - pub(crate) proxy_config: Option, + pub(crate) proxy_url: Option, + pub(crate) onion_url: Option, } impl Default for TcpSocketConfig { fn default() -> Self { Self { socket_transformer: Arc::new(|tcp_socket, _| Ok(tcp_socket)), - proxy_config: None, + proxy_url: None, + onion_url: None, } } }