Skip to content

Commit

Permalink
[#25534] docdb: Add robust mutex
Browse files Browse the repository at this point in the history
Summary:
This diff adds `RobustMutex`, which is a mutex that is robust to process crash even when held.

This is implemented with `PTHREAD_MUTEX_ROBUST`. When the owner of the mutex crashes,
the next mutex acquire will succeed and be notified that the previous owner had crashed. We
use this to trigger a cleanup callback which will perform any necessary work needed to bring the
state guarded by the mutex back to a consistent state.

`PTHREAD_MUTEX_ROBUST` is not implemented on OS X (which follows an older version of UNIX
that does not have it). Crashes while holding the mutex should ideally never happen, and OS X is
not used for production deployment, so we just use a normal mutex in shared memory for OS X.

This is needed for future changes to move the table locks lock manager into shared memory. For more
context: D40272.
Jira: DB-14789

Test Plan: Added tests: `./yb_build.sh --cxx-test 'util_robust_mutex-test'`

Reviewers: sergei, bkolagani, amitanand

Reviewed By: sergei

Subscribers: svc_phabricator, ybase, rthallam, zdrudi, amitanand

Differential Revision: https://phorge.dev.yugabyte.com/D41082
  • Loading branch information
es1024 committed Jan 15, 2025
1 parent 2b4598f commit c75a6f4
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ set(UTIL_SRCS
rw_semaphore.cc
rwc_lock.cc
shared_mem.cc
shmem/robust_mutex.cc
signal_util.cc
slice.cc
slice_parts.cc
Expand Down Expand Up @@ -414,6 +415,7 @@ if (NOT "${YB_BUILD_TYPE}" STREQUAL "asan")
endif()
ADD_YB_TEST(scope_exit-test)
ADD_YB_TEST(shared_mem-test)
ADD_YB_TEST(shmem/robust_mutex-test)
ADD_YB_TEST(slice-test)
ADD_YB_TEST(spinlock_profiling-test)
ADD_YB_TEST(split-test)
Expand Down
115 changes: 115 additions & 0 deletions src/yb/util/shmem/robust_mutex-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <unistd.h>

#include <chrono>

#include "yb/util/shared_mem.h"
#include "yb/util/shmem/robust_mutex.h"
#include "yb/util/monotime.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_util.h"
#include "yb/util/types.h"

using namespace std::literals;

namespace yb {

class RobustMutexTest : public YBTest { };

// Dummy cleanup method for tests that aren't testing robustness.
void CleanupDoNothing(void*) { }

TEST_F(RobustMutexTest, TestSimple) {
// Test does the following:
// P1: M1 lock - wait -
// P2: M2 lock
// - wait - P2: M1 try - fail
// P2: M1 lock - wait
// P1: M1 unlock
// P2: M1 get
// P1: M2 lock - wait? P2: M1, M2 unlock, exit
// P1: M2 get
// P1: M1 try_lock - get
// P1: M1, M2 unlock, exit
//
// The following duration is how long the waits are.
constexpr auto kWaitDurationMs = 1000;

using Mutex = RobustMutex<CleanupDoNothing>;

auto m1 = ASSERT_RESULT(SharedMemoryObject<Mutex>::Create());
auto m2 = ASSERT_RESULT(SharedMemoryObject<Mutex>::Create());

ASSERT_OK(ForkAndRunToCompletion([&m1, &m2, kWaitDurationMs]() NO_THREAD_SAFETY_ANALYSIS {
SleepFor(kWaitDurationMs * 1ms);

LOG(INFO) << "P2: M2 lock";
m2->lock();
LOG(INFO) << "P2: M1 try_lock";
ASSERT_FALSE(m1->try_lock());
LOG(INFO) << "P2: M1 lock";
m1->lock();
LOG(INFO) << "P2: M1 unlock";
m1->unlock();
LOG(INFO) << "P2: M2 unlock";
m2->unlock();
LOG(INFO) << "OK";
},
[&m1, &m2, kWaitDurationMs]() NO_THREAD_SAFETY_ANALYSIS {
LOG(INFO) << "P1: M1 lock";
m1->lock();

SleepFor(kWaitDurationMs * 2ms);

LOG(INFO) << "P1: M1 unlock";
m1->unlock();
LOG(INFO) << "P1: M2 lock";
m2->lock();
LOG(INFO) << "P1: M1 try_lock";
ASSERT_TRUE(m1->try_lock());
LOG(INFO) << "P1: M1 unlock";
m1->unlock();
LOG(INFO) << "P1: M2 unlock";
m2->unlock();
}));
}

struct MutexAndFlag {
static void CleanupOnCrash(void* p) NO_THREAD_SAFETY_ANALYSIS {
LOG(INFO) << "Cleanup function called";

auto* this_ = MEMBER_PTR_TO_CONTAINER(MutexAndFlag, mutex, p);
this_->cleaned_up_crash = true;
}

RobustMutex<MutexAndFlag::CleanupOnCrash> mutex;
bool cleaned_up_crash GUARDED_BY(mutex) = false;
};

TEST_F(RobustMutexTest, YB_DISABLE_TEST_ON_MACOS(TestCrashRecovery)) {
auto mutex_and_flag = ASSERT_RESULT(SharedMemoryObject<MutexAndFlag>::Create());

ASSERT_OK(ForkAndRunToCompletion([&mutex_and_flag] {
std::lock_guard lock(mutex_and_flag->mutex);
LOG(INFO) << "Exiting with lock held";
std::_Exit(0);
}));

LOG(INFO) << "Child process exited, grabbing lock";
std::lock_guard lock(mutex_and_flag->mutex);
ASSERT_TRUE(mutex_and_flag->cleaned_up_crash);
}

} // namespace yb
77 changes: 77 additions & 0 deletions src/yb/util/shmem/robust_mutex.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/util/shmem/robust_mutex.h"

#include <fcntl.h>
#include <pthread.h>

#include "yb/util/logging.h"

namespace yb {

RobustMutexImpl::RobustMutexImpl() {
pthread_mutexattr_t mta;
int rv = pthread_mutexattr_init(&mta);
DCHECK_EQ(0, rv) << strerror(rv);
rv = pthread_mutexattr_setpshared(&mta, PTHREAD_PROCESS_SHARED);
DCHECK_EQ(0, rv) << strerror(rv);
#if PTHREAD_MUTEX_ROBUST_SUPPORTED
rv = pthread_mutexattr_setrobust(&mta, PTHREAD_MUTEX_ROBUST);
DCHECK_EQ(0, rv) << strerror(rv);
#endif
rv = pthread_mutex_init(&native_handle_, &mta);
DCHECK_EQ(0, rv) << strerror(rv);
rv = pthread_mutexattr_destroy(&mta);
DCHECK_EQ(0, rv) << strerror(rv);
}

RobustMutexImpl::~RobustMutexImpl() {
int rv = pthread_mutex_destroy(&native_handle_);
DCHECK_EQ(0, rv) << strerror(rv);
}

AcquireResult RobustMutexImpl::Acquire() {
int rv = pthread_mutex_lock(&native_handle_);
auto result = PostAcquire(rv);
DCHECK_NE(AcquireResult::kFailed, result) << strerror(rv);
return result;
}

void RobustMutexImpl::Release() {
int rv = pthread_mutex_unlock(&native_handle_);
DCHECK(!rv) << strerror(rv);
}

AcquireResult RobustMutexImpl::TryAcquire() {
int rv = pthread_mutex_trylock(&native_handle_);
return PostAcquire(rv);
}

void RobustMutexImpl::MarkCleanedUp() {
#if PTHREAD_MUTEX_ROBUST_SUPPORTED
int rv = pthread_mutex_consistent(&native_handle_);
DCHECK_EQ(0, rv) << strerror(rv);
#endif
}

AcquireResult RobustMutexImpl::PostAcquire(int lock_rv) {
#if PTHREAD_MUTEX_ROBUST_SUPPORTED
if (lock_rv == EOWNERDEAD) {
return AcquireResult::kCleanupRequired;
}
#endif
return lock_rv == 0 ? AcquireResult::kAcquired : AcquireResult::kFailed;
}

} // namespace yb
101 changes: 101 additions & 0 deletions src/yb/util/shmem/robust_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#pragma once

#include <pthread.h>

#include <type_traits>

#include "yb/util/enums.h"

#include "yb/gutil/thread_annotations.h"

namespace yb {

YB_DEFINE_ENUM(AcquireResult, (kAcquired)(kCleanupRequired)(kFailed));

// PTHREAD_MUTEX_ROBUST is only available as of POSIX.1-2008, which is available for Linux kernels
// 2.6.23+ (so all the versions we support), but is not available on OS X. On OS X, we just use
// a non-robust mutex and hope for the best (no crashes while locked).
#if (_XOPEN_SOURCE >= 700 || _POSIX_C_SOURCE >= 200809L)
#define PTHREAD_MUTEX_ROBUST_SUPPORTED 1
#else
#define PTHREAD_MUTEX_ROBUST_SUPPORTED 0
#endif

class RobustMutexImpl {
public:
RobustMutexImpl();
~RobustMutexImpl();

RobustMutexImpl(const RobustMutexImpl&) = delete;
RobustMutexImpl(RobustMutexImpl&&) = delete;
RobustMutexImpl& operator=(const RobustMutexImpl&) = delete;
RobustMutexImpl& operator=(RobustMutexImpl&&) = delete;

AcquireResult Acquire();
void Release();
AcquireResult TryAcquire();

void MarkCleanedUp();

private:
AcquireResult PostAcquire(int lock_rv);

pthread_mutex_t native_handle_;
};

static_assert(std::is_standard_layout<RobustMutexImpl>::value);

// Mutex for interprocess use in shared memory, which ensures we do not enter a deadlock when one
// process dies while holding the mutex. This mutex assumes that any state guarded by the mutex
// remains consistent in event of a crash or can be cleaned up to a consistent state.
//
// The Cleanup function takes a pointer to the mutex, and should perform cleanup with the associated
// state to make it consistent again. This function is passed as a template parameter instead of
// stored inside the class, because RobustMutex is meant to be shared across processes and cleanup
// may be called in any process (and any stored function pointer would be process-specific).
template<void (*Cleanup)(void*)>
class CAPABILITY("mutex") RobustMutex {
public:
void Acquire() ACQUIRE() {
DoCleanupIfNeeded(impl_.Acquire());
}

void Release() RELEASE() {
impl_.Release();
}

bool TryAcquire() TRY_ACQUIRE(true) {
auto result = impl_.TryAcquire();
DoCleanupIfNeeded(result);
return result != AcquireResult::kFailed;
}

void lock() ACQUIRE() { Acquire(); }
void unlock() RELEASE() { Release(); }
bool try_lock() TRY_ACQUIRE(true) { return TryAcquire(); }

private:
void DoCleanupIfNeeded(AcquireResult result) {
if (result == AcquireResult::kCleanupRequired) {
Cleanup(this);
impl_.MarkCleanedUp();
}
}

RobustMutexImpl impl_;
};

} // namespace yb
25 changes: 25 additions & 0 deletions src/yb/util/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

#include "yb/util/test_util.h"

#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

#include <cstdlib>

#include <gtest/gtest-spi.h>

#include "yb/gutil/casts.h"
Expand Down Expand Up @@ -415,4 +421,23 @@ Status CorruptFile(
return file->Close();
}

Status ForkAndRunToCompletion(const std::function<void(void)>& child,
const std::function<void(void)>& parent) {
int pid = fork();
if (pid == 0) {
child();
std::exit(testing::Test::HasFailure());
} else {
if (parent) {
parent();
}
int wstatus;
waitpid(pid, &wstatus, 0 /* options */);
if (WEXITSTATUS(wstatus) != 0) {
return STATUS(RuntimeError, "Child process has test failures");
}
return Status::OK();
}
}

} // namespace yb
4 changes: 4 additions & 0 deletions src/yb/util/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <dirent.h>

#include <atomic>
#include <functional>
#include <future>
#include <string>
#include <type_traits>
Expand Down Expand Up @@ -273,6 +274,9 @@ Status CorruptFile(
const std::string& file_path, int64_t offset, size_t bytes_to_corrupt,
CorruptionType corruption_type);

Status ForkAndRunToCompletion(const std::function<void(void)>& child,
const std::function<void(void)>& parent = {});

} // namespace yb

// Gives ability to define custom parent class for test fixture.
Expand Down
Loading

0 comments on commit c75a6f4

Please sign in to comment.