Skip to content

Commit

Permalink
[#16971, #20206] xCluster: Handle dropped table in DB Scoped replication
Browse files Browse the repository at this point in the history
Summary:
When a YSQL table is dropped it will now properly clean it up from xCluster.
On the target cluster when the YSQL table is dropped, it will be removed from all the` UniverseReplicationGroups`. As part of this step we delete the stream on the source cluster.
On the source cluster when the YSQL table is dropped, it is hidden and kept until all its xCluster streams are dropped. This ensures data gets replicated to target clusters that are lagging behind. When the target table is dropped it will delete the source stream which will then delete the source tablets, and table. Also, the source streams belonging to dropped tables are automatically deleted after `FLAGS_cdc_wal_retention_time_secs`. This will cover the case when the two clusters are network partitioned, or the target cluster is down.
When the sourcer cluster xCluster stream is deleted as part of its cleanup we now delete it from the xCluster Outbound ReplicationGroups.

Drop of YCQL tables under xCluster replication are blocked.
Drop of YSQL dbs with tables under xCluster replication are blocked.

Moved all code related to removing tables from xCluster out of catalog manager. `RemoveTablesFromReplication` has been renamed to `RemoveTablesFromReplicationGroup` and moved to `xcluster_replication_group.cc`. `DeleteXReplStatesForIndexTables` has been moved to `XClusterTargetManager::RemoveDroppedTablesOnConsumer`.
Added a Background task in `XClusterTargetManager` to `RemoveDroppedTablesFromReplication`

Fixes #16971
Fixes #20206

Jira: DB-6295, DB-9160

Test Plan:
XClusterYsqlTest, DropTableOnConsumerThenProducer
XClusterYsqlTest, ConsumerMasterRestartAfterTableDrop
XClusterYsqlTest, DropTableOnProducerThenConsumer
XClusterYsqlTest, ProducerMasterRestartAfterTableDrop
XClusterYsqlTest, DropTableWithWorkload
XClusterYsqlTest, DropTableOnProducerOnly
XClusterDBScopedTest, DropTableOnProducerThenConsumer

Reviewers: jhe, slingam, xCluster

Reviewed By: slingam

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D33051
  • Loading branch information
hari90 committed Mar 22, 2024
1 parent a586d7b commit e5655b5
Show file tree
Hide file tree
Showing 27 changed files with 1,141 additions and 570 deletions.
43 changes: 0 additions & 43 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,49 +1179,6 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMast
ASSERT_EQ(expected_tablet_ids, tablets_found);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStreamDelete)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(1, 1, false));
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version = */ nullptr));

TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

RpcController rpc;
CreateCDCStreamRequestPB create_req;
CreateCDCStreamResponsePB create_resp;

create_req.set_table_id(table_id);
create_req.set_source_type(XCLUSTER);
ASSERT_OK(cdc_proxy_->CreateCDCStream(create_req, &create_resp, &rpc));
// Drop table on YSQL tables deletes associated xCluster streams.
DropTable(&test_cluster_, kTableName);

// Wait for bg thread to cleanup entries from cdc_state.
CDCStateTable cdc_state_table(test_client());
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
Status s;
for (auto row_result : VERIFY_RESULT(cdc_state_table.GetTableRange(
CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) {
RETURN_NOT_OK(row_result);
auto& row = *row_result;
if (row.key.stream_id.ToString() == create_resp.stream_id()) {
return false;
}
}
RETURN_NOT_OK(s);
return true;
},
MonoDelta::FromSeconds(60), "Waiting for stream metadata cleanup."));

// This should fail now as the stream is deleted.
ASSERT_EQ(
DeleteCDCStream(ASSERT_RESULT(xrepl::StreamId::FromString(create_resp.stream_id()))), false);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRestart)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_update_local_peer_min_index) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
Expand Down
53 changes: 31 additions & 22 deletions src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
//

#include "yb/client/table.h"
#include "yb/client/xcluster_client.h"
#include "yb/client/yb_table_name.h"
#include "yb/integration-tests/xcluster/xcluster_ysql_test_base.h"
#include "yb/master/master_replication.proxy.h"
#include "yb/master/mini_master.h"
#include "yb/util/backoff_waiter.h"

DECLARE_bool(TEST_enable_xcluster_api_v2);
DECLARE_string(certs_for_cdc_dir);

using namespace std::chrono_literals;

Expand All @@ -28,32 +27,13 @@ const MonoDelta kTimeout = 60s * kTimeMultiplier;

