From 5f9ced6325be4f8aa4f4394e37b3c3c7f779d6b5 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Thu, 19 Dec 2024 17:03:25 -0800 Subject: [PATCH] bindings filter messages by type --- Cargo.lock | 2 + bindings_ffi/Cargo.toml | 2 + bindings_ffi/src/mls.rs | 154 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 148 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf5f4a245..0822fc8d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7511,6 +7511,7 @@ dependencies = [ "futures", "paranoid-android", "parking_lot 0.12.3", + "prost", "rand", "thiserror 2.0.6", "tokio", @@ -7521,6 +7522,7 @@ dependencies = [ "uuid 1.11.0", "xmtp_api_grpc", "xmtp_common", + "xmtp_content_types", "xmtp_cryptography", "xmtp_id", "xmtp_mls", diff --git a/bindings_ffi/Cargo.toml b/bindings_ffi/Cargo.toml index 5831c3d7d..fc33b5e52 100644 --- a/bindings_ffi/Cargo.toml +++ b/bindings_ffi/Cargo.toml @@ -12,10 +12,12 @@ futures.workspace = true tracing.workspace = true tracing-subscriber = { workspace = true, features = ["registry", "env-filter", "fmt", "json"] } parking_lot.workspace = true +prost.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["macros"] } uniffi = { version = "0.28.0", default-features = false, features = ["tokio"] } xmtp_api_grpc = { path = "../xmtp_api_grpc" } +xmtp_content_types = { path = "../xmtp_content_types" } xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_id = { path = "../xmtp_id" } xmtp_mls = { path = "../xmtp_mls" } diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 193ea8732..7ec5bf587 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -19,8 +19,8 @@ use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate; use xmtp_mls::groups::scoped_client::LocalScopedGroupClient; use xmtp_mls::groups::HmacKey; use xmtp_mls::storage::group::ConversationType; -use xmtp_mls::storage::group_message::MsgQueryArgs; use xmtp_mls::storage::group_message::SortDirection; +use xmtp_mls::storage::group_message::{ContentType, MsgQueryArgs}; use xmtp_mls::{ api::ApiClientWrapper, builder::ClientBuilder, @@ -1259,6 +1259,38 @@ pub struct FfiListMessagesOptions { pub limit: Option, pub delivery_status: Option, pub direction: Option, + pub content_types: Option>, +} + +#[derive(uniffi::Enum, Clone)] +pub enum FfiContentType { + Unknown, + Text, + GroupMembershipChange, + GroupUpdated, + Reaction, + ReadReceipt, + Reply, + Attachment, + RemoteAttachment, + TransactionReference, +} + +impl From for ContentType { + fn from(value: FfiContentType) -> Self { + match value { + FfiContentType::Unknown => ContentType::Unknown, + FfiContentType::Text => ContentType::Text, + FfiContentType::GroupMembershipChange => ContentType::GroupMembershipChange, + FfiContentType::GroupUpdated => ContentType::GroupUpdated, + FfiContentType::Reaction => ContentType::Reaction, + FfiContentType::ReadReceipt => ContentType::ReadReceipt, + FfiContentType::Reply => ContentType::Reply, + FfiContentType::Attachment => ContentType::Attachment, + FfiContentType::RemoteAttachment => ContentType::RemoteAttachment, + FfiContentType::TransactionReference => ContentType::TransactionReference, + } + } } #[derive(uniffi::Record, Clone, Default)] @@ -1331,7 +1363,11 @@ impl FfiConversation { .maybe_kind(kind) .maybe_delivery_status(delivery_status) .maybe_limit(opts.limit) - .maybe_direction(direction), + .maybe_direction(direction) + .maybe_content_types( + opts.content_types + .map(|types| types.into_iter().map(|t| t.into()).collect()), + ), )? .into_iter() .map(|msg| msg.into()) @@ -1876,20 +1912,26 @@ mod tests { FfiPreferenceUpdate, FfiXmtpClient, }; use crate::{ - connect_to_backend, get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, - FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, - FfiConversationMessageKind, FfiCreateGroupOptions, FfiGroupPermissionsOptions, - FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, - FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError, + connect_to_backend, get_inbox_id_for_address, inbox_owner::SigningError, + v2::FfiSortDirection, FfiConsent, FfiConsentEntityType, FfiConsentState, FfiContentType, + FfiConversation, FfiConversationCallback, FfiConversationMessageKind, + FfiCreateGroupOptions, FfiDirection, FfiGroupPermissionsOptions, FfiInboxOwner, + FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, + FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError, }; use ethers::utils::hex; - use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, + use prost::Message; + use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, }; use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_common::tmp_path; use xmtp_common::{wait_for_eq, wait_for_ok}; + use xmtp_content_types::{read_receipt, text::TextCodec, ContentCodec}; use xmtp_cryptography::{signature::RecoverableSignature, utils::rng}; use xmtp_id::associations::{ generate_inbox_id, @@ -1900,6 +1942,7 @@ mod tests { storage::EncryptionKey, InboxOwner, }; + use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent}; const HISTORY_SYNC_URL: &str = "http://localhost:5558"; @@ -4832,4 +4875,95 @@ mod tests { Ok(_) => panic!("Expected an error, but got Ok"), } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_list_messages_with_content_types() { + // Create test clients + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Alix creates group with Bo + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Bo syncs to get the group + bo.conversations().sync().await.unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); + + // Alix sends first message + alix_group.send("hey".as_bytes().to_vec()).await.unwrap(); + + // Bo syncs and responds + bo_group.sync().await.unwrap(); + let bo_message_response = TextCodec::encode("hey alix".to_string()).unwrap(); + let mut buf = Vec::new(); + bo_message_response.encode(&mut buf).unwrap(); + bo_group.send(buf).await.unwrap(); + + // Bo sends read receipt + let read_receipt_content_id = ContentTypeId { + authority_id: "xmtp.org".to_string(), + type_id: read_receipt::ReadReceiptCodec::TYPE_ID.to_string(), + version_major: 1, + version_minor: 0, + }; + let read_receipt_encoded_content = EncodedContent { + r#type: Some(read_receipt_content_id), + parameters: HashMap::new(), + fallback: None, + compression: None, + content: vec![], + }; + + let mut buf = Vec::new(); + read_receipt_encoded_content.encode(&mut buf).unwrap(); + bo_group.send(buf).await.unwrap(); + + // Alix syncs and gets all messages + alix_group.sync().await.unwrap(); + let latest_message = alix_group + // ... existing code ... + .find_messages(FfiListMessagesOptions { + direction: Some(FfiDirection::Descending), + limit: Some(1), + ..Default::default() + }) + .await + .unwrap(); + + // Verify last message is the read receipt + assert_eq!(latest_message.len(), 1); + let latest_message_encoded_content = + EncodedContent::decode(latest_message.last().unwrap().content.clone().as_slice()) + .unwrap(); + assert_eq!( + latest_message_encoded_content.r#type.unwrap().type_id, + "readReceipt" + ); + + // Get only text messages + let text_messages = alix_group + .find_messages(FfiListMessagesOptions { + content_types: Some(vec![FfiContentType::Text]), + direction: Some(FfiDirection::Descending), + limit: Some(1), + ..Default::default() + }) + .await + .unwrap(); + + // Verify last message is "hey alix" when filtered + assert_eq!(text_messages.len(), 1); + let latest_message_encoded_content = + EncodedContent::decode(text_messages.last().unwrap().content.clone().as_slice()) + .unwrap(); + let text_message = TextCodec::decode(latest_message_encoded_content).unwrap(); + assert_eq!(text_message, "hey alix"); + } }