Skip to content

Commit

Permalink
Implement handling of mesh_stats.
Browse files Browse the repository at this point in the history
This message is generated by the ChirpStack Gateway Mesh component, when
using this as a proxy to the ChirpStack Concentratord in case of a mesh
setup.
  • Loading branch information
brocaar committed Jun 21, 2024
1 parent 963cf74 commit 3464617
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 4 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"usage",
"derive",
] }
chirpstack_api = { version = "4.8", default-features = false, features = [
chirpstack_api = { version = "4.9.0-test.1", default-features = false, features = [
"json",
] }
lrwn_filters = { version = "4.7", features = ["serde"] }
Expand Down
7 changes: 6 additions & 1 deletion src/backend/concentratord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::task;
use super::Backend as BackendTrait;
use crate::config::Configuration;
use crate::metadata;
use crate::mqtt::{send_gateway_stats, send_tx_ack, send_uplink_frame};
use crate::mqtt::{send_gateway_stats, send_mesh_stats, send_tx_ack, send_uplink_frame};

pub struct Backend {
gateway_id: String,
Expand Down Expand Up @@ -291,6 +291,11 @@ async fn handle_event_msg(
pl.metadata.extend(metadata::get().await?);
send_gateway_stats(&pl).await?;
}
"mesh_stats" => {
let pl = gw::MeshStats::decode(pl)?;
info!("Received mesh stats");
send_mesh_stats(&pl).await?;
}
_ => {
return Err(anyhow!("Unexpected event: {}", event));
}
Expand Down
15 changes: 15 additions & 0 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,21 @@ pub async fn send_gateway_stats(pl: &gw::GatewayStats) -> Result<()> {
Ok(())
}

pub async fn send_mesh_stats(pl: &gw::MeshStats) -> Result<()> {
let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?;

let b = match state.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
let topic = get_event_topic(&state.topic_prefix, &state.gateway_id, "mesh-stats");
info!("Sending mesh stats event, topic: {}", topic);
state.client.publish(topic, state.qos, false, b).await?;
trace!("Message published");

Ok(())
}

pub async fn send_tx_ack(pl: &gw::DownlinkTxAck) -> Result<()> {
let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?;

Expand Down
30 changes: 30 additions & 0 deletions tests/concentratord_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,36 @@ async fn end_to_end() {
pl
);

// Mesh Stats
let mesh_stats_pl = gw::MeshStats {
gateway_id: "0102030405060708".into(),
..Default::default()
};
thread::spawn({
let zmq_pub = zmq_pub.clone();
let mesh_stats_pl = mesh_stats_pl.encode_to_vec();

move || {
let zmq_pub = zmq_pub.lock().unwrap();
zmq_pub.send("mesh_stats", zmq::SNDMORE).unwrap();
zmq_pub.send(mesh_stats_pl, 0).unwrap();
}
});

let mqtt_msg = mqtt_rx.recv().await.unwrap();
assert_eq!(
"eu868/gateway/0102030405060708/event/mesh-stats",
String::from_utf8(mqtt_msg.topic.to_vec()).unwrap()
);
let pl = gw::MeshStats::decode(&mut Cursor::new(mqtt_msg.payload.to_vec())).unwrap();
assert_eq!(
gw::MeshStats {
gateway_id: "0102030405060708".into(),
..Default::default()
},
pl
);

// Downlink
let down_pl = gw::DownlinkFrame {
gateway_id: "0102030405060708".into(),
Expand Down

0 comments on commit 3464617

Please sign in to comment.