Skip to content

Commit

Permalink
feat: add broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
roberts-pumpurs committed Nov 24, 2024
1 parent 6033685 commit 25a7e93
Show file tree
Hide file tree
Showing 8 changed files with 776 additions and 182 deletions.
585 changes: 438 additions & 147 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ path = "src/jwt_stream_example.rs"
[[bin]]
name = "auth-example"
path = "src/auth_example.rs"

[[bin]]
name = "broadcast-example"
path = "src/broadcast_example.rs"
86 changes: 86 additions & 0 deletions crates/examples/src/broadcast_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use core::time::Duration;

use clap::Parser;
use rp_supabase_auth::jwt_stream::SupabaseAuthConfig;
use rp_supabase_auth::types::LoginCredentials;
use rp_supabase_auth::url;
use rp_supabase_realtime::futures::StreamExt as _;
use rp_supabase_realtime::message::broadcast::Broadcast;
use rp_supabase_realtime::message::phx_join;
use rp_supabase_realtime::realtime;
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
#[arg(short, long)]
supabase_api_url: url::Url,

#[arg(short, long)]
annon_key: String,

#[arg(short, long)]
email: String,

#[arg(short, long)]
pass: String,
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.from_env()
.unwrap()
.add_directive("rp_supabase_auth=info".to_owned().parse().unwrap())
.add_directive("rp_supabase_realtime=info".to_owned().parse().unwrap())
.add_directive("examples=info".to_owned().parse().unwrap())
.add_directive("broadcast_example=info".to_owned().parse().unwrap()),
)
.init();
color_eyre::install().unwrap();

let args = Args::parse();

let config = SupabaseAuthConfig {
api_key: args.annon_key,
max_reconnect_attempts: 5,
reconnect_interval: Duration::from_secs(3),
url: args.supabase_api_url.clone(),
};
let login_credentials = LoginCredentials::builder()
.email(args.email)
.password(args.pass)
.build();
let (mut realtime, mut client) = realtime::RealtimeConnection::new(config, "af")
.connect(login_credentials)
.await
.unwrap();

let payload = phx_join::PhxJoin {
config: phx_join::JoinConfig {
broadcast: phx_join::BroadcastConfig {
self_item: true,
ack: true,
},
presence: phx_join::PresenceConfig { key: String::new() },
postgres_changes: vec![],
},
access_token: None,
};
client.subscribe_to_changes(payload).await.unwrap();
client
.broadcast(Broadcast {
r#type: "broadcast".to_string(),
event: "update".to_string(),
payload: simd_json::json!({"aaa": "bbbb"}),
})
.await
.unwrap();
tracing::info!("pooling realtime connection");
while let Some(msg) = realtime.next().await {
tracing::info!(?msg, "message");
}
tracing::error!("realtime connection exited");
}
2 changes: 1 addition & 1 deletion crates/examples/src/realtime_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() {
.email(args.email)
.password(args.pass)
.build();
let (mut realtime, mut client) = realtime::RealtimeConnection::new(config)
let (mut realtime, mut client) = realtime::RealtimeConnection::new_db_updates(config)
.connect(login_credentials)
.await
.unwrap();
Expand Down
103 changes: 101 additions & 2 deletions crates/supabase-realtime/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ProtocolMessage {
pub topic: String,
#[serde(flatten)]
Expand All @@ -12,7 +12,7 @@ pub struct ProtocolMessage {
pub join_ref: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "event", content = "payload", rename_all = "snake_case")]
pub enum ProtocolPayload {
#[serde(rename = "heartbeat")]
Expand All @@ -27,6 +27,8 @@ pub enum ProtocolPayload {
PhxReply(phx_reply::PhxReply),
#[serde(rename = "presence_state")]
PresenceState(presence_state::PresenceState),
#[serde(rename = "broadcast")]
Broadcast(broadcast::Broadcast),
#[serde(rename = "system")]
System(system::System),
#[serde(rename = "phx_error")]
Expand Down Expand Up @@ -413,6 +415,103 @@ pub mod presence_state {
}
}

pub mod broadcast {
use simd_json::OwnedValue;

use super::*;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Broadcast {
#[serde(rename = "type")]
pub r#type: String,
pub event: String,
pub payload: OwnedValue,
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use simd_json::json;

use super::*;

#[test]
fn test_broadcast_deserialization() {
let json_data = r#"{
"ref": null,
"event": "broadcast",
"payload": {
"event": "Test message",
"payload": {
"message": "Hello World"
},
"type": "broadcast"
},
"topic": "realtime:af"
}"#;

let expected_struct = ProtocolMessage {
topic: "realtime:af".to_owned(),
payload: ProtocolPayload::Broadcast(Broadcast {
r#type: "broadcast".to_owned(),
event: "Test message".to_owned(),
payload: json!({
"message": "Hello World"
}),
}),
ref_field: None,
join_ref: None,
};

let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap();
dbg!(serialzed);

let deserialized_struct: ProtocolMessage =
simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap();

assert_eq!(deserialized_struct, expected_struct);
}

#[test]
fn test_broadcast_deserialization_second_example() {
let json_data = r#"{
"topic": "realtime:af",
"event": "broadcast",
"payload": {
"type": "broadcast",
"event": "message",
"payload": {
"content": "dddd"
}
},
"ref": "3",
"join_ref": "1"
}"#;

let expected_struct = ProtocolMessage {
topic: "realtime:af".to_owned(),
payload: ProtocolPayload::Broadcast(Broadcast {
r#type: "broadcast".to_owned(),
event: "message".to_owned(),
payload: json!({
"content": "dddd"
}),
}),
ref_field: Some("3".to_owned()),
join_ref: Some("1".to_owned()),
};

let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap();
dbg!(serialzed);

let deserialized_struct: ProtocolMessage =
simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap();

assert_eq!(deserialized_struct, expected_struct);
}
}
}

pub mod heartbeat {
use super::*;

Expand Down
Loading

0 comments on commit 25a7e93

Please sign in to comment.