Skip to content

Commit

Permalink
[BACKPORT 2.18.4][#18081][#19535] DocDB: Update TabletState only for …
Browse files Browse the repository at this point in the history
…tablets that are involved in write, Ignore statuses from old status tablet on promotion

Summary:
Original commit: 0880949 / D29626
The diff fixes a couple of issues with transaction promotion.

1. `tablets_` maintained at `YBTransaction::Impl` seems to maintain a list of tablets that are considered as "involved" for a transaction (the transaction has successfully processed some sort of write, which is determined by involved `YBOperation`'s `op` - (`op.yb_op->applied() && op.yb_op->should_apply_intents(metadata_.isolation)`). And the transaction promotion codepath ideally doesn't send promotion requests to tablets that haven't processed a write from this transaction yet.

In the current implementation, we update `num_completed_batches` of an entry of `tablets_` on all successfully completed operations immaterial of whether it processed a write at the tablet. This seems to wrongly affect the transaction promotion codepath, where promotion requests are sent to all tablets in `transaction_status_move_tablets_` (which is a consistent copy of `tablets_`) and with `num_completed_batches > 0`. When a txn participant sees a promotion request for an unknown transactions, it returns a 40001. So in a highly conflicting workload with read committed isolation, this seems to be frequent and we error with 40001. We would also hit this error when a promotion request is sent to a tablet that processes a vanilla read (without the `row_mark_type` set) with the existing code.

This diff addresses the issue by updating `num_completed_batches` only when `op.yb_op->applied() && op.yb_op->should_apply_intents(metadata_.isolation)` is true i.e, the op successfully wrote something to the tablet.

2. In the existing implementation, when processing the received status of a `RunningTransaction` at the participant, we don't really track if the response was from `old_status_tablet` or current `status_tablet`. For transaction promotion, this leads to additional issues. Consider a transaction that starts off as a local txn, successfully completes writes at a tablet T, and then undergoes promotion. In the process, an abort is sent to the old status tablet. Now if the `RunningTransaction` sees this abort, it initiates a cleanup at the participant. And when the promoted transaction now commits, we don't seem to error out and the updates get undone leading to data inconsistency.

This diff addresses the issue by tracking the status tablet on which the status request was initiated. On receiving the status response, if we see that the transaction underwent promotion, we return an already existing old status for the transaction instead of using the newly arrived status. Since the status tablet is now updated, subsequent status requests will get the latest state. Additionally, we reject promotion if there's an active request to abort the transaction.

3. At the query layer (`client/tramsaction.cc`), we could have two active heartbeating threads for a promoted transaction. in the existing implementation, we stop sending requests to the old status tablet once we have sent an abort to it (`state != OldTransactionState::kNone && state != OldTransactionState::kRunning`). But if we receive a response from an already sent request, we seem do the error handling and proactive clean up (requests to involved txn participants) even when if the abort to old status tablet was sent (`state == OldTransactionState::kAborting || state == OldTransactionState::kAborted`). This could lead to unnecessary failures for subsequent operations of the transaction.

This diff address the issue by dropping the status response if an abort to the old status tablet was already sent.

Additional context: Holding off promotion requests until the tablet has seen a write was introduced in [[ 21c5918 | commit ]]. Earlier, transaction promotion was being retried on failures which was removed in [[ 1ae14b8 | commit ]] (so we don't retry failed promotion anymore).

Test Plan:
Jenkins
./yb_build.sh --cxx-test pgwrapper_geo_transactions_promotion-test --gtest_filter GeoPartitionedReadCommiittedTest.TestPromotionAmidstConflicts -n 20
./yb_build.sh --cxx-test pgwrapper_geo_transactions_promotion-test --gtest_filter GeoPartitionedReadCommiittedTest.TestParticipantIgnoresAbortFromOldStatusTablet

The first test fails consistently without the changes. And it reproduces all the 3 issues in the description.

Elaborating on what the test does - We have a couple of transactions starting off as local transactions by issuing a `PGSQL_WRITE` on a tablet (say T_P1) in partition P1. This is followed by few more `PGSQL_READ` operations are launched for all tablets (2 tablets in P1, 1 tablet in P2, and 1 tablet in P3). These read ops trigger a transaction promotion request. Depending on the order in which these ops get flushed, if the read to T_P1 and T_P2 get flushed before sending promotion requests, the existing code seems to insert these tablets into the list of "involved tablets". Note that these read ops don't have `row_mark_type` set. And the promotion codepath errors out returning a 40001 to the backend.

With the changes in the diff, since we now only update `num_completed_batches` when `op.yb_op->applied() && op.yb_op >should_apply_intents(metadata_.isolation)`, we shouldn't run into this issue.

put up another simpler test to validate point 2. in the description that fails consistently without the current changes in place.
./yb_build.sh --cxx-test pgwrapper_geo_transactions_promotion-test --gtest_filter GeoPartitionedReadCommiittedTest.TestParticipantIgnoresAbortFromOldStatusTablet

Reviewers: sergei, rsami, rthallam, esheng

Reviewed By: esheng

Subscribers: yql, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D29787
  • Loading branch information
basavaraj29 committed Oct 31, 2023
1 parent d1287b8 commit a39a0df
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 33 deletions.
59 changes: 54 additions & 5 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,16 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
prev_tablet_id = &tablet_id;
tablets_[tablet_id].has_metadata = true;
}
// 'num_completed_batches' for the involved tablet should be updated only when the op
// performs a write at the tablet (explicit write/read with explicit locks), which is
// checked for in the above 'if' clause. Else, we could run into scenarios where a txn
// promotion request is sent to a participant which hasn't yet registered the txn, and
// would lead to a 40001 being returned to pg.
++tablets_[tablet_id].num_completed_batches;
}
if (transaction_status_move_tablets_.count(tablet_id)) {
schedule_status_moved = true;
}
++tablets_[tablet_id].num_completed_batches;
}
} else {
const TransactionError txn_err(status);
Expand Down Expand Up @@ -1188,6 +1193,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
return;
}

