Skip to content

Commit

Permalink
[#25381] CDC: Virtual WAL changes to send LSN values depending on LSN…
Browse files Browse the repository at this point in the history
… type

Summary:
This diff adds the support to CDC service and virtual WAL to send records with LSN values depending on the LSN type. If LSN type is
* `SEQUENCE` then send the standard LSN values.
* `HYBRID_TIME` then send the commit time of each record as the LSN values.
Jira: DB-14611

Test Plan: Run existing tests.

Reviewers: siddharth.shah, sumukh.phalgaonkar, xCluster, hsunder, skumar, stiwary

Reviewed By: skumar

Subscribers: svc_phabricator, yql, ybase, ycdcxcluster

Differential Revision: https://phorge.dev.yugabyte.com/D38886
  • Loading branch information
vaibhav-yb committed Jan 15, 2025
1 parent da72841 commit ec49a06
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
17 changes: 16 additions & 1 deletion src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "yb/cdc/cdc_error.h"
#include "yb/cdc/cdc_producer.h"
#include "yb/cdc/cdc_service.pb.h"
#include "yb/cdc/cdc_util.h"
#include "yb/cdc/xcluster_rpc.h"
#include "yb/cdc/cdc_service.proxy.h"
Expand Down Expand Up @@ -4852,6 +4853,19 @@ void CDCServiceImpl::InitVirtualWALForCDC(
auto session_id = req->session_id();
auto stream_id = RPC_VERIFY_STRING_TO_STREAM_ID(req->stream_id());
std::shared_ptr<CDCSDKVirtualWAL> virtual_wal;

// Ensure that stream metadata is populated with the newly created stream_id and
// it is present in the tserver cache.
auto stream_metadata_result = GetStream(stream_id, RefreshStreamMapOption::kAlways);

RPC_CHECK_AND_RETURN_ERROR(
stream_metadata_result.ok(),
STATUS_FORMAT(NotFound, "Stream metadata not found for stream id $0", stream_id),
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);

auto lsn_type = stream_metadata_result->get()->GetReplicationSlotLsnType().value_or(
ReplicationSlotLsnType_SEQUENCE);

// Get an exclusive lock to prevent multiple threads from creating VirtualWAL instance for the
// same session_id.
{
Expand All @@ -4863,7 +4877,8 @@ void CDCServiceImpl::InitVirtualWALForCDC(
session_id),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

virtual_wal = std::make_shared<CDCSDKVirtualWAL>(this, stream_id, session_id);
virtual_wal = std::make_shared<CDCSDKVirtualWAL>(
this, stream_id, session_id, lsn_type);
session_virtual_wal_[session_id] = virtual_wal;
}

Expand Down
38 changes: 35 additions & 3 deletions src/yb/cdc/cdcsdk_virtual_wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ using RecordInfo = CDCSDKVirtualWAL::RecordInfo;
using TabletRecordInfoPair = CDCSDKVirtualWAL::TabletRecordInfoPair;

CDCSDKVirtualWAL::CDCSDKVirtualWAL(
CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id)
CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id,
ReplicationSlotLsnType lsn_type)
: cdc_service_(cdc_service),
stream_id_(stream_id),
vwal_session_id_(session_id),
log_prefix_(Format("VWAL [$0:$1]: ", stream_id_, vwal_session_id_)) {}
log_prefix_(Format("VWAL [$0:$1]: ", stream_id_, vwal_session_id_)),
slot_lsn_type_(lsn_type) {}

std::string CDCSDKVirtualWAL::LogPrefix() const {
return log_prefix_;
Expand Down Expand Up @@ -526,17 +528,24 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
metadata.min_lsn = std::min(metadata.min_lsn, row_message->pg_lsn());
metadata.max_lsn = std::max(metadata.max_lsn, row_message->pg_lsn());

auto& record_entry = metadata.txn_id_to_ct_records_map_[*txn_id_result];
switch (record->row_message().op()) {
case RowMessage_Op_INSERT: {
metadata.insert_records++;
record_entry.second += 1;
record_entry.first = *lsn_result;
break;
}
case RowMessage_Op_UPDATE: {
metadata.update_records++;
record_entry.second += 1;
record_entry.first = *lsn_result;
break;
}
case RowMessage_Op_DELETE: {
metadata.delete_records++;
record_entry.second += 1;
record_entry.first = *lsn_result;
break;
}
case RowMessage_Op_BEGIN: {
Expand Down Expand Up @@ -570,6 +579,8 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
break;
}

VLOG_WITH_PREFIX(4) << "shipping record: " << record->ShortDebugString();

auto records = resp->add_cdc_sdk_proto_records();
records->CopyFrom(*record);
}
Expand Down Expand Up @@ -618,6 +629,15 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
<< ", VWAL lag: " << (metadata.commit_records > 0 ? Format("$0 ms", vwal_lag_in_ms) : "-1")
<< ", Number of unacked txns in VWAL: " << unacked_txn;

if (metadata.txn_ids.size() > 0) {
oss << ", Records per txn details:";

for (const auto& entry : metadata.txn_id_to_ct_records_map_) {
oss << ", {txn_id, ct, dml}: {" << entry.first << ", " << entry.second.first << ", "
<< entry.second.second << "}";
}
}

YB_CDC_LOG_WITH_PREFIX_EVERY_N_SECS_OR_VLOG(oss, 300, 1);
}

Expand Down Expand Up @@ -884,7 +904,19 @@ Result<uint64_t> CDCSDKVirtualWAL::GetRecordLSN(
// duplicate records like BEGIN/COMMIT that can be received in case of multi-shard transaction or
// multiple transactions with same commit_time.
if (curr_unique_record_id->GreaterThanDistributedLSN(last_seen_unique_record_id_)) {
last_seen_lsn_ += 1;
switch (slot_lsn_type_) {
case ReplicationSlotLsnType_SEQUENCE:
last_seen_lsn_ += 1;
break;
case ReplicationSlotLsnType_HYBRID_TIME:
last_seen_lsn_ = curr_unique_record_id->GetCommitTime();
break;
default:
return STATUS_FORMAT(
IllegalState,
Format("Invalid LSN type specified $0 for stream $1", slot_lsn_type_, stream_id_));
}

return last_seen_lsn_;
}

Expand Down
7 changes: 6 additions & 1 deletion src/yb/cdc/cdcsdk_virtual_wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace cdc {
class CDCSDKVirtualWAL {
public:
explicit CDCSDKVirtualWAL(
CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id);
CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id,
ReplicationSlotLsnType lsn_type);

using RecordInfo =
std::pair<std::shared_ptr<CDCSDKUniqueRecordID>, std::shared_ptr<CDCSDKProtoRecordPB>>;
Expand Down Expand Up @@ -105,6 +106,7 @@ class CDCSDKVirtualWAL {
uint64_t max_lsn = 0;
bool is_last_txn_fully_sent = false;
bool contains_publication_refresh_record = false;
std::map<uint32_t, std::pair<uint64_t, int>> txn_id_to_ct_records_map_;
};

using TabletRecordPriorityQueue = std::priority_queue<
Expand Down Expand Up @@ -280,6 +282,9 @@ class CDCSDKVirtualWAL {

// ordered map in increasing order of LSN of commit record.
std::map<uint64_t, CommitMetadataAndLastSentRequest> commit_meta_and_last_req_map_;

// Store the LSN type the replication slot is created with.
ReplicationSlotLsnType slot_lsn_type_;
};

} // namespace cdc
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
#include "yb/util/tsan_util.h"

#include "yb/yql/cql/ql/ptree/pt_option.h"
#include "yb/yql/pggate/ybc_pg_typedefs.h"

using namespace std::literals;

Expand Down

0 comments on commit ec49a06

Please sign in to comment.