Skip to content

Commit

Permalink
Automatic sync - sync worker (#1207)
Browse files Browse the repository at this point in the history
* make local events available

* wip

* wip

* bindings

* node bindings

* wasm bindings

* cli

* lint

* cleanup

* stream

* process replies, and respond to requests

* wip

* wip

* lifetime pollution

* wip

* consent test passes

* complete the test

* improve consent sync test

* cleanup

* update history sync test

* message history server

* flaky tests

* cleanup

* cleanup

* cleanup

* worker don't die

* debounce

* lint

* fix

* remove unwraps

* start the sync worker after registering

* fix wasm tests

* add streaming cfg for wasm

* wip

* wip

* move the constraints up a level

* cleanup

* rename

* unreachable

* error update

* wasm send theory

* wip

* cleanup

---------

Co-authored-by: Mojtaba Chenani <[email protected]>
  • Loading branch information
codabrink and mchenani authored Nov 7, 2024
1 parent 424319a commit 0d3bf35
Show file tree
Hide file tree
Showing 13 changed files with 447 additions and 348 deletions.
46 changes: 29 additions & 17 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,15 @@ pub async fn create_client(
legacy_signed_private_key_proto,
);

let xmtp_client: RustXmtpClient = match history_sync_url {
Some(url) => {
ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store)
.history_sync_url(&url)
.build()
.await?
}
None => {
ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store)
.build()
.await?
}
};
let mut builder = ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store);

if let Some(url) = &history_sync_url {
builder = builder.history_sync_url(url);
}

let xmtp_client = builder.build().await?;

log::info!(
"Created XMTP client for inbox_id: {}",
Expand Down Expand Up @@ -397,6 +389,26 @@ impl FfiXmtpClient {
.register_identity(signature_request.clone())
.await?;

self.maybe_start_sync_worker().await?;

Ok(())
}

/// Starts the sync worker if the history sync url is present.
async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> {
if self.inner_client.history_sync_url().is_none() {
return Ok(());
}

let provider = self
.inner_client
.mls_provider()
.map_err(GenericError::from_error)?;
self.inner_client
.start_sync_worker(&provider)
.await
.map_err(GenericError::from_error)?;

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions dev/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ services:
ports:
- 8545:8545

history-server:
image: ghcr.io/xmtp/message-history-server:main
platform: linux/amd64
ports:
- 5558:5558

db:
image: postgres:13
environment:
Expand Down
64 changes: 10 additions & 54 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ enum Commands {
account_addresses: Vec<String>,
},
RequestHistorySync {},
ReplyToHistorySyncRequest {},
ProcessHistorySyncReply {},
ProcessConsentSyncReply {},
ListHistorySyncMessages {},
/// Information about the account that owns the DB
Info {},
Expand Down Expand Up @@ -415,60 +412,19 @@ async fn main() -> color_eyre::eyre::Result<()> {
);
}
Commands::RequestHistorySync {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let (group_id, _) = client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await?;
let group_id_str = hex::encode(group_id);
info!(
group_id = group_id_str,
"Sent history sync request in sync group {group_id_str}"
);
}
Commands::ReplyToHistorySyncRequest {} => {
let provider = client.mls_provider()?;
let group = client.get_sync_group()?;
let group_id_str = hex::encode(group.group_id);
let reply = client
.reply_to_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await?;

info!(
group_id = group_id_str,
"Sent history sync reply in sync group {group_id_str}"
);
info!("Reply: {:?}", reply);
}
Commands::ProcessHistorySyncReply {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker(&provider).await.unwrap();
client
.process_sync_reply(&provider, DeviceSyncKind::MessageHistory)
.await?;

info!("History bundle downloaded and inserted into user DB")
}
Commands::ProcessConsentSyncReply {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
client
.process_sync_reply(&provider, DeviceSyncKind::Consent)
.await?;

info!("Consent bundle downloaded and inserted into user DB")
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
.unwrap();
info!("Sent history sync request in sync group.")
}
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let group = client.get_sync_group()?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
Expand Down Expand Up @@ -504,7 +460,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
Ok(())
}

