Skip to content

Commit

Permalink
feat: presence typings
Browse files Browse the repository at this point in the history
  • Loading branch information
roberts-pumpurs committed Nov 24, 2024
1 parent 25a7e93 commit d868643
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 19 deletions.
3 changes: 0 additions & 3 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
[net]
git-fetch-with-cli = true

[build]
rustflags = ["-Z", "threads=8", "-Ctarget-cpu=native", "-Zthreads=10"]

[alias]
xtask = "run --package xtask --"

Expand Down
4 changes: 2 additions & 2 deletions crates/examples/src/broadcast_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ async fn main() {
client.subscribe_to_changes(payload).await.unwrap();
client
.broadcast(Broadcast {
r#type: "broadcast".to_string(),
event: "update".to_string(),
r#type: "broadcast".to_owned(),
event: "update".to_owned(),
payload: simd_json::json!({"aaa": "bbbb"}),
})
.await
Expand Down
147 changes: 137 additions & 10 deletions crates/supabase-realtime/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum ProtocolPayload {
PresenceState(presence_state::PresenceState),
#[serde(rename = "broadcast")]
Broadcast(broadcast::Broadcast),
#[serde(rename = "presence_diff")]
PresenceDiff(presence_diff::PresenceDiff),
#[serde(rename = "system")]
System(system::System),
#[serde(rename = "phx_error")]
Expand Down Expand Up @@ -376,30 +378,68 @@ pub mod phx_join {
}

pub mod presence_state {
use super::*;
use std::collections::HashMap;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct PresenceState;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PresenceState(pub HashMap<String, Presence>);

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Presence {
pub metas: Vec<PresenceMeta>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PresenceMeta {
pub phx_ref: String,
pub name: String,
pub t: f64,
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;

use super::*;
use crate::message::{ProtocolMessage, ProtocolPayload};

#[test]
fn test_presence_state_serialization() {
fn test_presence_state_deserialization() {
let json_data = r#"
{
"event": "presence_state",
"payload": {},
"ref": null,
"topic": "realtime:db"
"event": "presence_state",
"payload": {
"1c4ed5ca-aaa4-11ef-bce9-0242ac120004": {
"metas": [
{
"phx_ref": "GAsCC3FpEhdb4wgk",
"name": "service_role_75",
"t": 22866011
}
]
}
},
"topic": "realtime:af"
}
"#;

let mut state_map = HashMap::new();
state_map.insert(
"1c4ed5ca-aaa4-11ef-bce9-0242ac120004".to_owned(),
Presence {
metas: vec![PresenceMeta {
phx_ref: "GAsCC3FpEhdb4wgk".to_owned(),
name: "service_role_75".to_owned(),
t: 22866011.0,
}],
},
);

let expected_struct = ProtocolMessage {
topic: "realtime:db".to_owned(),
payload: ProtocolPayload::PresenceState(PresenceState),
topic: "realtime:af".to_owned(),
payload: ProtocolPayload::PresenceState(PresenceState(state_map)),
ref_field: None,
join_ref: None,
};
Expand Down Expand Up @@ -511,6 +551,93 @@ pub mod broadcast {
}
}
}
pub mod presence_diff {
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PresenceDiff {
pub joins: HashMap<String, Presence>,
pub leaves: HashMap<String, Presence>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Presence {
pub metas: Vec<PresenceMeta>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PresenceMeta {
pub phx_ref: String,
pub name: String,
pub t: f64,
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;

use super::*;
use crate::message::{ProtocolMessage, ProtocolPayload};

#[test]
fn test_presence_diff_deserialization() {
let json_data = r#"
{
"ref": null,
"event": "presence_diff",
"payload": {
"joins": {
"fe9f9386-aaa1-11ef-a588-0242ac120004": {
"metas": [
{
"phx_ref": "GAsBN9izrRlb40jh",
"name": "service_role_47",
"t": 21957173.599999905
}
]
}
},
"leaves": {}
},
"topic": "realtime:af"
}
"#;

let expected_struct = ProtocolMessage {
topic: "realtime:af".to_owned(),
payload: ProtocolPayload::PresenceDiff(PresenceDiff {
joins: {
let mut joins = HashMap::new();
joins.insert(
"fe9f9386-aaa1-11ef-a588-0242ac120004".to_owned(),
Presence {
metas: vec![PresenceMeta {
phx_ref: "GAsBN9izrRlb40jh".to_owned(),
name: "service_role_47".to_owned(),
t: 21957173.599999905,
}],
},
);
joins
},
leaves: HashMap::new(),
}),
ref_field: None,
join_ref: None,
};

let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap();
dbg!(serialzed);

let deserialized_struct: ProtocolMessage =
simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap();

assert_eq!(deserialized_struct, expected_struct);
}
}
}

pub mod heartbeat {
use super::*;
Expand Down
7 changes: 3 additions & 4 deletions crates/supabase-realtime/src/realtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use alloc::rc::Rc;
use alloc::sync::Arc;
use core::task::Poll;
use std::sync::Arc;

use fastwebsockets::Frame;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -56,7 +55,7 @@ impl RealtimeConnection {
pub fn new(config: rp_supabase_auth::jwt_stream::SupabaseAuthConfig, topic: &str) -> Self {
let prefix = "realtime";
let topic = [prefix, topic].join(":");
Self { config, topic }
Self { topic, config }
}

#[tracing::instrument(skip_all, err)]
Expand Down Expand Up @@ -203,7 +202,7 @@ impl RealtimeBaseConnection {
let con = Arc::clone(&con);
async move {
let con = Arc::clone(&con);
read_from_ws(&con, tx).await
read_from_ws(&con, tx).await;
}
};
reat_future.push(read_task);
Expand Down

0 comments on commit d868643

Please sign in to comment.