Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCBC-463: Add DocumentNotLocked error & C++ core update #128

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/check-clang-static-analyzer
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rm -rf "${BUILD_DIR}" "${REPORT_DIR}"
mkdir -p "${BUILD_DIR}"
cd "${BUILD_DIR}"

SCAN_BUILD_ARGS="--exclude third_party -o ${REPORT_DIR} --use-analyzer=${CB_CC} --use-c++=${CB_CXX} --use-cc=${CB_CC} --status-bugs"
SCAN_BUILD_ARGS="--exclude third_party --exclude _deps -o ${REPORT_DIR} --use-analyzer=${CB_CC} --use-c++=${CB_CXX} --use-cc=${CB_CC} --status-bugs"

RUBY_HDR_DIR=$(ruby -rrbconfig -e "puts RbConfig::CONFIG['rubyhdrdir']")
RUBY_ARCH_HDR_DIR=$(ruby -rrbconfig -e "puts RbConfig::CONFIG['rubyarchhdrdir']")
Expand Down
3 changes: 3 additions & 0 deletions ext/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ target_link_libraries(
PRIVATE project_options
project_warnings
couchbase_cxx_client
Microsoft.GSL::GSL
asio
taocpp::json
snappy)
if(RUBY_LIBRUBY)
target_link_directories(couchbase PRIVATE "${RUBY_LIBRARY_DIR}")
Expand Down
2 changes: 1 addition & 1 deletion ext/couchbase
65 changes: 39 additions & 26 deletions ext/couchbase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <core/meta/version.hxx>

#include <asio.hpp>
#include <openssl/crypto.h>
#include <spdlog/cfg/env.h>
#include <spdlog/sinks/base_sink.h>
#include <spdlog/spdlog.h>
Expand All @@ -37,6 +38,7 @@
#include <core/agent_group.hxx>
#include <core/design_document_namespace_fmt.hxx>
#include <core/logger/configuration.hxx>
#include <core/logger/logger.hxx>
#include <core/operations.hxx>

#include <core/operations/management/analytics.hxx>
Expand Down Expand Up @@ -485,7 +487,7 @@ cb_Backend_allocate(VALUE klass)
cb_backend_data* backend = nullptr;
VALUE obj = TypedData_Make_Struct(klass, cb_backend_data, &cb_backend_type, backend);
backend->ctx = std::make_unique<asio::io_context>();
backend->cluster = couchbase::core::cluster::create(*backend->ctx);
backend->cluster = std::make_shared<couchbase::core::cluster>(*backend->ctx);
backend->worker = std::thread([backend]() { backend->ctx->run(); });
return obj;
}
Expand Down Expand Up @@ -527,6 +529,7 @@ static VALUE eDocumentExists;
static VALUE eDocumentIrretrievable;
static VALUE eDocumentLocked;
static VALUE eDocumentNotFound;
static VALUE eDocumentNotLocked;
static VALUE eDocumentNotJson;
static VALUE eDurabilityAmbiguous;
static VALUE eDurabilityImpossible;
Expand Down Expand Up @@ -623,6 +626,7 @@ init_exceptions(VALUE mCouchbase)
eDocumentIrretrievable = rb_define_class_under(mError, "DocumentIrretrievable", eCouchbaseError);
eDocumentLocked = rb_define_class_under(mError, "DocumentLocked", eCouchbaseError);
eDocumentNotFound = rb_define_class_under(mError, "DocumentNotFound", eCouchbaseError);
eDocumentNotLocked = rb_define_class_under(mError, "DocumentNotLocked", eCouchbaseError);
eDocumentNotJson = rb_define_class_under(mError, "DocumentNotJson", eCouchbaseError);
eDurabilityAmbiguous = rb_define_class_under(mError, "DurabilityAmbiguous", eCouchbaseError);
eDurabilityImpossible = rb_define_class_under(mError, "DurabilityImpossible", eCouchbaseError);
Expand Down Expand Up @@ -776,6 +780,9 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e
case couchbase::errc::key_value::document_locked:
return rb_exc_new_cstr(eDocumentLocked, what.c_str());

case couchbase::errc::key_value::document_not_locked:
return rb_exc_new_cstr(eDocumentNotLocked, what.c_str());

case couchbase::errc::key_value::value_too_large:
return rb_exc_new_cstr(eValueTooLarge, what.c_str());

