Skip to content

Commit

Permalink
[BACKPORT 2.16.2][#16147] CDCSDK: Correctly set cdc_sdk_min_checkpoin…
Browse files Browse the repository at this point in the history
…t_op_id to -1.-1 in upgrade scenarios

Summary:
Orginal commit: 8950c7b / D23283
If a cluster is upgraded from any version lesser than 2.14 to 2.14 or greater there is an issue with the initialisation of CDCSDK checkpoint per tablet.
The value is initialised from the "cdc_sdk_min_checkpoint_op_id" field of raft superblock.
In cases for tablets created before the upgrade, after the update this value defaults to OpId() (term: 0, index: 0).
This has side-effects since if this checkpoint has any other value than -1.-1 we assume CDC has been enabled for the tablet.
Through a previous fix(https://phabricator.dev.yugabyte.com/D22998),
if the supeblock does not have "cdc_sdk_min_checkpoint_op_id" set, we will manually set it to -1.-1.

But another scenario was missed. To give an example, If the cluster already in a version where the "cdc_sdk_min_checkpoint_op_id" was added (2.14 >),
but below a version with the fix (2.14.7), the cluster would retain "cdc_sdk_min_checkpoint_op_id" as 0.0,
again causing unnecessary intent retention after restarts.

We now have introduced a new boolean field to the raft superblock: 'is_under_cdc_sdk_replication'.
This new field will be set to true, if we ever set the 'cdc_sdk_min_checkpoint_op_id' to a valid OpId as part of CDC's logic.
During tablet bootstrap, If this field is false, and we see the checkpoint is 0.0, we will infer that this was a upgrade scenario as described before and reset
"cdc_sdk_min_checkpoint_op_id" to -1.-1.

There however reamins one small window where the checkpoint could be legitimately set to 0.0, but the 'is_under_cdc_sdk_replication' field would be unset.
This is if the cluster was in a version supporting CDC, but in a version without this fix, and there was an ongoing CDC snapshot process.
The first step of the snapshot process sets the checkpoint to 0.0, and then an immediate RPC set's it to something higher (based on GetLastReplicatedData).
If between these calls, there is a rolling upgrade, according to the fix, we will reset 'cdc_sdk_min_checkpoint_op_id' to -1.-1 which would be wrong.
We will document that rolling upgrades should not be run during the CDC snapshot process.

Also, we have now added a ctest: TestCheckPointWithNoCDCStream, which asserts that 'cdc_sdk_min_checkpoint_op_id' is set to -1.-1 in cases without CDC.

Test Plan:
Jenkins: urgent

ybd --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCheckPointWithNoCDCStream
ybd --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestIsUnderCDCSDKReplicationField

Reviewers: skumar

Reviewed By: skumar

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D23325
  • Loading branch information
Adithya Bharadwaj committed Mar 4, 2023
1 parent b101372 commit c84fd01
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 8 deletions.
97 changes: 97 additions & 0 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10716,6 +10716,103 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestLeadershipChangeAndSnapshotAf
ASSERT_GT(checkpoint_after_leadership_change, checkpoint_after_snapshot);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointWithNoCDCStream)) {
ASSERT_OK(SetUpWithParams(3, 1, false));

const uint32_t num_tablets = 1;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

std::string table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

// Assert the cdc_sdk_min_checkpoint_op_id is -1.-1.
for (size_t i = 0; i < test_cluster()->num_tablet_servers(); ++i) {
for (const auto& peer : test_cluster()->GetTabletPeers(i)) {
if (peer->tablet_id() == tablets[0].tablet_id()) {
// What ever checkpoint persisted in the RAFT logs should be same as what ever in memory
// transaction participant tablet peer.
ASSERT_EQ(peer->cdc_sdk_min_checkpoint_op_id(), OpId::Invalid());
ASSERT_EQ(
peer->cdc_sdk_min_checkpoint_op_id(),
peer->tablet()->transaction_participant()->GetRetainOpId());
}
}
}

// Restart all nodes.
SleepFor(MonoDelta::FromSeconds(1));
test_cluster()->mini_tablet_server(1)->Shutdown();
ASSERT_OK(test_cluster()->mini_tablet_server(1)->Start());
ASSERT_OK(test_cluster()->mini_tablet_server(1)->WaitStarted());

// Re-Assert the cdc_sdk_min_checkpoint_op_id is -1.-1, even after restart
for (size_t i = 0; i < test_cluster()->num_tablet_servers(); ++i) {
for (const auto& peer : test_cluster()->GetTabletPeers(i)) {
if (peer->tablet_id() == tablets[0].tablet_id()) {
// What ever checkpoint persisted in the RAFT logs should be same as what ever in memory
// transaction participant tablet peer.
ASSERT_EQ(peer->cdc_sdk_min_checkpoint_op_id(), OpId::Invalid());
ASSERT_EQ(
peer->cdc_sdk_min_checkpoint_op_id(),
peer->tablet()->transaction_participant()->GetRetainOpId());
}
}
}
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestIsUnderCDCSDKReplicationField)) {
FLAGS_update_min_cdc_indices_interval_secs = 1;
FLAGS_update_metrics_interval_ms = 1;
ASSERT_OK(SetUpWithParams(3, 1, false));

const uint32_t num_tablets = 1;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT));

