diff --git a/src/yb/client/transaction.cc b/src/yb/client/transaction.cc index f7756af900bb..9adf0b6ea937 100644 --- a/src/yb/client/transaction.cc +++ b/src/yb/client/transaction.cc @@ -486,11 +486,16 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { prev_tablet_id = &tablet_id; tablets_[tablet_id].has_metadata = true; } + // 'num_completed_batches' for the involved tablet should be updated only when the op + // performs a write at the tablet (explicit write/read with explicit locks), which is + // checked for in the above 'if' clause. Else, we could run into scenarios where a txn + // promotion request is sent to a participant which hasn't yet registered the txn, and + // would lead to a 40001 being returned to pg. + ++tablets_[tablet_id].num_completed_batches; } if (transaction_status_move_tablets_.count(tablet_id)) { schedule_status_moved = true; } - ++tablets_[tablet_id].num_completed_batches; } } else { const TransactionError txn_err(status); @@ -1188,6 +1193,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { return; } + // TODO: In case of transaction promotion, DoCommit is called only after we hearback on all + // sent transation status move requests. So either the promotion went through, in which case + // transaction participants of all involved tablets check the txn's state against the new + // status tablet, or the promotion failed, in which case we anyways abort the transaction. + // TBD if we do require a successfull PENDING heartbeat to go through. if (old_status_tablet_ && last_old_heartbeat_failed_.load(std::memory_order_acquire)) { auto rpc = PrepareOldStatusTabletFinalHeartbeat(deadline, seal_only, status, transaction); lock.unlock(); @@ -1693,10 +1703,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ", " << send_to_new_tablet << ")"; - auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire); - if (!send_to_new_tablet && - old_status_tablet_state != OldTransactionState::kNone && - old_status_tablet_state != OldTransactionState::kRunning) { + if (!ShouldContinueHeartbeats(send_to_new_tablet)) { VLOG_WITH_PREFIX(1) << "Old status tablet is no longer in use, cancelling heartbeat"; return; } @@ -1749,6 +1756,34 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { FATAL_INVALID_ENUM_VALUE(TransactionState, current_state); } + // Returns whether the heartbeater thread should continue heartbeating. + // + // When 'send_to_new_tablet' is true, we should continue sending/processing heartbeat requests/ + // responses as they are with the new status tablet stored in 'status_tablet_'. + // + // When 'send_to_new_tablet' is false, we should continue heartbeats in the following cases: + // 1. Heartbeating to 'status_tablet_': + // - happens when the transaction started off as a global txn. In this case + // 'old_status_tablet_state_' remains in 'kNone'. + // - transaction started off as local, and promotion hasn't been kicked off yet (new status + // tablet hasn't been picked yet). 'old_status_tablet_state_' remains in 'kNone' until the + // 'status_tablet_' switch happens. + // 2. Heartbeating to 'old_status_tablet_': + // - transaction started off as local, and was promoted. 'old_status_tablet_state_' remains + // in 'kRunning' until abort is sent to 'old_status_tablet_'. + // Once an abort request is sent to the old status tablet, 'old_status_tablet_state_' transitions + // to 'kAborting', and to 'kAborted' from there. Hence, we should not process heartbeat responses + // or initiate new heartbeat requests when not in ('kNone', 'kRunning') states, as they wouldn't + // reflect the actual status of the transaction. + bool ShouldContinueHeartbeats(SendHeartbeatToNewTablet send_to_new_tablet) { + if (send_to_new_tablet) { + return true; + } + auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire); + return old_status_tablet_state == OldTransactionState::kNone || + old_status_tablet_state == OldTransactionState::kRunning; + } + void HeartbeatDone(Status status, const tserver::UpdateTransactionRequestPB& request, const tserver::UpdateTransactionResponsePB& response, @@ -1778,6 +1813,20 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { << TransactionStatus_Name(transaction_status) << ", " << send_to_new_tablet << ")"; + if (!ShouldContinueHeartbeats(send_to_new_tablet)) { + // It could have happended that all UpdateTransactionStatusLocation requests went through + // and we issued an abort to the old status tablet. We should ignore any heartbeat response + // we receive after that. Since the promotion went through, the involved txn participants + // would check against the latest status tablet for any course of action. So we need not + // be concerned on whether this was a genuine heartbeat error and can safely exit without + // error handling or cleanup (they will be handled by the other heartbeater thread hitting + // the new status tablet in 'status_tablet_'). + VLOG_WITH_PREFIX_AND_FUNC(1) + << "Skipping cleanup because the heartbeat response from the old status tablet was " + << "received after sending abort to old status tablet. " + << old_status_tablet_state_.load(std::memory_order_relaxed); + return; + } if (!send_to_new_tablet) { last_old_heartbeat_failed_.store(!status.ok(), std::memory_order_release); } diff --git a/src/yb/tablet/running_transaction.cc b/src/yb/tablet/running_transaction.cc index d6f10545988b..24abcf9e65f0 100644 --- a/src/yb/tablet/running_transaction.cc +++ b/src/yb/tablet/running_transaction.cc @@ -13,6 +13,8 @@ #include "yb/tablet/running_transaction.h" +#include + #include "yb/client/transaction_rpc.h" #include "yb/common/hybrid_time.h" @@ -40,6 +42,9 @@ DEFINE_UNKNOWN_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultip DEFINE_UNKNOWN_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier, "Timeout used when checking for aborted transactions."); +DEFINE_test_flag(bool, pause_sending_txn_status_requests, false, + "When set, hold off sending transaction status requests until the flag is reset."); + namespace yb { namespace tablet { @@ -135,9 +140,9 @@ void RunningTransaction::RequestStatusAt(const StatusRequest& request, "Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3", TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request, request_id); - + auto status_tablet = shared_self->status_tablet(); lock->unlock(); - SendStatusRequest(request_id, shared_self); + SendStatusRequest(status_tablet, request_id, shared_self); } bool RunningTransaction::WasAborted() const { @@ -165,13 +170,15 @@ void RunningTransaction::Abort(client::YBClient* client, } bool was_empty = abort_waiters_.empty(); abort_waiters_.push_back(std::move(callback)); + auto status_tablet = this->status_tablet(); + abort_request_in_progress_ = true; lock->unlock(); VLOG_WITH_PREFIX(3) << "Abort request: " << was_empty; if (!was_empty) { return; } tserver::AbortTransactionRequestPB req; - req.set_tablet_id(metadata_.status_tablet); + req.set_tablet_id(status_tablet); req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); context_.rpcs_.RegisterAndStart( @@ -180,7 +187,8 @@ void RunningTransaction::Abort(client::YBClient* client, nullptr /* tablet */, client, &req, - std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())), + std::bind(&RunningTransaction::AbortReceived, this, status_tablet, + _1, _2, shared_from_this())), &abort_handle_); } @@ -225,17 +233,33 @@ boost::optional RunningTransaction::GetStatusAt( return TransactionStatus::PENDING; } return boost::none; + case TransactionStatus::CREATED: { + // This can happen in case of transaction promotion. The first status request to the old + // status tablet could have arrived and the transaction could have undergone promoted in + // the interim. In that case, we just return the past known status (which could be CREATED + // if this was the first ever txn status request). + return boost::none; + } default: FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status); } } void RunningTransaction::SendStatusRequest( - int64_t serial_no, const RunningTransactionPtr& shared_self) { + const TabletId& status_tablet, int64_t serial_no, const RunningTransactionPtr& shared_self) { TRACE_FUNC(); VTRACE(1, yb::ToString(metadata_.transaction_id)); + if (PREDICT_FALSE(FLAGS_TEST_pause_sending_txn_status_requests)) { + VLOG_WITH_PREFIX_AND_FUNC(4) << "FLAGS_TEST_pause_sending_txn_status_requests set. Holding " + << "off sending transaction status requests until flag is reset."; + while (FLAGS_TEST_pause_sending_txn_status_requests) { + SleepFor(10ms); + } + VLOG_WITH_PREFIX_AND_FUNC(4) << "Resume sending transaction status requests " + << "against status tablet: " << status_tablet; + } tserver::GetTransactionStatusRequestPB req; - req.set_tablet_id(metadata_.status_tablet); + req.set_tablet_id(status_tablet); req.add_transaction_id()->assign( pointer_cast(metadata_.transaction_id.data()), metadata_.transaction_id.size()); req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); @@ -245,11 +269,13 @@ void RunningTransaction::SendStatusRequest( nullptr /* tablet */, context_.participant_context_.client_future().get(), &req, - std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)), + std::bind(&RunningTransaction::StatusReceived, this, status_tablet, _1, _2, serial_no, + shared_self)), &get_status_handle_); } void RunningTransaction::StatusReceived( + const TabletId& status_tablet, const Status& status, const tserver::GetTransactionStatusResponsePB& response, int64_t serial_no, @@ -258,16 +284,24 @@ void RunningTransaction::StatusReceived( if (delay_usec > 0) { context_.delayer().Delay( MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec), - std::bind(&RunningTransaction::DoStatusReceived, this, status, response, + std::bind(&RunningTransaction::DoStatusReceived, this, status_tablet, status, response, serial_no, shared_self)); } else { - DoStatusReceived(status, response, serial_no, shared_self); + DoStatusReceived(status_tablet, status, response, serial_no, shared_self); } } bool RunningTransaction::UpdateStatus( - TransactionStatus transaction_status, HybridTime time_of_status, + const TabletId& status_tablet, TransactionStatus transaction_status, HybridTime time_of_status, HybridTime coordinator_safe_time, SubtxnSet aborted_subtxn_set) { + if (status_tablet != this->status_tablet()) { + // Can happen in case of transaction promotion, should be okay to return an existing older + // status as the subsequent requests would return the latest status. + VLOG_WITH_PREFIX_AND_FUNC(4) << "Passed in status tablet isn't the current active status " + << "tablet of the txn. Not updating the txn status."; + return last_known_status_ == TransactionStatus::ABORTED; + } + if (!local_commit_time_ && transaction_status != TransactionStatus::ABORTED) { // If we've already committed locally, then last_known_aborted_subtxn_set_ is already set // properly. Otherwise, we should update it here. @@ -294,13 +328,14 @@ bool RunningTransaction::UpdateStatus( return transaction_status == TransactionStatus::ABORTED; } -void RunningTransaction::DoStatusReceived(const Status& status, +void RunningTransaction::DoStatusReceived(const TabletId& status_tablet, + const Status& status, const tserver::GetTransactionStatusResponsePB& response, int64_t serial_no, const RunningTransactionPtr& shared_self) { TRACE("$0: $1", __func__, response.ShortDebugString()); VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", " - << serial_no << ")"; + << serial_no << ")" << " from status tablet: " << status_tablet; if (response.has_propagated_hybrid_time()) { context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); @@ -313,6 +348,7 @@ void RunningTransaction::DoStatusReceived(const Status& status, SubtxnSet aborted_subtxn_set; const bool ok = status.ok(); int64_t new_request_id = -1; + TabletId current_status_tablet; { MinRunningNotifier min_running_notifier(&context_.applier_); std::unique_lock lock(context_.mutex_); @@ -357,7 +393,8 @@ void RunningTransaction::DoStatusReceived(const Status& status, auto coordinator_safe_time = response.coordinator_safe_time().size() == 1 ? HybridTime::FromPB(response.coordinator_safe_time(0)) : HybridTime(); auto did_abort_txn = UpdateStatus( - transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set); + status_tablet, transaction_status, time_of_status, coordinator_safe_time, + aborted_subtxn_set); if (did_abort_txn) { context_.NotifyAborted(id()); context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier); @@ -390,9 +427,10 @@ void RunningTransaction::DoStatusReceived(const Status& status, } } } + current_status_tablet = shared_self->status_tablet(); } if (new_request_id >= 0) { - SendStatusRequest(new_request_id, shared_self); + SendStatusRequest(current_status_tablet, new_request_id, shared_self); } NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters); } @@ -469,7 +507,8 @@ Result RunningTransaction::MakeAbortResult( return TransactionStatusResult{response.status(), status_time, SubtxnSet()}; } -void RunningTransaction::AbortReceived(const Status& status, +void RunningTransaction::AbortReceived(const TabletId& status_tablet, + const Status& status, const tserver::AbortTransactionResponsePB& response, const RunningTransactionPtr& shared_self) { if (response.has_propagated_hybrid_time()) { @@ -484,6 +523,15 @@ void RunningTransaction::AbortReceived(const Status& status, { MinRunningNotifier min_running_notifier(&context_.applier_); std::lock_guard lock(context_.mutex_); + LOG_IF(DFATAL, !abort_request_in_progress_) + << "AbortReceived executed with abort_request_in_progress_ unset. Could lead to data " + << "inconsistentcy issues in case of Geo-Partition workloads."; + abort_request_in_progress_ = false; + + LOG_IF(DFATAL, status_tablet != shared_self->status_tablet()) + << "Status Tablet switched while Abort txn request was in progress. This might lead " + << "to data consistency issues."; + context_.rpcs_.Unregister(&abort_handle_); abort_waiters_.swap(abort_waiters); // kMax status_time means that this status is not yet replicated and could be rejected. @@ -491,7 +539,8 @@ void RunningTransaction::AbortReceived(const Status& status, if (result.ok() && result->status_time != HybridTime::kMax) { auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time()); if (UpdateStatus( - result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) { + status_tablet, result->status, result->status_time, coordinator_safe_time, + result->aborted_subtxn_set)) { context_.NotifyAborted(id()); context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier); } diff --git a/src/yb/tablet/running_transaction.h b/src/yb/tablet/running_transaction.h index 55b8abfe0b2d..372f911a5a40 100644 --- a/src/yb/tablet/running_transaction.h +++ b/src/yb/tablet/running_transaction.h @@ -56,8 +56,8 @@ class RunningTransaction : public std::enable_shared_from_this* lock); + Status CheckPromotionAllowed() { + if (last_known_status_ == TransactionStatus::ABORTED) { + return STATUS_FORMAT( + IllegalState, Format("Transaction $0 in ABORTED state. Cannot be promoted.", id())); + } + if (abort_request_in_progress_) { + return STATUS_FORMAT( + IllegalState, Format("Request to abort txn $0 in progress. Cannot be promoted.", id())); + } + return Status::OK(); + } + std::string ToString() const; void ScheduleRemoveIntents(const RunningTransactionPtr& shared_self, RemoveReason reason); @@ -140,14 +152,17 @@ class RunningTransaction : public std::enable_shared_from_this& status_infos) { - std::vector aborted; + std::vector aborted; for (const auto& info : status_infos) { VLOG_WITH_PREFIX(4) << "Transaction status: " << info.ToString(); if (info.status == TransactionStatus::COMMITTED) { @@ -1012,7 +1012,7 @@ class TransactionParticipant::Impl committed_ids.push_back(info.transaction_id); } } else if (info.status == TransactionStatus::ABORTED) { - aborted.push_back(info.transaction_id); + aborted.push_back(info); } else { LOG_IF_WITH_PREFIX(DFATAL, info.status != TransactionStatus::PENDING) << "Transaction is in unexpected state: " << info.ToString(); @@ -1024,8 +1024,19 @@ class TransactionParticipant::Impl if (!aborted.empty()) { MinRunningNotifier min_running_notifier(&applier_); std::lock_guard lock(mutex_); - for (const auto& id : aborted) { - EnqueueRemoveUnlocked(id, RemoveReason::kStatusReceived, &min_running_notifier); + for (const auto& info : aborted) { + // TODO: Refactor so that the clean up code can use the established iterator + // instead of executing find again. + auto it = transactions_.find(info.transaction_id); + if (it != transactions_.end() && (*it)->status_tablet() != info.status_tablet) { + VLOG_WITH_PREFIX(2) << "Dropping Aborted status for txn " + << info.transaction_id.ToString() + << " from old status tablet " << info.status_tablet + << ". New status tablet " << (*it)->status_tablet(); + continue; + } + EnqueueRemoveUnlocked( + info.transaction_id, RemoveReason::kStatusReceived, &min_running_notifier); } } }); @@ -1197,6 +1208,8 @@ class TransactionParticipant::Impl } auto& transaction = *it; + RETURN_NOT_OK_SET_CODE(transaction->CheckPromotionAllowed(), + PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); metadata = transaction->metadata(); VLOG_WITH_PREFIX(2) << "Update transaction status location for transaction: " << metadata.transaction_id << " from tablet " << metadata.status_tablet @@ -1626,7 +1639,8 @@ class TransactionParticipant::Impl continue; } if ((**it).UpdateStatus( - info.status, info.status_ht, info.coordinator_safe_time, info.aborted_subtxn_set)) { + info.status_tablet, info.status, info.status_ht, info.coordinator_safe_time, + info.aborted_subtxn_set)) { NotifyAborted(info.transaction_id); EnqueueRemoveUnlocked( info.transaction_id, RemoveReason::kStatusReceived, &min_running_notifier); diff --git a/src/yb/tablet/transaction_status_resolver.cc b/src/yb/tablet/transaction_status_resolver.cc index eafbcb1704da..cf34646fb7f8 100644 --- a/src/yb/tablet/transaction_status_resolver.cc +++ b/src/yb/tablet/transaction_status_resolver.cc @@ -187,6 +187,7 @@ class TransactionStatusResolver::Impl { status_infos_.resize(request_size); for (size_t i = 0; i != request_size; ++i) { auto& status_info = status_infos_[i]; + status_info.status_tablet = it->first; status_info.transaction_id = queue.front(); status_info.status = TransactionStatus::ABORTED; status_info.status_ht = HybridTime::kMax; @@ -247,6 +248,7 @@ class TransactionStatusResolver::Impl { status_infos_.resize(response.status().size()); for (int i = 0; i != response.status().size(); ++i) { auto& status_info = status_infos_[i]; + status_info.status_tablet = it->first; status_info.transaction_id = queue.front(); status_info.status = response.status(i); diff --git a/src/yb/tablet/transaction_status_resolver.h b/src/yb/tablet/transaction_status_resolver.h index 8e5b6620b250..b11f5796c48a 100644 --- a/src/yb/tablet/transaction_status_resolver.h +++ b/src/yb/tablet/transaction_status_resolver.h @@ -28,6 +28,7 @@ namespace yb { namespace tablet { struct TransactionStatusInfo { + TabletId status_tablet; TransactionId transaction_id = TransactionId::Nil(); TransactionStatus status; SubtxnSet aborted_subtxn_set; @@ -35,7 +36,8 @@ struct TransactionStatusInfo { HybridTime coordinator_safe_time; std::string ToString() const { - return YB_STRUCT_TO_STRING(transaction_id, status, status_ht, coordinator_safe_time); + return YB_STRUCT_TO_STRING( + status_tablet, transaction_id, status, status_ht, coordinator_safe_time); } }; diff --git a/src/yb/yql/pgwrapper/geo_transactions_promotion-test.cc b/src/yb/yql/pgwrapper/geo_transactions_promotion-test.cc index de2427356e46..491291d83e19 100644 --- a/src/yb/yql/pgwrapper/geo_transactions_promotion-test.cc +++ b/src/yb/yql/pgwrapper/geo_transactions_promotion-test.cc @@ -22,6 +22,7 @@ #include "yb/yql/pgwrapper/geo_transactions_test_base.h" DECLARE_bool(TEST_consider_all_local_transaction_tables_local); +DECLARE_bool(TEST_pause_sending_txn_status_requests); DECLARE_bool(TEST_select_all_status_tablets); DECLARE_bool(TEST_txn_status_moved_rpc_force_fail); DECLARE_bool(auto_create_local_transaction_tables); @@ -29,6 +30,7 @@ DECLARE_bool(auto_promote_nonlocal_transactions_to_global); DECLARE_bool(enable_deadlock_detection); DECLARE_bool(enable_wait_queues); DECLARE_bool(force_global_transactions); +DECLARE_bool(yb_enable_read_committed_isolation); DECLARE_double(transaction_max_missed_heartbeat_periods); DECLARE_int32(TEST_old_txn_status_abort_delay_ms); DECLARE_int32(TEST_transaction_inject_flushed_delay_ms); @@ -817,5 +819,156 @@ TEST_F(GeoPartitionedDeadlockTest, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossTab thread_holder.WaitAndStop(35s * kTimeMultiplier); } +class GeoPartitionedReadCommiittedTest : public GeoTransactionsTestBase { + protected: + void SetUp() override { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_read_committed_isolation) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_wait_queues) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_auto_create_local_transaction_tables) = true; + GeoTransactionsTestBase::SetUp(); + SetupTablespaces(); + } + + // Sets up a partitioned table with primary key(state, country) partitioned on state into 3 + // partitions. partition P_i hosts states S_(2*i-1), S_(2*i). Partitioned table P1 is split + // into 2 tablets and the other partitioned tables are split into 1 tablet. + void SetUpPartitionedTable(const std::string& table_name) { + auto current_version = GetCurrentVersion(); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0(people int, state VARCHAR, country VARCHAR, $1) PARTITION BY LIST (state)", + table_name, "PRIMARY KEY(state, country)")); + + std::vector partition_list; + partition_list.reserve(NumRegions()); + for (size_t i = 1; i <= NumRegions(); i++) { + partition_list.push_back(Format("('S$0', 'S$1')", 2*i - 1, 2*i)); + } + for (size_t i = 1; i <= NumRegions(); i++) { + auto num_tablets = Format("SPLIT INTO $0 TABLETS", i == 1 ? 2 : 1); + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0$1 PARTITION OF $0 FOR VALUES IN $2 TABLESPACE tablespace$1 $3", + table_name, i, partition_list[i - 1], num_tablets)); + + if (ANNOTATE_UNPROTECTED_READ(FLAGS_auto_create_local_transaction_tables)) { + WaitForStatusTabletsVersion(current_version + 1); + ++current_version; + } + } + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0_defaut PARTITION OF $0 DEFAULT TABLESPACE tablespace$1", table_name, 3)); + + // For states S[1-6], insert countries C[0-10]. + for (size_t i = 0; i <= 10; i++) { + for (size_t j = 1; j <= 2 * NumRegions(); j++) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES(0, 'S$1', 'C$2')", table_name, j, i)); + } + } + } +}; + +// The below test helps assert that transaction promotion requests are sent only to involved +// tablets that have already processed a write of the transaction (explicit write/read with locks). +TEST_F(GeoPartitionedReadCommiittedTest, TestPromotionAmidstConflicts) { + auto table_name = "foo"; + SetUpPartitionedTable(table_name); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_force_global_transactions) = false; + + TestThreadHolder thread_holder; + const auto num_sessions = 5; + const auto num_iterations = 20; + for (int i = 1; i <= num_sessions; i++) { + thread_holder.AddThreadFunctor([this, i, table_name] { + for (int j = 1; j <= num_iterations; j++) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET force_global_transaction = false")); + ASSERT_OK(conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")); + // Start off as a local txn, hitting just one tablet of the local partition. + ASSERT_OK(conn.ExecuteFormat( + "UPDATE $0 SET people=people+1 WHERE country='C0' AND state='S$1'", + table_name, i%2 + 1)); + // The below would trigger transaction promotion since it would launch read ops across all + // tablets. This would lead to transaction promotion amidst conflicting writes. + ASSERT_OK(conn.ExecuteFormat( + "UPDATE $0 SET people=people+1 WHERE country='C$1'", table_name, (j + 1)/2)); + ASSERT_OK(conn.CommitTransaction()); + } + }); + } + thread_holder.WaitAndStop(60s * kTimeMultiplier); + + // Assert that the conflicting updates above go through successfully. + auto conn = ASSERT_RESULT(Connect()); + for (int i = 1; i <= num_iterations/2; i++) { + auto rows_res = ASSERT_RESULT(conn.FetchFormat( + Format("SELECT people FROM $0 WHERE country='C$1'", table_name, i))); + auto num_fetched_rows = PQntuples(rows_res.get()); + for (int j = 0; j < num_fetched_rows; j++) { + ASSERT_EQ(num_sessions * 2, ASSERT_RESULT(pgwrapper::GetValue(rows_res.get(), j, 0))); + } + } +} + +// The test asserts that the transaction participant ignores status responses from old status tablet +// for transactions that underwent promotion. If not, the participant could end up cleaning intents +// and silently let the commit go through, thus leading to data loss/inconsistency in case of +// promoted transactions. Refer #19535 for details. +TEST_F(GeoPartitionedReadCommiittedTest, + YB_DISABLE_TEST_IN_TSAN(TestParticipantIgnoresAbortFromOldStatusTablet)) { + auto table_name = "foo"; + const auto kLocalState = "S1"; + const auto kOtherState = "S6"; + SetUpPartitionedTable(table_name); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_force_global_transactions) = false; + + auto update_query = Format( + "UPDATE $0 SET people=people+1 WHERE country='C0' AND state='$1'", table_name, kLocalState); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET force_global_transaction = false")); + ASSERT_OK(conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")); + // Start off as a local txn. Will end up writing intents. + ASSERT_OK(conn.Execute(update_query)); + // Delay sending all status requests for this txn from the txn participant end. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_sending_txn_status_requests) = true; + + const auto num_sessions = 5; + CountDownLatch requested_status_of_txn_with_intents{num_sessions}; + TestThreadHolder thread_holder; + for (int i = 0; i < num_sessions; i++) { + thread_holder.AddThreadFunctor( + [this, &requested_status_of_txn_with_intents, update_query] { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET force_global_transaction = false")); + ASSERT_OK(conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")); + // Try performing conflicting updates which end up requesting status of the txn with intents. + auto status_future = std::async(std::launch::async, [&conn, update_query]() { + return conn.Execute(update_query); + }); + ASSERT_OK(WaitFor([&conn] () { + return conn.IsBusy(); + }, 1s * kTimeMultiplier, "Wait for blocking request to be submitted to the query layer")); + requested_status_of_txn_with_intents.CountDown(); + ASSERT_OK(status_future.get()); + ASSERT_OK(conn.CommitTransaction()); + }); + } + // Wait for the status requests for this transaction to pile up with the current status tablet. + ASSERT_TRUE(requested_status_of_txn_with_intents.WaitFor(10s * kTimeMultiplier)); + SleepFor(5s * kTimeMultiplier); + // Make the transaction undergo transaction promotion, which switches its status tablet. + ASSERT_OK(conn.ExecuteFormat( + "UPDATE $0 SET people=people+1 WHERE country='C1' AND state='$1'", table_name, kOtherState)); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_sending_txn_status_requests) = false; + // Wait for the piled up status requests to get executed and process the response from the old + // status tablet. + SleepFor(5s * kTimeMultiplier); + ASSERT_OK(conn.CommitTransaction()); + + thread_holder.WaitAndStop(60s * kTimeMultiplier); + auto res = ASSERT_RESULT(conn.FetchValue(Format( + "SELECT people FROM $0 WHERE country='C0' AND state='$1'", table_name, kLocalState))); + ASSERT_EQ(res, num_sessions + 1); +} + } // namespace client } // namespace yb