async fn create_client<C: XmtpApi>(
async fn create_client<C: XmtpApi + 'static>(
cli: &Cli,
account: IdentityStrategy,
grpc: C,
Expand All @@ -531,7 +487,7 @@ async fn register<C>(
client: C,
) -> Result<(), CliError>
where
C: XmtpApi,
C: XmtpApi + 'static,
{
let w: Wallet = if let Some(seed_phrase) = maybe_seed_phrase {
Wallet::LocalWallet(
Expand Down
20 changes: 10 additions & 10 deletions xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum ClientBuilderError {
GroupError(#[from] crate::groups::GroupError),
#[error(transparent)]
ApiError(#[from] xmtp_proto::api_client::Error),
#[error(transparent)]
DeviceSync(#[from] crate::groups::device_sync::DeviceSyncError),
}

pub struct ClientBuilder<ApiClient, V = RemoteSignatureVerifier<ApiClient>> {
Expand Down Expand Up @@ -108,8 +110,8 @@ impl<ApiClient, V> ClientBuilder<ApiClient, V> {

impl<ApiClient, V> ClientBuilder<ApiClient, V>
where
ApiClient: XmtpApi,
V: SmartContractSignatureVerifier,
ApiClient: XmtpApi + 'static,
V: SmartContractSignatureVerifier + 'static,
{
/// Build with a custom smart contract wallet verifier
pub async fn build_with_verifier(self) -> Result<Client<ApiClient, V>, ClientBuilderError> {
Expand All @@ -120,7 +122,7 @@ where

impl<ApiClient> ClientBuilder<ApiClient, RemoteSignatureVerifier<ApiClient>>
where
ApiClient: XmtpApi,
ApiClient: XmtpApi + 'static,
{
/// Build with the default [`RemoteSignatureVerifier`]
pub async fn build(self) -> Result<Client<ApiClient>, ClientBuilderError> {
Expand Down Expand Up @@ -161,8 +163,8 @@ async fn inner_build<C, V>(
api_client: Arc<C>,
) -> Result<Client<C, V>, ClientBuilderError>
where
C: XmtpApi,
V: SmartContractSignatureVerifier,
C: XmtpApi + 'static,
V: SmartContractSignatureVerifier + 'static,
{
let ClientBuilder {
mut store,
Expand Down Expand Up @@ -198,15 +200,13 @@ where
)
.await?;

let client = Client::new(
Ok(Client::new(
api_client_wrapper,
identity,
store,
scw_verifier,
history_sync_url,
);

Ok(client)
history_sync_url.clone(),
))
}

#[cfg(test)]
Expand Down
10 changes: 7 additions & 3 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ use crate::{
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::group::GroupQueryArgs,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
db_connection::DbConnection,
group::{GroupMembershipState, StoredGroup},
group::{GroupMembershipState, GroupQueryArgs, StoredGroup},
group_message::StoredGroupMessage,
refresh_state::EntityKind,
sql_key_store, EncryptedMessageStore, StorageError,
Expand Down Expand Up @@ -311,7 +310,8 @@ where
let intents = Arc::new(Intents {
context: context.clone(),
});
let (tx, _) = broadcast::channel(10);
let (tx, _) = broadcast::channel(32);

Self {
api_client: api_client.into(),
context,
Expand Down Expand Up @@ -350,6 +350,10 @@ where
self.context.mls_provider()
}

pub fn history_sync_url(&self) -> Option<&String> {
self.history_sync_url.as_ref()
}

/// Calls the server to look up the `inbox_id` associated with a given address
pub async fn find_inbox_id_from_address(
&self,
Expand Down
Loading

0 comments on commit 0d3bf35

Please sign in to comment.