EnableCDCServiceInAllTserver(3);
auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets));
ASSERT_FALSE(resp.has_error());

auto check_is_under_cdc_sdk_replication = [&](bool expected_value) {
for (size_t i = 0; i < test_cluster()->num_tablet_servers(); ++i) {
for (const auto& peer : test_cluster()->GetTabletPeers(i)) {
if (peer->tablet_id() == tablets[0].tablet_id()) {
// Check value of 'is_under_cdc_sdk_replication' in all tablet peers.
ASSERT_EQ(peer->is_under_cdc_sdk_replication(), expected_value);
}
}
}
};

// Assert that 'is_under_cdc_sdk_replication' remains true even after restart.
check_is_under_cdc_sdk_replication(true);

// Restart all the nodes.
SleepFor(MonoDelta::FromSeconds(1));
for (size_t i = 0; i < test_cluster()->num_tablet_servers(); ++i) {
test_cluster()->mini_tablet_server(i)->Shutdown();
ASSERT_OK(test_cluster()->mini_tablet_server(i)->Start());
}
LOG(INFO) << "All nodes restarted";
EnableCDCServiceInAllTserver(3);

check_is_under_cdc_sdk_replication(true);

ASSERT_EQ(DeleteCDCStream(stream_id), true);
VerifyStreamDeletedFromCdcState(test_client(), stream_id, tablets.Get(0).tablet_id());
VerifyTransactionParticipant(tablets.Get(0).tablet_id(), OpId::Max());

// Assert that after deleting the stream, 'is_under_cdc_sdk_replication' will be set to 'false'.
check_is_under_cdc_sdk_replication(false);
}

} // namespace enterprise
} // namespace cdc
} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/tablet/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ message RaftGroupReplicaSuperBlockPB {
optional OpIdPB cdc_sdk_min_checkpoint_op_id = 33;

optional fixed64 cdc_sdk_safe_time = 34;

// Is this tablet currently a being replicated with cdc.
optional bool is_under_cdc_sdk_replication = 36;
}

