diff --git a/Cargo.lock b/Cargo.lock index 882c0118..0caf970f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "addr2line" @@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "async-channel" @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.2" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "shlex", ] @@ -246,9 +246,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "clap" -version = "4.5.22" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69371e34337c4c984bbe322360c2547210bf632eb2814bbe78a6e87a2935bd2b" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" dependencies = [ "clap_builder", "clap_derive", @@ -256,9 +256,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.22" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e24c1b4099818523236a8ca881d2b45db98dadfb4625cf6608c12069fcbbde1" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstream", "anstyle", @@ -281,9 +281,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "colorchoice" @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto-common" @@ -426,6 +426,7 @@ checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" dependencies = [ "curve25519-dalek", "ed25519", + "rand_core", "serde", "sha2", "subtle", @@ -477,9 +478,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "fiat-crypto" @@ -636,9 +637,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "h2" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +checksum = "ccae279728d634d083c00f6099cb58f01cc99c145b84b8be2f6c74618d79922e" dependencies = [ "atomic-waker", "bytes", @@ -719,9 +720,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0" dependencies = [ "bytes", "futures-channel", @@ -957,6 +958,8 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "tracing", + "tracing-subscriber", "ureq", ] @@ -964,6 +967,7 @@ dependencies = [ name = "kitsune2_core" version = "0.0.1-alpha" dependencies = [ + "axum", "bytes", "ed25519-dalek", "futures", @@ -974,6 +978,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "ureq", ] [[package]] @@ -1016,9 +1021,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.167" +version = "0.2.168" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" +checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" [[package]] name = "linux-raw-sys" @@ -1077,9 +1082,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "a2ef2593ffb6958c941575cee70c8e257438749971869c4ae5acf6f91a168a61" dependencies = [ "adler2", ] @@ -1251,9 +1256,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" dependencies = [ "bytes", "prost-derive", @@ -1261,11 +1266,10 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b" dependencies = [ - "bytes", "heck", "itertools", "log", @@ -1282,9 +1286,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", "itertools", @@ -1295,9 +1299,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +checksum = "cc2f1e56baa61e93533aebc21af4d2134b70f66275e0fcdf3cbe43d77ff7e8fc" dependencies = [ "prost", ] @@ -1343,9 +1347,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ "bitflags", ] @@ -1426,22 +1430,22 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.41" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "rustls" -version = "0.23.19" +version = "0.23.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" +checksum = "5065c3f250cbd332cd894be57c40fa52387247659b14a2d6041d121547903b1b" dependencies = [ "log", "once_cell", @@ -1454,9 +1458,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" [[package]] name = "rustls-webpki" @@ -1489,24 +1493,24 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "semver" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +checksum = "3cb6eb87a131f756572d7fb904f6e7b68633f09cca868c5df1c4b8d1a694bbba" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", @@ -1681,18 +1685,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.4" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490" +checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.4" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" +checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36" dependencies = [ "proc-macro2", "quote", @@ -1750,9 +1754,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", @@ -1838,6 +1842,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -1848,12 +1862,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -1876,9 +1893,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3193f92e105038f98ae68af40c008e3c94f2f046926e0f95e6c835dc6459bac8" +checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" dependencies = [ "base64", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 739faf1d..0db4f32e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,8 @@ futures = "0.3" tempfile = "3.14.0" # kitsune2 internally uses a mix of std::io::Error and thiserror derivation. thiserror = "2.0.3" +# ureq is used in the bootstrap client module. +ureq = "2.10.1" # url validation used by the api crate. url = "2.5.4" # kitsune2 uses tracing to log events. A consumer can choose any subscriber @@ -69,9 +71,9 @@ prost-build = "0.13.3" # Please be careful to only include them in dev dependencies or move them # above this section. # --- dev-dependencies --- +kitsune2_bootstrap_srv = { path = "crates/bootstrap_srv" } kitsune2_memory = { path = "crates/memory" } kitsune2_test_utils = { path = "crates/test_utils" } rand = "0.8.5" -ureq = "2.10.1" tracing-subscriber = "0.3" diff --git a/Makefile b/Makefile index c2d44b8f..fe4836db 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,9 @@ # tasks both in automation and locally until we figure out better # release automation tools. -.PHONY: all fmt clippy doc build test +.PHONY: all static-toml fmt clippy doc build test -all: fmt clippy doc test +all: static-toml fmt clippy doc build test static-toml: taplo format --check diff --git a/crates/api/src/agent.rs b/crates/api/src/agent.rs index dc7f1c3c..68fddb1e 100644 --- a/crates/api/src/agent.rs +++ b/crates/api/src/agent.rs @@ -105,6 +105,17 @@ pub trait Verifier: std::fmt::Debug { /// Trait-object [Verifier]. pub type DynVerifier = Arc; +impl Verifier for DynVerifier { + fn verify( + &self, + agent_info: &AgentInfo, + message: &[u8], + signature: &[u8], + ) -> bool { + (**self).verify(agent_info, message, signature) + } +} + /// A "Local" agent is an agent that is connected to the local Kitsune2 node, /// and is able to sign messages and agent infos. pub trait LocalAgent: Signer + 'static + Send + Sync + std::fmt::Debug { @@ -170,7 +181,6 @@ pub struct AgentInfo { } /// Signed agent information. -#[derive(Debug)] pub struct AgentInfoSigned { /// The decoded information associated with this agent. agent_info: AgentInfo, @@ -182,6 +192,14 @@ pub struct AgentInfoSigned { signature: bytes::Bytes, } +impl std::fmt::Debug for AgentInfoSigned { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("AgentInfoSigned(")?; + f.write_str(&self.encoded)?; + f.write_str(")") + } +} + impl AgentInfoSigned { /// Generate a signed agent info by signing an agent info. pub async fn sign( @@ -215,16 +233,44 @@ impl AgentInfoSigned { } let v: Ref = serde_json::from_slice(encoded) .map_err(|e| K2Error::other_src("decoding agent_info", e))?; - let agent_info: AgentInfo = serde_json::from_str(&v.agent_info) + Self::inner_decode_one(verifier, v.agent_info, v.signature) + } + + /// Decode a canonical json encoding of a list of signed agent infos. + pub fn decode_list( + verifier: &V, + encoded: &[u8], + ) -> K2Result>>> { + #[derive(serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct Ref { + agent_info: String, + #[serde(with = "crate::serde_bytes_base64")] + signature: bytes::Bytes, + } + let v: Vec = serde_json::from_slice(encoded) + .map_err(|e| K2Error::other_src("decoding agent_info", e))?; + Ok(v.into_iter() + .map(|v| { + Self::inner_decode_one(verifier, v.agent_info, v.signature) + }) + .collect()) + } + + fn inner_decode_one( + verifier: &V, + agent_info: String, + signature: bytes::Bytes, + ) -> K2Result> { + let info: AgentInfo = serde_json::from_str(&agent_info) .map_err(|e| K2Error::other_src("decoding inner agent_info", e))?; - if !verifier.verify(&agent_info, v.agent_info.as_bytes(), &v.signature) - { + if !verifier.verify(&info, agent_info.as_bytes(), &signature) { return Err(K2Error::other("InvalidSignature")); } Ok(std::sync::Arc::new(Self { - agent_info, - encoded: v.agent_info, - signature: v.signature, + agent_info: info, + encoded: agent_info, + signature, })) } diff --git a/crates/api/src/bootstrap.rs b/crates/api/src/bootstrap.rs new file mode 100644 index 00000000..07f55eef --- /dev/null +++ b/crates/api/src/bootstrap.rs @@ -0,0 +1,41 @@ +//! Kitsune2 bootstrap related types. + +use crate::*; +use std::sync::Arc; + +/// Method for bootstrapping WAN discovery of peers. +/// +/// The internal implementation will take care of whatever polling +/// or managing of message queues is required to be notified of +/// remote peers both on initialization and over runtime. +pub trait Bootstrap: 'static + Send + Sync + std::fmt::Debug { + /// Put an agent info onto a bootstrap server. + /// + /// This method takes responsibility for retrying the send in the case + /// of server error until such time as: + /// - the Put succeeds + /// - we receive a new info that supersedes the previous + /// - or the info expires + fn put(&self, info: Arc); +} + +/// Trait-object [Bootstrap]. +pub type DynBootstrap = Arc; + +/// A factory for constructing Bootstrap instances. +pub trait BootstrapFactory: 'static + Send + Sync + std::fmt::Debug { + /// Help the builder construct a default config from the chosen + /// module factories. + fn default_config(&self, config: &mut config::Config) -> K2Result<()>; + + /// Construct a bootstrap instance. + fn create( + &self, + builder: Arc, + peer_store: peer_store::DynPeerStore, + space: SpaceId, + ) -> BoxFut<'static, K2Result>; +} + +/// Trait-object [BootstrapFactory]. +pub type DynBootstrapFactory = Arc; diff --git a/crates/api/src/builder.rs b/crates/api/src/builder.rs index 64dea2b8..9967e77d 100644 --- a/crates/api/src/builder.rs +++ b/crates/api/src/builder.rs @@ -27,9 +27,14 @@ pub struct Builder { /// [peer_store::PeerStore] instances. pub peer_store: peer_store::DynPeerStoreFactory, + /// The [bootstrap::BootstrapFactory] to be used for creating + /// [bootstrap::Bootstrap] instances for initial WAN discovery. + pub bootstrap: bootstrap::DynBootstrapFactory, + /// The [fetch::FetchFactory] to be used for creating /// [fetch::Fetch] instances. pub fetch: fetch::DynFetchFactory, + /// The [transport::TransportFactory] to be used for creating /// [transport::Transport] instances. pub transport: transport::DynTransportFactory, @@ -47,6 +52,7 @@ impl Builder { kitsune, space, peer_store, + bootstrap, fetch, transport, } = &mut self; @@ -54,6 +60,7 @@ impl Builder { kitsune.default_config(config)?; space.default_config(config)?; peer_store.default_config(config)?; + bootstrap.default_config(config)?; fetch.default_config(config)?; transport.default_config(config)?; diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 41a479b0..de01277f 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -40,6 +40,7 @@ pub mod agent; pub mod arc; pub use arc::*; +pub mod bootstrap; pub mod builder; pub mod config; pub mod kitsune; diff --git a/crates/bootstrap_srv/Cargo.toml b/crates/bootstrap_srv/Cargo.toml index 92166913..378ba3ca 100644 --- a/crates/bootstrap_srv/Cargo.toml +++ b/crates/bootstrap_srv/Cargo.toml @@ -27,6 +27,8 @@ serde = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["time", "rt", "rt-multi-thread"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } [dev-dependencies] ureq = { workspace = true } diff --git a/crates/bootstrap_srv/src/bin/kitsune2-bootstrap-srv.rs b/crates/bootstrap_srv/src/bin/kitsune2-bootstrap-srv.rs index 3640b2cd..1dbb83c5 100644 --- a/crates/bootstrap_srv/src/bin/kitsune2-bootstrap-srv.rs +++ b/crates/bootstrap_srv/src/bin/kitsune2-bootstrap-srv.rs @@ -14,6 +14,9 @@ pub struct Args { /// on a single given machine, you can set this "production" mode. #[arg(long)] pub production: bool, + /// Output tracing in json format. + #[arg(long)] + pub json: bool, // TODO - Implement the ability to specify TLS certificates // TODO - Implement the ability to specify the listening address // TODO - Implement the ability to override any other relevant @@ -23,13 +26,29 @@ pub struct Args { fn main() { let args = ::parse(); + let t = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing::Level::DEBUG.into()) + .from_env_lossy(), + ) + .with_file(true) + .with_line_number(true); + + if args.json { + t.json().try_init() + } else { + t.try_init() + } + .expect("failed to init tracing"); + let config = if args.production { Config::production() } else { Config::testing() }; - println!("{args:?}--{config:?}"); + tracing::info!(?args, ?config); let (send, recv) = std::sync::mpsc::channel(); @@ -42,8 +61,8 @@ fn main() { let _ = recv.recv(); - println!("Terminating..."); + tracing::info!("Terminating..."); drop(srv); - println!("Done."); + tracing::info!("Exit Process."); std::process::exit(0); } diff --git a/crates/bootstrap_srv/src/server.rs b/crates/bootstrap_srv/src/server.rs index cc95bd8c..5760d3c4 100644 --- a/crates/bootstrap_srv/src/server.rs +++ b/crates/bootstrap_srv/src/server.rs @@ -19,7 +19,7 @@ struct ThreadGuard(&'static str); impl Drop for ThreadGuard { fn drop(&mut self) { - eprintln!("{}", self.0); + tracing::debug!("{}", self.0); } } @@ -37,6 +37,9 @@ pub struct BootstrapSrv { impl Drop for BootstrapSrv { fn drop(&mut self) { + let _g = ThreadGuard("Server Shutdown Complete!"); + + tracing::debug!("begin server shutdown..."); let _ = self.shutdown(); } } @@ -68,7 +71,12 @@ impl BootstrapSrv { // get the address that was assigned let addrs = server.server_addrs().to_vec(); - println!("Listening at {:?}", addrs); + tracing::info!(?addrs, "Listening"); + for addr in addrs.iter() { + // print these incase someone wants to parse for them + println!("#kitsune2_bootstrap_srv#listening#{addr:?}#"); + } + println!("#kitsune2_bootstrap_srv#running#"); // spawn our worker threads let mut workers = Vec::with_capacity(config.worker_thread_count + 1); @@ -104,11 +112,16 @@ impl BootstrapSrv { let mut is_err = false; self.cont.store(false, std::sync::atomic::Ordering::SeqCst); drop(self.server.take()); - for worker in self.workers.drain(..) { - if worker.join().is_err() { + while !self.workers.is_empty() { + tracing::debug!( + "waiting on {} threads to close...", + self.workers.len() + ); + if self.workers.pop().unwrap().join().is_err() { is_err = true; } } + tracing::debug!("all threads closed."); if is_err { Err(std::io::Error::other("Failure shutting down worker thread")) } else { @@ -127,7 +140,7 @@ fn prune_worker( cont: Arc, space_map: crate::SpaceMap, ) -> std::io::Result<()> { - let _g = ThreadGuard("WARN: prune_worker thread has ended"); + let _g = ThreadGuard("prune_worker thread has ended"); let mut last_check = std::time::Instant::now(); @@ -151,7 +164,7 @@ fn worker( recv: HttpReceiver, space_map: crate::SpaceMap, ) -> std::io::Result<()> { - let _g = ThreadGuard("WARN: worker thread has ended"); + let _g = ThreadGuard("worker thread has ended"); while cont.load(std::sync::atomic::Ordering::SeqCst) { let (req, res) = match recv.recv() { @@ -168,6 +181,7 @@ fn worker( handler.handle(req)?; } + Ok(()) } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 4c9a5be3..e526bdac 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -18,9 +18,15 @@ kitsune2_api = { workspace = true } prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["sync", "rt"] } +tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] } tracing = { workspace = true } +ureq = { workspace = true } [dev-dependencies] +axum = { workspace = true, default-features = false, features = [ + "http1", + "tokio", +] } +ed25519-dalek = { workspace = true, features = ["rand_core"] } rand = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/crates/core/src/factories.rs b/crates/core/src/factories.rs index 62a18716..947f384f 100644 --- a/crates/core/src/factories.rs +++ b/crates/core/src/factories.rs @@ -12,8 +12,14 @@ pub use mem_peer_store::MemPeerStoreFactory; #[cfg(test)] pub(crate) use mem_peer_store::test_utils; -mod core_fetch; -pub use core_fetch::*; +pub mod mem_bootstrap; +pub use mem_bootstrap::MemBootstrapFactory; + +pub mod core_bootstrap; +pub use core_bootstrap::CoreBootstrapFactory; + +pub mod core_fetch; +pub use core_fetch::CoreFetchFactory; mod mem_transport; pub use mem_transport::*; diff --git a/crates/core/src/factories/core_bootstrap.rs b/crates/core/src/factories/core_bootstrap.rs new file mode 100644 index 00000000..50153605 --- /dev/null +++ b/crates/core/src/factories/core_bootstrap.rs @@ -0,0 +1,290 @@ +//! The core bootstrap implementation provided by Kitsune2. + +use kitsune2_api::{bootstrap::*, config::*, *}; +use std::sync::Arc; + +/// CoreBootstrap configuration types. +pub mod config { + /// Configuration parameters for [CoreBootstrapFactory](super::CoreBootstrapFactory). + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct CoreBootstrapConfig { + /// The url of the kitsune2 bootstrap server. E.g. `https://boot.kitsu.ne`. + pub server_url: String, + + /// Minimum backoff in ms to use for both push and poll retry loops. + /// Default: 5 seconds. + pub backoff_min_ms: u32, + + /// Maximum backoff in ms to use for both push and poll retry loops. + /// Default: 5 minutes. + pub backoff_max_ms: u32, + } + + impl Default for CoreBootstrapConfig { + fn default() -> Self { + Self { + server_url: "".into(), + backoff_min_ms: 1000 * 5, + backoff_max_ms: 1000 * 60 * 5, + } + } + } + + impl CoreBootstrapConfig { + /// Get the minimum backoff duration. + pub fn backoff_min(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.backoff_min_ms as u64) + } + + /// Get the maximum backoff duration. + pub fn backoff_max(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.backoff_max_ms as u64) + } + } + + /// Module-level configuration for CoreBootstrap. + #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct CoreBootstrapModConfig { + /// CoreBootstrap configuration. + pub core_bootstrap: CoreBootstrapConfig, + } +} + +use config::*; + +/// The core bootstrap implementation provided by Kitsune2. +#[derive(Debug)] +pub struct CoreBootstrapFactory {} + +impl CoreBootstrapFactory { + /// Construct a new CoreBootstrapFactory. + pub fn create() -> DynBootstrapFactory { + let out: DynBootstrapFactory = Arc::new(CoreBootstrapFactory {}); + out + } +} + +impl BootstrapFactory for CoreBootstrapFactory { + fn default_config(&self, config: &mut Config) -> K2Result<()> { + config.set_module_config(&CoreBootstrapModConfig::default()) + } + + fn create( + &self, + builder: Arc, + peer_store: peer_store::DynPeerStore, + space: SpaceId, + ) -> BoxFut<'static, K2Result> { + Box::pin(async move { + let config: CoreBootstrapModConfig = + builder.config.get_module_config()?; + let out: DynBootstrap = Arc::new(CoreBootstrap::new( + builder, + config.core_bootstrap, + peer_store, + space, + )); + Ok(out) + }) + } +} + +type PushSend = tokio::sync::mpsc::Sender>; +type PushRecv = tokio::sync::mpsc::Receiver>; + +#[derive(Debug)] +struct CoreBootstrap { + space: SpaceId, + push_send: PushSend, + push_task: tokio::task::JoinHandle<()>, + poll_task: tokio::task::JoinHandle<()>, +} + +impl Drop for CoreBootstrap { + fn drop(&mut self) { + self.push_task.abort(); + self.poll_task.abort(); + } +} + +impl CoreBootstrap { + pub fn new( + builder: Arc, + config: CoreBootstrapConfig, + peer_store: peer_store::DynPeerStore, + space: SpaceId, + ) -> Self { + let server_url: Arc = + config.server_url.clone().into_boxed_str().into(); + + let (push_send, push_recv) = tokio::sync::mpsc::channel(1024); + + let push_task = tokio::task::spawn(push_task( + config.clone(), + server_url.clone(), + push_send.clone(), + push_recv, + )); + + let poll_task = tokio::task::spawn(poll_task( + builder, + config, + server_url, + space.clone(), + peer_store, + )); + + Self { + space, + push_send, + push_task, + poll_task, + } + } +} + +impl Bootstrap for CoreBootstrap { + fn put(&self, info: Arc) { + // ignore puts outside our space. + if info.space != self.space { + tracing::error!( + ?info, + "Logic Error: Attempting to put an agent outside of this space" + ); + return; + } + + // if we can't push onto our large buffer channel... we've got problems + if let Err(err) = self.push_send.try_send(info) { + tracing::warn!(?err, "Bootstrap overloaded, dropping put"); + } + } +} + +async fn push_task( + config: CoreBootstrapConfig, + server_url: Arc, + push_send: PushSend, + mut push_recv: PushRecv, +) { + let mut wait = None; + + while let Some(info) = push_recv.recv().await { + let url = + format!("{server_url}/bootstrap/{}/{}", &info.space, &info.agent); + let enc = match info.encode() { + Err(err) => { + tracing::error!(?err, "Could not encode agent info, dropping"); + continue; + } + Ok(enc) => enc, + }; + match tokio::task::spawn_blocking(move || { + ureq::put(&url).send_string(&enc) + }) + .await + { + Ok(Ok(_)) => { + // the put was successful, we don't need to wait + // before sending the next info if it is ready + wait = None; + } + _ => { + let now = Timestamp::now(); + + // the put failed, send it back to try again if not expired + if info.expires_at > now { + let _ = push_send.try_send(info); + } + + // we need to configure a backoff so we don't hammer the server + match wait { + None => wait = Some(config.backoff_min()), + Some(p) => { + let mut p = p * 2; + if p > config.backoff_max() { + p = config.backoff_max(); + } + wait = Some(p); + } + } + + // wait for the backoff time + if let Some(wait) = &wait { + tokio::time::sleep(*wait).await; + } + } + } + } +} + +async fn poll_task( + builder: Arc, + config: CoreBootstrapConfig, + server_url: Arc, + space: SpaceId, + peer_store: peer_store::DynPeerStore, +) { + let mut wait = config.backoff_min(); + + loop { + let url = format!("{server_url}/bootstrap/{space}"); + match tokio::task::spawn_blocking(move || { + ureq::get(&url) + .call() + .map_err(K2Error::other)? + .into_string() + .map_err(K2Error::other) + }) + .await + .map_err(|_| K2Error::other("task join error")) + { + Err(err) | Ok(Err(err)) => { + tracing::debug!(?err, "failure contacting bootstrap server"); + } + Ok(Ok(data)) => { + match agent::AgentInfoSigned::decode_list( + &builder.verifier, + data.as_bytes(), + ) { + Err(err) => tracing::debug!( + ?err, + "failure decoding bootstrap server response" + ), + Ok(list) => { + // count decoding a success, and set the wait to max + wait = config.backoff_max(); + + let list = list + .into_iter() + .filter_map(|l| match l { + Ok(l) => Some(l), + Err(err) => { + tracing::debug!( + ?err, + "failure decoding bootstrap agent info" + ); + None + } + }) + .collect::>(); + + let _ = peer_store.insert(list).await; + } + } + } + } + + wait *= 2; + if wait > config.backoff_max() { + wait = config.backoff_max(); + } + + tokio::time::sleep(wait).await; + } +} + +#[cfg(test)] +mod test; diff --git a/crates/core/src/factories/core_bootstrap/test.rs b/crates/core/src/factories/core_bootstrap/test.rs new file mode 100644 index 00000000..34fd44e9 --- /dev/null +++ b/crates/core/src/factories/core_bootstrap/test.rs @@ -0,0 +1,286 @@ +use kitsune2_api::{id::*, *}; +use std::sync::{Arc, Mutex}; + +#[derive(Debug)] +struct TestCrypto; + +impl agent::Signer for TestCrypto { + fn sign( + &self, + agent_info: &agent::AgentInfo, + encoded: &[u8], + ) -> BoxFut<'_, K2Result> { + use ed25519_dalek::*; + + let s1: AgentId = serde_json::from_str(&format!( + "\"{}\"", + agent_info.url.as_ref().unwrap().peer_id().unwrap() + )) + .unwrap(); + + let mut s2 = [0_u8; 32]; + s2.copy_from_slice(&s1[..]); + let s3 = SigningKey::from_bytes(&s2); + let sig = s3.sign(encoded); + let sig = bytes::Bytes::copy_from_slice(&sig.to_bytes()); + Box::pin(async move { Ok(sig) }) + } +} + +impl agent::Verifier for TestCrypto { + fn verify( + &self, + agent_info: &agent::AgentInfo, + message: &[u8], + signature: &[u8], + ) -> bool { + crate::Ed25519Verifier.verify(agent_info, message, signature) + } +} + +const S1: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space-1"))); + +struct Test { + peer_store: peer_store::DynPeerStore, + boot: bootstrap::DynBootstrap, +} + +impl Test { + pub async fn new(server: &str) -> Self { + let builder = builder::Builder { + verifier: Arc::new(TestCrypto), + bootstrap: super::CoreBootstrapFactory::create(), + ..crate::default_builder() + } + .with_default_config() + .unwrap(); + builder + .config + .set_module_config(&super::CoreBootstrapModConfig { + core_bootstrap: super::CoreBootstrapConfig { + server_url: server.into(), + backoff_min_ms: 10, + backoff_max_ms: 10, + }, + }) + .unwrap(); + let builder = Arc::new(builder); + println!("{}", serde_json::to_string(&builder.config).unwrap()); + + let peer_store = + builder.peer_store.create(builder.clone()).await.unwrap(); + + let boot = builder + .bootstrap + .create(builder.clone(), peer_store.clone(), S1.clone()) + .await + .unwrap(); + + Self { peer_store, boot } + } + + pub async fn push_agent(&self) -> AgentId { + let secret = + ed25519_dalek::SigningKey::generate(&mut rand::thread_rng()); + let pubkey = secret.verifying_key(); + + let agent = + AgentId::from(bytes::Bytes::copy_from_slice(pubkey.as_bytes())); + + let secret = + AgentId::from(bytes::Bytes::copy_from_slice(secret.as_bytes())); + + let url = + Some(Url::from_str(format!("ws://test.com:80/{secret}")).unwrap()); + let storage_arc = DhtArc::Arc(42, u32::MAX / 13); + + let info = agent::AgentInfoSigned::sign( + &TestCrypto, + agent::AgentInfo { + agent: agent.clone(), + space: S1.clone(), + created_at: Timestamp::now(), + expires_at: Timestamp::now() + + std::time::Duration::from_secs(60 * 20), + is_tombstone: false, + url, + storage_arc, + }, + ) + .await + .unwrap(); + + self.boot.put(info); + + agent + } + + pub async fn check_agent(&self, agent: AgentId) -> K2Result<()> { + self.peer_store.get(agent.clone()).await.map(|a| { + a.ok_or_else(|| { + let err = K2Error::other(format!("{agent} not found")); + println!("{err}"); + err + }) + .map(|a| { + println!("GOT AGENT: {a:?}"); + }) + })? + } +} + +pub struct Srv { + kill: Option>, + task: tokio::task::JoinHandle>, + halt: Arc, + addr: String, +} + +impl Drop for Srv { + fn drop(&mut self) { + if let Some(kill) = self.kill.take() { + let _ = kill.send(()); + } + self.task.abort(); + } +} + +impl Srv { + pub async fn new() -> Self { + use axum::*; + use std::sync::atomic::*; + + let (kill, kill_r) = tokio::sync::oneshot::channel(); + let kill = Some(kill); + let kill_r = async move { + let _ = kill_r.await; + }; + + let l = tokio::net::TcpListener::bind(std::net::SocketAddr::from(( + [127, 0, 0, 1], + 0, + ))) + .await + .unwrap(); + let addr = format!("http://{:?}", l.local_addr().unwrap()); + + let halt = Arc::new(std::sync::atomic::AtomicBool::new(true)); + + #[derive(Clone)] + struct State { + halt: Arc, + data: Arc>>, + } + + let get_state = State { + halt: halt.clone(), + data: Arc::new(Mutex::new(Vec::new())), + }; + let put_state = get_state.clone(); + + let app: Router = Router::new() + .route( + "/bootstrap/:space", + routing::get(move || async move { + if get_state.halt.load(Ordering::SeqCst) { + return Err( + http::status::StatusCode::INTERNAL_SERVER_ERROR, + ); + } + let mut out = "[".to_string(); + let mut is_first = true; + for d in get_state.data.lock().unwrap().iter() { + if is_first { + is_first = false; + } else { + out.push(','); + } + out.push_str(d); + } + out.push(']'); + Ok(out) + }), + ) + .route( + "/bootstrap/:space/:agent", + routing::put(move |body: String| async move { + if put_state.halt.load(Ordering::SeqCst) { + return Err( + http::status::StatusCode::INTERNAL_SERVER_ERROR, + ); + } + put_state.data.lock().unwrap().push(body); + Ok("{}".to_string()) + }), + ); + + let task = tokio::task::spawn(std::future::IntoFuture::into_future( + serve(l, app).with_graceful_shutdown(kill_r), + )); + + Self { + kill, + task, + halt, + addr, + } + } + + pub fn set_halt(&self, halt: bool) { + self.halt.store(halt, std::sync::atomic::Ordering::SeqCst); + } + + pub fn addr(&self) -> &str { + &self.addr + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn bootstrap_delayed_online() { + // this custom server will reject all requests with 500 errors + // until we call set_halt(false) on it. + let srv = Srv::new().await; + + println!("addr: {}", srv.addr()); + + let t1 = Test::new(srv.addr()).await; + let t2 = Test::new(srv.addr()).await; + + let a1 = t1.push_agent().await; + let a2 = t2.push_agent().await; + + // we should NOT get the infos yet, the server is erroring + + for _ in 0..5 { + println!("checking..."); + if t1.check_agent(a2.clone()).await.is_ok() + && t2.check_agent(a1.clone()).await.is_ok() + { + println!("found too soon!"); + panic!("the server is halting!! how did we get the data?!?!?!"); + } + println!("not found - yay, this is what we want here."); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // set_halt(false). now the push retry loop will successfully push agents + // now the poll retry loop will successfully pull agents + + srv.set_halt(false); + + for _ in 0..5 { + println!("checking..."); + if t1.check_agent(a2.clone()).await.is_ok() + && t2.check_agent(a1.clone()).await.is_ok() + { + println!("found!"); + return; + } + println!("not found :("); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + panic!("failed to bootstrap both created agents in time"); +} diff --git a/crates/core/src/factories/core_fetch.rs b/crates/core/src/factories/core_fetch.rs index b1c762dc..f295f46e 100644 --- a/crates/core/src/factories/core_fetch.rs +++ b/crates/core/src/factories/core_fetch.rs @@ -10,9 +10,9 @@ //! - persisting ops to the data store //! - removing op ids from in-memory data object //! -//! ### State object [CoreFetch] +//! ### State object CoreFetch //! -//! - Exposes public method [CoreFetch::add_ops] that takes a list of op ids and an agent id. +//! - Exposes public method CoreFetch::add_ops that takes a list of op ids and an agent id. //! - Stores pairs of ([OpId][AgentId]) in a set. //! - A hash set is used to look up elements by key efficiently. Ops may be added redundantly //! to the set with different sources to fetch from, so the set is keyed by op and agent id together. diff --git a/crates/core/src/factories/mem_bootstrap.rs b/crates/core/src/factories/mem_bootstrap.rs new file mode 100644 index 00000000..f9288d59 --- /dev/null +++ b/crates/core/src/factories/mem_bootstrap.rs @@ -0,0 +1,158 @@ +//! The mem bootstrap implementation provided by Kitsune2. + +use kitsune2_api::{bootstrap::*, config::*, *}; +use std::sync::{Arc, Mutex}; + +/// MemBootstrap configuration types. +pub mod config { + /// Configuration parameters for [MemBootstrapFactory](super::MemBootstrapFactory). + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct MemBootstrapConfig { + /// Since rust test runs multiple tests in the same process, + /// we cannot just have a single global bootstrap test store. + /// This defaults to the current thread id when this config instance + /// is constructed. This should be sufficient for most needs. + /// However, if you are creating kitsune nodes in tests from + /// different tasks, you may need to pick an explicit id for this value. + pub test_id: String, + + /// How often in ms to update the peer store with bootstrap infos. + /// Defaults to 5s. + pub poll_freq_ms: u32, + } + + impl Default for MemBootstrapConfig { + fn default() -> Self { + Self { + test_id: format!("{:?}", std::thread::current().id()), + poll_freq_ms: 5000, + } + } + } + + /// Module-level configuration for MemBootstrap. + #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct MemBootstrapModConfig { + /// MemBootstrap configuration. + pub mem_bootstrap: MemBootstrapConfig, + } +} + +use config::*; + +/// The mem bootstrap implementation provided by Kitsune2. +#[derive(Debug)] +pub struct MemBootstrapFactory {} + +impl MemBootstrapFactory { + /// Construct a new MemBootstrapFactory. + pub fn create() -> DynBootstrapFactory { + let out: DynBootstrapFactory = Arc::new(MemBootstrapFactory {}); + out + } + + /// Testing hook to trigger an immediate bootstrap pull of all + /// polling tasks that are currently registered. + pub fn trigger_immediate_poll() { + NOTIFY.notify_waiters(); + } +} + +impl BootstrapFactory for MemBootstrapFactory { + fn default_config(&self, config: &mut Config) -> K2Result<()> { + config.set_module_config(&MemBootstrapModConfig::default()) + } + + fn create( + &self, + builder: Arc, + peer_store: peer_store::DynPeerStore, + _space: SpaceId, + ) -> BoxFut<'static, K2Result> { + Box::pin(async move { + let config: MemBootstrapModConfig = + builder.config.get_module_config()?; + let out: DynBootstrap = + Arc::new(MemBootstrap::new(config.mem_bootstrap, peer_store)); + Ok(out) + }) + } +} + +#[derive(Debug)] +struct MemBootstrap { + test_id: Arc, + task: tokio::task::JoinHandle<()>, +} + +impl Drop for MemBootstrap { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl MemBootstrap { + pub fn new( + config: MemBootstrapConfig, + peer_store: peer_store::DynPeerStore, + ) -> Self { + let test_id: Arc = config.test_id.into_boxed_str().into(); + let test_id2 = test_id.clone(); + let poll_freq = + std::time::Duration::from_millis(config.poll_freq_ms as u64); + let task = tokio::task::spawn(async move { + loop { + let info_list = stat_process(test_id2.clone(), None); + peer_store.insert(info_list).await.unwrap(); + tokio::select! { + _ = tokio::time::sleep(poll_freq) => (), + _ = NOTIFY.notified() => (), + } + } + }); + Self { test_id, task } + } +} + +impl Bootstrap for MemBootstrap { + fn put(&self, info: Arc) { + let _ = stat_process(self.test_id.clone(), Some(info)); + } +} + +static NOTIFY: tokio::sync::Notify = tokio::sync::Notify::const_new(); + +type Store = Vec>; +type Map = std::collections::HashMap, Store>; +static STAT: std::sync::OnceLock> = std::sync::OnceLock::new(); +fn stat_process( + id: Arc, + info: Option>, +) -> Vec> { + let mut lock = STAT.get_or_init(Default::default).lock().unwrap(); + let store = lock.entry(id).or_default(); + let now = Timestamp::now(); + store.retain(|a| { + if let Some(info) = info.as_ref() { + if a.agent == info.agent { + return false; + } + } + if a.expires_at <= now { + return false; + } + true + }); + if let Some(info) = info { + while store.len() > 31 { + store.remove(16); + } + store.push(info); + } + store.clone() +} + +#[cfg(test)] +mod test; diff --git a/crates/core/src/factories/mem_bootstrap/test.rs b/crates/core/src/factories/mem_bootstrap/test.rs new file mode 100644 index 00000000..1ab79dce --- /dev/null +++ b/crates/core/src/factories/mem_bootstrap/test.rs @@ -0,0 +1,131 @@ +use kitsune2_api::{id::*, *}; +use std::sync::Arc; + +#[derive(Debug)] +struct TestCrypto; + +const SIG: bytes::Bytes = bytes::Bytes::from_static(b"TEST-SIGNATURE"); + +impl agent::Signer for TestCrypto { + fn sign( + &self, + _agent_info: &agent::AgentInfo, + _encoded: &[u8], + ) -> BoxFut<'_, K2Result> { + Box::pin(async move { Ok(SIG.clone()) }) + } +} + +impl agent::Verifier for TestCrypto { + fn verify( + &self, + _agent_info: &agent::AgentInfo, + _message: &[u8], + signature: &[u8], + ) -> bool { + signature == &SIG[..] + } +} + +const S1: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space-1"))); + +struct Test { + peer_store: peer_store::DynPeerStore, + boot: bootstrap::DynBootstrap, +} + +impl Test { + pub async fn new() -> Self { + let builder = Arc::new( + builder::Builder { + verifier: Arc::new(TestCrypto), + ..crate::default_builder() + } + .with_default_config() + .unwrap(), + ); + println!("{}", serde_json::to_string(&builder.config).unwrap()); + + let peer_store = + builder.peer_store.create(builder.clone()).await.unwrap(); + + let boot = builder + .bootstrap + .create(builder.clone(), peer_store.clone(), S1.clone()) + .await + .unwrap(); + + Self { peer_store, boot } + } + + pub async fn push_agent(&self) -> AgentId { + use std::sync::atomic::*; + + static NXT: AtomicU64 = AtomicU64::new(1); + let nxt = NXT.fetch_add(1, Ordering::Relaxed); + let agent = + AgentId::from(bytes::Bytes::copy_from_slice(&nxt.to_le_bytes())); + + let url = None; + let storage_arc = DhtArc::Arc(42, u32::MAX / 13); + + let info = agent::AgentInfoSigned::sign( + &TestCrypto, + agent::AgentInfo { + agent: agent.clone(), + space: S1.clone(), + created_at: Timestamp::now(), + expires_at: Timestamp::now() + + std::time::Duration::from_secs(60 * 20), + is_tombstone: false, + url, + storage_arc, + }, + ) + .await + .unwrap(); + + self.boot.put(info); + + agent + } + + pub async fn check_agent(&self, agent: AgentId) -> K2Result<()> { + self.peer_store.get(agent.clone()).await.map(|a| { + a.ok_or_else(|| { + let err = K2Error::other(format!("{agent} not found")); + println!("{err}"); + err + }) + .map(|a| { + println!("GOT AGENT: {a:?}"); + }) + })? + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn mem_bootstrap_sanity() { + let t1 = Test::new().await; + let t2 = Test::new().await; + + let a1 = t1.push_agent().await; + let a2 = t2.push_agent().await; + + for _ in 0..5 { + super::MemBootstrapFactory::trigger_immediate_poll(); + + println!("checking..."); + if t1.check_agent(a2.clone()).await.is_ok() + && t2.check_agent(a1.clone()).await.is_ok() + { + println!("found!"); + return; + } + println!("not found :("); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + panic!("failed to bootstrap both created agents in time"); +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b532d019..979e5724 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -44,6 +44,7 @@ impl agent::Verifier for Ed25519Verifier { /// [factories::CoreKitsuneFactory]. /// - `space` - The default space module is [factories::CoreSpaceFactory]. /// - `peer_store` - The default peer store is [factories::MemPeerStoreFactory]. +/// - `bootstrap` - The default bootstrap is [factories::MemBootstrapFactory]. /// - `fetch` - The default fetch module is [factories::CoreFetchFactory]. /// - `transport` - The default transport is [factories::MemTransportFactory]. pub fn default_builder() -> Builder { @@ -53,6 +54,7 @@ pub fn default_builder() -> Builder { kitsune: factories::CoreKitsuneFactory::create(), space: factories::CoreSpaceFactory::create(), peer_store: factories::MemPeerStoreFactory::create(), + bootstrap: factories::MemBootstrapFactory::create(), fetch: factories::CoreFetchFactory::create(), transport: factories::MemTransportFactory::create(), } diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs index c362c82f..ea0537f6 100644 --- a/crates/test_utils/src/lib.rs +++ b/crates/test_utils/src/lib.rs @@ -1,3 +1,6 @@ +#![deny(missing_docs)] +//! Shared testing types for kitsune2 crates. + /// Enable tracing with the RUST_LOG environment variable. /// /// This is intended to be used in tests, so it defaults to DEBUG level.