diff --git a/Cargo.lock b/Cargo.lock index 9667b2e..567fbb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "chirpstack_api" -version = "4.8.1" +version = "4.9.0-test.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74e4ac52b7aabb481f0f05f7dfc6f66e61357ae705751b882bf688ab70bce54e" +checksum = "ef50294bd90c389f76dcce5b7214adf48b92feef5b4b231d17d4fa3589fb8951" dependencies = [ "hex", "pbjson", diff --git a/Cargo.toml b/Cargo.toml index 4699e50..55453b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ "usage", "derive", ] } - chirpstack_api = { version = "4.8", default-features = false, features = [ + chirpstack_api = { version = "4.9.0-test.1", default-features = false, features = [ "json", ] } lrwn_filters = { version = "4.7", features = ["serde"] } diff --git a/src/backend/concentratord.rs b/src/backend/concentratord.rs index e0610bd..8004d38 100644 --- a/src/backend/concentratord.rs +++ b/src/backend/concentratord.rs @@ -13,7 +13,7 @@ use tokio::task; use super::Backend as BackendTrait; use crate::config::Configuration; use crate::metadata; -use crate::mqtt::{send_gateway_stats, send_tx_ack, send_uplink_frame}; +use crate::mqtt::{send_gateway_stats, send_mesh_stats, send_tx_ack, send_uplink_frame}; pub struct Backend { gateway_id: String, @@ -291,6 +291,11 @@ async fn handle_event_msg( pl.metadata.extend(metadata::get().await?); send_gateway_stats(&pl).await?; } + "mesh_stats" => { + let pl = gw::MeshStats::decode(pl)?; + info!("Received mesh stats"); + send_mesh_stats(&pl).await?; + } _ => { return Err(anyhow!("Unexpected event: {}", event)); } diff --git a/src/mqtt.rs b/src/mqtt.rs index c015621..78c4ef9 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -286,6 +286,21 @@ pub async fn send_gateway_stats(pl: &gw::GatewayStats) -> Result<()> { Ok(()) } +pub async fn send_mesh_stats(pl: &gw::MeshStats) -> Result<()> { + let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?; + + let b = match state.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + let topic = get_event_topic(&state.topic_prefix, &state.gateway_id, "mesh-stats"); + info!("Sending mesh stats event, topic: {}", topic); + state.client.publish(topic, state.qos, false, b).await?; + trace!("Message published"); + + Ok(()) +} + pub async fn send_tx_ack(pl: &gw::DownlinkTxAck) -> Result<()> { let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?; diff --git a/tests/concentratord_test.rs b/tests/concentratord_test.rs index 77140dd..5a87acc 100644 --- a/tests/concentratord_test.rs +++ b/tests/concentratord_test.rs @@ -187,6 +187,36 @@ async fn end_to_end() { pl ); + // Mesh Stats + let mesh_stats_pl = gw::MeshStats { + gateway_id: "0102030405060708".into(), + ..Default::default() + }; + thread::spawn({ + let zmq_pub = zmq_pub.clone(); + let mesh_stats_pl = mesh_stats_pl.encode_to_vec(); + + move || { + let zmq_pub = zmq_pub.lock().unwrap(); + zmq_pub.send("mesh_stats", zmq::SNDMORE).unwrap(); + zmq_pub.send(mesh_stats_pl, 0).unwrap(); + } + }); + + let mqtt_msg = mqtt_rx.recv().await.unwrap(); + assert_eq!( + "eu868/gateway/0102030405060708/event/mesh-stats", + String::from_utf8(mqtt_msg.topic.to_vec()).unwrap() + ); + let pl = gw::MeshStats::decode(&mut Cursor::new(mqtt_msg.payload.to_vec())).unwrap(); + assert_eq!( + gw::MeshStats { + gateway_id: "0102030405060708".into(), + ..Default::default() + }, + pl + ); + // Downlink let down_pl = gw::DownlinkFrame { gateway_id: "0102030405060708".into(),