From 48bd5482afb9cec894e9bd1ee33414e398e4d02f Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Tue, 28 Nov 2023 14:03:46 -0500 Subject: [PATCH] feat(status): prevent duplicate status publishes Signed-off-by: Brooks Townsend --- bin/main.rs | 9 +++++-- src/scaler/daemonscaler/mod.rs | 2 +- src/scaler/spreadscaler/mod.rs | 2 +- src/workers/event.rs | 26 +++++++++---------- src/workers/event_helpers.rs | 46 ++++++++++++++++++++++++++++------ 5 files changed, 60 insertions(+), 25 deletions(-) diff --git a/bin/main.rs b/bin/main.rs index 258c0c63..9391897f 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -274,6 +274,7 @@ async fn main() -> anyhow::Result<()> { command_topic_prefix: DEFAULT_COMMANDS_TOPIC.trim_matches(trimmer).to_owned(), publisher: context.clone(), notify_stream, + status_stream: status_stream.clone(), }; let events_manager: ConsumerManager = ConsumerManager::new( permit_pool.clone(), @@ -373,6 +374,7 @@ struct EventWorkerCreator { command_topic_prefix: String, publisher: Context, notify_stream: Stream, + status_stream: Stream, } #[async_trait::async_trait] @@ -392,8 +394,11 @@ where self.publisher.clone(), &format!("{}.{lattice_id}", self.command_topic_prefix), ); - let status_publisher = - StatusPublisher::new(self.publisher.clone(), &format!("wadm.status.{lattice_id}")); + let status_publisher = StatusPublisher::new( + self.publisher.clone(), + Some(self.status_stream.clone()), + &format!("wadm.status.{lattice_id}"), + ); let manager = ScalerManager::new( self.publisher.clone(), self.notify_stream.clone(), diff --git a/src/scaler/daemonscaler/mod.rs b/src/scaler/daemonscaler/mod.rs index f0201a59..24d8b433 100644 --- a/src/scaler/daemonscaler/mod.rs +++ b/src/scaler/daemonscaler/mod.rs @@ -771,7 +771,7 @@ mod test { }, ); let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher,None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), diff --git a/src/scaler/spreadscaler/mod.rs b/src/scaler/spreadscaler/mod.rs index fa8cdbe4..b76107a9 100644 --- a/src/scaler/spreadscaler/mod.rs +++ b/src/scaler/spreadscaler/mod.rs @@ -1260,7 +1260,7 @@ mod test { let lattice_source = TestLatticeSource::default(); let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), diff --git a/src/workers/event.rs b/src/workers/event.rs index 3e6160b6..90648b1a 100644 --- a/src/workers/event.rs +++ b/src/workers/event.rs @@ -1201,7 +1201,7 @@ mod test { let lattice_id = "all_state"; let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), @@ -1490,7 +1490,7 @@ mod test { ActorDescription { id: actor1.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1512,7 +1512,7 @@ mod test { ActorDescription { id: actor2.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1565,7 +1565,7 @@ mod test { ActorDescription { id: actor1.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1587,7 +1587,7 @@ mod test { ActorDescription { id: actor2.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1795,7 +1795,7 @@ mod test { actors: vec![ActorDescription { id: actor2.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1828,7 +1828,7 @@ mod test { actors: vec![ActorDescription { id: actor2.public_key.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -1992,7 +1992,7 @@ mod test { ..Default::default() }; let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), @@ -2023,7 +2023,7 @@ mod test { ActorDescription { id: actor1_id.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ ActorInstance { annotations: None, @@ -2045,7 +2045,7 @@ mod test { ActorDescription { id: actor2_id.to_string(), image_ref: None, - /// The individual instances of this actor that are running + // The individual instances of this actor that are running instances: vec![ActorInstance { annotations: None, instance_id: "3".to_string(), @@ -2155,7 +2155,7 @@ mod test { let lattice_source = TestLatticeSource::default(); let lattice_id = "provider_status"; let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), @@ -2272,7 +2272,7 @@ mod test { let lattice_id = "provider_contract_id"; let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), @@ -2376,7 +2376,7 @@ mod test { let lattice_id = "update_data"; let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter"); - let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter"); + let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter"); let worker = EventWorker::new( store.clone(), lattice_source.clone(), diff --git a/src/workers/event_helpers.rs b/src/workers/event_helpers.rs index f2e25d50..b72055e5 100644 --- a/src/workers/event_helpers.rs +++ b/src/workers/event_helpers.rs @@ -1,7 +1,8 @@ +use async_nats::jetstream::stream::Stream; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; -use tracing::{instrument, warn}; +use tracing::{instrument, trace, warn}; use wasmcloud_control_interface::{HostInventory, LinkDefinition}; use crate::{commands::Command, publisher::Publisher, server::StatusInfo, APP_SPEC_ANNOTATION}; @@ -98,6 +99,8 @@ impl LinkSource for wasmcloud_control_interface::Client { #[derive(Clone)] pub struct StatusPublisher { publisher: Pub, + // Stream for querying current status to avoid duplicate updates + status_stream: Option, // Topic prefix, e.g. wadm.status.default topic_prefix: String, } @@ -105,9 +108,14 @@ pub struct StatusPublisher { impl StatusPublisher { /// Creates an new status publisher configured with the given publisher that will send to the /// manifest status topic using the given prefix - pub fn new(publisher: Pub, topic_prefix: &str) -> StatusPublisher { + pub fn new( + publisher: Pub, + status_stream: Option, + topic_prefix: &str, + ) -> StatusPublisher { StatusPublisher { publisher, + status_stream, topic_prefix: topic_prefix.to_owned(), } } @@ -116,12 +124,34 @@ impl StatusPublisher { impl StatusPublisher { #[instrument(level = "trace", skip(self))] pub async fn publish_status(&self, name: &str, status: StatusInfo) -> anyhow::Result<()> { - self.publisher - .publish( - serde_json::to_vec(&status)?, - Some(&format!("{}.{name}", self.topic_prefix)), - ) - .await + let topic = format!("{}.{name}", self.topic_prefix); + + // NOTE(brooksmtownsend): This direct get may not always query the jetstream leader. In the + // worst case where the last message isn't all the way updated, we may publish a duplicate + // status. This is an acceptable tradeoff to not have to query the leader directly every time. + let prev_status = if let Some(status_stream) = &self.status_stream { + status_stream + .direct_get_last_for_subject(&topic) + .await + .map(|m| serde_json::from_slice::(&m.payload).ok()) + .ok() + .flatten() + } else { + None + }; + + match prev_status { + // If the status hasn't changed, skip publishing + Some(prev_status) if prev_status == status => { + trace!(%name, "Status hasn't changed since last update. Skipping"); + Ok(()) + } + _ => { + self.publisher + .publish(serde_json::to_vec(&status)?, Some(&topic)) + .await + } + } } }