Skip to content

Commit

Permalink
Merge pull request #39 from gmawdo/fix-AZ-issues-caused-by-refactor-i…
Browse files Browse the repository at this point in the history
…n-nov24

Fix az issues caused by refactor in nov24
  • Loading branch information
gmawdo authored Jan 8, 2025
2 parents 8716287 + 9b1ba66 commit 3df21c6
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 49 deletions.
38 changes: 25 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ config = "0.11"
chrono = "0.4"

# required for Polkadot substrate, Aleph Zero blockchain integration
#subxt = { version = "0.37.0", optional = true }
#subxt-signer = { version = "0.37.0", optional = true }
subxt = { version = "0.37.0", optional = true }
subxt-signer = { version = "0.37.0", optional = true }
# Substrate specific
#frame-support = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
#frame-system = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
#pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
#pallet-identity = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
#pallet-staking = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
frame-support = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
frame-system = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
pallet-identity = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }
pallet-staking = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master", optional = true }

#sp-core = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
#sp-keyring = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
#sp-runtime = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
#sp-weights = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
sp-core = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
sp-keyring = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
sp-runtime = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}
sp-weights = { git = "https://github.com/paritytech/polkadot-sdk.git", branch = "master" , optional = true}

sha2 = "0.10"
parking_lot = "0.12"
Expand Down Expand Up @@ -113,10 +113,22 @@ http-body-util = "0.1"
[features]
strict = []
#traceability = ["tracing-subscriber", "tokio/rt-multi-thread", "intaglio"]
default = ["merkle_audit","rocksdb_store","compressed_store"]
#default = ["merkle_audit","rocksdb_store","compressed_store"]
compressed_store = []
merkle_audit = []
az_audit = []
az_audit = [
"subxt",
"subxt-signer",
"frame-support",
"frame-system",
"pallet-balances",
"pallet-identity",
"pallet-staking",
"sp-core",
"sp-keyring",
"sp-runtime",
"sp-weights"
]
redis_store = []
rocksdb_store = []
metrics = ["prometheus", "hyper"]
Expand Down
3 changes: 2 additions & 1 deletion config/settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ module_filter = [
"graymamba=info",
"graymamba::kernel::vfs::api=debug",
#"data_room=debug",
"graymamba::backingstore::rocksdb_data_store=debug"
"graymamba::backingstore::rocksdb_data_store=debug",
"graymamba::audit_adapters::substrate_based_audit=debug"
]

[storage]
Expand Down
2 changes: 1 addition & 1 deletion src/audit_adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod audit_system;
pub mod merkle_audit;
pub mod merkle_tree;
//pub mod substrate_based_audit;
pub mod substrate_based_audit;
pub mod poseidon_hash;
pub mod snark_proof;
pub mod irrefutable_audit;
230 changes: 198 additions & 32 deletions src/audit_adapters/substrate_based_audit.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
use crate::audit_adapters::irrefutable_audit::{IrrefutableAudit, AuditEvent, AuditError};
use async_trait::async_trait;
use std::error::Error;

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
use std::sync::Arc;
use tokio::sync::mpsc::{self as tokio_mpsc};

use config::{Config, File};
use subxt::backend::legacy::LegacyRpcMethods;
use subxt::backend::rpc::RpcClient;
//use subxt::utils::AccountId32;
use subxt::OnlineClient;
use subxt::PolkadotConfig;
use subxt_signer::sr25519::dev;
use subxt_signer::sr25519::Keypair;
use tokio::runtime::Runtime;
use subxt::ext::codec::Decode;

#[subxt::subxt(runtime_metadata_path = "metadata.scale")]
pub mod pallet_template {}

use tracing::debug;

#[derive(Clone)]
pub struct SubstrateBasedAudit {
api: OnlineClient<PolkadotConfig>,
//account_id: AccountId32,
signer: Keypair,
tx_sender: Sender<AuditEvent>,
//rpc: LegacyRpcMethods<PolkadotConfig>,
tx_sender: tokio_mpsc::Sender<AuditEvent>,
}


Expand Down Expand Up @@ -67,75 +66,179 @@ impl IrrefutableAudit for SubstrateBasedAudit {
))?;

