Skip to content

Commit

Permalink
RCBC-463: Add DocumentNotLocked error & C++ core update (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
DemetrisChr authored Dec 18, 2023
1 parent 2a19f12 commit 17d2cf1
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 30 deletions.
2 changes: 1 addition & 1 deletion bin/check-clang-static-analyzer
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rm -rf "${BUILD_DIR}" "${REPORT_DIR}"
mkdir -p "${BUILD_DIR}"
cd "${BUILD_DIR}"

SCAN_BUILD_ARGS="--exclude third_party -o ${REPORT_DIR} --use-analyzer=${CB_CC} --use-c++=${CB_CXX} --use-cc=${CB_CC} --status-bugs"
SCAN_BUILD_ARGS="--exclude third_party --exclude _deps -o ${REPORT_DIR} --use-analyzer=${CB_CC} --use-c++=${CB_CXX} --use-cc=${CB_CC} --status-bugs"

RUBY_HDR_DIR=$(ruby -rrbconfig -e "puts RbConfig::CONFIG['rubyhdrdir']")
RUBY_ARCH_HDR_DIR=$(ruby -rrbconfig -e "puts RbConfig::CONFIG['rubyarchhdrdir']")
Expand Down
3 changes: 3 additions & 0 deletions ext/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ target_link_libraries(
PRIVATE project_options
project_warnings
couchbase_cxx_client
Microsoft.GSL::GSL
asio
taocpp::json
snappy)
if(RUBY_LIBRUBY)
target_link_directories(couchbase PRIVATE "${RUBY_LIBRARY_DIR}")
Expand Down
2 changes: 1 addition & 1 deletion ext/couchbase
65 changes: 39 additions & 26 deletions ext/couchbase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <core/meta/version.hxx>

#include <asio.hpp>
#include <openssl/crypto.h>
#include <spdlog/cfg/env.h>
#include <spdlog/sinks/base_sink.h>
#include <spdlog/spdlog.h>
Expand All @@ -37,6 +38,7 @@
#include <core/agent_group.hxx>
#include <core/design_document_namespace_fmt.hxx>
#include <core/logger/configuration.hxx>
#include <core/logger/logger.hxx>
#include <core/operations.hxx>

#include <core/operations/management/analytics.hxx>
Expand Down Expand Up @@ -485,7 +487,7 @@ cb_Backend_allocate(VALUE klass)
cb_backend_data* backend = nullptr;
VALUE obj = TypedData_Make_Struct(klass, cb_backend_data, &cb_backend_type, backend);
backend->ctx = std::make_unique<asio::io_context>();
backend->cluster = couchbase::core::cluster::create(*backend->ctx);
backend->cluster = std::make_shared<couchbase::core::cluster>(*backend->ctx);
backend->worker = std::thread([backend]() { backend->ctx->run(); });
return obj;
}
Expand Down Expand Up @@ -527,6 +529,7 @@ static VALUE eDocumentExists;
static VALUE eDocumentIrretrievable;
static VALUE eDocumentLocked;
static VALUE eDocumentNotFound;
static VALUE eDocumentNotLocked;
static VALUE eDocumentNotJson;
static VALUE eDurabilityAmbiguous;
static VALUE eDurabilityImpossible;
Expand Down Expand Up @@ -623,6 +626,7 @@ init_exceptions(VALUE mCouchbase)
eDocumentIrretrievable = rb_define_class_under(mError, "DocumentIrretrievable", eCouchbaseError);
eDocumentLocked = rb_define_class_under(mError, "DocumentLocked", eCouchbaseError);
eDocumentNotFound = rb_define_class_under(mError, "DocumentNotFound", eCouchbaseError);
eDocumentNotLocked = rb_define_class_under(mError, "DocumentNotLocked", eCouchbaseError);
eDocumentNotJson = rb_define_class_under(mError, "DocumentNotJson", eCouchbaseError);
eDurabilityAmbiguous = rb_define_class_under(mError, "DurabilityAmbiguous", eCouchbaseError);
eDurabilityImpossible = rb_define_class_under(mError, "DurabilityImpossible", eCouchbaseError);
Expand Down Expand Up @@ -776,6 +780,9 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e
case couchbase::errc::key_value::document_locked:
return rb_exc_new_cstr(eDocumentLocked, what.c_str());

