diff --git a/Cargo.toml b/Cargo.toml index 0e32704..95b4453 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,19 +64,19 @@ config = "0.11" chrono = "0.4" # required for Polkadot substrate, Aleph Zero blockchain integration -#subxt = { version = "0.37.0", optional = true } -#subxt-signer = { version = "0.37.0", optional = true } +subxt = { version = "0.37.0", optional = true } +subxt-signer = { version = "0.37.0", optional = true } # Substrate specific -#frame-support = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } -#frame-system = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } -#pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } -#pallet-identity = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } -#pallet-staking = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } +frame-support = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } +frame-system = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } +pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } +pallet-identity = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } +pallet-staking = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true } -#sp-core = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} -#sp-keyring = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} -#sp-runtime = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} -#sp-weights = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} +sp-core = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} +sp-keyring = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} +sp-runtime = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} +sp-weights = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true} sha2 = "0.10" parking_lot = "0.12" @@ -113,10 +113,22 @@ http-body-util = "0.1" [features] strict = [] #traceability = ["tracing-subscriber", "tokio/rt-multi-thread", "intaglio"] -default = ["merkle_audit","rocksdb_store","compressed_store"] +#default = ["merkle_audit","rocksdb_store","compressed_store"] compressed_store = [] merkle_audit = [] -az_audit = [] +az_audit = [ + "subxt", + "subxt-signer", + "frame-support", + "frame-system", + "pallet-balances", + "pallet-identity", + "pallet-staking", + "sp-core", + "sp-keyring", + "sp-runtime", + "sp-weights" +] redis_store = [] rocksdb_store = [] metrics = ["prometheus", "hyper"] diff --git a/config/settings.toml b/config/settings.toml index 7b925ac..369b0f3 100644 --- a/config/settings.toml +++ b/config/settings.toml @@ -34,7 +34,8 @@ module_filter = [ "graymamba=info", "graymamba::kernel::vfs::api=debug", #"data_room=debug", - "graymamba::backingstore::rocksdb_data_store=debug" + "graymamba::backingstore::rocksdb_data_store=debug", + "graymamba::audit_adapters::substrate_based_audit=debug" ] [storage] diff --git a/src/audit_adapters/mod.rs b/src/audit_adapters/mod.rs index c71a17a..d88d00e 100644 --- a/src/audit_adapters/mod.rs +++ b/src/audit_adapters/mod.rs @@ -1,7 +1,7 @@ pub mod audit_system; pub mod merkle_audit; pub mod merkle_tree; -//pub mod substrate_based_audit; +pub mod substrate_based_audit; pub mod poseidon_hash; pub mod snark_proof; pub mod irrefutable_audit; \ No newline at end of file diff --git a/src/audit_adapters/substrate_based_audit.rs b/src/audit_adapters/substrate_based_audit.rs index 6b59c52..a96eff0 100644 --- a/src/audit_adapters/substrate_based_audit.rs +++ b/src/audit_adapters/substrate_based_audit.rs @@ -1,29 +1,28 @@ use crate::audit_adapters::irrefutable_audit::{IrrefutableAudit, AuditEvent, AuditError}; use async_trait::async_trait; use std::error::Error; - -use std::sync::mpsc::{self, Receiver, Sender}; -use std::thread; +use std::sync::Arc; +use tokio::sync::mpsc::{self as tokio_mpsc}; use config::{Config, File}; use subxt::backend::legacy::LegacyRpcMethods; use subxt::backend::rpc::RpcClient; -//use subxt::utils::AccountId32; use subxt::OnlineClient; use subxt::PolkadotConfig; use subxt_signer::sr25519::dev; use subxt_signer::sr25519::Keypair; -use tokio::runtime::Runtime; +use subxt::ext::codec::Decode; #[subxt::subxt(runtime_metadata_path = "metadata.scale")] pub mod pallet_template {} +use tracing::debug; + +#[derive(Clone)] pub struct SubstrateBasedAudit { api: OnlineClient, - //account_id: AccountId32, signer: Keypair, - tx_sender: Sender, - //rpc: LegacyRpcMethods, + tx_sender: tokio_mpsc::Sender, } @@ -67,12 +66,13 @@ impl IrrefutableAudit for SubstrateBasedAudit { ))?; let _rpc = LegacyRpcMethods::::new(rpc_client); - println!("āœ… Connection with Substrate Node established."); + println!("āœ… Connection with Substrate/AZ Node established."); //let account_id: AccountId32 = dev::alice().public_key().into(); let signer = dev::alice(); + println!("šŸ”‘ Using account Alice: {}", hex::encode(signer.public_key().0)); - let (tx_sender, rx) = mpsc::channel(); + let (tx_sender, rx) = tokio_mpsc::channel(100); let audit = SubstrateBasedAudit { api, @@ -80,37 +80,54 @@ impl IrrefutableAudit for SubstrateBasedAudit { tx_sender, }; + // Verify metadata + audit.verify_metadata().await?; + // Spawn the event handler - Self::spawn_event_handler(rx)?; + Self::spawn_event_handler(Arc::new(audit.clone()), rx)?; Ok(audit) } - fn get_sender(&self) -> &Sender { + fn get_sender(&self) -> &tokio_mpsc::Sender { &self.tx_sender } - fn spawn_event_handler(receiver: Receiver) -> Result<(), Box> { - thread::spawn(move || { - let rt = Runtime::new().unwrap(); - rt.block_on(async move { - while let Ok(event) = receiver.recv() { - // Just process the event - no new connection needed - debug!("Processing event: {:?}", event); - // TODO: Add actual event processing logic here + fn spawn_event_handler( + audit: Arc, + mut receiver: tokio_mpsc::Receiver + ) -> Result<(), Box> { + tokio::spawn(async move { + while let Some(event) = receiver.recv().await { + debug!("Processing event: {:?}", event); + + if let Err(e) = audit.process_event(event).await { + eprintln!("Error processing event: {}", e); } - }); + } }); Ok(()) } async fn process_event(&self, event: AuditEvent) -> Result<(), Box> { - debug("Processing event: {:?}", event); + let event_type_bytes = event.event_type.as_bytes().to_vec(); + let creation_time = event.creation_time.to_string().as_bytes().to_vec(); + let file_path = event.file_path.as_bytes().to_vec(); + let event_key = event.event_key.as_bytes().to_vec(); - let event_type_bytes = event.event_type.clone().into_bytes(); - let creation_time = event.creation_time.into_bytes(); - let file_path = event.file_path.into_bytes(); - let event_key = event.event_key.into_bytes(); + // Validate input lengths before sending + self.validate_input_lengths( + &event_type_bytes, + &creation_time, + &file_path, + &event_key, + )?; + + debug!("šŸ“¤ Sending event to blockchain:"); + debug!(" Type: {} ({} bytes)", event.event_type, event_type_bytes.len()); + debug!(" Time: {} ({} bytes)", event.creation_time, creation_time.len()); + debug!(" Path: {} ({} bytes)", event.file_path, file_path.len()); + debug!(" Key: {} ({} bytes)", event.event_key, event_key.len()); match event.event_type.as_str() { "disassembled" => { @@ -118,24 +135,110 @@ impl IrrefutableAudit for SubstrateBasedAudit { .template_module() .disassembled(event_type_bytes, creation_time, file_path, event_key); - self.api.tx() + let tx_hash = self.api.tx() .sign_and_submit_default(&tx, &self.signer) .await .map_err(|e| Box::new(AuditError::TransactionError(e.to_string())))?; - println!("Disassembled event processed."); + debug!("šŸ”— Transaction submitted to blockchain"); + debug!("šŸ“ Transaction hash: {}", tx_hash); + debug!("šŸ‘¤ Sender: {}", hex::encode(self.signer.public_key().0)); + + // Subscribe to events using the events subscription + let mut sub = self.api.blocks().subscribe_finalized().await?; + + while let Some(block) = sub.next().await { + let block = block?; + debug!("šŸ” Block #{}", block.header().number); + + // Get events for this block + if let Ok(events) = block.events().await { + for event in events.iter() { + if let Ok(event) = event { + if event.pallet_name() == "TemplateModule" { + debug!(" āœØ Found pallet event!"); + debug!(" ā€¢ Name: {}", event.variant_name()); + debug!(" ā€¢ Phase: {:?}", event.phase()); + + return Ok(()); // Exit after finding our event + } + } + } + } + } + + debug!("Disassembled event processed."); } "reassembled" => { let tx = pallet_template::tx() .template_module() .reassembled(event_type_bytes, creation_time, file_path, event_key); - self.api.tx() + let tx_hash = self.api.tx() .sign_and_submit_default(&tx, &self.signer) .await .map_err(|e| Box::new(AuditError::TransactionError(e.to_string())))?; - println!("Reassembled event processed."); + debug!("šŸ”— Transaction submitted to blockchain"); + debug!("šŸ“ Transaction hash: {}", tx_hash); + debug!("šŸ‘¤ Sender: {}", hex::encode(self.signer.public_key().0)); + + // Subscribe to events using the events subscription + let mut sub = self.api.blocks().subscribe_finalized().await?; + + while let Some(block) = sub.next().await { + let block = block?; + debug!("šŸ” Block #{}", block.header().number); + + // Get events for this block + if let Ok(events) = block.events().await { + for event in events.iter() { + if let Ok(event) = event { + if event.pallet_name() == "TemplateModule" { + debug!(" āœØ Found pallet event!"); + debug!(" ā€¢ Name: {}", event.variant_name()); + debug!(" ā€¢ Phase: {:?}", event.phase()); + + if let Ok(fields) = event.field_values() { + let fields_str = format!("{:?}", fields); + + // The structure is Named([...]) + if let Some(content) = fields_str.strip_prefix("Named([") { + if let Some(inner) = content.strip_suffix("])") { + // Split on "), (" to get each field + let fields: Vec<&str> = inner.split("), (").collect(); + + for field in fields { + // Extract field name + if let Some(name_end) = field.find("\", ") { + let name = field + .trim_start_matches('(') + .trim_start_matches('"') + .split('"') + .next() + .unwrap_or(""); + debug!(" ā€¢ Field: {}", name); + + // If this is the event field, parse its inner structure + if name == "event" { + if let Some(event_start) = field.find("Named([") { + let event_content = &field[event_start..]; + //debug!(" ā€¢ Event content: {}", event_content); + } + } + } + } + } + } + } + return Ok(()); // Exit after finding our event + } + } + } + } + } + + debug!("Reassembled event processed."); } _ => return Err(Box::new(AuditError::EventProcessingError( "Unknown event type".to_string() @@ -144,13 +247,76 @@ impl IrrefutableAudit for SubstrateBasedAudit { Ok(()) } + fn shutdown(&self) -> Result<(), Box> { - // Drop sender to signal handler thread to stop - // Additional cleanup if needed + // Close the channel by dropping the sender + let _ = self.tx_sender.send(AuditEvent { + event_type: "shutdown".to_string(), + creation_time: "".to_string(), + file_path: "".to_string(), + event_key: "".to_string(), + }); + + // Give a short grace period for pending events to complete + tokio::time::sleep(tokio::time::Duration::from_secs(1)); + + println!("Substrate/AZ audit system shutdown complete"); Ok(()) } } +impl SubstrateBasedAudit { + fn validate_input_lengths( + &self, + event_type: &[u8], + creation_time: &[u8], + file_path: &[u8], + event_key: &[u8], + ) -> Result<(), Box> { + if event_type.len() > 64 { + return Err("Event type exceeds 64 bytes".into()); + } + if creation_time.len() > 64 { + return Err("Creation time exceeds 64 bytes".into()); + } + if file_path.len() > 256 { + return Err("File path exceeds 256 bytes".into()); + } + if event_key.len() > 128 { + return Err("Event key exceeds 128 bytes".into()); + } + Ok(()) + } + + async fn verify_metadata(&self) -> Result<(), Box> { + debug!("\nšŸ” Verifying Metadata:"); + + // Get metadata from the API + let metadata = self.api.metadata(); + + // Look for our pallet + if let Some(pallet) = metadata.pallet_by_name("TemplateModule") { + debug!("āœ… Found pallet: {}", pallet.name()); + + // Check calls + debug!("\nCalls:"); + for call in pallet.call_variants().unwrap_or_default() { + debug!(" ā€¢ {}", call.name); + } + + // Check events + debug!("\nEvents:"); + for event in pallet.event_variants().unwrap_or_default() { + debug!(" ā€¢ {}", event.name); + } + + Ok(()) + } else { + Err("Pallet 'TemplateModule' not found in metadata!".into()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/bin/graymamba/main.rs b/src/bin/graymamba/main.rs index a199498..b5823b4 100644 --- a/src/bin/graymamba/main.rs +++ b/src/bin/graymamba/main.rs @@ -7,7 +7,7 @@ use graymamba::audit_adapters::irrefutable_audit::IrrefutableAudit; use graymamba::audit_adapters::merkle_audit::MerkleBasedAuditSystem; #[cfg(feature = "az_audit")] -use graymamba::audit_adapters::substrate_audit::SubstrateAuditSystem; +use graymamba::audit_adapters::substrate_based_audit::SubstrateBasedAudit; use config::{Config, File as ConfigFile}; @@ -174,7 +174,7 @@ async fn main() -> Result<(), Box> { #[cfg(feature = "az_audit")] { - match SubstrateAuditSystem::new().await { + match SubstrateBasedAudit::new().await { Ok(audit) => { println!("āœ… Aleph Zero audit initialization successful"); Arc::new(audit)