let _rpc = LegacyRpcMethods::<PolkadotConfig>::new(rpc_client);
println!("✅ Connection with Substrate Node established.");
println!("✅ Connection with Substrate/AZ Node established.");

//let account_id: AccountId32 = dev::alice().public_key().into();
let signer = dev::alice();
println!("🔑 Using account Alice: {}", hex::encode(signer.public_key().0));

let (tx_sender, rx) = mpsc::channel();
let (tx_sender, rx) = tokio_mpsc::channel(100);

let audit = SubstrateBasedAudit {
api,
signer,
tx_sender,
};

// Verify metadata
audit.verify_metadata().await?;

// Spawn the event handler
Self::spawn_event_handler(rx)?;
Self::spawn_event_handler(Arc::new(audit.clone()), rx)?;

Ok(audit)
}

fn get_sender(&self) -> &Sender<AuditEvent> {
fn get_sender(&self) -> &tokio_mpsc::Sender<AuditEvent> {
&self.tx_sender
}

fn spawn_event_handler(receiver: Receiver<AuditEvent>) -> Result<(), Box<dyn Error>> {
thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
while let Ok(event) = receiver.recv() {
// Just process the event - no new connection needed
debug!("Processing event: {:?}", event);
// TODO: Add actual event processing logic here
fn spawn_event_handler(
audit: Arc<dyn IrrefutableAudit>,
mut receiver: tokio_mpsc::Receiver<AuditEvent>
) -> Result<(), Box<dyn Error>> {
tokio::spawn(async move {
while let Some(event) = receiver.recv().await {
debug!("Processing event: {:?}", event);

if let Err(e) = audit.process_event(event).await {
eprintln!("Error processing event: {}", e);
}
});
}
});
Ok(())
}

async fn process_event(&self, event: AuditEvent) -> Result<(), Box<dyn Error>> {
debug("Processing event: {:?}", event);
let event_type_bytes = event.event_type.as_bytes().to_vec();
let creation_time = event.creation_time.to_string().as_bytes().to_vec();
let file_path = event.file_path.as_bytes().to_vec();
let event_key = event.event_key.as_bytes().to_vec();

let event_type_bytes = event.event_type.clone().into_bytes();
let creation_time = event.creation_time.into_bytes();
let file_path = event.file_path.into_bytes();
let event_key = event.event_key.into_bytes();
// Validate input lengths before sending
self.validate_input_lengths(
&event_type_bytes,
&creation_time,
&file_path,
&event_key,
)?;

debug!("📤 Sending event to blockchain:");
debug!(" Type: {} ({} bytes)", event.event_type, event_type_bytes.len());
debug!(" Time: {} ({} bytes)", event.creation_time, creation_time.len());
debug!(" Path: {} ({} bytes)", event.file_path, file_path.len());
debug!(" Key: {} ({} bytes)", event.event_key, event_key.len());

match event.event_type.as_str() {
"disassembled" => {
let tx = pallet_template::tx()
.template_module()
.disassembled(event_type_bytes, creation_time, file_path, event_key);

self.api.tx()
let tx_hash = self.api.tx()
.sign_and_submit_default(&tx, &self.signer)
.await
.map_err(|e| Box::new(AuditError::TransactionError(e.to_string())))?;

println!("Disassembled event processed.");
debug!("🔗 Transaction submitted to blockchain");
debug!("📝 Transaction hash: {}", tx_hash);
debug!("👤 Sender: {}", hex::encode(self.signer.public_key().0));

// Subscribe to events using the events subscription
let mut sub = self.api.blocks().subscribe_finalized().await?;

while let Some(block) = sub.next().await {
let block = block?;
debug!("🔍 Block #{}", block.header().number);

// Get events for this block
if let Ok(events) = block.events().await {
for event in events.iter() {
if let Ok(event) = event {
if event.pallet_name() == "TemplateModule" {
debug!(" ✨ Found pallet event!");
debug!(" • Name: {}", event.variant_name());
debug!(" • Phase: {:?}", event.phase());

return Ok(()); // Exit after finding our event
}
}
}
}
}

debug!("Disassembled event processed.");
}
"reassembled" => {
let tx = pallet_template::tx()
.template_module()
.reassembled(event_type_bytes, creation_time, file_path, event_key);

self.api.tx()
let tx_hash = self.api.tx()
.sign_and_submit_default(&tx, &self.signer)
.await
.map_err(|e| Box::new(AuditError::TransactionError(e.to_string())))?;

println!("Reassembled event processed.");
debug!("🔗 Transaction submitted to blockchain");
debug!("📝 Transaction hash: {}", tx_hash);
debug!("👤 Sender: {}", hex::encode(self.signer.public_key().0));

// Subscribe to events using the events subscription
let mut sub = self.api.blocks().subscribe_finalized().await?;

while let Some(block) = sub.next().await {
let block = block?;
debug!("🔍 Block #{}", block.header().number);

// Get events for this block
if let Ok(events) = block.events().await {
for event in events.iter() {
if let Ok(event) = event {
if event.pallet_name() == "TemplateModule" {
debug!(" ✨ Found pallet event!");
debug!(" • Name: {}", event.variant_name());
debug!(" • Phase: {:?}", event.phase());

if let Ok(fields) = event.field_values() {
let fields_str = format!("{:?}", fields);

// The structure is Named([...])
if let Some(content) = fields_str.strip_prefix("Named([") {
if let Some(inner) = content.strip_suffix("])") {
// Split on "), (" to get each field
let fields: Vec<&str> = inner.split("), (").collect();

for field in fields {
// Extract field name
if let Some(name_end) = field.find("\", ") {
let name = field
.trim_start_matches('(')
.trim_start_matches('"')
.split('"')
.next()
.unwrap_or("");
debug!(" • Field: {}", name);

// If this is the event field, parse its inner structure
if name == "event" {
if let Some(event_start) = field.find("Named([") {
let event_content = &field[event_start..];
//debug!(" • Event content: {}", event_content);
}
}
}
}
}
}
}
return Ok(()); // Exit after finding our event
}
}
}
}
}

debug!("Reassembled event processed.");
}
_ => return Err(Box::new(AuditError::EventProcessingError(
"Unknown event type".to_string()
Expand All @@ -144,13 +247,76 @@ impl IrrefutableAudit for SubstrateBasedAudit {
Ok(())
}


fn shutdown(&self) -> Result<(), Box<dyn Error>> {
// Drop sender to signal handler thread to stop
// Additional cleanup if needed
// Close the channel by dropping the sender
let _ = self.tx_sender.send(AuditEvent {
event_type: "shutdown".to_string(),
creation_time: "".to_string(),
file_path: "".to_string(),
event_key: "".to_string(),
});

// Give a short grace period for pending events to complete
tokio::time::sleep(tokio::time::Duration::from_secs(1));

println!("Substrate/AZ audit system shutdown complete");
Ok(())
}
}

impl SubstrateBasedAudit {
fn validate_input_lengths(
&self,
event_type: &[u8],
creation_time: &[u8],
file_path: &[u8],
event_key: &[u8],
) -> Result<(), Box<dyn Error>> {
if event_type.len() > 64 {
return Err("Event type exceeds 64 bytes".into());
}
if creation_time.len() > 64 {
return Err("Creation time exceeds 64 bytes".into());
}
if file_path.len() > 256 {
return Err("File path exceeds 256 bytes".into());
}
if event_key.len() > 128 {
return Err("Event key exceeds 128 bytes".into());
}
Ok(())
}

async fn verify_metadata(&self) -> Result<(), Box<dyn Error>> {
debug!("\n🔍 Verifying Metadata:");

// Get metadata from the API
let metadata = self.api.metadata();

// Look for our pallet
if let Some(pallet) = metadata.pallet_by_name("TemplateModule") {
debug!("✅ Found pallet: {}", pallet.name());

// Check calls
debug!("\nCalls:");
for call in pallet.call_variants().unwrap_or_default() {
debug!(" • {}", call.name);
}

// Check events
debug!("\nEvents:");
for event in pallet.event_variants().unwrap_or_default() {
debug!(" • {}", event.name);
}

Ok(())
} else {
Err("Pallet 'TemplateModule' not found in metadata!".into())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 3df21c6

Please sign in to comment.