Skip to content

Commit

Permalink
Implement zone aware replica reads
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt-Woz committed Jan 13, 2025
1 parent 160fd14 commit e1cd348
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 30 deletions.
5 changes: 5 additions & 0 deletions ext/rcb_backend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ initialize_cluster_options(const core::utils::connection_string& connstr,
cluster_options.network().preferred_network(param.value());
}

static const auto server_group = rb_id2sym(rb_intern("preferred_server_group"));
if (auto group = options::get_string(options, server_group); group) {
cluster_options.network().preferred_server_group(group.value());
}

static const auto sym_use_ip_protocol = rb_id2sym(rb_intern("use_ip_protocol"));
if (auto proto = options::get_symbol(options, sym_use_ip_protocol); proto) {
static const auto sym_any = rb_id2sym(rb_intern("any"));
Expand Down
81 changes: 51 additions & 30 deletions ext/rcb_crud.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,31 +128,40 @@ cb_Backend_document_get_any_replica(VALUE self,
VALUE id,
VALUE options)
{
auto cluster = cb_backend_to_public_api_cluster(self);
auto cluster = cb_backend_to_core_api_cluster(self);

Check_Type(bucket, T_STRING);
Check_Type(scope, T_STRING);
Check_Type(collection, T_STRING);
Check_Type(id, T_STRING);

try {
couchbase::get_any_replica_options opts;
set_timeout(opts, options);
core::document_id doc_id{
cb_string_new(bucket),
cb_string_new(scope),
cb_string_new(collection),
cb_string_new(id),
};

auto f = cluster.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.get_any_replica(cb_string_new(id), opts);
auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
cb_throw_error(ctx, "unable to get replica of the document");
core::operations::get_any_replica_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

std::promise<core::operations::get_any_replica_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable to get replica of the document");
}

auto value = resp.content_as<passthrough_transcoder>();
VALUE res = rb_hash_new();
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas()));
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(resp.value));
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas));
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(resp.flags));
rb_hash_aset(res, rb_id2sym(rb_intern("replica")), resp.replica ? Qtrue : Qfalse);
return res;
} catch (const std::system_error& se) {
rb_exc_raise(cb_map_error_code(
Expand All @@ -171,33 +180,43 @@ cb_Backend_document_get_all_replicas(VALUE self,
VALUE id,
VALUE options)
{
auto cluster = cb_backend_to_public_api_cluster(self);
auto cluster = cb_backend_to_core_api_cluster(self);

Check_Type(bucket, T_STRING);
Check_Type(scope, T_STRING);
Check_Type(collection, T_STRING);
Check_Type(id, T_STRING);

try {
couchbase::get_all_replicas_options opts;
set_timeout(opts, options);
core::document_id doc_id{
cb_string_new(bucket),
cb_string_new(scope),
cb_string_new(collection),
cb_string_new(id),
};

auto f = cluster.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.get_all_replicas(cb_string_new(id), opts);
auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
cb_throw_error(ctx, "unable to get all replicas for the document");
core::operations::get_all_replicas_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

std::promise<core::operations::get_all_replicas_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable to get all replicas for the document");
}

VALUE res = rb_ary_new_capa(static_cast<long>(resp.size()));
for (const auto& entry : resp) {
VALUE res = rb_ary_new_capa(static_cast<long>(resp.entries.size()));

for (const auto& entry : resp.entries) {
VALUE response = rb_hash_new();
auto value = entry.content_as<passthrough_transcoder>();
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas()));
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(entry.value));
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas));
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(entry.flags));
rb_hash_aset(response, rb_id2sym(rb_intern("replica")), entry.replica ? Qtrue : Qfalse);
rb_ary_push(res, response);
}
return res;
Expand Down Expand Up @@ -1107,6 +1126,7 @@ cb_Backend_document_lookup_in_any_replica(VALUE self,

core::operations::lookup_in_any_replica_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
static VALUE path_property = rb_id2sym(rb_intern("path"));
Expand Down Expand Up @@ -1234,6 +1254,7 @@ cb_Backend_document_lookup_in_all_replicas(VALUE self,

core::operations::lookup_in_all_replicas_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
static VALUE path_property = rb_id2sym(rb_intern("path"));
Expand Down
32 changes: 32 additions & 0 deletions ext/rcb_utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <couchbase/cas.hxx>
#include <couchbase/durability_level.hxx>
#include <couchbase/persist_to.hxx>
#include <couchbase/read_preference.hxx>
#include <couchbase/replicate_to.hxx>
#include <couchbase/store_semantics.hxx>

Expand Down Expand Up @@ -128,6 +129,37 @@ cb_extract_timeout(Request& req, VALUE options)
}
}

template<typename Request>
inline void
cb_extract_read_preference(Request& req, VALUE options)
{
static VALUE property_name = rb_id2sym(rb_intern("read_preference"));
if (!NIL_P(options)) {
if (TYPE(options) != T_HASH) {
throw ruby_exception(rb_eArgError,
rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options));
}

VALUE val = rb_hash_aref(options, property_name);
if (NIL_P(val)) {
return;
}
if (TYPE(val) != T_SYMBOL) {
throw ruby_exception(
rb_eArgError, rb_sprintf("read_preference must be a Symbol, but given %+" PRIsVALUE, val));
}

if (ID mode = rb_sym2id(val); mode == rb_intern("no_preference")) {
req.read_preference = couchbase::read_preference::no_preference;
} else if (mode == rb_intern("selected_server_group")) {
req.read_preference = couchbase::read_preference::selected_server_group;
} else {
throw ruby_exception(rb_eArgError,
rb_sprintf("unexpected read_preference, given %+" PRIsVALUE, val));
}
}
}

template<typename Field>
inline void
cb_extract_duration(Field& field, VALUE options, const char* name)
Expand Down
Loading

0 comments on commit e1cd348

Please sign in to comment.