diff --git a/.github/workflows/pipelines.yml b/.github/workflows/pipelines.yml index 9d4c177..2a69405 100644 --- a/.github/workflows/pipelines.yml +++ b/.github/workflows/pipelines.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly-2022-03-17] + rust: [nightly, beta, stable] steps: - uses: actions/checkout@v2 @@ -35,7 +35,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly-2022-03-17] + rust: [nightly, beta, stable] steps: - uses: actions/checkout@v2 @@ -58,7 +58,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly-2022-03-17] + rust: [nightly, beta, stable] steps: - uses: actions/checkout@v2 @@ -82,7 +82,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - rust: [nightly-2022-03-17] + rust: [nightly, beta, stable] steps: - uses: actions/checkout@v2 @@ -103,7 +103,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - rust: [nightly-2022-03-17] + rust: [nightly, beta, stable] steps: - uses: actions/checkout@v2 diff --git a/Cargo.lock b/Cargo.lock index 5406bf2..f20b020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,7 +10,7 @@ checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", ] [[package]] @@ -76,6 +76,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block-padding" version = "0.2.1" @@ -189,6 +198,22 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "derive_more" version = "0.99.17" @@ -199,7 +224,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.96", ] [[package]] @@ -211,6 +236,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer 0.10.4", + "crypto-common", +] + [[package]] name = "dtoa" version = "0.4.8" @@ -350,38 +385,38 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.43", ] [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", "futures-macro", @@ -402,14 +437,25 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "groestl" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2432787a9b8f0d58dca43fe2240399479b7582dc8afa2126dc7652b864029e47" dependencies = [ - "block-buffer", - "digest", + "block-buffer 0.9.0", + "digest 0.9.0", "opaque-debug", ] @@ -624,9 +670,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.126" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "log" @@ -670,6 +716,7 @@ dependencies = [ "sha3", "simple_logger", "tokio", + "tokio-tungstenite", ] [[package]] @@ -703,9 +750,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" dependencies = [ "lazy_static", "libc", @@ -803,7 +850,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", ] [[package]] @@ -858,6 +905,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "primitives" version = "0.1.0" @@ -870,18 +923,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.39" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.18" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -907,7 +960,7 @@ checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" dependencies = [ "autocfg 0.1.8", "libc", - "rand_chacha", + "rand_chacha 0.1.1", "rand_core 0.4.2", "rand_hc", "rand_isaac", @@ -918,6 +971,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -928,6 +992,16 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -943,6 +1017,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "rand_hc" version = "0.1.0" @@ -1072,8 +1155,8 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eca4ecc81b7f313189bf73ce724400a07da2a6dac19588b03c8bd76a2dcc251" dependencies = [ - "block-buffer", - "digest", + "block-buffer 0.9.0", + "digest 0.9.0", "opaque-debug", ] @@ -1179,7 +1262,7 @@ checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", ] [[package]] @@ -1210,23 +1293,34 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest", + "digest 0.9.0", "opaque-debug", ] @@ -1236,8 +1330,8 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809" dependencies = [ - "block-buffer", - "digest", + "block-buffer 0.9.0", + "digest 0.9.0", "keccak", "opaque-debug", ] @@ -1312,6 +1406,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tempfile" version = "3.3.0" @@ -1351,7 +1456,7 @@ checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", ] [[package]] @@ -1422,19 +1527,33 @@ checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", ] [[package]] name = "tokio-native-tls" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" dependencies = [ "native-tls", "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log 0.4.17", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -1495,6 +1614,26 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log 0.4.17", + "native-tls", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.15.0" @@ -1563,6 +1702,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1612,7 +1757,7 @@ dependencies = [ "log 0.4.17", "proc-macro2", "quote", - "syn", + "syn 1.0.96", "wasm-bindgen-shared", ] @@ -1634,7 +1779,7 @@ checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.96", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 360b8ec..c30a127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,3 +2,5 @@ members = [ "main", ] + +resolver = "2" diff --git a/main/Cargo.toml b/main/Cargo.toml index 9032ba3..536ee5f 100644 --- a/main/Cargo.toml +++ b/main/Cargo.toml @@ -23,7 +23,8 @@ serde = "1.0.137" serde_json = "1.0.81" sha3 = "0.9" simple_logger = "2.1.0" -tokio = { version = "1.12.0", default-features = false, features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.12.0", default-features = false, features = ["macros", "rt-multi-thread", "sync", "time"] } +tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } # From our sources bitcrypto = { git = "https://github.com/KomodoPlatform/atomicDEX-API", branch = "dev" } ethkey = { git = "https://github.com/artemii235/parity-ethereum.git" } diff --git a/main/src/ctx.rs b/main/src/ctx.rs index 7bb7843..4bf1f39 100644 --- a/main/src/ctx.rs +++ b/main/src/ctx.rs @@ -1,8 +1,9 @@ -use super::*; +use std::env; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; -use std::env; + +use super::*; const DEFAULT_TOKEN_EXPIRATION_TIME: i64 = 3600; static CONFIG: OnceCell = OnceCell::new(); @@ -28,7 +29,9 @@ pub(crate) struct AppConfig { pub(crate) struct ProxyRoute { pub(crate) inbound_route: String, pub(crate) outbound_route: String, + #[serde(default)] pub(crate) authorized: bool, + #[serde(default)] pub(crate) allowed_methods: Vec, } @@ -53,6 +56,18 @@ impl AppConfig { self.token_expiration_time .unwrap_or(DEFAULT_TOKEN_EXPIRATION_TIME) } + + pub(crate) fn get_proxy_route_by_inbound(&self, inbound: String) -> Option<&ProxyRoute> { + let route_index = self.proxy_routes.iter().position(|r| { + r.inbound_route == inbound || r.inbound_route.to_owned() + "/" == inbound + }); + + if let Some(index) = route_index { + return Some(&self.proxy_routes[index]); + } + + None + } } #[cfg(test)] diff --git a/main/src/db.rs b/main/src/db.rs index c1cec3c..e07c3d0 100644 --- a/main/src/db.rs +++ b/main/src/db.rs @@ -1,9 +1,9 @@ -use super::*; - use ctx::AppConfig; use once_cell::sync::OnceCell; use redis::aio::MultiplexedConnection; +use super::*; + static REDIS_CLIENT: OnceCell = OnceCell::new(); pub(crate) fn get_redis_client(cfg: &AppConfig) -> &'static redis::Client { diff --git a/main/src/main.rs b/main/src/main.rs index 3cac011..d4528eb 100644 --- a/main/src/main.rs +++ b/main/src/main.rs @@ -1,8 +1,6 @@ -#![feature(ip)] - use ctx::get_app_config; use db::get_redis_connection; -use http::serve; +use server::serve; #[path = "security/address_status.rs"] mod address_status; @@ -18,8 +16,12 @@ mod proof_of_funding; mod rate_limiter; #[path = "net/rpc.rs"] mod rpc; +#[path = "net/server.rs"] +mod server; #[path = "security/sign.rs"] mod sign; +#[path = "net/websocket.rs"] +mod websocket; #[cfg(all(target_os = "linux", target_arch = "x86_64", target_env = "gnu"))] #[global_allocator] diff --git a/main/src/net/http.rs b/main/src/net/http.rs index f1188ff..6fc68e4 100644 --- a/main/src/net/http.rs +++ b/main/src/net/http.rs @@ -1,4 +1,4 @@ -use super::*; +use std::net::SocketAddr; use address_status::{ get_address_status_list, post_address_status, AddressStatus, AddressStatusOperations, @@ -6,11 +6,9 @@ use address_status::{ use ctx::{AppConfig, ProxyRoute}; use db::*; use hyper::header::HeaderName; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; use hyper::{ header::{self, HeaderValue}, - Body, HeaderMap, Method, Request, Response, Server, StatusCode, + Body, HeaderMap, Method, Request, Response, StatusCode, }; use hyper_tls::HttpsConnector; use jwt::{get_cached_token_or_generate_one, JwtClaims}; @@ -19,27 +17,9 @@ use rate_limiter::RateLimitOperations; use serde::{Deserialize, Serialize}; use serde_json::json; use sign::SignedMessage; -use std::net::{IpAddr, SocketAddr}; -use std::str::FromStr; - -macro_rules! http_log_format { - ($ip: expr, $address: expr, $path: expr, $format: expr, $($args: tt)+) => {format!(concat!("[Ip: {} | Address: {} | Path: {}] ", $format), $ip, $address, $path, $($args)+)}; - ($ip: expr, $address: expr, $path: expr, $format: expr) => {format!(concat!("[Ip: {} | Pubkey: {} | Address: {}] ", $format), $ip, $address, $path)} -} - -impl AppConfig { - pub(crate) fn get_proxy_route_by_inbound(&self, inbound: String) -> Option<&ProxyRoute> { - let route_index = self.proxy_routes.iter().position(|r| { - r.inbound_route == inbound || r.inbound_route.to_owned() + "/" == inbound - }); - - if let Some(index) = route_index { - return Some(&self.proxy_routes[index]); - } - None - } -} +use super::*; +use crate::server::is_private_ip; async fn get_healthcheck() -> GenericResult> { let json = json!({ @@ -117,7 +97,7 @@ async fn proxy( if !proxy_route.allowed_methods.contains(&payload.method) { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, req.uri(), @@ -135,7 +115,7 @@ async fn proxy( { log::error!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, req.uri(), @@ -151,7 +131,7 @@ async fn proxy( Err(_) => { log::error!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, original_req_uri, @@ -193,7 +173,7 @@ async fn proxy( Err(_) => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, original_req_uri, @@ -208,42 +188,19 @@ async fn proxy( Ok(res) } -#[allow(dead_code)] -fn get_real_address(req: &Request, remote_addr: &SocketAddr) -> GenericResult { - if let Some(ip) = req.headers().get("x-forwarded-for") { - let addr = IpAddr::from_str(ip.to_str()?)?; - - return Ok(SocketAddr::new(addr, remote_addr.port())); - } - - Ok(*remote_addr) -} - -async fn router( +pub(crate) async fn http_handler( cfg: &AppConfig, req: Request, remote_addr: SocketAddr, ) -> GenericResult> { - let remote_addr = match get_real_address(&req, &remote_addr) { - Ok(t) => t, - _ => { - log::error!( - "{}", - http_log_format!( - remote_addr.ip(), - String::from("-"), - req.uri(), - "Reading real remote address failed, returning 500." - ) - ); - return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); - } - }; + let req_uri = req.uri().clone(); + + let is_private_ip = is_private_ip(&remote_addr.ip()); - if !remote_addr.ip().is_global() { + if is_private_ip { log::info!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), String::from("-"), req.uri(), @@ -251,7 +208,7 @@ async fn router( ) ); - match (req.method(), req.uri().path()) { + match (req.method(), req_uri.path()) { (&Method::GET, "/") => return get_healthcheck().await, (&Method::GET, "/address-status") => return get_address_status_list(cfg).await, (&Method::POST, "/address-status") => return post_address_status(cfg, req).await, @@ -263,16 +220,15 @@ async fn router( return handle_preflight(); } - let req_path = req.uri().clone(); let (req, payload) = match parse_payload(req).await { Ok(t) => t, Err(_) => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), String::from("-"), - req_path, + req_uri, "Recieved invalid http payload, returning 401." ) ); @@ -282,10 +238,10 @@ async fn router( log::info!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Request received." ) ); @@ -295,10 +251,10 @@ async fn router( Err(_) => { log::error!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Error type casting of IpAddr into HeaderValue, returning 500." ) ); @@ -311,7 +267,7 @@ async fn router( None => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, req.uri(), @@ -322,7 +278,7 @@ async fn router( } }; - if !remote_addr.ip().is_global() { + if is_private_ip { return proxy( cfg, req, @@ -354,10 +310,10 @@ async fn router( AddressStatus::Blocked => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Request blocked." ) ); @@ -370,10 +326,10 @@ async fn router( if let Err(ProofOfFundingError::InvalidSignedMessage) = signed_message_status { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Request has invalid signed message, returning 401." ) ); @@ -394,10 +350,10 @@ async fn router( _ => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Rate exceed for {}, checking balance for {} address.", rate_limiter_key, payload.signed_message.address @@ -409,10 +365,10 @@ async fn router( Err(ProofOfFundingError::InsufficientBalance) => { log::warn!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Wallet {} has insufficient balance for coin {}, returning 406.", payload.signed_message.coin_ticker, payload.signed_message.address @@ -423,10 +379,10 @@ async fn router( e => { log::error!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "verify_message_and_balance failed in coin {}: {:?}", payload.signed_message.coin_ticker, e @@ -441,10 +397,10 @@ async fn router( if db.rate_address(rate_limiter_key).await.is_err() { log::error!( "{}", - http_log_format!( + log_format!( remote_addr.ip(), payload.signed_message.address, - req_path, + req_uri, "Rate incrementing failed." ) ); @@ -463,21 +419,6 @@ async fn router( } } -pub(crate) async fn serve(cfg: &'static AppConfig) -> GenericResult<()> { - let addr = format!("0.0.0.0:{}", cfg.port.unwrap_or(5000)).parse()?; - - let router = make_service_fn(move |c_stream: &AddrStream| { - let remote_addr = c_stream.remote_addr(); - async move { Ok::<_, GenericError>(service_fn(move |req| router(cfg, req, remote_addr))) } - }); - - let server = Server::bind(&addr).serve(router); - - log::info!("AtomicDEX Auth API serving on http://{}", addr); - - Ok(server.await?) -} - #[test] fn test_rpc_payload_serialzation_and_deserialization() { let json_payload = json!({ @@ -549,25 +490,6 @@ fn test_respond_by_status() { } } -#[test] -fn test_get_real_address() { - let mut req = Request::new(Body::from(Vec::new())); - - let addr = IpAddr::from_str("127.0.0.1").unwrap(); - let socket_addr = SocketAddr::new(addr, 80); - - let remote_addr = get_real_address(&req, &socket_addr).unwrap(); - assert_eq!("127.0.0.1", remote_addr.ip().to_string()); - - req.headers_mut().insert( - HeaderName::from_static("x-forwarded-for"), - "0.0.0.0".parse().unwrap(), - ); - - let remote_addr = get_real_address(&req, &socket_addr).unwrap(); - assert_eq!("0.0.0.0", remote_addr.ip().to_string()); -} - #[tokio::test] async fn test_parse_payload() { let serialized_payload = json!({ diff --git a/main/src/net/rpc.rs b/main/src/net/rpc.rs index b0178c6..932098b 100644 --- a/main/src/net/rpc.rs +++ b/main/src/net/rpc.rs @@ -1,5 +1,3 @@ -use super::*; - use bytes::Buf; use ctx::AppConfig; use http::insert_jwt_to_http_header; @@ -7,6 +5,8 @@ use hyper::{body::aggregate, header, Body, Request}; use hyper_tls::HttpsConnector; use serde_json::from_reader; +use super::*; + pub(crate) type Json = serde_json::Value; #[derive(Debug, PartialEq)] diff --git a/main/src/net/server.rs b/main/src/net/server.rs new file mode 100644 index 0000000..48db1b1 --- /dev/null +++ b/main/src/net/server.rs @@ -0,0 +1,119 @@ +use std::net::{IpAddr, SocketAddr}; +use std::str::FromStr; + +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server, StatusCode}; + +use crate::http::{http_handler, response_by_status}; +use crate::log_format; +use crate::websocket::{should_upgrade_to_socket_conn, socket_handler}; +use crate::{ctx::AppConfig, GenericError, GenericResult}; + +#[macro_export] +macro_rules! log_format { + ($ip: expr, $address: expr, $path: expr, $format: expr, $($args: tt)+) => {format!(concat!("[Ip: {} | Address: {} | Path: {}] ", $format), $ip, $address, $path, $($args)+)}; + ($ip: expr, $address: expr, $path: expr, $format: expr) => {format!(concat!("[Ip: {} | Pubkey: {} | Address: {}] ", $format), $ip, $address, $path)} +} + +pub(crate) fn is_private_ip(ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => v4.is_private() || v4.is_loopback(), + // We don't support IPv6s yet + IpAddr::V6(_) => false, + } +} + +fn get_real_address(req: &Request, remote_addr: &SocketAddr) -> GenericResult { + if let Some(ip) = req.headers().get("x-forwarded-for") { + let addr = IpAddr::from_str(ip.to_str()?)?; + + return Ok(SocketAddr::new(addr, remote_addr.port())); + } + + Ok(*remote_addr) +} + +async fn connection_handler( + cfg: &AppConfig, + req: Request, + remote_addr: SocketAddr, +) -> GenericResult> { + let remote_addr = match get_real_address(&req, &remote_addr) { + Ok(t) => t, + _ => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "Reading real remote address failed, returning 500." + ) + ); + return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + + if should_upgrade_to_socket_conn(&req) { + socket_handler(cfg, req, remote_addr).await + } else { + http_handler(cfg, req, remote_addr).await + } +} + +pub(crate) async fn serve(cfg: &'static AppConfig) -> GenericResult<()> { + let addr = format!("0.0.0.0:{}", cfg.port.unwrap_or(5000)).parse()?; + + let handler = make_service_fn(move |c_stream: &AddrStream| { + let remote_addr = c_stream.remote_addr(); + async move { + Ok::<_, GenericError>(service_fn(move |req| { + connection_handler(cfg, req, remote_addr) + })) + } + }); + + let server = Server::bind(&addr).serve(handler); + + log::info!("AtomicDEX Auth API serving on http://{}", addr); + + Ok(server.await?) +} + +#[test] +fn test_get_real_address() { + let mut req = Request::new(Body::from(Vec::new())); + + let addr = IpAddr::from_str("127.0.0.1").unwrap(); + let socket_addr = SocketAddr::new(addr, 80); + + let remote_addr = get_real_address(&req, &socket_addr).unwrap(); + assert_eq!("127.0.0.1", remote_addr.ip().to_string()); + + req.headers_mut().insert( + hyper::header::HeaderName::from_static("x-forwarded-for"), + "0.0.0.0".parse().unwrap(), + ); + + let remote_addr = get_real_address(&req, &socket_addr).unwrap(); + assert_eq!("0.0.0.0", remote_addr.ip().to_string()); +} + +#[test] +fn test_is_private_ip_v4() { + let private_ip = "192.168.1.1".parse().unwrap(); + assert!(is_private_ip(&private_ip)); + + let private_ip = "10.0.0.1".parse().unwrap(); + assert!(is_private_ip(&private_ip)); + + let private_ip = "172.16.0.1".parse().unwrap(); + assert!(is_private_ip(&private_ip)); + + let public_ip = "8.8.8.8".parse().unwrap(); + assert!(!is_private_ip(&public_ip)); + + let public_ip = "203.0.113.1".parse().unwrap(); + assert!(!is_private_ip(&public_ip)); +} diff --git a/main/src/net/websocket.rs b/main/src/net/websocket.rs new file mode 100644 index 0000000..6b6d160 --- /dev/null +++ b/main/src/net/websocket.rs @@ -0,0 +1,164 @@ +use std::{net::SocketAddr, time::Duration}; + +use futures_util::{FutureExt, SinkExt, StreamExt}; +use hyper::{header::HeaderValue, upgrade, Body, Request, Response, StatusCode}; +use tokio::time; +use tokio_tungstenite::{ + tungstenite::{handshake, Message}, + WebSocketStream, +}; + +use crate::{ctx::AppConfig, http::response_by_status, log_format, GenericResult}; + +pub(crate) fn should_upgrade_to_socket_conn(req: &Request) -> bool { + let expected = HeaderValue::from_static("websocket"); + Some(&expected) == req.headers().get("upgrade") +} + +pub(crate) async fn socket_handler( + cfg: &AppConfig, + mut req: Request, + remote_addr: SocketAddr, +) -> GenericResult> { + let inbound_route = req.uri().to_string(); + let proxy_route = match cfg.get_proxy_route_by_inbound(inbound_route) { + Some(proxy_route) => proxy_route, + None => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "Proxy route not found for socket, returning 404." + ) + ); + return response_by_status(StatusCode::NOT_FOUND); + } + }; + + let outbound_addr = proxy_route.outbound_route.clone(); + + match handshake::server::create_response_with_body(&req, Body::empty) { + Ok(response) => { + tokio::spawn(async move { + match upgrade::on(&mut req).await { + Ok(upgraded) => { + let mut inbound_socket = WebSocketStream::from_raw_socket( + upgraded, + tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + match tokio_tungstenite::connect_async(outbound_addr).await { + Ok((mut outbound_socket, _)) => { + let mut keepalive_interval = + time::interval(Duration::from_secs(10)); + + loop { + futures_util::select! { + _ = keepalive_interval.tick().fuse() => { + if let Err(e) = outbound_socket.send(Message::Ping(Vec::new())).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "{:?}", + e + ) + ); + }; + + if let Err(e) = inbound_socket.send(Message::Ping(Vec::new())).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "{:?}", + e + ) + ); + } + } + + msg = outbound_socket.next() => { + match msg { + Some(Ok(msg)) => { + if let Err(e) = inbound_socket.send(msg).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "{:?}", + e + ) + ); + }; + }, + _ => break, + }; + }, + + msg = inbound_socket.next() => { + match msg { + Some(Ok(msg)) => { + if let Err(e) = outbound_socket.send(msg).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "{:?}", + e + ) + ); + }; + }, + _ => break + }; + } + }; + } + } + e => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req.uri(), + "{:?}", + e + ) + ); + } + }; + } + Err(e) => { + log::error!( + "{}", + log_format!(remote_addr.ip(), String::from("-"), req.uri(), "{}", e) + ); + } + } + }); + + Ok(response) + } + Err(e) => { + log::error!( + "{}", + log_format!(remote_addr.ip(), String::from("-"), req.uri(), "{}", e) + ); + response_by_status(StatusCode::SERVICE_UNAVAILABLE) + } + } +} diff --git a/main/src/security/address_status.rs b/main/src/security/address_status.rs index 46a33d1..1bb8181 100644 --- a/main/src/security/address_status.rs +++ b/main/src/security/address_status.rs @@ -1,5 +1,3 @@ -use super::*; - use async_trait::async_trait; use bytes::Buf; use ctx::AppConfig; @@ -8,6 +6,8 @@ use hyper::{header, Body, Request, Response, StatusCode}; use redis::FromRedisValue; use serde::{Deserialize, Serialize}; +use super::*; + pub(crate) const DB_STATUS_LIST: &str = "status_list"; #[derive(Debug, Serialize, Deserialize, PartialEq)] diff --git a/main/src/security/jwt.rs b/main/src/security/jwt.rs index a276355..0e3e6f4 100644 --- a/main/src/security/jwt.rs +++ b/main/src/security/jwt.rs @@ -1,15 +1,16 @@ -use super::*; - -use ctx::AppConfig; -use jsonwebtoken::*; -use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; use std::{ fs::File, io::Read, time::{SystemTime, UNIX_EPOCH}, }; +use ctx::AppConfig; +use jsonwebtoken::*; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; + +use super::*; + const TOKEN_ISSUER: &str = "ATOMICDEX-AUTH"; #[derive(Debug, Serialize, Deserialize, PartialEq)] diff --git a/main/src/security/proof_of_funding.rs b/main/src/security/proof_of_funding.rs index 2399004..6bc9e95 100644 --- a/main/src/security/proof_of_funding.rs +++ b/main/src/security/proof_of_funding.rs @@ -1,16 +1,17 @@ -use super::*; - use ctx::{AppConfig, ProxyRoute}; use http::RpcPayload; use rpc::Json; use serde_json::json; use sign::SignOps; +use super::*; + #[derive(Debug)] pub(crate) enum ProofOfFundingError { InvalidSignedMessage, InsufficientBalance, ErrorFromRpcCall, + #[allow(dead_code)] RpcCallFailed(String), } diff --git a/main/src/security/rate_limiter.rs b/main/src/security/rate_limiter.rs index e036499..eeea27e 100644 --- a/main/src/security/rate_limiter.rs +++ b/main/src/security/rate_limiter.rs @@ -1,10 +1,10 @@ -use super::*; - use async_trait::async_trait; use ctx::RateLimiter; use db::Db; use redis::Pipeline; +use super::*; + pub(crate) const DB_RP_1_MIN: &str = "rp:1_min"; pub(crate) const DB_RP_5_MIN: &str = "rp:5_min"; pub(crate) const DB_RP_15_MIN: &str = "rp:15_min"; @@ -94,7 +94,7 @@ impl RateLimitOperations for Db { ) -> GenericResult { let rate: u16 = redis::cmd("HGET") .arg(db) - .arg(&address) + .arg(address) .query_async(&mut self.connection) .await .unwrap_or(0); diff --git a/main/src/security/sign.rs b/main/src/security/sign.rs index 421f30a..0e8400c 100644 --- a/main/src/security/sign.rs +++ b/main/src/security/sign.rs @@ -1,13 +1,14 @@ -use super::*; +use core::{convert::From, str::FromStr}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcrypto::keccak256; -use core::{convert::From, str::FromStr}; use ethereum_types::{Address, H256}; use ethkey::{sign, verify_address, Secret, Signature}; use serde::{Deserialize, Serialize}; use serialization::{CompactInteger, Serializable, Stream}; use sha3::{Digest, Keccak256}; -use std::time::{SystemTime, UNIX_EPOCH}; + +use super::*; pub(crate) trait SignOps { fn sign_message_hash(&self) -> [u8; 32]; @@ -58,6 +59,7 @@ impl SignOps for SignedMessage { let hash = hasher.finalize(); let mut result: String = "0x".into(); for (i, c) in addr.chars().enumerate() { + #[allow(clippy::is_digit_ascii_radix)] if c.is_digit(10) { result.push(c); } else { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 97b8f4d..fd74a04 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] components = [ "rustfmt", "clippy" ] -channel = "nightly-2022-03-17" +channel = "1.75"