case couchbase::errc::key_value::document_not_locked:
return rb_exc_new_cstr(eDocumentNotLocked, what.c_str());

case couchbase::errc::key_value::value_too_large:
return rb_exc_new_cstr(eValueTooLarge, what.c_str());

Expand Down Expand Up @@ -1802,6 +1809,12 @@ cb_extract_timeout(std::chrono::milliseconds& field, VALUE options)
cb_extract_duration(field, options, "timeout");
}

static void
cb_extract_timeout(std::optional<std::chrono::milliseconds>& field, VALUE options)
{
cb_extract_duration(field, options, "timeout");
}

static void
cb_extract_cas(couchbase::cas& field, VALUE cas)
{
Expand Down Expand Up @@ -2454,9 +2467,12 @@ cb_Backend_ping(VALUE self, VALUE bucket, VALUE options)
}
}
}
std::optional<std::chrono::milliseconds> timeout{};
cb_extract_timeout(timeout, options);

auto barrier = std::make_shared<std::promise<couchbase::core::diag::ping_result>>();
auto f = barrier->get_future();
cluster->ping(report_id, bucket_name, selected_services, [barrier](couchbase::core::diag::ping_result&& resp) {
cluster->ping(report_id, bucket_name, selected_services, timeout, [barrier](couchbase::core::diag::ping_result&& resp) {
barrier->set_value(std::move(resp));
});
auto resp = cb_wait_for_future(f);
Expand Down Expand Up @@ -2583,7 +2599,7 @@ cb_Backend_document_get_any_replica(VALUE self, VALUE bucket, VALUE scope, VALUE
couchbase::get_any_replica_options opts;
couchbase::ruby::set_timeout(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -2621,7 +2637,7 @@ cb_Backend_document_get_all_replicas(VALUE self, VALUE bucket, VALUE scope, VALU
couchbase::get_all_replicas_options opts;
couchbase::ruby::set_timeout(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3018,12 +3034,11 @@ cb_Backend_document_upsert(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_preserve_expiry(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.upsert<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.upsert(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3051,7 +3066,7 @@ cb_Backend_document_upsert_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_preserve_expiry(opts, options);

auto c = couchbase::cluster(core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));
auto c = couchbase::cluster(*core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));

std::vector<std::pair<std::string, couchbase::codec::encoded_value>> tuples{};
cb_extract_array_of_id_content(tuples, id_content);
Expand All @@ -3061,7 +3076,7 @@ cb_Backend_document_upsert_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
futures.reserve(num_of_tuples);

for (auto& [id, content] : tuples) {
futures.emplace_back(c.upsert<couchbase::ruby::passthrough_transcoder>(std::move(id), content, opts));
futures.emplace_back(c.upsert(std::move(id), content, opts));
}

VALUE res = rb_ary_new_capa(static_cast<long>(num_of_tuples));
Expand Down Expand Up @@ -3102,7 +3117,7 @@ cb_Backend_document_append(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_timeout(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3142,7 +3157,7 @@ cb_Backend_document_prepend(VALUE self, VALUE bucket, VALUE scope, VALUE collect
couchbase::ruby::set_timeout(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3186,12 +3201,11 @@ cb_Backend_document_replace(VALUE self, VALUE bucket, VALUE scope, VALUE collect
couchbase::ruby::set_preserve_expiry(opts, options);
couchbase::ruby::set_cas(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.replace<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.replace(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3228,12 +3242,11 @@ cb_Backend_document_insert(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_expiry(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.insert<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.insert(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3268,7 +3281,7 @@ cb_Backend_document_remove(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_cas(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3305,7 +3318,7 @@ cb_Backend_document_remove_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
std::vector<std::pair<std::string, couchbase::cas>> tuples{};
cb_extract_array_of_id_cas(tuples, id_cas);

auto c = couchbase::cluster(core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));
auto c = couchbase::cluster(*core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));

auto num_of_tuples = tuples.size();
std::vector<std::future<std::pair<couchbase::key_value_error_context, couchbase::mutation_result>>> futures;
Expand Down Expand Up @@ -3357,7 +3370,7 @@ cb_Backend_document_increment(VALUE self, VALUE bucket, VALUE scope, VALUE colle
couchbase::ruby::set_delta(opts, options);
couchbase::ruby::set_initial_value(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3401,7 +3414,7 @@ cb_Backend_document_decrement(VALUE self, VALUE bucket, VALUE scope, VALUE colle
couchbase::ruby::set_delta(opts, options);
couchbase::ruby::set_initial_value(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3873,7 +3886,7 @@ cb_Backend_document_mutate_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle
}
}

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -4095,7 +4108,7 @@ cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE col
auto collection_name = cb_string_new(collection);

// Getting the operation agent
auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { cluster } });
auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { *cluster } });
agent_group.open_bucket(bucket_name);
auto agent = agent_group.get_agent(bucket_name);
if (!agent.has_value()) {
Expand Down Expand Up @@ -6939,11 +6952,11 @@ cb_Backend_search_get_stats(VALUE self, VALUE timeout)
const auto& cluster = cb_backend_to_cluster(self);

try {
couchbase::core::operations::management::search_index_stats_request req{};
couchbase::core::operations::management::search_get_stats_request req{};
cb_extract_timeout(req, timeout);
auto barrier = std::make_shared<std::promise<couchbase::core::operations::management::search_index_stats_response>>();
auto barrier = std::make_shared<std::promise<couchbase::core::operations::management::search_get_stats_response>>();
auto f = barrier->get_future();
cluster->execute(req, [barrier](couchbase::core::operations::management::search_index_stats_response&& resp) {
cluster->execute(req, [barrier](couchbase::core::operations::management::search_get_stats_response&& resp) {
barrier->set_value(std::move(resp));
});
auto resp = cb_wait_for_future(f);
Expand Down
4 changes: 4 additions & 0 deletions lib/couchbase/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class DocumentIrretrievable < CouchbaseError
class DocumentLocked < CouchbaseError
end

# Thrown when the server reports that the document is not locked when an unlocking operation is being performed.
class DocumentNotLocked < CouchbaseError
end

# Thrown when the request is too big for some reason.
class ValueTooLarge < CouchbaseError
end
Expand Down
2 changes: 1 addition & 1 deletion lib/couchbase/protostellar/response_converter/kv.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def self.to_mutate_in_result(resp, specs, options)
Couchbase::Collection::MutateInResult.new do |res|
res.cas = resp.cas
res.transcoder = options.transcoder
res.deleted = nil # TODO: gRPC response has no deleted field
res.deleted = nil # TODO: gRPC response has no deleted field
res.mutation_token = extract_mutation_token(resp)
res.encoded = resp.specs.each_with_index.map do |s, idx|
Couchbase::Collection::SubDocumentField.new do |f|
Expand Down
2 changes: 1 addition & 1 deletion lib/couchbase/protostellar/response_converter/search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def self.convert_meta_data(proto_meta_data)
Couchbase::Cluster::SearchMetaData.new do |meta|
proto_metrics = proto_meta_data.metrics
dur = proto_metrics.execution_time
meta.metrics.took = (dur.seconds * 1000) + (dur.nanos / 1000.0).round # `took` is in milliseconds
meta.metrics.took = (dur.seconds * 1000) + (dur.nanos / 1000.0).round # `took` is in milliseconds
meta.metrics.total_rows = proto_metrics.total_rows
meta.metrics.max_score = proto_metrics.max_score
meta.metrics.success_partition_count = proto_metrics.success_partition_count
Expand Down
20 changes: 20 additions & 0 deletions test/crud_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ def test_get_and_lock_protects_document_from_mutations
@collection.upsert(doc_id, document)
end

def test_unlock_document_not_locked
skip("#{name}: Server does not support the not_locked KV status") unless env.server_version.trinity?
skip("#{name}: The mock server does not support the not_locked KV status") if use_caves?

# TODO: Remove protostellar skip once it's added
skip("#{name}: The #{Couchbase::Protostellar::NAME} protocol does not support DocumentNotLocked yet") if env.protostellar?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document)

res = @collection.get_and_lock(doc_id, 20)
cas = res.cas
@collection.unlock(doc_id, cas)

assert_raises(Couchbase::Error::DocumentNotLocked) do
@collection.unlock(doc_id, cas)
end
end

def test_insert_fails_when_document_exists_already
doc_id = uniq_id(:foo)
document = {"value" => 42}
Expand Down

0 comments on commit 17d2cf1

Please sign in to comment.