From ec49a06b9626612f6f7f90f37bb2834a897b218c Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 15 Jan 2025 11:00:30 +0530 Subject: [PATCH] [#25381] CDC: Virtual WAL changes to send LSN values depending on LSN type Summary: This diff adds the support to CDC service and virtual WAL to send records with LSN values depending on the LSN type. If LSN type is * `SEQUENCE` then send the standard LSN values. * `HYBRID_TIME` then send the commit time of each record as the LSN values. Jira: DB-14611 Test Plan: Run existing tests. Reviewers: siddharth.shah, sumukh.phalgaonkar, xCluster, hsunder, skumar, stiwary Reviewed By: skumar Subscribers: svc_phabricator, yql, ybase, ycdcxcluster Differential Revision: https://phorge.dev.yugabyte.com/D38886 --- src/yb/cdc/cdc_service.cc | 17 +++++++++++++- src/yb/cdc/cdcsdk_virtual_wal.cc | 38 +++++++++++++++++++++++++++++--- src/yb/cdc/cdcsdk_virtual_wal.h | 7 +++++- src/yb/client/client.cc | 1 + 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 8547c1ca929a..f76578c9d50c 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -25,6 +25,7 @@ #include "yb/cdc/cdc_error.h" #include "yb/cdc/cdc_producer.h" +#include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_util.h" #include "yb/cdc/xcluster_rpc.h" #include "yb/cdc/cdc_service.proxy.h" @@ -4852,6 +4853,19 @@ void CDCServiceImpl::InitVirtualWALForCDC( auto session_id = req->session_id(); auto stream_id = RPC_VERIFY_STRING_TO_STREAM_ID(req->stream_id()); std::shared_ptr virtual_wal; + + // Ensure that stream metadata is populated with the newly created stream_id and + // it is present in the tserver cache. + auto stream_metadata_result = GetStream(stream_id, RefreshStreamMapOption::kAlways); + + RPC_CHECK_AND_RETURN_ERROR( + stream_metadata_result.ok(), + STATUS_FORMAT(NotFound, "Stream metadata not found for stream id $0", stream_id), + resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); + + auto lsn_type = stream_metadata_result->get()->GetReplicationSlotLsnType().value_or( + ReplicationSlotLsnType_SEQUENCE); + // Get an exclusive lock to prevent multiple threads from creating VirtualWAL instance for the // same session_id. { @@ -4863,7 +4877,8 @@ void CDCServiceImpl::InitVirtualWALForCDC( session_id), resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - virtual_wal = std::make_shared(this, stream_id, session_id); + virtual_wal = std::make_shared( + this, stream_id, session_id, lsn_type); session_virtual_wal_[session_id] = virtual_wal; } diff --git a/src/yb/cdc/cdcsdk_virtual_wal.cc b/src/yb/cdc/cdcsdk_virtual_wal.cc index 26ea935faf2e..a5c543c8c2b3 100644 --- a/src/yb/cdc/cdcsdk_virtual_wal.cc +++ b/src/yb/cdc/cdcsdk_virtual_wal.cc @@ -79,11 +79,13 @@ using RecordInfo = CDCSDKVirtualWAL::RecordInfo; using TabletRecordInfoPair = CDCSDKVirtualWAL::TabletRecordInfoPair; CDCSDKVirtualWAL::CDCSDKVirtualWAL( - CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id) + CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id, + ReplicationSlotLsnType lsn_type) : cdc_service_(cdc_service), stream_id_(stream_id), vwal_session_id_(session_id), - log_prefix_(Format("VWAL [$0:$1]: ", stream_id_, vwal_session_id_)) {} + log_prefix_(Format("VWAL [$0:$1]: ", stream_id_, vwal_session_id_)), + slot_lsn_type_(lsn_type) {} std::string CDCSDKVirtualWAL::LogPrefix() const { return log_prefix_; @@ -526,17 +528,24 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal( metadata.min_lsn = std::min(metadata.min_lsn, row_message->pg_lsn()); metadata.max_lsn = std::max(metadata.max_lsn, row_message->pg_lsn()); + auto& record_entry = metadata.txn_id_to_ct_records_map_[*txn_id_result]; switch (record->row_message().op()) { case RowMessage_Op_INSERT: { metadata.insert_records++; + record_entry.second += 1; + record_entry.first = *lsn_result; break; } case RowMessage_Op_UPDATE: { metadata.update_records++; + record_entry.second += 1; + record_entry.first = *lsn_result; break; } case RowMessage_Op_DELETE: { metadata.delete_records++; + record_entry.second += 1; + record_entry.first = *lsn_result; break; } case RowMessage_Op_BEGIN: { @@ -570,6 +579,8 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal( break; } + VLOG_WITH_PREFIX(4) << "shipping record: " << record->ShortDebugString(); + auto records = resp->add_cdc_sdk_proto_records(); records->CopyFrom(*record); } @@ -618,6 +629,15 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal( << ", VWAL lag: " << (metadata.commit_records > 0 ? Format("$0 ms", vwal_lag_in_ms) : "-1") << ", Number of unacked txns in VWAL: " << unacked_txn; + if (metadata.txn_ids.size() > 0) { + oss << ", Records per txn details:"; + + for (const auto& entry : metadata.txn_id_to_ct_records_map_) { + oss << ", {txn_id, ct, dml}: {" << entry.first << ", " << entry.second.first << ", " + << entry.second.second << "}"; + } + } + YB_CDC_LOG_WITH_PREFIX_EVERY_N_SECS_OR_VLOG(oss, 300, 1); } @@ -884,7 +904,19 @@ Result CDCSDKVirtualWAL::GetRecordLSN( // duplicate records like BEGIN/COMMIT that can be received in case of multi-shard transaction or // multiple transactions with same commit_time. if (curr_unique_record_id->GreaterThanDistributedLSN(last_seen_unique_record_id_)) { - last_seen_lsn_ += 1; + switch (slot_lsn_type_) { + case ReplicationSlotLsnType_SEQUENCE: + last_seen_lsn_ += 1; + break; + case ReplicationSlotLsnType_HYBRID_TIME: + last_seen_lsn_ = curr_unique_record_id->GetCommitTime(); + break; + default: + return STATUS_FORMAT( + IllegalState, + Format("Invalid LSN type specified $0 for stream $1", slot_lsn_type_, stream_id_)); + } + return last_seen_lsn_; } diff --git a/src/yb/cdc/cdcsdk_virtual_wal.h b/src/yb/cdc/cdcsdk_virtual_wal.h index 64f5969d7fa2..c5230d7e4e09 100644 --- a/src/yb/cdc/cdcsdk_virtual_wal.h +++ b/src/yb/cdc/cdcsdk_virtual_wal.h @@ -31,7 +31,8 @@ namespace cdc { class CDCSDKVirtualWAL { public: explicit CDCSDKVirtualWAL( - CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id); + CDCServiceImpl* cdc_service, const xrepl::StreamId& stream_id, const uint64_t session_id, + ReplicationSlotLsnType lsn_type); using RecordInfo = std::pair, std::shared_ptr>; @@ -105,6 +106,7 @@ class CDCSDKVirtualWAL { uint64_t max_lsn = 0; bool is_last_txn_fully_sent = false; bool contains_publication_refresh_record = false; + std::map> txn_id_to_ct_records_map_; }; using TabletRecordPriorityQueue = std::priority_queue< @@ -280,6 +282,9 @@ class CDCSDKVirtualWAL { // ordered map in increasing order of LSN of commit record. std::map commit_meta_and_last_req_map_; + + // Store the LSN type the replication slot is created with. + ReplicationSlotLsnType slot_lsn_type_; }; } // namespace cdc diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index d6f11d542fcb..ddd2f021c602 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -123,6 +123,7 @@ #include "yb/util/tsan_util.h" #include "yb/yql/cql/ql/ptree/pt_option.h" +#include "yb/yql/pggate/ybc_pg_typedefs.h" using namespace std::literals;