Skip to content

Commit

Permalink
feat(status): prevent duplicate status publishes
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Nov 28, 2023
1 parent 7c5f5d8 commit 42a9135
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
4 changes: 3 additions & 1 deletion bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async fn main() -> anyhow::Result<()> {
command_topic_prefix: DEFAULT_COMMANDS_TOPIC.trim_matches(trimmer).to_owned(),
publisher: context.clone(),
notify_stream,
status_stream: status_stream.clone(),
};
let events_manager: ConsumerManager<EventConsumer> = ConsumerManager::new(
permit_pool.clone(),
Expand Down Expand Up @@ -373,6 +374,7 @@ struct EventWorkerCreator<StateStore> {
command_topic_prefix: String,
publisher: Context,
notify_stream: Stream,
status_stream: Stream,
}

#[async_trait::async_trait]
Expand All @@ -393,7 +395,7 @@ where
&format!("{}.{lattice_id}", self.command_topic_prefix),
);
let status_publisher =
StatusPublisher::new(self.publisher.clone(), &format!("wadm.status.{lattice_id}"));
StatusPublisher::new(self.publisher.clone(), self.status_stream.clone(), &format!("wadm.status.{lattice_id}"));
let manager = ScalerManager::new(
self.publisher.clone(),
self.notify_stream.clone(),
Expand Down
35 changes: 28 additions & 7 deletions src/workers/event_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use async_nats::jetstream::stream::Stream;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

use tracing::{instrument, warn};
use tracing::{instrument, trace, warn};
use wasmcloud_control_interface::{HostInventory, LinkDefinition};

use crate::{commands::Command, publisher::Publisher, server::StatusInfo, APP_SPEC_ANNOTATION};
Expand Down Expand Up @@ -98,16 +99,19 @@ impl LinkSource for wasmcloud_control_interface::Client {
#[derive(Clone)]
pub struct StatusPublisher<Pub> {
publisher: Pub,
// Stream for querying current status to avoid duplicate updates
status_stream: Stream,
// Topic prefix, e.g. wadm.status.default
topic_prefix: String,
}

impl<Pub> StatusPublisher<Pub> {
/// Creates an new status publisher configured with the given publisher that will send to the
/// manifest status topic using the given prefix
pub fn new(publisher: Pub, topic_prefix: &str) -> StatusPublisher<Pub> {
pub fn new(publisher: Pub, status_stream: Stream, topic_prefix: &str) -> StatusPublisher<Pub> {
StatusPublisher {
publisher,
status_stream,
topic_prefix: topic_prefix.to_owned(),
}
}
Expand All @@ -116,12 +120,29 @@ impl<Pub> StatusPublisher<Pub> {
impl<Pub: Publisher> StatusPublisher<Pub> {
#[instrument(level = "trace", skip(self))]
pub async fn publish_status(&self, name: &str, status: StatusInfo) -> anyhow::Result<()> {
self.publisher
.publish(
serde_json::to_vec(&status)?,
Some(&format!("{}.{name}", self.topic_prefix)),
)
let topic = format!("{}.{name}", self.topic_prefix);

// NOTE(brooksmtownsend): This direct get may not always query the jetstream leader. In the
// worst case where the last message isn't all the way updated, we may publish a duplicate
// status. This is an acceptable tradeoff to not have to query the leader directly every time.
match self
.status_stream
.direct_get_last_for_subject(&topic)
.await
.map(|m| serde_json::from_slice::<StatusInfo>(&m.payload))
.map_err(|e| anyhow::anyhow!("{e:?}"))
{
// If the status hasn't changed, skip publishing
Ok(Ok(prev_status)) if prev_status == status => {
trace!(%name, "Status hasn't changed since last update. Skipping");
Ok(())
}
_ => {
self.publisher
.publish(serde_json::to_vec(&status)?, Some(&topic))
.await
}
}
}
}

Expand Down

0 comments on commit 42a9135

Please sign in to comment.