message FilePB {
Expand Down
38 changes: 30 additions & 8 deletions src/yb/tablet/tablet_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,16 +783,24 @@ Status RaftGroupMetadata::LoadFromSuperBlock(const RaftGroupReplicaSuperBlockPB&
}
cdc_min_replicated_index_ = superblock.cdc_min_replicated_index();

{
if (superblock.has_cdc_sdk_min_checkpoint_op_id()) {
cdc_sdk_min_checkpoint_op_id_ = OpId::FromPB(superblock.cdc_sdk_min_checkpoint_op_id());
} else {
// If a cluster is upgraded from any version lesser than 2.14,
// 'cdc_sdk_min_checkpoint_op_id' would be absent from the superblock, and we need to set
// 'cdc_sdk_min_checkpoint_op_id_' to OpId::Invalid() as this indicates that there are no
// active CDC streams on this tablet.
if (superblock.has_cdc_sdk_min_checkpoint_op_id()) {
auto cdc_sdk_checkpoint = OpId::FromPB(superblock.cdc_sdk_min_checkpoint_op_id());
if (cdc_sdk_checkpoint == OpId() && (!superblock.has_is_under_cdc_sdk_replication() ||
!superblock.is_under_cdc_sdk_replication())) {
// This indiactes that 'cdc_sdk_min_checkpoint_op_id' has been set to 0.0 during a prior
// upgrade even without CDC running. Hence we reset it to -1.-1.
LOG_WITH_PREFIX(WARNING) << "Setting cdc_sdk_min_checkpoint_op_id_ to OpId::Invalid(), "
"since 'is_under_cdc_sdk_replication' is not set";
cdc_sdk_min_checkpoint_op_id_ = OpId::Invalid();
} else {
cdc_sdk_min_checkpoint_op_id_ = cdc_sdk_checkpoint;
is_under_cdc_sdk_replication_ = superblock.is_under_cdc_sdk_replication();
}
} else {
// If a cluster is upgraded from any version lesser than 2.14, 'cdc_sdk_min_checkpoint_op_id'
// would be absent from the superblock, and we need to set 'cdc_sdk_min_checkpoint_op_id_' to
// OpId::Invalid() as this indicates that there are no active CDC streams on this tablet.
cdc_sdk_min_checkpoint_op_id_ = OpId::Invalid();
}

cdc_sdk_safe_time_ = HybridTime::FromPB(superblock.cdc_sdk_safe_time());
Expand Down Expand Up @@ -936,6 +944,7 @@ void RaftGroupMetadata::ToSuperBlockUnlocked(RaftGroupReplicaSuperBlockPB* super
if (restoration_hybrid_time_) {
pb.set_restoration_hybrid_time(restoration_hybrid_time_.ToUint64());
}
pb.set_is_under_cdc_sdk_replication(is_under_cdc_sdk_replication_);

if (!split_op_id_.empty()) {
split_op_id_.ToPB(pb.mutable_split_op_id());
Expand Down Expand Up @@ -1187,10 +1196,23 @@ HybridTime RaftGroupMetadata::cdc_sdk_safe_time() const {
return cdc_sdk_safe_time_;
}

bool RaftGroupMetadata::is_under_cdc_sdk_replication() const {
std::lock_guard<MutexType> lock(data_mutex_);
return is_under_cdc_sdk_replication_;
}

Status RaftGroupMetadata::set_cdc_sdk_min_checkpoint_op_id(const OpId& cdc_min_checkpoint_op_id) {
{
std::lock_guard<MutexType> lock(data_mutex_);
cdc_sdk_min_checkpoint_op_id_ = cdc_min_checkpoint_op_id;

if (cdc_min_checkpoint_op_id == OpId::Max() || cdc_min_checkpoint_op_id == OpId::Invalid()) {
// This means we no longer have an active CDC stream for the tablet.
is_under_cdc_sdk_replication_ = false;
} else if (cdc_min_checkpoint_op_id.valid()) {
// Any OpId less than OpId::Max() indicates we are actively streaming from this tablet.
is_under_cdc_sdk_replication_ = true;
}
}
return Flush();
}
Expand Down
4 changes: 4 additions & 0 deletions src/yb/tablet/tablet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ class RaftGroupMetadata : public RefCountedThreadSafe<RaftGroupMetadata>,

Status SetIsUnderTwodcReplicationAndFlush(bool is_under_twodc_replication);

bool is_under_cdc_sdk_replication() const;

bool is_under_twodc_replication() const;

bool has_been_fully_compacted() const {
Expand Down Expand Up @@ -652,6 +654,8 @@ class RaftGroupMetadata : public RefCountedThreadSafe<RaftGroupMetadata>,

bool is_under_twodc_replication_ GUARDED_BY(data_mutex_) = false;

bool is_under_cdc_sdk_replication_ GUARDED_BY(data_mutex_) = false;

bool hidden_ GUARDED_BY(data_mutex_) = false;

HybridTime restoration_hybrid_time_ GUARDED_BY(data_mutex_) = HybridTime::kMin;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,10 @@ CoarseTimePoint TabletPeer::cdc_sdk_min_checkpoint_op_id_expiration() {
return CoarseTimePoint();
}

bool TabletPeer::is_under_cdc_sdk_replication() {
return meta_->is_under_cdc_sdk_replication();
}

OpId TabletPeer::GetLatestCheckPoint() {
auto tablet = shared_tablet();
if (tablet) {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ class TabletPeer : public std::enable_shared_from_this<TabletPeer>,

CoarseTimePoint cdc_sdk_min_checkpoint_op_id_expiration();

bool is_under_cdc_sdk_replication();

Status SetCDCSDKRetainOpIdAndTime(
const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration,
const HybridTime& cdc_sdk_safe_time = HybridTime::kInvalid);
Expand Down

0 comments on commit c84fd01

Please sign in to comment.