Skip to content

Commit

Permalink
Move all MQTT-related logic to Mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
lptr committed Aug 10, 2024
1 parent f20c1ec commit 90139da
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 86 deletions.
48 changes: 5 additions & 43 deletions src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::Mutex;
use esp_idf_hal::modem::Modem;
use esp_idf_svc::mdns::EspMdns;
use esp_idf_svc::mqtt::client::EspAsyncMqttClient;
use esp_idf_svc::mqtt::client::EspAsyncMqttConnection;
use esp_idf_svc::mqtt::client::QoS;
use esp_idf_svc::sntp::EspSntp;
use esp_idf_svc::timer::EspTaskTimerService;
use esp_idf_svc::wifi::AsyncWifi;
use esp_idf_svc::wifi::EspWifi;
use esp_idf_svc::{eventloop::EspSystemEventLoop, nvs::EspDefaultNvsPartition};
use esp_idf_sys::{esp_pm_config_esp32_t, esp_pm_configure};
use mqtt::Mqtt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::ffi::c_void;
use std::sync::Arc;

Expand All @@ -39,9 +36,7 @@ pub struct Device {
pub config: DeviceConfig,
_wifi: Arc<Mutex<CriticalSectionRawMutex, AsyncWifi<EspWifi<'static>>>>,
_sntp: EspSntp<'static>,
mqtt: Arc<Mutex<CriticalSectionRawMutex, EspAsyncMqttClient>>,
_conn: Arc<Mutex<CriticalSectionRawMutex, EspAsyncMqttConnection>>,
topic_root: String,
pub mqtt: Mqtt,
}