Expand Down Expand Up @@ -1802,6 +1809,12 @@ cb_extract_timeout(std::chrono::milliseconds& field, VALUE options)
cb_extract_duration(field, options, "timeout");
}

static void
cb_extract_timeout(std::optional<std::chrono::milliseconds>& field, VALUE options)
{
cb_extract_duration(field, options, "timeout");
}

static void
cb_extract_cas(couchbase::cas& field, VALUE cas)
{
Expand Down Expand Up @@ -2454,9 +2467,12 @@ cb_Backend_ping(VALUE self, VALUE bucket, VALUE options)
}
}
}
std::optional<std::chrono::milliseconds> timeout{};
cb_extract_timeout(timeout, options);

auto barrier = std::make_shared<std::promise<couchbase::core::diag::ping_result>>();
auto f = barrier->get_future();
cluster->ping(report_id, bucket_name, selected_services, [barrier](couchbase::core::diag::ping_result&& resp) {
cluster->ping(report_id, bucket_name, selected_services, timeout, [barrier](couchbase::core::diag::ping_result&& resp) {
barrier->set_value(std::move(resp));
});
auto resp = cb_wait_for_future(f);
Expand Down Expand Up @@ -2583,7 +2599,7 @@ cb_Backend_document_get_any_replica(VALUE self, VALUE bucket, VALUE scope, VALUE
couchbase::get_any_replica_options opts;
couchbase::ruby::set_timeout(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -2621,7 +2637,7 @@ cb_Backend_document_get_all_replicas(VALUE self, VALUE bucket, VALUE scope, VALU
couchbase::get_all_replicas_options opts;
couchbase::ruby::set_timeout(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3018,12 +3034,11 @@ cb_Backend_document_upsert(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_preserve_expiry(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.upsert<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.upsert(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3051,7 +3066,7 @@ cb_Backend_document_upsert_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_preserve_expiry(opts, options);

auto c = couchbase::cluster(core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));
auto c = couchbase::cluster(*core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));

std::vector<std::pair<std::string, couchbase::codec::encoded_value>> tuples{};
cb_extract_array_of_id_content(tuples, id_content);
Expand All @@ -3061,7 +3076,7 @@ cb_Backend_document_upsert_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
futures.reserve(num_of_tuples);

for (auto& [id, content] : tuples) {
futures.emplace_back(c.upsert<couchbase::ruby::passthrough_transcoder>(std::move(id), content, opts));
futures.emplace_back(c.upsert(std::move(id), content, opts));
}

VALUE res = rb_ary_new_capa(static_cast<long>(num_of_tuples));
Expand Down Expand Up @@ -3102,7 +3117,7 @@ cb_Backend_document_append(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_timeout(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3142,7 +3157,7 @@ cb_Backend_document_prepend(VALUE self, VALUE bucket, VALUE scope, VALUE collect
couchbase::ruby::set_timeout(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3186,12 +3201,11 @@ cb_Backend_document_replace(VALUE self, VALUE bucket, VALUE scope, VALUE collect
couchbase::ruby::set_preserve_expiry(opts, options);
couchbase::ruby::set_cas(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.replace<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.replace(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3228,12 +3242,11 @@ cb_Backend_document_insert(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_expiry(opts, options);
couchbase::ruby::set_durability(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.insert<couchbase::ruby::passthrough_transcoder>(
cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);
.insert(cb_string_new(id), couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, opts);

auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
Expand Down Expand Up @@ -3268,7 +3281,7 @@ cb_Backend_document_remove(VALUE self, VALUE bucket, VALUE scope, VALUE collecti
couchbase::ruby::set_durability(opts, options);
couchbase::ruby::set_cas(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3305,7 +3318,7 @@ cb_Backend_document_remove_multi(VALUE self, VALUE bucket, VALUE scope, VALUE co
std::vector<std::pair<std::string, couchbase::cas>> tuples{};
cb_extract_array_of_id_cas(tuples, id_cas);

auto c = couchbase::cluster(core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));
auto c = couchbase::cluster(*core).bucket(cb_string_new(bucket)).scope(cb_string_new(scope)).collection(cb_string_new(collection));

auto num_of_tuples = tuples.size();
std::vector<std::future<std::pair<couchbase::key_value_error_context, couchbase::mutation_result>>> futures;
Expand Down Expand Up @@ -3357,7 +3370,7 @@ cb_Backend_document_increment(VALUE self, VALUE bucket, VALUE scope, VALUE colle
couchbase::ruby::set_delta(opts, options);
couchbase::ruby::set_initial_value(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3401,7 +3414,7 @@ cb_Backend_document_decrement(VALUE self, VALUE bucket, VALUE scope, VALUE colle
couchbase::ruby::set_delta(opts, options);
couchbase::ruby::set_initial_value(opts, options);

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -3873,7 +3886,7 @@ cb_Backend_document_mutate_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle
}
}

auto f = couchbase::cluster(core)
auto f = couchbase::cluster(*core)
.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
Expand Down Expand Up @@ -4095,7 +4108,7 @@ cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE col
auto collection_name = cb_string_new(collection);

// Getting the operation agent
auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { cluster } });
auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { *cluster } });
agent_group.open_bucket(bucket_name);
auto agent = agent_group.get_agent(bucket_name);
if (!agent.has_value()) {
Expand Down Expand Up @@ -6939,11 +6952,11 @@ cb_Backend_search_get_stats(VALUE self, VALUE timeout)
const auto& cluster = cb_backend_to_cluster(self);

try {
couchbase::core::operations::management::search_index_stats_request req{};
couchbase::core::operations::management::search_get_stats_request req{};
cb_extract_timeout(req, timeout);
auto barrier = std::make_shared<std::promise<couchbase::core::operations::management::search_index_stats_response>>();
auto barrier = std::make_shared<std::promise<couchbase::core::operations::management::search_get_stats_response>>();
auto f = barrier->get_future();
cluster->execute(req, [barrier](couchbase::core::operations::management::search_index_stats_response&& resp) {
cluster->execute(req, [barrier](couchbase::core::operations::management::search_get_stats_response&& resp) {
barrier->set_value(std::move(resp));
});
auto resp = cb_wait_for_future(f);
Expand Down
4 changes: 4 additions & 0 deletions lib/couchbase/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class DocumentIrretrievable < CouchbaseError
class DocumentLocked < CouchbaseError
end

# Thrown when the server reports that the document is not locked when an unlocking operation is being performed.
class DocumentNotLocked < CouchbaseError
end

# Thrown when the request is too big for some reason.
class ValueTooLarge < CouchbaseError
end
Expand Down
2 changes: 1 addition & 1 deletion lib/couchbase/protostellar/response_converter/kv.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def self.to_mutate_in_result(resp, specs, options)
Couchbase::Collection::MutateInResult.new do |res|
res.cas = resp.cas
res.transcoder = options.transcoder
res.deleted = nil # TODO: gRPC response has no deleted field
res.deleted = nil # TODO: gRPC response has no deleted field
res.mutation_token = extract_mutation_token(resp)
res.encoded = resp.specs.each_with_index.map do |s, idx|
Couchbase::Collection::SubDocumentField.new do |f|
Expand Down
2 changes: 1 addition & 1 deletion lib/couchbase/protostellar/response_converter/search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def self.convert_meta_data(proto_meta_data)
Couchbase::Cluster::SearchMetaData.new do |meta|
proto_metrics = proto_meta_data.metrics
dur = proto_metrics.execution_time
meta.metrics.took = (dur.seconds * 1000) + (dur.nanos / 1000.0).round # `took` is in milliseconds
meta.metrics.took = (dur.seconds * 1000) + (dur.nanos / 1000.0).round # `took` is in milliseconds
meta.metrics.total_rows = proto_metrics.total_rows
meta.metrics.max_score = proto_metrics.max_score
meta.metrics.success_partition_count = proto_metrics.success_partition_count
Expand Down
20 changes: 20 additions & 0 deletions test/crud_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ def test_get_and_lock_protects_document_from_mutations
@collection.upsert(doc_id, document)
end

def test_unlock_document_not_locked
skip("#{name}: Server does not support the not_locked KV status") unless env.server_version.trinity?
skip("#{name}: The mock server does not support the not_locked KV status") if use_caves?

# TODO: Remove protostellar skip once it's added
skip("#{name}: The #{Couchbase::Protostellar::NAME} protocol does not support DocumentNotLocked yet") if env.protostellar?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document)

res = @collection.get_and_lock(doc_id, 20)
cas = res.cas
@collection.unlock(doc_id, cas)

assert_raises(Couchbase::Error::DocumentNotLocked) do
@collection.unlock(doc_id, cas)
end
end

def test_insert_fails_when_document_exists_already
doc_id = uniq_id(:foo)
document = {"value" => 42}
Expand Down
Loading