diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 58b82d801..25e879fe1 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -117,23 +117,15 @@ pub async fn create_client( legacy_signed_private_key_proto, ); - let xmtp_client: RustXmtpClient = match history_sync_url { - Some(url) => { - ClientBuilder::new(identity_strategy) - .api_client(api_client) - .store(store) - .history_sync_url(&url) - .build() - .await? - } - None => { - ClientBuilder::new(identity_strategy) - .api_client(api_client) - .store(store) - .build() - .await? - } - }; + let mut builder = ClientBuilder::new(identity_strategy) + .api_client(api_client) + .store(store); + + if let Some(url) = &history_sync_url { + builder = builder.history_sync_url(url); + } + + let xmtp_client = builder.build().await?; log::info!( "Created XMTP client for inbox_id: {}", @@ -397,6 +389,26 @@ impl FfiXmtpClient { .register_identity(signature_request.clone()) .await?; + self.maybe_start_sync_worker().await?; + + Ok(()) + } + + /// Starts the sync worker if the history sync url is present. + async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> { + if self.inner_client.history_sync_url().is_none() { + return Ok(()); + } + + let provider = self + .inner_client + .mls_provider() + .map_err(GenericError::from_error)?; + self.inner_client + .start_sync_worker(&provider) + .await + .map_err(GenericError::from_error)?; + Ok(()) } diff --git a/dev/docker/docker-compose.yml b/dev/docker/docker-compose.yml index 902586445..f9b8197a3 100644 --- a/dev/docker/docker-compose.yml +++ b/dev/docker/docker-compose.yml @@ -36,6 +36,12 @@ services: ports: - 8545:8545 + history-server: + image: ghcr.io/xmtp/message-history-server:main + platform: linux/amd64 + ports: + - 5558:5558 + db: image: postgres:13 environment: diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index b14b14076..746c3890d 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -125,9 +125,6 @@ enum Commands { account_addresses: Vec, }, RequestHistorySync {}, - ReplyToHistorySyncRequest {}, - ProcessHistorySyncReply {}, - ProcessConsentSyncReply {}, ListHistorySyncMessages {}, /// Information about the account that owns the DB Info {}, @@ -415,60 +412,19 @@ async fn main() -> color_eyre::eyre::Result<()> { ); } Commands::RequestHistorySync {} => { - let conn = client.store().conn()?; - let provider = client.mls_provider()?; - client.sync_welcomes(&conn).await?; - client.enable_sync(&provider).await?; - let (group_id, _) = client - .send_sync_request(&provider, DeviceSyncKind::MessageHistory) - .await?; - let group_id_str = hex::encode(group_id); - info!( - group_id = group_id_str, - "Sent history sync request in sync group {group_id_str}" - ); - } - Commands::ReplyToHistorySyncRequest {} => { - let provider = client.mls_provider()?; - let group = client.get_sync_group()?; - let group_id_str = hex::encode(group.group_id); - let reply = client - .reply_to_sync_request(&provider, DeviceSyncKind::MessageHistory) - .await?; - - info!( - group_id = group_id_str, - "Sent history sync reply in sync group {group_id_str}" - ); - info!("Reply: {:?}", reply); - } - Commands::ProcessHistorySyncReply {} => { - let conn = client.store().conn()?; - let provider = client.mls_provider()?; - client.sync_welcomes(&conn).await?; - client.enable_sync(&provider).await?; + let conn = client.store().conn().unwrap(); + let provider = client.mls_provider().unwrap(); + client.sync_welcomes(&conn).await.unwrap(); + client.start_sync_worker(&provider).await.unwrap(); client - .process_sync_reply(&provider, DeviceSyncKind::MessageHistory) - .await?; - - info!("History bundle downloaded and inserted into user DB") - } - Commands::ProcessConsentSyncReply {} => { - let conn = client.store().conn()?; - let provider = client.mls_provider()?; - client.sync_welcomes(&conn).await?; - client.enable_sync(&provider).await?; - client - .process_sync_reply(&provider, DeviceSyncKind::Consent) - .await?; - - info!("Consent bundle downloaded and inserted into user DB") + .send_sync_request(&provider, DeviceSyncKind::MessageHistory) + .await + .unwrap(); + info!("Sent history sync request in sync group.") } Commands::ListHistorySyncMessages {} => { let conn = client.store().conn()?; - let provider = client.mls_provider()?; client.sync_welcomes(&conn).await?; - client.enable_sync(&provider).await?; let group = client.get_sync_group()?; let group_id_str = hex::encode(group.group_id.clone()); group.sync().await?; @@ -504,7 +460,7 @@ async fn main() -> color_eyre::eyre::Result<()> { Ok(()) } -async fn create_client( +async fn create_client( cli: &Cli, account: IdentityStrategy, grpc: C, @@ -531,7 +487,7 @@ async fn register( client: C, ) -> Result<(), CliError> where - C: XmtpApi, + C: XmtpApi + 'static, { let w: Wallet = if let Some(seed_phrase) = maybe_seed_phrase { Wallet::LocalWallet( diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index f48cb0eda..f7c3672b1 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -43,6 +43,8 @@ pub enum ClientBuilderError { GroupError(#[from] crate::groups::GroupError), #[error(transparent)] ApiError(#[from] xmtp_proto::api_client::Error), + #[error(transparent)] + DeviceSync(#[from] crate::groups::device_sync::DeviceSyncError), } pub struct ClientBuilder> { @@ -108,8 +110,8 @@ impl ClientBuilder { impl ClientBuilder where - ApiClient: XmtpApi, - V: SmartContractSignatureVerifier, + ApiClient: XmtpApi + 'static, + V: SmartContractSignatureVerifier + 'static, { /// Build with a custom smart contract wallet verifier pub async fn build_with_verifier(self) -> Result, ClientBuilderError> { @@ -120,7 +122,7 @@ where impl ClientBuilder> where - ApiClient: XmtpApi, + ApiClient: XmtpApi + 'static, { /// Build with the default [`RemoteSignatureVerifier`] pub async fn build(self) -> Result, ClientBuilderError> { @@ -161,8 +163,8 @@ async fn inner_build( api_client: Arc, ) -> Result, ClientBuilderError> where - C: XmtpApi, - V: SmartContractSignatureVerifier, + C: XmtpApi + 'static, + V: SmartContractSignatureVerifier + 'static, { let ClientBuilder { mut store, @@ -198,15 +200,13 @@ where ) .await?; - let client = Client::new( + Ok(Client::new( api_client_wrapper, identity, store, scw_verifier, - history_sync_url, - ); - - Ok(client) + history_sync_url.clone(), + )) } #[cfg(test)] diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index a14f2dcdd..2eadd83b2 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -47,11 +47,10 @@ use crate::{ mutex_registry::MutexRegistry, retry::Retry, retry_async, retryable, - storage::group::GroupQueryArgs, storage::{ consent_record::{ConsentState, ConsentType, StoredConsentRecord}, db_connection::DbConnection, - group::{GroupMembershipState, StoredGroup}, + group::{GroupMembershipState, GroupQueryArgs, StoredGroup}, group_message::StoredGroupMessage, refresh_state::EntityKind, sql_key_store, EncryptedMessageStore, StorageError, @@ -311,7 +310,8 @@ where let intents = Arc::new(Intents { context: context.clone(), }); - let (tx, _) = broadcast::channel(10); + let (tx, _) = broadcast::channel(32); + Self { api_client: api_client.into(), context, @@ -350,6 +350,10 @@ where self.context.mls_provider() } + pub fn history_sync_url(&self) -> Option<&String> { + self.history_sync_url.as_ref() + } + /// Calls the server to look up the `inbox_id` associated with a given address pub async fn find_inbox_id_from_address( &self, diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index b00c22edb..266c23301 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,12 +1,13 @@ use super::group_metadata::ConversationType; use super::{GroupError, MlsGroup}; use crate::configuration::NS_IN_HOUR; +use crate::retry::{RetryBuilder, RetryableError}; use crate::storage::group::GroupQueryArgs; use crate::storage::group_message::MsgQueryArgs; use crate::storage::DbConnection; +use crate::subscriptions::{StreamMessages, SubscribeError, SyncMessage}; use crate::utils::time::now_ns; use crate::xmtp_openmls_provider::XmtpOpenMlsProvider; -use crate::Store; use crate::{ client::ClientError, storage::{ @@ -17,16 +18,19 @@ use crate::{ }, Client, }; +use crate::{retry_async, Store}; use aes_gcm::aead::generic_array::GenericArray; use aes_gcm::{ aead::{Aead, KeyInit}, Aes256Gcm, }; +use futures::{pin_mut, Stream, StreamExt}; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, }; use serde::{Deserialize, Serialize}; +use std::time::Duration; use thiserror::Error; use tracing::warn; use xmtp_cryptography::utils as crypto_utils; @@ -101,57 +105,153 @@ pub enum DeviceSyncError { UnspecifiedDeviceSyncKind, #[error("sync reply is too old")] SyncReplyTimestamp, + #[error(transparent)] + Subscribe(#[from] SubscribeError), +} + +impl RetryableError for DeviceSyncError { + fn is_retryable(&self) -> bool { + true + } } impl Client where - ApiClient: XmtpApi, - V: SmartContractSignatureVerifier, + ApiClient: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, { - pub async fn reply_to_sync_request( + pub async fn start_sync_worker( &self, provider: &XmtpOpenMlsProvider, - kind: DeviceSyncKind, - ) -> Result { - let conn = provider.conn_ref(); + ) -> Result<(), DeviceSyncError> { + self.sync_init(provider).await?; - let (_msg, request) = self.pending_sync_request(provider, kind).await?; - let records = match kind { - DeviceSyncKind::Consent => vec![self.syncable_consent_records(conn)?], - DeviceSyncKind::MessageHistory => { - vec![self.syncable_groups(conn)?, self.syncable_messages(conn)?] + crate::spawn(None, { + let client = self.clone(); + + let receiver = client.local_events.subscribe(); + let sync_stream = receiver.stream_sync_messages(); + + async move { + pin_mut!(sync_stream); + + while let Err(err) = client.sync_worker(&mut sync_stream).await { + tracing::error!("Sync worker error: {err}"); + } } - DeviceSyncKind::Unspecified => return Err(DeviceSyncError::UnspecifiedDeviceSyncKind), - }; + }); - let reply = self - .create_sync_reply(&request.request_id, &records, kind) - .await?; - self.send_sync_reply(provider, reply.clone()).await?; + Ok(()) + } +} - Ok(reply) +impl Client +where + ApiClient: XmtpApi, + V: SmartContractSignatureVerifier, +{ + pub(crate) async fn sync_worker( + &self, + sync_stream: &mut (impl Stream> + Unpin), + ) -> Result<(), DeviceSyncError> { + let provider = self.mls_provider()?; + + let query_retry = RetryBuilder::default() + .retries(5) + .duration(Duration::from_millis(20)) + .build(); + + while let Some(msg) = sync_stream.next().await { + let msg = msg?; + match msg { + SyncMessage::Reply { message_id } => { + let conn = provider.conn_ref(); + let msg = retry_async!( + &query_retry, + (async { + conn.get_group_message(&message_id)? + .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( + "Message id {message_id:?} not found." + )))) + }) + )?; + + let msg_content: DeviceSyncContent = + serde_json::from_slice(&msg.decrypted_message_bytes)?; + let DeviceSyncContent::Reply(reply) = msg_content else { + unreachable!(); + }; + + if let Err(err) = self.process_sync_reply(&provider, reply).await { + tracing::warn!("Sync worker error: {err}"); + } + } + SyncMessage::Request { message_id } => { + let conn = provider.conn_ref(); + let msg = retry_async!( + &query_retry, + (async { + conn.get_group_message(&message_id)? + .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( + "Message id {message_id:?} not found." + )))) + }) + )?; + + let msg_content: DeviceSyncContent = + serde_json::from_slice(&msg.decrypted_message_bytes)?; + let DeviceSyncContent::Request(request) = msg_content else { + unreachable!(); + }; + + if let Err(err) = self.reply_to_sync_request(&provider, request).await { + tracing::warn!("Sync worker error: {err}"); + } + } + } + } + + Ok(()) + } + + /** + * Ideally called when the client is registered. + * Will auto-send a sync request if sync group is created. + */ + pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> { + if self.get_sync_group().is_err() { + self.ensure_sync_group(provider).await?; + + self.send_sync_request(provider, DeviceSyncKind::Consent) + .await?; + self.send_sync_request(provider, DeviceSyncKind::MessageHistory) + .await?; + } + + Ok(()) } - pub async fn enable_sync(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> { + async fn ensure_sync_group( + &self, + provider: &XmtpOpenMlsProvider, + ) -> Result, GroupError> { let sync_group = match self.get_sync_group() { Ok(group) => group, Err(_) => self.create_sync_group()?, }; - sync_group .maybe_update_installations(provider, None) .await?; - sync_group.sync_with_conn(provider).await?; - Ok(()) + Ok(sync_group) } pub async fn send_sync_request( &self, provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, - ) -> Result<(String, String), DeviceSyncError> { + ) -> Result { let request = DeviceSyncRequest::new(kind); // find the sync group @@ -162,33 +262,54 @@ where // lookup if a request has already been made if let Ok((_msg, request)) = self.pending_sync_request(provider, request.kind).await { - return Ok((request.request_id, request.pin_code)); + return Ok(request); } // build the request let request: DeviceSyncRequestProto = request.into(); - let pin_code = request.pin_code.clone(); - let request_id = request.request_id.clone(); let content = DeviceSyncContent::Request(request.clone()); let content_bytes = serde_json::to_vec(&content)?; - let _message_id = - sync_group.prepare_message(&content_bytes, provider.conn_ref(), move |_time_ns| { - PlaintextEnvelope { - content: Some(Content::V2(V2 { - message_type: Some(Request(request)), - idempotency_key: new_request_id(), - })), - } - })?; + let _message_id = sync_group.prepare_message(&content_bytes, provider.conn_ref(), { + let request = request.clone(); + move |_time_ns| PlaintextEnvelope { + content: Some(Content::V2(V2 { + message_type: Some(Request(request)), + idempotency_key: new_request_id(), + })), + } + })?; // publish the intent if let Err(err) = sync_group.publish_intents(provider).await { tracing::error!("error publishing sync group intents: {:?}", err); } - Ok((request_id, pin_code)) + Ok(request) + } + + pub(crate) async fn reply_to_sync_request( + &self, + provider: &XmtpOpenMlsProvider, + request: DeviceSyncRequestProto, + ) -> Result { + let conn = provider.conn_ref(); + + let records = match request.kind() { + DeviceSyncKind::Consent => vec![self.syncable_consent_records(conn)?], + DeviceSyncKind::MessageHistory => { + vec![self.syncable_groups(conn)?, self.syncable_messages(conn)?] + } + DeviceSyncKind::Unspecified => return Err(DeviceSyncError::UnspecifiedDeviceSyncKind), + }; + + let reply = self + .create_sync_reply(&request.request_id, &records, request.kind()) + .await?; + self.send_sync_reply(provider, reply.clone()).await?; + + Ok(reply) } async fn send_sync_reply( @@ -264,41 +385,37 @@ where Err(DeviceSyncError::NoPendingRequest) } - /// Look for sync reply by kind, returns NoReplyToProcess error if not found. + #[cfg(test)] async fn sync_reply( &self, provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, - ) -> Result { + ) -> Result, DeviceSyncError> { let sync_group = self.get_sync_group()?; - sync_group.sync_with_conn(provider).await?; + let messages = sync_group .find_messages(&MsgQueryArgs::default().kind(GroupMessageKind::Application))?; - for msg in messages.iter().rev() { - // ignore this installation's messages - if msg.sender_installation_id == self.installation_public_key() { - continue; - } - - let content: DeviceSyncContent = serde_json::from_slice(&msg.decrypted_message_bytes)?; - if let DeviceSyncContent::Reply(reply) = content { - if reply.kind() == kind { - return Ok(reply); + for msg in messages.into_iter().rev() { + let msg_content: DeviceSyncContent = + serde_json::from_slice(&msg.decrypted_message_bytes)?; + match msg_content { + DeviceSyncContent::Reply(reply) if reply.kind() == kind => { + return Ok(Some((msg, reply))); } + _ => {} } } - Err(DeviceSyncError::NoReplyToProcess) + Ok(None) } pub async fn process_sync_reply( &self, provider: &XmtpOpenMlsProvider, - kind: DeviceSyncKind, + reply: DeviceSyncReplyProto, ) -> Result<(), DeviceSyncError> { - let reply = self.sync_reply(provider, kind).await?; let conn = provider.conn_ref(); let time_diff = reply.timestamp_ns.abs_diff(now_ns() as u64); @@ -353,10 +470,11 @@ where let Some(url) = &self.history_sync_url else { return Err(DeviceSyncError::MissingHistorySyncUrl); }; - tracing::info!("Using upload url {url}upload"); + let upload_url = format!("{url}/upload"); + tracing::info!("Using upload url {upload_url}"); let response = reqwest::Client::new() - .post(format!("{url}/upload")) + .post(upload_url) .body(payload) .send() .await?; @@ -385,7 +503,7 @@ where } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub enum DeviceSyncContent { Request(DeviceSyncRequestProto), Reply(DeviceSyncReplyProto), diff --git a/xmtp_mls/src/groups/device_sync/consent_sync.rs b/xmtp_mls/src/groups/device_sync/consent_sync.rs index d1bfdf9a5..113e5b869 100644 --- a/xmtp_mls/src/groups/device_sync/consent_sync.rs +++ b/xmtp_mls/src/groups/device_sync/consent_sync.rs @@ -22,40 +22,27 @@ where #[cfg(all(not(target_arch = "wasm32"), test))] pub(crate) mod tests { - const HISTORY_SERVER_HOST: &str = "0.0.0.0"; + const HISTORY_SERVER_HOST: &str = "localhost"; const HISTORY_SERVER_PORT: u16 = 5558; + use std::time::{Duration, Instant}; + use super::*; use crate::{ assert_ok, builder::ClientBuilder, storage::consent_record::{ConsentState, ConsentType}, }; - use mockito; use xmtp_cryptography::utils::generate_local_wallet; use xmtp_id::InboxOwner; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_consent_sync() { - let options = mockito::ServerOpts { - host: HISTORY_SERVER_HOST, - port: HISTORY_SERVER_PORT + 1, - ..Default::default() - }; - let mut server = mockito::Server::new_with_opts_async(options).await; - - let _m = server - .mock("POST", "/upload") - .with_status(201) - .with_body("12345") - .create(); - - let history_sync_url = - format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT + 1); + let history_sync_url = format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT); let wallet = generate_local_wallet(); - let mut amal_a = ClientBuilder::new_test_client(&wallet).await; - amal_a.history_sync_url = Some(history_sync_url.clone()); + let amal_a = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; + let amal_a_provider = amal_a.mls_provider().unwrap(); let amal_a_conn = amal_a_provider.conn_ref(); @@ -72,75 +59,49 @@ pub(crate) mod tests { let syncable_consent_records = amal_a.syncable_consent_records(amal_a_conn).unwrap(); assert_eq!(syncable_consent_records.len(), 1); - // The first installation should have zero sync groups. - let amal_a_sync_groups = amal_a.store().conn().unwrap().latest_sync_group().unwrap(); - assert!(amal_a_sync_groups.is_none()); - - // Create a second installation for amal. - let amal_b = ClientBuilder::new_test_client(&wallet).await; + // Create a second installation for amal with sync. + let amal_b = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; let amal_b_provider = amal_b.mls_provider().unwrap(); - // Turn on history sync for the second installation. - assert_ok!(amal_b.enable_sync(&amal_b_provider).await); + let amal_b_conn = amal_b_provider.conn_ref(); + + let consent_records_b = amal_b.syncable_consent_records(amal_b_conn).unwrap(); + assert_eq!(consent_records_b.len(), 0); + + let old_group_id = amal_a.get_sync_group().unwrap().group_id; // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). - amal_a - .sync_welcomes(amal_a_conn) - .await - .expect("sync_welcomes"); - - // Have the second installation request for a consent sync. - let (_group_id, _pin_code) = amal_b - .send_sync_request(&amal_b_provider, DeviceSyncKind::Consent) - .await - .expect("history request"); - - // The first installation should now be a part of the sync group created by the second installation. - let amal_a_sync_groups = amal_a_conn.latest_sync_group().unwrap(); - assert!(amal_a_sync_groups.is_some()); - - // Have first installation reply. - // This is to make sure it finds the request in its sync group history, - // verifies the pin code, - // has no problem packaging the consent records, - // and sends a reply message to the first installation. - let reply = amal_a - .reply_to_sync_request(&amal_a_provider, DeviceSyncKind::Consent) - .await - .unwrap(); - - // recreate the encrypted payload that was uploaded to our mock server using the same encryption key... - let (enc_payload, _key) = encrypt_syncables_with_key( - &[amal_a.syncable_consent_records(amal_a_conn).unwrap()], - reply.encryption_key.unwrap().try_into().unwrap(), - ) - .unwrap(); - - // have the mock server reply with the payload - let file_path = reply.url.replace(&history_sync_url, ""); - let _m = server - .mock("GET", &*file_path) - .with_status(200) - .with_body(&enc_payload) - .create(); - - // The second installation has consented to nobody - let consent_records = amal_b.store().conn().unwrap().consent_records().unwrap(); - assert_eq!(consent_records.len(), 0); - - // Have the second installation process the reply. - amal_b - .process_sync_reply(&amal_b_provider, DeviceSyncKind::Consent) - .await - .unwrap(); - - // Load consents of both installations - let consent_records_a = amal_a.store().conn().unwrap().consent_records().unwrap(); - let consent_records_b = amal_b.store().conn().unwrap().consent_records().unwrap(); - - // Ensure the consent is synced. - assert_eq!(consent_records_a.len(), 2); // 2 consents - alix, and the group sync - assert_eq!(consent_records_b.len(), 2); - for record in &consent_records_a { - assert!(consent_records_b.contains(record)); + amal_a.sync_welcomes(amal_a_conn).await.unwrap(); + let new_group_id = amal_a.get_sync_group().unwrap().group_id; + // group id should have changed to the new sync group created by the second installation + assert_ne!(old_group_id, new_group_id); + + let consent_a = amal_a.syncable_consent_records(amal_a_conn).unwrap().len(); + + // Have amal_a receive the message (and auto-process) + let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); + + // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) + let start = Instant::now(); + let mut reply = None; + while reply.is_none() { + reply = amal_b + .sync_reply(&amal_b_provider, DeviceSyncKind::Consent) + .await + .unwrap(); + if start.elapsed() > Duration::from_secs(3) { + panic!("Did not receive consent reply."); + } + } + + // Wait up to 3 seconds for sync to process (typically is almost instant) + let mut consent_b = 0; + let start = Instant::now(); + while consent_b != consent_a { + consent_b = amal_b.syncable_consent_records(amal_b_conn).unwrap().len(); + + if start.elapsed() > Duration::from_secs(3) { + panic!("Consent sync did not work. Consent: {consent_b}/{consent_a}"); + } } } } diff --git a/xmtp_mls/src/groups/device_sync/message_sync.rs b/xmtp_mls/src/groups/device_sync/message_sync.rs index 7e16e355a..f427e0b18 100644 --- a/xmtp_mls/src/groups/device_sync/message_sync.rs +++ b/xmtp_mls/src/groups/device_sync/message_sync.rs @@ -46,36 +46,22 @@ pub(crate) mod tests { #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker); - const HISTORY_SERVER_HOST: &str = "0.0.0.0"; + const HISTORY_SERVER_HOST: &str = "localhost"; const HISTORY_SERVER_PORT: u16 = 5558; use super::*; use crate::{assert_ok, builder::ClientBuilder, groups::GroupMetadataOptions}; - use mockito; + use std::time::{Duration, Instant}; use xmtp_cryptography::utils::generate_local_wallet; use xmtp_id::InboxOwner; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_message_history_sync() { - let options = mockito::ServerOpts { - host: HISTORY_SERVER_HOST, - port: HISTORY_SERVER_PORT + 1, - ..Default::default() - }; - let mut server = mockito::Server::new_with_opts_async(options).await; - - let _m = server - .mock("POST", "/upload") - .with_status(201) - .with_body("12345") - .create(); - - let history_sync_url = - format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT + 1); + let history_sync_url = format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT); let wallet = generate_local_wallet(); - let mut amal_a = ClientBuilder::new_test_client(&wallet).await; - amal_a.history_sync_url = Some(history_sync_url.clone()); + let amal_a = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; + let amal_a_provider = amal_a.mls_provider().unwrap(); let amal_a_conn = amal_a_provider.conn_ref(); @@ -84,7 +70,6 @@ pub(crate) mod tests { let alix = ClientBuilder::new_test_client(&alix_wallet).await; // Have amal_a create a group and add alix to that group, then send a message. - let group = amal_a .create_group(None, GroupMetadataOptions::default()) .unwrap(); @@ -100,85 +85,59 @@ pub(crate) mod tests { let syncable_messages = amal_a.syncable_messages(amal_a_conn).unwrap(); assert_eq!(syncable_messages.len(), 2); // welcome message, and message that was just sent - // The first installation should have zero sync groups. - let amal_a_sync_group = amal_a_conn.latest_sync_group().unwrap(); - assert!(amal_a_sync_group.is_none()); - // Create a second installation for amal. - let amal_b = ClientBuilder::new_test_client(&wallet).await; + let amal_b = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; let amal_b_provider = amal_b.mls_provider().unwrap(); let amal_b_conn = amal_b_provider.conn_ref(); - // Turn on history sync for the second installation. - assert_ok!(amal_b.enable_sync(&amal_b_provider).await); + + let groups_b = amal_b.syncable_groups(amal_b_conn).unwrap(); + assert_eq!(groups_b.len(), 0); + + let old_group_id = amal_a.get_sync_group().unwrap().group_id; // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). amal_a .sync_welcomes(amal_a_conn) .await .expect("sync_welcomes"); + let new_group_id = amal_a.get_sync_group().unwrap().group_id; + // group id should have changed to the new sync group created by the second installation + assert_ne!(old_group_id, new_group_id); + // Have the second installation request for a consent sync. - let (_group_id, _pin_code) = amal_b + amal_b .send_sync_request(&amal_b_provider, DeviceSyncKind::MessageHistory) .await - .expect("history request"); - - // The first installation should now be a part of the sync group created by the second installation. - let amal_a_sync_group = amal_a_conn.latest_sync_group().unwrap(); - assert!(amal_a_sync_group.is_some()); - - // Have first installation reply. - // This is to make sure it finds the request in its sync group history, - // verifies the pin code, - // has no problem packaging the consent records, - // and sends a reply message to the first installation. - let reply = amal_a - .reply_to_sync_request(&amal_a_provider, DeviceSyncKind::MessageHistory) - .await .unwrap(); - // recreate the encrypted payload that was uploaded to our mock server using the same encryption key... - let (enc_payload, _key) = encrypt_syncables_with_key( - &[ - amal_a.syncable_groups(amal_a_conn).unwrap(), - amal_a.syncable_messages(amal_a_conn).unwrap(), - ], - reply.encryption_key.unwrap().try_into().unwrap(), - ) - .unwrap(); - - // have the mock server reply with the payload - let file_path = reply.url.replace(&history_sync_url, ""); - let _m = server - .mock("GET", &*file_path) - .with_status(200) - .with_body(&enc_payload) - .create(); - - // The second installation has no groups - assert_eq!(amal_b.syncable_groups(amal_b_conn).unwrap().len(), 0); - assert_eq!(amal_b.syncable_messages(amal_b_conn).unwrap().len(), 0); - - // Have the second installation process the reply. - amal_b - .process_sync_reply(&amal_b_provider, DeviceSyncKind::MessageHistory) - .await - .unwrap(); + // Have amal_a receive the message (and auto-process) + let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); - // Load consents of both installations - let groups_a = amal_a.syncable_groups(amal_a_conn).unwrap(); - let groups_b = amal_b.syncable_groups(amal_b_conn).unwrap(); - let messages_a = amal_a.syncable_messages(amal_a_conn).unwrap(); - let messages_b = amal_b.syncable_messages(amal_b_conn).unwrap(); - - // Ensure the groups and messages are synced. - assert_eq!(groups_a.len(), 1); - assert_eq!(groups_b.len(), 1); - for record in &groups_a { - assert!(groups_b.contains(record)); + // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) + let start = Instant::now(); + let mut reply = None; + while reply.is_none() { + reply = amal_b + .sync_reply(&amal_b_provider, DeviceSyncKind::MessageHistory) + .await + .unwrap(); + if start.elapsed() > Duration::from_secs(3) { + panic!("Did not receive consent reply."); + } } - assert_eq!(messages_a.len(), 2); - assert_eq!(messages_b.len(), 2); - for record in &messages_a { - assert!(messages_b.contains(record)); + + // Wait up to 3 seconds for sync to process (typically is almost instant) + let [mut groups_a, mut groups_b, mut messages_a, mut messages_b] = [0; 4]; + let start = Instant::now(); + while groups_a != groups_b || messages_a != messages_b { + groups_a = amal_a.syncable_groups(amal_a_conn).unwrap().len(); + groups_b = amal_b.syncable_groups(amal_b_conn).unwrap().len(); + messages_a = amal_a.syncable_messages(amal_a_conn).unwrap().len(); + messages_b = amal_b.syncable_messages(amal_b_conn).unwrap().len(); + + if start.elapsed() > Duration::from_secs(3) { + panic!("Message sync did not work. Groups: {groups_a}/{groups_b} | Messages: {messages_a}/{messages_b}"); + } } } @@ -201,20 +160,17 @@ pub(crate) mod tests { #[tokio::test] async fn test_externals_cant_join_sync_group() { + let history_sync_url = format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT); let wallet = generate_local_wallet(); - let amal = ClientBuilder::new_test_client(&wallet).await; - assert_ok!(amal.enable_sync(&amal.mls_provider().unwrap()).await); + let amal = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; amal.sync_welcomes(&amal.store().conn().unwrap()) .await .expect("sync welcomes"); let external_wallet = generate_local_wallet(); - let external_client = ClientBuilder::new_test_client(&external_wallet).await; - assert_ok!( - external_client - .enable_sync(&external_client.mls_provider().unwrap()) - .await - ); + let external_client = + ClientBuilder::new_test_client_with_history(&external_wallet, &history_sync_url).await; + external_client .sync_welcomes(&external_client.store().conn().unwrap()) .await diff --git a/xmtp_mls/src/groups/node_sync.rs b/xmtp_mls/src/groups/node_sync.rs index 46d1f3316..5026a2217 100644 --- a/xmtp_mls/src/groups/node_sync.rs +++ b/xmtp_mls/src/groups/node_sync.rs @@ -14,7 +14,6 @@ use super::{ GroupError, MlsGroup, ScopedGroupClient, }; -use crate::groups::device_sync::DeviceSyncContent; use crate::{ client::MessageProcessingError, codecs::{group_updated::GroupUpdatedCodec, ContentCodec}, @@ -35,10 +34,12 @@ use crate::{ refresh_state::EntityKind, serialization::{db_deserialize, db_serialize}, }, + subscriptions::LocalEvents, utils::{hash::sha256, id::calculate_message_id}, xmtp_openmls_provider::XmtpOpenMlsProvider, Delete, Fetch, StoreOrIgnore, }; +use crate::{groups::device_sync::DeviceSyncContent, subscriptions::SyncMessage}; use futures::future::try_join_all; use openmls::{ credentials::BasicCredential, @@ -362,6 +363,8 @@ where let decrypted_message = openmls_group.process_message(provider, message)?; let (sender_inbox_id, sender_installation_id) = extract_message_sender(openmls_group, &decrypted_message, envelope_timestamp_ns)?; + let sent_from_this_installation = + sender_installation_id == self.context().installation_public_key(); tracing::info!( "[{}] extracted sender inbox id: {}", self.context().inbox_id(), @@ -414,7 +417,7 @@ where // store the request message StoredGroupMessage { - id: message_id, + id: message_id.clone(), group_id: self.group_id.clone(), decrypted_message_bytes: content_bytes, sent_at_ns: envelope_timestamp_ns as i64, @@ -424,6 +427,13 @@ where delivery_status: DeliveryStatus::Published, } .store_or_ignore(provider.conn_ref())?; + + // Ignore this installation's sync messages + if !sent_from_this_installation { + let _ = self.client.local_events().send(LocalEvents::SyncMessage( + SyncMessage::Request { message_id }, + )); + } } Some(Reply(history_reply)) => { @@ -438,7 +448,7 @@ where // store the reply message StoredGroupMessage { - id: message_id, + id: message_id.clone(), group_id: self.group_id.clone(), decrypted_message_bytes: content_bytes, sent_at_ns: envelope_timestamp_ns as i64, @@ -448,6 +458,13 @@ where delivery_status: DeliveryStatus::Published, } .store_or_ignore(provider.conn_ref())?; + + // Ignore this installation's sync messages + if !sent_from_this_installation { + let _ = self.client.local_events().send(LocalEvents::SyncMessage( + SyncMessage::Reply { message_id }, + )); + } } _ => { return Err(MessageProcessingError::InvalidPayload); diff --git a/xmtp_mls/src/groups/scoped_client.rs b/xmtp_mls/src/groups/scoped_client.rs index be4b814d7..0808188ae 100644 --- a/xmtp_mls/src/groups/scoped_client.rs +++ b/xmtp_mls/src/groups/scoped_client.rs @@ -1,5 +1,5 @@ use std::sync::Arc; - +use tokio::sync::broadcast; use xmtp_id::{associations::AssociationState, scw_verifier::SmartContractSignatureVerifier}; use xmtp_proto::{api_client::trait_impls::XmtpApi, xmtp::mls::api::v1::GroupMessage}; @@ -9,6 +9,7 @@ use crate::{ identity_updates::{InstallationDiff, InstallationDiffError}, intents::Intents, storage::{DbConnection, EncryptedMessageStore}, + subscriptions::LocalEvents, verified_key_package_v2::VerifiedKeyPackageV2, xmtp_openmls_provider::XmtpOpenMlsProvider, Client, @@ -28,6 +29,8 @@ pub trait LocalScopedGroupClient: Send + Sync + Sized { self.context_ref().store() } + fn local_events(&self) -> &broadcast::Sender>; + fn inbox_id(&self) -> String { self.context().inbox_id() } @@ -88,6 +91,8 @@ pub trait ScopedGroupClient: Sized { self.context_ref().store() } + fn local_events(&self) -> &broadcast::Sender>; + fn inbox_id(&self) -> String { self.context().inbox_id() } @@ -148,6 +153,10 @@ where &self.api_client } + fn local_events(&self) -> &broadcast::Sender> { + &self.local_events + } + fn context_ref(&self) -> &Arc { Client::::context(self) } @@ -227,6 +236,10 @@ where (**self).api() } + fn local_events(&self) -> &broadcast::Sender> { + (**self).local_events() + } + fn store(&self) -> &EncryptedMessageStore { (**self).store() } @@ -317,6 +330,10 @@ where (**self).store() } + fn local_events(&self) -> &broadcast::Sender> { + (**self).local_events() + } + fn intents(&self) -> &Arc { (**self).intents() } diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 34588857a..b23808a9c 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -184,6 +184,7 @@ pub mod private { tracing::info!("Migrations successful"); Ok::<_, StorageError>(()) })?; + Ok::<_, StorageError>(()) } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 2a0939db0..a41e121e7 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -1,19 +1,23 @@ -use std::{collections::HashMap, sync::Arc}; - use futures::{FutureExt, Stream, StreamExt}; use prost::Message; -use tokio::{sync::oneshot, task::JoinHandle}; +use std::{collections::HashMap, sync::Arc}; +use tokio::{ + sync::{broadcast, oneshot}, + task::JoinHandle, +}; +use tokio_stream::wrappers::BroadcastStream; use xmtp_id::scw_verifier::SmartContractSignatureVerifier; use xmtp_proto::{api_client::XmtpMlsStreams, xmtp::mls::api::v1::WelcomeMessage}; use crate::{ client::{extract_welcome_message, ClientError, MessageProcessingError}, groups::{group_metadata::ConversationType, subscriptions, GroupError, MlsGroup}, - retry::Retry, - retry::RetryableError, + retry::{Retry, RetryableError}, retry_async, retryable, storage::{ - group::GroupQueryArgs, group::StoredGroup, group_message::StoredGroupMessage, StorageError, + group::{GroupQueryArgs, StoredGroup}, + group_message::StoredGroupMessage, + StorageError, }, Client, XmtpApi, }; @@ -28,32 +32,55 @@ pub struct StreamHandle { /// Events local to this client /// are broadcast across all senders/receivers of streams -pub(crate) enum LocalEvents { +#[derive(Clone)] +pub enum LocalEvents { // a new group was created NewGroup(MlsGroup), + SyncMessage(SyncMessage), +} + +#[derive(Clone)] +pub enum SyncMessage { + Request { message_id: Vec }, + Reply { message_id: Vec }, } impl LocalEvents { fn group_filter(self) -> Option> { use LocalEvents::*; // this is just to protect against any future variants - #[allow(unreachable_patterns)] match self { NewGroup(c) => Some(c), _ => None, } } -} -impl Clone for LocalEvents { - fn clone(&self) -> LocalEvents { + pub(crate) fn sync_filter(self) -> Option { use LocalEvents::*; match self { - NewGroup(group) => NewGroup(group.clone()), + SyncMessage(msg) => Some(msg), + _ => None, } } } +pub(crate) trait StreamMessages { + fn stream_sync_messages(self) -> impl Stream>; +} + +impl StreamMessages for broadcast::Receiver> +where + C: Clone + Send + Sync + 'static, +{ + fn stream_sync_messages(self) -> impl Stream> { + BroadcastStream::new(self).filter_map(|event| async { + crate::optify!(event, "Missed message due to event queue lag") + .and_then(LocalEvents::sync_filter) + .map(Result::Ok) + }) + } +} + impl StreamHandle { /// Waits for the stream to be fully spawned pub async fn wait_for_ready(&mut self) { @@ -199,7 +226,7 @@ where ) .filter_map(|event| async { crate::optify!(event, "Missed messages due to event queue lag") - .and_then(LocalEvents::<_>::group_filter) + .and_then(LocalEvents::group_filter) .map(Result::Ok) }); diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index d86a8177b..1616556ea 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -110,6 +110,22 @@ impl ClientBuilder { owner, api_client, MockSmartContractSignatureVerifier::new(true), + None, + ) + .await + } + + pub async fn new_test_client_with_history( + owner: &impl InboxOwner, + history_sync_url: &str, + ) -> FullXmtpClient { + let api_client = ::create_local().await; + + build_with_verifier( + owner, + api_client, + MockSmartContractSignatureVerifier::new(true), + Some(history_sync_url), ) .await } @@ -124,6 +140,7 @@ impl ClientBuilder { owner, api_client, MockSmartContractSignatureVerifier::new(true), + None, ) .await } @@ -153,7 +170,7 @@ impl ClientBuilder { async fn inner_build(owner: impl InboxOwner, api_client: A) -> Client where - A: XmtpApi, + A: XmtpApi + 'static, { let nonce = 1; let inbox_id = generate_inbox_id(&owner.get_address(), &nonce).unwrap(); @@ -182,32 +199,39 @@ async fn build_with_verifier( owner: impl InboxOwner, api_client: A, scw_verifier: V, + history_sync_url: Option<&str>, ) -> Client where - A: XmtpApi, - V: SmartContractSignatureVerifier, + A: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, { let nonce = 1; let inbox_id = generate_inbox_id(&owner.get_address(), &nonce).unwrap(); - let client = Client::::builder(IdentityStrategy::CreateIfNotFound( + let mut builder = Client::::builder(IdentityStrategy::CreateIfNotFound( inbox_id, owner.get_address(), nonce, None, - )); + )) + .temp_store() + .await + .api_client(api_client) + .scw_signature_verifier(scw_verifier); + + if let Some(history_sync_url) = history_sync_url { + builder = builder.history_sync_url(history_sync_url); + } - let client = client - .temp_store() - .await - .api_client(api_client) - .scw_signature_verifier(scw_verifier) - .build_with_verifier() - .await - .unwrap(); + let client = builder.build_with_verifier().await.unwrap(); register_client(&client, owner).await; + if client.history_sync_url.is_some() { + let provider = client.mls_provider().unwrap(); + client.start_sync_worker(&provider).await.unwrap(); + } + client }