// TODO: In case of transaction promotion, DoCommit is called only after we hearback on all
// sent transation status move requests. So either the promotion went through, in which case
// transaction participants of all involved tablets check the txn's state against the new
// status tablet, or the promotion failed, in which case we anyways abort the transaction.
// TBD if we do require a successfull PENDING heartbeat to go through.
if (old_status_tablet_ && last_old_heartbeat_failed_.load(std::memory_order_acquire)) {
auto rpc = PrepareOldStatusTabletFinalHeartbeat(deadline, seal_only, status, transaction);
lock.unlock();
Expand Down Expand Up @@ -1693,10 +1703,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ", "
<< send_to_new_tablet << ")";

auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire);
if (!send_to_new_tablet &&
old_status_tablet_state != OldTransactionState::kNone &&
old_status_tablet_state != OldTransactionState::kRunning) {
if (!ShouldContinueHeartbeats(send_to_new_tablet)) {
VLOG_WITH_PREFIX(1) << "Old status tablet is no longer in use, cancelling heartbeat";
return;
}
Expand Down Expand Up @@ -1749,6 +1756,34 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
FATAL_INVALID_ENUM_VALUE(TransactionState, current_state);
}

// Returns whether the heartbeater thread should continue heartbeating.
//
// When 'send_to_new_tablet' is true, we should continue sending/processing heartbeat requests/
// responses as they are with the new status tablet stored in 'status_tablet_'.
//
// When 'send_to_new_tablet' is false, we should continue heartbeats in the following cases:
// 1. Heartbeating to 'status_tablet_':
// - happens when the transaction started off as a global txn. In this case
// 'old_status_tablet_state_' remains in 'kNone'.
// - transaction started off as local, and promotion hasn't been kicked off yet (new status
// tablet hasn't been picked yet). 'old_status_tablet_state_' remains in 'kNone' until the
// 'status_tablet_' switch happens.
// 2. Heartbeating to 'old_status_tablet_':
// - transaction started off as local, and was promoted. 'old_status_tablet_state_' remains
// in 'kRunning' until abort is sent to 'old_status_tablet_'.
// Once an abort request is sent to the old status tablet, 'old_status_tablet_state_' transitions
// to 'kAborting', and to 'kAborted' from there. Hence, we should not process heartbeat responses
// or initiate new heartbeat requests when not in ('kNone', 'kRunning') states, as they wouldn't
// reflect the actual status of the transaction.
bool ShouldContinueHeartbeats(SendHeartbeatToNewTablet send_to_new_tablet) {
if (send_to_new_tablet) {
return true;
}
auto old_status_tablet_state = old_status_tablet_state_.load(std::memory_order_acquire);
return old_status_tablet_state == OldTransactionState::kNone ||
old_status_tablet_state == OldTransactionState::kRunning;
}

