Skip to content

Commit

Permalink
Add sync control methods (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
leemaguire authored Dec 21, 2023
1 parent 2515c1a commit bd6ba61
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 1 deletion.
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
X.Y.Z Release notes (YYYY-MM-DD)
=============================================================

### Fixed
* None

### Enhancements
* Add the following methods for control over the sync session state:
- `realm::sync_session::pause()`
- `realm::sync_session::resume()`
- `realm::sync_session::reconnect()`
- `realm::sync_session::state()`
- `realm::sync_session::connection_state()`
- `realm::sync_session::observe_connection_change(std::function<void(enum connection_state old_state,
enum connection_state new_state)>&& callback)`
- `realm::sync_session::unregister_connection_change_observer(uint64_t token)`

### Breaking Changes
* None

### Compatibility
* Fileformat: Generates files with format v23. Reads and automatically upgrade from fileformat v5.

### Internals
* None

----------------------------------------------

0.6.1 Release notes (2023-12-19)
=============================================================

Expand Down
59 changes: 59 additions & 0 deletions src/cpprealm/internal/bridge/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,30 @@

namespace realm::internal::bridge {

static_assert(static_cast<int>(::realm::SyncSession::State::Active) == static_cast<int>(sync_session::state::active));
static_assert(static_cast<int>(::realm::SyncSession::State::Dying) == static_cast<int>(sync_session::state::dying));
static_assert(static_cast<int>(::realm::SyncSession::State::Inactive) == static_cast<int>(sync_session::state::inactive));
static_assert(static_cast<int>(::realm::SyncSession::State::WaitingForAccessToken) == static_cast<int>(sync_session::state::waiting_for_access_token));
static_assert(static_cast<int>(::realm::SyncSession::State::Paused) == static_cast<int>(sync_session::state::paused));

static_assert(static_cast<int>(::realm::SyncSession::ConnectionState::Disconnected) == static_cast<int>(sync_session::connection_state::disconnected));
static_assert(static_cast<int>(::realm::SyncSession::ConnectionState::Connecting) == static_cast<int>(sync_session::connection_state::connecting));
static_assert(static_cast<int>(::realm::SyncSession::ConnectionState::Connected) == static_cast<int>(sync_session::connection_state::connected));

enum sync_session::state sync_session::state() const {
if (auto session = m_session.lock()) {
return static_cast<enum sync_session::state>(session->state());
}
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}

enum sync_session::connection_state sync_session::connection_state() const {
if (auto session = m_session.lock()) {
return static_cast<enum sync_session::connection_state>(session->connection_state());
}
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}

void sync_session::wait_for_download_completion(std::function<void(status)> &&callback) {
if (auto session = m_session.lock()) {
session->wait_for_download_completion([cb = std::move(callback)](::realm::Status s) {
Expand Down Expand Up @@ -75,4 +92,46 @@ namespace realm::internal::bridge {
sync_session::sync_session(const std::shared_ptr<SyncSession> &v) {
m_session = v;
}

void sync_session::pause() {
if (auto session = m_session.lock()) {
session->pause();
} else {
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}
}

void sync_session::resume() {
if (auto session = m_session.lock()) {
session->resume();
} else {
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}
}

void sync_session::reconnect() {
if (auto session = m_session.lock()) {
session->handle_reconnect();
} else {
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}
}

uint64_t sync_session::observe_connection_change(std::function<void(enum connection_state old_state, enum connection_state new_state)>&& callback) {
if (auto session = m_session.lock()) {
return session->register_connection_change_callback([fn = std::move(callback)](::realm::SyncSession::ConnectionState old_state,
::realm::SyncSession::ConnectionState new_state) {
fn(static_cast<enum sync_session::connection_state>(old_state), static_cast<enum sync_session::connection_state>(new_state));
});
} else {
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}
}
void sync_session::unregister_connection_change_observer(uint64_t token) {
if (auto session = m_session.lock()) {
session->unregister_connection_change_callback(token);
} else {
throw std::runtime_error("Realm: Error accessing sync_session which has been destroyed.");
}
}
}
64 changes: 63 additions & 1 deletion src/cpprealm/internal/bridge/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,30 @@ namespace realm {
class SyncSession;
namespace internal::bridge {
struct status;
struct realm;

struct sync_session {

enum class state {
active,
dying,
inactive,
waiting_for_access_token,
paused
};

sync_session(const std::shared_ptr<SyncSession> &);
enum class connection_state {
disconnected,
connecting,
connected
};

// The session's current state.
enum state state() const;

// The session's current connection state.
enum connection_state connection_state() const;

// Register a callback that will be called when all pending uploads have completed.
// The callback is run asynchronously, and upon whatever thread the underlying sync client
// chooses to run it on.
Expand All @@ -29,14 +42,63 @@ namespace realm {
// Register a callback that will be called when all pending downloads have been completed.
// Works the same way as `wait_for_upload_completion()`.
void wait_for_download_completion(std::function<void(status)>&& callback);

// Register a callback that will be called when all pending uploads have completed.
// The callback is run asynchronously, and upon whatever thread the underlying sync client
// chooses to run it on.
std::future<void> wait_for_upload_completion();

// Register a callback that will be called when all pending downloads have been completed.
std::future<void> wait_for_download_completion();

/**
Temporarily suspend syncronization and disconnect from the server.
The session will not attempt to connect to Atlas App Services until `resume`
is called or the Realm file is closed and re-opened.
*/
void pause();

/**
Resume syncronization and reconnect to Atlas App Services after suspending.
This is a no-op if the session was already active or if the session is invalid.
Newly created sessions begin in the Active state and do not need to be resumed.
*/
void resume();

/**
Request an immediate reconnection to the server if the session is disconnected.
Realm automatically reconnects after disconnects with an exponential backoff,
which is reset when the reachability handler reports a network status change.
In some scenarios an application may wish to skip the reconnect delay, such as
when an application receives a wake from background notification, which can be
done by calling this method. Calling this method is never required.
This method is asynchronous and merely skips the current reconnect delay, so
the connection state will still normally be disconnected immediately after
calling it.
Has no effect if the session is currently connected.
*/
void reconnect();

/**
Registers a callback that is invoked when the underlying sync session changes
its connection state
*/
uint64_t observe_connection_change(std::function<void(enum connection_state old_state, enum connection_state new_state)>&& callback);

/**
Unregister a previously registered notifier. If the token is invalid,
this method does nothing.
*/
void unregister_connection_change_observer(uint64_t token);
private:
std::weak_ptr<SyncSession> m_session;
sync_session(const std::shared_ptr<SyncSession> &);
friend struct internal::bridge::realm;
};
}
}
Expand Down
52 changes: 52 additions & 0 deletions tests/experimental/sync/flexible_sync_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,56 @@ TEST_CASE("set collection sync", "[set]") {
auto objs = realm2.objects<experimental::AllTypesObject>();
CHECK(objs.size() == 1);
}
}

TEST_CASE("pause_resume_sync", "[sync]") {
auto app = realm::App(realm::App::configuration({Admin::shared().cached_app_id(), Admin::shared().base_url()}));

SECTION("pause_resume") {
auto user = app.login(realm::App::credentials::anonymous()).get();

auto flx_sync_config = user.flexible_sync_configuration();
auto synced_realm = experimental::db(flx_sync_config);
auto update_success = synced_realm.subscriptions().update([](realm::mutable_sync_subscription_set &subs) {
subs.clear();
}).get();
CHECK(update_success == true);
update_success = synced_realm.subscriptions().update([](realm::mutable_sync_subscription_set &subs) {
subs.add<experimental::AllTypesObject>("foo-strings");
subs.add<experimental::AllTypesObjectLink>("foo-link");
})
.get();
CHECK(update_success == true);

auto wait_for_state = [](enum realm::sync_session::connection_state expected_state,
const experimental::db& realm,
std::function<void(const experimental::db&)> fn) {
std::promise<void> p;
std::future<void> f = p.get_future();
auto token = realm.get_sync_session()->observe_connection_change([&]
(enum realm::sync_session::connection_state,
enum realm::sync_session::connection_state new_state) {
if (new_state == expected_state)
p.set_value();
});
fn(realm);

if (f.wait_for(std::chrono::milliseconds(10000)) == std::future_status::ready) {
realm.get_sync_session()->unregister_connection_change_observer(token);
return f.get();
} else {
throw std::runtime_error("Timeout exceeded");
}
};

wait_for_state(realm::sync_session::connection_state::disconnected, synced_realm, [](const experimental::db& realm) {
realm.get_sync_session()->pause();
});
CHECK(synced_realm.get_sync_session()->state() == realm::sync_session::state::paused);

wait_for_state(realm::sync_session::connection_state::connected, synced_realm, [](const experimental::db& realm) {
realm.get_sync_session()->resume();
});
CHECK(synced_realm.get_sync_session()->state() == realm::sync_session::state::active);
}
}

0 comments on commit bd6ba61

Please sign in to comment.