From 74007f13bec338c09e782f62de8762ec98fbb482 Mon Sep 17 00:00:00 2001 From: DGAlexandru Date: Wed, 8 Jan 2025 09:55:06 +0200 Subject: [PATCH] fix(MQTT): Attempt to fix the reconfigure mutex never being left (main branch) --- backend/lib/mqtt/MqttController.js | 30 ++++++++++++++++++--- backend/lib/mqtt/handles/RobotMqttHandle.js | 15 ++++++----- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/backend/lib/mqtt/MqttController.js b/backend/lib/mqtt/MqttController.js index a6860e2..b944841 100644 --- a/backend/lib/mqtt/MqttController.js +++ b/backend/lib/mqtt/MqttController.js @@ -566,6 +566,16 @@ class MqttController { } /** + * Whether we're connected to the broker + * + * @public + * @return {boolean} + */ + get isConnected() { + return this.client && this.client.connected === true && this.client.disconnecting !== true; + } + + /** * Set device state * * @private @@ -573,10 +583,14 @@ class MqttController { * @return {Promise} */ async setState(state) { - if (this.client && this.client.connected === true && this.client.disconnecting !== true) { + if (state === this.state) { // No point in setting the same state again + return; + } + + if (this.isConnected) { await this.publish(this.currentConfig.stateTopic, state, { // @ts-ignore - qos: MqttCommonAttributes.QOS.AT_LEAST_ONCE, + qos: MqttCommonAttributes.QOS.AT_MOST_ONCE, // Anything other than QoS 0 can potentially block for a long time retain: true }); } @@ -642,9 +656,19 @@ class MqttController { Object.assign(reconfOptions, options); } + const previousState = this.state; + try { await this.setState(reconfOptions.reconfigState); await cb(); + + // Since the ready state is used by the handles to determine whether they should publish stuff, + // we must never exit reconfigure in the READY state if we are in fact not READY + if (reconfOptions.targetState === HomieCommonAttributes.STATE.READY && !this.isConnected) { + Logger.debug(`Overriding reconfigure target state '${reconfOptions.targetState}' with '${previousState}' since we're not connected.`); + reconfOptions.targetState = previousState; + } + await this.setState(reconfOptions.targetState); this.mutexes.reconfigure.leave(); @@ -895,7 +919,7 @@ class MqttController { //@ts-ignore if (this.client?.stream?.writableLength > 1024 * 1024) { //Allow for 1MiB of buffered messages Logger.warn(`Stale MQTT connection detected. Dropping message for ${topic}`); - } else if (this.asyncClient) { + } else if (this.isConnected) { //This looks like an afterthought because it is one. :( const hasChanged = this.messageDeduplicationCache.update(topic, message); diff --git a/backend/lib/mqtt/handles/RobotMqttHandle.js b/backend/lib/mqtt/handles/RobotMqttHandle.js index 57b5c0c..37366d9 100644 --- a/backend/lib/mqtt/handles/RobotMqttHandle.js +++ b/backend/lib/mqtt/handles/RobotMqttHandle.js @@ -128,12 +128,15 @@ class RobotMqttHandle extends MqttHandle { })); } - await this.deconfigure({ - cleanHomie: false, - unsubscribe: false - }); - - await this.configure(); + if (this.controller.isConnected) { + await this.deconfigure({ + cleanHomie: false, + unsubscribe: false + }); + await this.configure(); + } else { + Logger.debug("Skipping (de)configure on robot status attribute discovery, as we're not connected to MQTT."); + } }); }