Skip to content

Commit

Permalink
[#21597, #21598] xCluster: Return proper errors in some failure cases
Browse files Browse the repository at this point in the history
Summary:
#21597: Table creates converts all NotFound status as Database not found errors. We now wrap bootstrap errors from source around a InvalidArgument status.
#21598: XClusterClient was not surfacing replication errors. This has been fixed.

Fixes #21597
Fixes #21598
Jira: DB-10486, DB-10487

Test Plan: XClusterDBScopedTest

Reviewers: jhe, slingam, xCluster

Reviewed By: slingam

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D33398
  • Loading branch information
hari90 committed Mar 21, 2024
1 parent 88629db commit cd12fe3
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 24 deletions.
8 changes: 6 additions & 2 deletions src/yb/client/xcluster_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status XClusterClient::CreateXClusterReplication(
return Status::OK();
}

Result<bool> XClusterClient::IsCreateXClusterReplicationDone(
Result<IsOperationDoneResult> XClusterClient::IsCreateXClusterReplicationDone(
const xcluster::ReplicationGroupId& replication_group_id,
const std::string& target_master_addresses) {
SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty");
Expand All @@ -83,7 +83,11 @@ Result<bool> XClusterClient::IsCreateXClusterReplicationDone(
return StatusFromPB(resp.error().status());
}

return resp.done();
if (resp.has_replication_error()) {
return IsOperationDoneResult::Done(StatusFromPB(resp.replication_error()));
}

return IsOperationDoneResult::Done();
}

Result<std::vector<NamespaceId>> XClusterClient::XClusterCreateOutboundReplicationGroup(
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/xcluster_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class XClusterClient {
const xcluster::ReplicationGroupId& replication_group_id,
const std::string& target_master_addresses);

virtual Result<bool> IsCreateXClusterReplicationDone(
virtual Result<IsOperationDoneResult> IsCreateXClusterReplicationDone(
const xcluster::ReplicationGroupId& replication_group_id,
const std::string& target_master_addresses);

Expand Down
9 changes: 7 additions & 2 deletions src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ TEST_F(XClusterDBScopedTest, TestCreateWithCheckpoint) {
ASSERT_OK(SetUpClusters());

ASSERT_OK(CheckpointReplicationGroup());

ASSERT_NOK(CreateReplicationFromCheckpoint("bad-master-addr"));

ASSERT_OK(CreateReplicationFromCheckpoint());

// Verify that universe was setup on consumer.
Expand All @@ -80,8 +83,10 @@ TEST_F(XClusterDBScopedTest, CreateTable) {
ASSERT_OK(CreateReplicationFromCheckpoint());

// Creating a new table on target first should fail.
ASSERT_NOK(CreateYsqlTable(
/*idx=*/1, /*num_tablets=*/3, &consumer_cluster_));
auto status = CreateYsqlTable(
/*idx=*/1, /*num_tablets=*/3, &consumer_cluster_);
ASSERT_NOK(status);
ASSERT_STR_CONTAINS(status.ToString(), "Table public.test_table_1 not found");

auto new_producer_table_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/1, /*num_tablets=*/3, &producer_cluster_));
Expand Down
26 changes: 18 additions & 8 deletions src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/util/backoff_waiter.h"
#include "yb/util/is_operation_done_result.h"
#include "yb/util/thread.h"
#include "yb/yql/pgwrapper/libpq_utils.h"

Expand Down Expand Up @@ -823,28 +824,37 @@ Status XClusterYsqlTestBase::CheckpointReplicationGroup() {
return Status::OK();
}

Status XClusterYsqlTestBase::WaitForCreateReplicationToFinish() {
Status XClusterYsqlTestBase::WaitForCreateReplicationToFinish(
const std::string& target_master_addresses) {
RETURN_NOT_OK(LoggedWaitFor(
[this]() {
return client::XClusterClient(*producer_client())
.IsCreateXClusterReplicationDone(
kReplicationGroupId, consumer_cluster()->GetMasterAddresses());
[this, &target_master_addresses]() -> Result<bool> {
auto result = VERIFY_RESULT(
client::XClusterClient(*producer_client())
.IsCreateXClusterReplicationDone(kReplicationGroupId, target_master_addresses));
if (!result.status().ok()) {
return result.status();
}
return result.done();
},
MonoDelta::FromSeconds(kRpcTimeout), __func__));

// Wait for the xcluster safe time to propagate to the tserver nodes.
return WaitForSafeTimeToAdvanceToNow();
}

Status XClusterYsqlTestBase::CreateReplicationFromCheckpoint() {
Status XClusterYsqlTestBase::CreateReplicationFromCheckpoint(
const std::string& target_master_addresses) {
RETURN_NOT_OK(SetupCertificates(kReplicationGroupId));

auto master_addr = consumer_cluster()->GetMasterAddresses();
auto master_addr = target_master_addresses;
if (master_addr.empty()) {
master_addr = consumer_cluster()->GetMasterAddresses();
}

RETURN_NOT_OK(client::XClusterClient(*producer_client())
.CreateXClusterReplication(kReplicationGroupId, master_addr));

return WaitForCreateReplicationToFinish();
return WaitForCreateReplicationToFinish(master_addr);
}

} // namespace yb
4 changes: 2 additions & 2 deletions src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class XClusterYsqlTestBase : public XClusterTestBase {
bool delete_op = false, bool use_transaction = false);

Status CheckpointReplicationGroup();
Status CreateReplicationFromCheckpoint();
Status WaitForCreateReplicationToFinish();
Status CreateReplicationFromCheckpoint(const std::string& target_master_addresses = {});
Status WaitForCreateReplicationToFinish(const std::string& target_master_addresses);

protected:
void TestReplicationWithSchemaChanges(TableId producer_table_id, bool bootstrap);
Expand Down
9 changes: 6 additions & 3 deletions src/yb/master/xcluster/add_table_to_xcluster_target_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ Status AddTableToXClusterTargetTask::AddTableToReplicationGroup(
client::BootstrapProducerResult bootstrap_result) {
const auto& replication_group_id = universe_->ReplicationGroupId();

auto [producer_table_ids, bootstrap_ids, bootstrap_time] = VERIFY_RESULT_PREPEND(
std::move(bootstrap_result),
Format("Failed to bootstrap table for xCluster replication group $0", replication_group_id));
SCHECK_EC_FORMAT(
bootstrap_result, InvalidArgument, MasterError(MasterErrorPB::INVALID_REQUEST),
"Failed to bootstrap table on the source universe of xCluster replication group $0: $1",
replication_group_id, bootstrap_result.status().ToString());

auto& [producer_table_ids, bootstrap_ids, bootstrap_time] = *bootstrap_result;

CHECK_EQ(producer_table_ids.size(), 1);
CHECK_EQ(bootstrap_ids.size(), 1);
Expand Down
13 changes: 7 additions & 6 deletions src/yb/tools/yb-admin_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
#include "yb/master/master_test.proxy.h"
#include "yb/master/master_types.pb.h"
#include "yb/master/master_util.h"
#include "yb/master/sys_catalog.h"
#include "yb/master/sys_catalog_constants.h"

#include "yb/rpc/messenger.h"
#include "yb/rpc/proxy.h"
Expand All @@ -92,16 +92,16 @@

#include "yb/encryption/encryption_util.h"

#include "yb/util/debug-util.h"
#include "yb/util/format.h"
#include "yb/util/is_operation_done_result.h"
#include "yb/util/net/net_util.h"
#include "yb/util/pb_util.h"
#include "yb/util/protobuf_util.h"
#include "yb/util/random_util.h"
#include "yb/util/status_format.h"
#include "yb/util/stol_utils.h"
#include "yb/util/string_case.h"
#include "yb/util/string_util.h"
#include "yb/util/flags.h"
#include "yb/util/tostring.h"

DEFINE_NON_RUNTIME_bool(wait_if_no_leader_master, false,
Expand Down Expand Up @@ -4578,9 +4578,10 @@ Status ClusterAdminClient::WaitForCreateXClusterReplication(
SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty");

for (;;) {
if (VERIFY_RESULT(XClusterClient().IsCreateXClusterReplicationDone(
replication_group_id, target_master_addresses))) {
return Status::OK();
auto result = XClusterClient().IsCreateXClusterReplicationDone(
replication_group_id, target_master_addresses);
if (result && result->done()) {
return result->status();
}

std::this_thread::sleep_for(100ms);
Expand Down

0 comments on commit cd12fe3

Please sign in to comment.