void HeartbeatDone(Status status,
const tserver::UpdateTransactionRequestPB& request,
const tserver::UpdateTransactionResponsePB& response,
Expand Down Expand Up @@ -1778,6 +1813,20 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
<< TransactionStatus_Name(transaction_status) << ", "
<< send_to_new_tablet << ")";

if (!ShouldContinueHeartbeats(send_to_new_tablet)) {
// It could have happended that all UpdateTransactionStatusLocation requests went through
// and we issued an abort to the old status tablet. We should ignore any heartbeat response
// we receive after that. Since the promotion went through, the involved txn participants
// would check against the latest status tablet for any course of action. So we need not
// be concerned on whether this was a genuine heartbeat error and can safely exit without
// error handling or cleanup (they will be handled by the other heartbeater thread hitting
// the new status tablet in 'status_tablet_').
VLOG_WITH_PREFIX_AND_FUNC(1)
<< "Skipping cleanup because the heartbeat response from the old status tablet was "
<< "received after sending abort to old status tablet. "
<< old_status_tablet_state_.load(std::memory_order_relaxed);
return;
}
if (!send_to_new_tablet) {
last_old_heartbeat_failed_.store(!status.ok(), std::memory_order_release);
}
Expand Down
81 changes: 65 additions & 16 deletions src/yb/tablet/running_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "yb/tablet/running_transaction.h"

#include <glog/logging.h>

#include "yb/client/transaction_rpc.h"

#include "yb/common/hybrid_time.h"
Expand Down Expand Up @@ -40,6 +42,9 @@ DEFINE_UNKNOWN_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultip
DEFINE_UNKNOWN_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier,
"Timeout used when checking for aborted transactions.");

DEFINE_test_flag(bool, pause_sending_txn_status_requests, false,
"When set, hold off sending transaction status requests until the flag is reset.");

namespace yb {
namespace tablet {

Expand Down Expand Up @@ -135,9 +140,9 @@ void RunningTransaction::RequestStatusAt(const StatusRequest& request,
"Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3",
TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request,
request_id);

auto status_tablet = shared_self->status_tablet();
lock->unlock();
SendStatusRequest(request_id, shared_self);
SendStatusRequest(status_tablet, request_id, shared_self);
}

bool RunningTransaction::WasAborted() const {
Expand Down Expand Up @@ -165,13 +170,15 @@ void RunningTransaction::Abort(client::YBClient* client,
}
bool was_empty = abort_waiters_.empty();
abort_waiters_.push_back(std::move(callback));
auto status_tablet = this->status_tablet();
abort_request_in_progress_ = true;
lock->unlock();
VLOG_WITH_PREFIX(3) << "Abort request: " << was_empty;
if (!was_empty) {
return;
}
tserver::AbortTransactionRequestPB req;
req.set_tablet_id(metadata_.status_tablet);
req.set_tablet_id(status_tablet);
req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
context_.rpcs_.RegisterAndStart(
Expand All @@ -180,7 +187,8 @@ void RunningTransaction::Abort(client::YBClient* client,
nullptr /* tablet */,
client,
&req,
std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())),
std::bind(&RunningTransaction::AbortReceived, this, status_tablet,
_1, _2, shared_from_this())),
&abort_handle_);
}

Expand Down Expand Up @@ -225,17 +233,33 @@ boost::optional<TransactionStatus> RunningTransaction::GetStatusAt(
return TransactionStatus::PENDING;
}
return boost::none;
case TransactionStatus::CREATED: {
// This can happen in case of transaction promotion. The first status request to the old
// status tablet could have arrived and the transaction could have undergone promoted in
// the interim. In that case, we just return the past known status (which could be CREATED
// if this was the first ever txn status request).
return boost::none;
}
default:
FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status);
}
}

void RunningTransaction::SendStatusRequest(
int64_t serial_no, const RunningTransactionPtr& shared_self) {
const TabletId& status_tablet, int64_t serial_no, const RunningTransactionPtr& shared_self) {
TRACE_FUNC();
VTRACE(1, yb::ToString(metadata_.transaction_id));
if (PREDICT_FALSE(FLAGS_TEST_pause_sending_txn_status_requests)) {
VLOG_WITH_PREFIX_AND_FUNC(4) << "FLAGS_TEST_pause_sending_txn_status_requests set. Holding "
<< "off sending transaction status requests until flag is reset.";
while (FLAGS_TEST_pause_sending_txn_status_requests) {
SleepFor(10ms);
}
VLOG_WITH_PREFIX_AND_FUNC(4) << "Resume sending transaction status requests "
<< "against status tablet: " << status_tablet;
}
tserver::GetTransactionStatusRequestPB req;
req.set_tablet_id(metadata_.status_tablet);
req.set_tablet_id(status_tablet);
req.add_transaction_id()->assign(
pointer_cast<const char*>(metadata_.transaction_id.data()), metadata_.transaction_id.size());
req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
Expand All @@ -245,11 +269,13 @@ void RunningTransaction::SendStatusRequest(
nullptr /* tablet */,
context_.participant_context_.client_future().get(),
&req,
std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)),
std::bind(&RunningTransaction::StatusReceived, this, status_tablet, _1, _2, serial_no,
shared_self)),
&get_status_handle_);
}