class XClusterDBScopedTest : public XClusterYsqlTestBase {
public:
struct SetupParams {
std::vector<uint32_t> num_consumer_tablets = {3};
std::vector<uint32_t> num_producer_tablets = {3};
uint32_t replication_factor = 3;
uint32_t num_masters = 1;
bool ranged_partitioned = false;
};

XClusterDBScopedTest() = default;
~XClusterDBScopedTest() = default;

virtual void SetUp() override {
XClusterYsqlTestBase::SetUp();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_enable_xcluster_api_v2) = true;
}

Status SetUpClusters() {
static const SetupParams kDefaultParams;
return SetUpClusters(kDefaultParams);
}

Status SetUpClusters(const SetupParams& params) {
return XClusterYsqlTestBase::SetUpWithParams(
params.num_consumer_tablets, params.num_producer_tablets, params.replication_factor,
params.num_masters, params.ranged_partitioned);
}
};

TEST_F(XClusterDBScopedTest, TestCreateWithCheckpoint) {
Expand Down Expand Up @@ -116,4 +96,33 @@ TEST_F(XClusterDBScopedTest, CreateTable) {
ASSERT_OK(VerifyWrittenRecords(new_producer_table, new_consumer_table));
}

TEST_F(XClusterDBScopedTest, DropTableOnProducerThenConsumer) {
// Setup replication with two tables
SetupParams params;
params.num_consumer_tablets = params.num_producer_tablets = {3, 3};
ASSERT_OK(SetUpClusters(params));

ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

// Perform the drop on producer cluster.
ASSERT_OK(DropYsqlTable(producer_cluster_, *producer_table_));

// Perform the drop on consumer cluster.
ASSERT_OK(DropYsqlTable(consumer_cluster_, *consumer_table_));

auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client()));
std::promise<Result<master::GetXClusterStreamsResponsePB>> promise;
client::XClusterClient remote_client(*producer_client());
auto outbound_table_info = remote_client.GetXClusterStreams(
CoarseMonoClock::Now() + kTimeout, kReplicationGroupId, namespace_id,
{producer_table_->name().table_name()}, {producer_table_->name().pgschema_name()},
[&promise](Result<master::GetXClusterStreamsResponsePB> result) {
promise.set_value(std::move(result));
});
auto result = promise.get_future().get();
ASSERT_NOK(result) << result->DebugString();
ASSERT_STR_CONTAINS(result.status().ToString(), "test_table_0 not found in namespace");
}

} // namespace yb
21 changes: 21 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ Status XClusterTestBase::CreateDatabase(
"CREATE DATABASE $0$1", namespace_name, colocated ? " colocated = true" : "");
}

Status XClusterTestBase::DropDatabase(Cluster& cluster, const std::string& namespace_name) {
auto conn = VERIFY_RESULT(cluster.Connect());
return conn.ExecuteFormat("DROP DATABASE $0", namespace_name);
}

Result<YBTableName> XClusterTestBase::CreateTable(
YBClient* client, const std::string& namespace_name, const std::string& table_name,
uint32_t num_tablets, const client::YBSchema* schema) {
Expand Down Expand Up @@ -497,6 +502,22 @@ Status XClusterTestBase::ChangeXClusterRole(const cdc::XClusterRole role, Cluste
return Status::OK();
}

Result<master::GetUniverseReplicationResponsePB> XClusterTestBase::GetUniverseReplicationInfo(
Cluster& cluster, const xcluster::ReplicationGroupId& replication_group_id) {
master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
req.set_replication_group_id(replication_group_id.ToString());

auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&cluster.client_->proxy_cache(),
VERIFY_RESULT(cluster.mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

RETURN_NOT_OK(master_proxy->GetUniverseReplication(req, &resp, &rpc));
return resp;
}

Status XClusterTestBase::VerifyUniverseReplicationDeleted(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, int timeout) {
Expand Down
5 changes: 5 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class XClusterTestBase : public YBTest {

Status CreateDatabase(
Cluster* cluster, const std::string& namespace_name, bool colocated = false);
Status DropDatabase(Cluster& cluster, const std::string& namespace_name);

static Result<client::YBTableName> CreateTable(
YBClient* client, const std::string& namespace_name, const std::string& table_name,
Expand Down Expand Up @@ -216,6 +217,10 @@ class XClusterTestBase : public YBTest {
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled);

Result<master::GetUniverseReplicationResponsePB> GetUniverseReplicationInfo(
Cluster& cluster,
const xcluster::ReplicationGroupId& replication_group_id = kReplicationGroupId);

Status VerifyUniverseReplicationDeleted(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, int timeout);
Expand Down
Loading

0 comments on commit e5655b5

Please sign in to comment.