impl Device {
Expand Down Expand Up @@ -69,52 +64,19 @@ impl Device {

// TODO Use some async mDNS instead to avoid blocking the executor
let mdns = EspMdns::take()?;
let (sntp, mqtts) = join(
let (sntp, mqtt) = join(
rtc::init_rtc(&mdns),
mqtt::init_mqtt(&mdns, &config.instance),
mqtt::Mqtt::create(&mdns, &config.instance),
)
.await;
let (mqtt, conn, topic_root) = mqtts?;

// TODO Figure out how to avoid this warning
#[allow(clippy::arc_with_non_send_sync)]
let mqtt = Arc::new(Mutex::new(mqtt));

Ok(Self {
config,
_wifi: wifi,
_sntp: sntp?,
mqtt,
// TODO Do we need to keep this alive?
_conn: conn,
topic_root,
mqtt: mqtt?,
})
}

pub async fn publish_mqtt(&self, topic: &str, payload: Value) -> Result<()> {
let topic = format!("{}/{}", self.topic_root, topic);
self.mqtt
.lock()
.await
.publish(
&topic,
QoS::AtMostOnce,
false,
payload.to_string().as_bytes(),
)
.await?;
Ok(())
}

pub async fn subscribe_mqtt(&self, topic: &str) -> Result<()> {
let topic = format!("{}/{}", self.topic_root, topic);
self.mqtt
.lock()
.await
.subscribe(&topic, QoS::AtMostOnce)
.await?;
Ok(())
}
}

fn load_device_config() -> Result<DeviceConfig> {
Expand Down
106 changes: 71 additions & 35 deletions src/kernel/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,87 @@ use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::Mutex;
use embassy_sync::signal::Signal;
use esp_idf_svc::mdns::EspMdns;
use esp_idf_svc::mqtt::client::EventPayload;
use esp_idf_svc::mqtt::client::{
EspAsyncMqttClient, EspAsyncMqttConnection, MqttClientConfiguration,
};
use esp_idf_svc::mqtt::client::{EventPayload, MessageId, QoS};
use serde_json::Value;
use std::sync::Arc;

pub async fn init_mqtt(
mdns: &EspMdns,
instance: &str,
) -> Result<(
EspAsyncMqttClient,
Arc<Mutex<CriticalSectionRawMutex, EspAsyncMqttConnection>>,
String,
)> {
let mqtt = mdns::query_mdns(mdns, "_mqtt", "_tcp")?.unwrap_or_else(|| mdns::Service {
hostname: String::from("bumblebee.local"),
port: 1883,
});
log::info!("MDNS query result: {:?}", mqtt);
pub struct Mqtt {
topic_root: String,
mqtt: Arc<Mutex<CriticalSectionRawMutex, EspAsyncMqttClient>>,
_conn: Arc<Mutex<CriticalSectionRawMutex, EspAsyncMqttConnection>>,
}

impl Mqtt {
pub async fn create(mdns: &EspMdns, instance: &str) -> Result<Mqtt> {
let mqtt = mdns::query_mdns(mdns, "_mqtt", "_tcp")?.unwrap_or_else(|| mdns::Service {
hostname: String::from("bumblebee.local"),
port: 1883,
});
log::info!("MDNS query result: {:?}", mqtt);

let url: &str = &format!("mqtt://{}:{}", mqtt.hostname, mqtt.port);
let (mqtt, conn) = EspAsyncMqttClient::new(
url,
&MqttClientConfiguration {
client_id: Some(instance),
..Default::default()
},
)
.expect("Couldn't connect to MQTT");

let topic_root = format!("devices/ugly-duckling/{}", instance);

let url: &str = &format!("mqtt://{}:{}", mqtt.hostname, mqtt.port);
let (mqtt, conn) = EspAsyncMqttClient::new(
url,
&MqttClientConfiguration {
client_id: Some(instance),
..Default::default()
},
)
.expect("Couldn't connect to MQTT");
// TODO Figure out how to avoid this warning
#[allow(clippy::arc_with_non_send_sync)]
let mqtt = Arc::new(Mutex::new(mqtt));

let topic_root = format!("devices/ugly-duckling/{}", instance);
// TODO Figure out how to avoid this warning
#[allow(clippy::arc_with_non_send_sync)]
let conn = Arc::new(Mutex::new(conn));

// TODO Figure out how to avoid this warning
#[allow(clippy::arc_with_non_send_sync)]
let conn = Arc::new(Mutex::new(conn));
// TODO Need more robust reconnection logic, but for the time being it will do
let connected = Arc::new(Signal::<CriticalSectionRawMutex, ()>::new());
Spawner::for_current_executor()
.await
.spawn(handle_mqtt_events(conn.clone(), connected.clone()))
.expect("Couldn't spawn MQTT handler");
connected.wait().await;

// TODO Need something more robust than this, but for the time being it will do
let connected = Arc::new(Signal::<CriticalSectionRawMutex, ()>::new());
Spawner::for_current_executor()
.await
.spawn(handle_mqtt_events(conn.clone(), connected.clone()))
.expect("Couldn't spawn MQTT handler");
connected.wait().await;
Ok(Self {
topic_root,
mqtt,
// TODO Do we need this?
_conn: conn,
})
}

pub async fn publish(&self, topic: &str, payload: Value) -> Result<MessageId> {
let topic = format!("{}/{}", self.topic_root, topic);
self.mqtt
.lock()
.await
.publish(
&topic,
QoS::AtMostOnce,
false,
payload.to_string().as_bytes(),
)
.await
.map_err(anyhow::Error::from)
}

Ok((mqtt, conn, topic_root))
pub async fn subscribe(&self, topic: &str) -> Result<MessageId> {
let topic = format!("{}/{}", self.topic_root, topic);
self.mqtt
.lock()
.await
.subscribe(&topic, QoS::AtMostOnce)
.await
.map_err(anyhow::Error::from)
}
}

#[embassy_executor::task]
Expand Down
16 changes: 8 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ async fn task<Fut: Future<Output = Result<()>>>(future: Fut) {
}

async fn start_device(modem: Modem) -> Result<()> {
let device = kernel::Device::init(modem).await?;
let kernel = kernel::Device::init(modem).await?;

let uptime_us = unsafe { esp_idf_sys::esp_timer_get_time() };
log::info!("Device started in {} ms", uptime_us as f64 / 1000.0);

device.subscribe_mqtt("commands/ping").await?;
kernel.mqtt.subscribe("commands/ping").await?;

device.publish_mqtt("init", json!({
kernel.mqtt.publish("init", json!({
"type": "ugly-duckling",
"model": "mk6",
"id": device.config.id,
"instance": device.config.instance,
"id": kernel.config.id,
"instance": kernel.config.instance,
// TODO Add mac
"deviceConfig": serde_json::to_string(&device.config)?,
"deviceConfig": serde_json::to_string(&kernel.config)?,
// TODO Do we need this?
"app": "ugly-duckling-rs",
// TODO Extract this to static variable
Expand All @@ -92,8 +92,8 @@ async fn start_device(modem: Modem) -> Result<()> {
})).await?;

loop {
device
.publish_mqtt("telemetry", json!({
kernel.mqtt
.publish("telemetry", json!({
"uptime": Duration::from_micros(unsafe { esp_idf_sys::esp_timer_get_time() as u64 }).as_millis(),
"memory": unsafe { esp_idf_sys::esp_get_free_heap_size() },
}))
Expand Down

0 comments on commit 90139da

Please sign in to comment.