void RunningTransaction::StatusReceived(
const TabletId& status_tablet,
const Status& status,
const tserver::GetTransactionStatusResponsePB& response,
int64_t serial_no,
Expand All @@ -258,16 +284,24 @@ void RunningTransaction::StatusReceived(
if (delay_usec > 0) {
context_.delayer().Delay(
MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec),
std::bind(&RunningTransaction::DoStatusReceived, this, status, response,
std::bind(&RunningTransaction::DoStatusReceived, this, status_tablet, status, response,
serial_no, shared_self));
} else {
DoStatusReceived(status, response, serial_no, shared_self);
DoStatusReceived(status_tablet, status, response, serial_no, shared_self);
}
}

bool RunningTransaction::UpdateStatus(
TransactionStatus transaction_status, HybridTime time_of_status,
const TabletId& status_tablet, TransactionStatus transaction_status, HybridTime time_of_status,
HybridTime coordinator_safe_time, SubtxnSet aborted_subtxn_set) {
if (status_tablet != this->status_tablet()) {
// Can happen in case of transaction promotion, should be okay to return an existing older
// status as the subsequent requests would return the latest status.
VLOG_WITH_PREFIX_AND_FUNC(4) << "Passed in status tablet isn't the current active status "
<< "tablet of the txn. Not updating the txn status.";
return last_known_status_ == TransactionStatus::ABORTED;
}

if (!local_commit_time_ && transaction_status != TransactionStatus::ABORTED) {
// If we've already committed locally, then last_known_aborted_subtxn_set_ is already set
// properly. Otherwise, we should update it here.
Expand All @@ -294,13 +328,14 @@ bool RunningTransaction::UpdateStatus(
return transaction_status == TransactionStatus::ABORTED;
}

void RunningTransaction::DoStatusReceived(const Status& status,
void RunningTransaction::DoStatusReceived(const TabletId& status_tablet,
const Status& status,
const tserver::GetTransactionStatusResponsePB& response,
int64_t serial_no,
const RunningTransactionPtr& shared_self) {
TRACE("$0: $1", __func__, response.ShortDebugString());
VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", "
<< serial_no << ")";
<< serial_no << ")" << " from status tablet: " << status_tablet;

if (response.has_propagated_hybrid_time()) {
context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
Expand All @@ -313,6 +348,7 @@ void RunningTransaction::DoStatusReceived(const Status& status,
SubtxnSet aborted_subtxn_set;
const bool ok = status.ok();
int64_t new_request_id = -1;
TabletId current_status_tablet;
{
MinRunningNotifier min_running_notifier(&context_.applier_);
std::unique_lock<std::mutex> lock(context_.mutex_);
Expand Down Expand Up @@ -357,7 +393,8 @@ void RunningTransaction::DoStatusReceived(const Status& status,
auto coordinator_safe_time = response.coordinator_safe_time().size() == 1
? HybridTime::FromPB(response.coordinator_safe_time(0)) : HybridTime();
auto did_abort_txn = UpdateStatus(
transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set);
status_tablet, transaction_status, time_of_status, coordinator_safe_time,
aborted_subtxn_set);
if (did_abort_txn) {
context_.NotifyAborted(id());
context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier);
Expand Down Expand Up @@ -390,9 +427,10 @@ void RunningTransaction::DoStatusReceived(const Status& status,
}
}
}
current_status_tablet = shared_self->status_tablet();
}
if (new_request_id >= 0) {
SendStatusRequest(new_request_id, shared_self);
SendStatusRequest(current_status_tablet, new_request_id, shared_self);
}
NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters);
}
Expand Down Expand Up @@ -469,7 +507,8 @@ Result<TransactionStatusResult> RunningTransaction::MakeAbortResult(
return TransactionStatusResult{response.status(), status_time, SubtxnSet()};
}

void RunningTransaction::AbortReceived(const Status& status,
void RunningTransaction::AbortReceived(const TabletId& status_tablet,
const Status& status,
const tserver::AbortTransactionResponsePB& response,
const RunningTransactionPtr& shared_self) {
if (response.has_propagated_hybrid_time()) {
Expand All @@ -484,14 +523,24 @@ void RunningTransaction::AbortReceived(const Status& status,
{
MinRunningNotifier min_running_notifier(&context_.applier_);
std::lock_guard<std::mutex> lock(context_.mutex_);
LOG_IF(DFATAL, !abort_request_in_progress_)
<< "AbortReceived executed with abort_request_in_progress_ unset. Could lead to data "
<< "inconsistentcy issues in case of Geo-Partition workloads.";
abort_request_in_progress_ = false;

LOG_IF(DFATAL, status_tablet != shared_self->status_tablet())
<< "Status Tablet switched while Abort txn request was in progress. This might lead "
<< "to data consistency issues.";

context_.rpcs_.Unregister(&abort_handle_);
abort_waiters_.swap(abort_waiters);
// kMax status_time means that this status is not yet replicated and could be rejected.
// So we could use it as reply to Abort, but cannot store it as transaction status.
if (result.ok() && result->status_time != HybridTime::kMax) {
auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time());
if (UpdateStatus(
result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) {
status_tablet, result->status, result->status_time, coordinator_safe_time,
result->aborted_subtxn_set)) {
context_.NotifyAborted(id());
context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier);
}
Expand Down
32 changes: 26 additions & 6 deletions src/yb/tablet/running_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
}

MUST_USE_RESULT bool UpdateStatus(
TransactionStatus transaction_status, HybridTime time_of_status,
HybridTime coordinator_safe_time, SubtxnSet aborted_subtxn_set);
const TabletId& status_tablet, TransactionStatus transaction_status,
HybridTime time_of_status, HybridTime coordinator_safe_time, SubtxnSet aborted_subtxn_set);

void UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode);

Expand Down Expand Up @@ -109,6 +109,18 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
TransactionStatusCallback callback,
std::unique_lock<std::mutex>* lock);

Status CheckPromotionAllowed() {
if (last_known_status_ == TransactionStatus::ABORTED) {
return STATUS_FORMAT(
IllegalState, Format("Transaction $0 in ABORTED state. Cannot be promoted.", id()));
}
if (abort_request_in_progress_) {
return STATUS_FORMAT(
IllegalState, Format("Request to abort txn $0 in progress. Cannot be promoted.", id()));
}
return Status::OK();
}

std::string ToString() const;
void ScheduleRemoveIntents(const RunningTransactionPtr& shared_self, RemoveReason reason);

Expand Down Expand Up @@ -140,14 +152,17 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
TransactionStatus last_known_status,
bool external_transaction);

void SendStatusRequest(int64_t serial_no, const RunningTransactionPtr& shared_self);
void SendStatusRequest(
const TabletId& status_tablet, int64_t serial_no, const RunningTransactionPtr& shared_self);

void StatusReceived(const Status& status,
void StatusReceived(const TabletId& status_tablet,
const Status& status,
const tserver::GetTransactionStatusResponsePB& response,
int64_t serial_no,
const RunningTransactionPtr& shared_self);

void DoStatusReceived(const Status& status,
void DoStatusReceived(const TabletId& status_tablet,
const Status& status,
const tserver::GetTransactionStatusResponsePB& response,
int64_t serial_no,
const RunningTransactionPtr& shared_self);
Expand All @@ -167,7 +182,8 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
const Status& status,
const tserver::AbortTransactionResponsePB& response);

void AbortReceived(const Status& status,
void AbortReceived(const TabletId& status_tablet,
const Status& status,
const tserver::AbortTransactionResponsePB& response,
const RunningTransactionPtr& shared_self);

Expand Down Expand Up @@ -195,6 +211,10 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio

// Time of the next check whether this transaction has been aborted.
HybridTime abort_check_ht_;
// Is true if an external request to abort the transaction is in progress i.e. AbortTransaction
// rpc to the transaction coordinator is in progress. Gets set in RunningTransaction::Abort and
// reset in RunningTransaction::AbortReceived.
bool abort_request_in_progress_ = false;
};

Status MakeAbortedStatus(const TransactionId& id);
Expand Down
Loading

0 comments on commit a39a0df

Please sign in to comment.