Skip to content

Commit

Permalink
[#5091] Only Update CDC Metrics for Tablet Leaders
Browse files Browse the repository at this point in the history
Test Plan: Unit tests with 1 tablet 3 peers. Write a couple values, make sure the leader metric is > 0 but the follower stays at 0 the whole time.

Reviewers: hector, nicolas, neha

Reviewed By: neha

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8909
  • Loading branch information
rahuldesirazu committed Jul 15, 2020
1 parent 6bb1ab4 commit a5f0657
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 92 deletions.
5 changes: 3 additions & 2 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ void CDCServiceImpl::UpdateLagMetrics() {
for (auto it = tablet_checkpoints.begin(); it != tablet_checkpoints.end(); it++) {
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = tablet_manager_->GetTabletPeer(it->tablet_id(), &tablet_peer);
if (!s.ok()) {
LOG(WARNING) << s;
if (s.IsNotFound() ||
tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) {
// We either couldn't find the tablet or we're not the leader for this tablet, skip.
continue;
}
auto tablet_metric = GetCDCTabletMetrics(it->producer_tablet_info, tablet_peer);
Expand Down
252 changes: 162 additions & 90 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ DECLARE_int32(update_min_cdc_indices_interval_secs);
DECLARE_int64(TEST_simulate_free_space_bytes);
DECLARE_int64(log_stop_retaining_min_disk_mb);
DECLARE_uint64(log_segment_size_bytes);
DECLARE_int32(update_metrics_interval_ms);

METRIC_DECLARE_entity(cdc);
METRIC_DECLARE_gauge_int64(last_read_opid_index);
Expand Down Expand Up @@ -482,96 +483,6 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) {
}
}

TEST_F(CDCServiceTest, TestUpdateLagMetrics) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);

const auto& tserver = cluster_->mini_tablet_server(0)->server();
// Use proxy for to most accurately simulate normal requests.
const auto& proxy = tserver->proxy();

auto cdc_service = dynamic_cast<CDCServiceImpl*>(
tserver->rpc_server()->service_pool("yb.cdc.CDCService")->TEST_get_service().get());

GetChangesRequestPB change_req;
GetChangesResponsePB change_resp;
change_req.set_tablet_id(tablet_id);
change_req.set_stream_id(stream_id);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
}

// Insert test rows, one at a time so they have different hybrid times.
tserver::WriteRequestPB write_req;
tserver::WriteResponsePB write_resp;
write_req.set_tablet_id(tablet_id);
{
RpcController rpc;
AddTestRowInsert(1, 11, "key1", &write_req);
SCOPED_TRACE(write_req.DebugString());
ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc));
SCOPED_TRACE(write_resp.DebugString());
ASSERT_FALSE(write_resp.has_error());
}

{
write_req.Clear();
write_req.set_tablet_id(tablet_id);
RpcController rpc;
AddTestRowInsert(2, 22, "key2", &write_req);
SCOPED_TRACE(write_req.DebugString());
ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc));
SCOPED_TRACE(write_resp.DebugString());
ASSERT_FALSE(write_resp.has_error());
}

ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() > 0 &&
metrics->async_replication_committed_lag_micros->value() > 0;
}, MonoDelta::FromSeconds(10), "Wait for Lag > 0"));

change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
}

// When we GetChanges the first time, only the read lag metric should be 0.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() == 0 &&
metrics->async_replication_committed_lag_micros->value() > 0;
}, MonoDelta::FromSeconds(10), "Wait for Read Lag = 0"));

change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
}

LOG(INFO) << "After second time";

// When we GetChanges the second time, both the lag metrics should be 0.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() == 0 &&
metrics->async_replication_committed_lag_micros->value() == 0;
}, MonoDelta::FromSeconds(10), "Wait for Lag = 0"));

}

TEST_F(CDCServiceTest, TestGetChanges) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
Expand Down Expand Up @@ -743,6 +654,167 @@ TEST_F(CDCServiceTest, TestGetCheckpoint) {
}
}

class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest {
virtual int server_count() override { return 3; }
virtual int tablet_count() override { return 1; }
};

TEST_F_EX(CDCServiceTest, TestUpdateLagMetrics, CDCServiceTestMultipleServersOneTablet) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);

// Get the leader and a follower for the tablet.
tserver::MiniTabletServer* leader_mini_tserver;
tserver::MiniTabletServer* follower_mini_tserver;

ASSERT_OK(WaitFor([&]() -> Result<bool> {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = cluster_->mini_tablet_server(i)->server()->tablet_manager()->
GetTabletPeer(tablet_id, &tablet_peer);
if (!s.ok()) {
continue;
}
if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
leader_mini_tserver = cluster_->mini_tablet_server(i);
} else {
follower_mini_tserver = cluster_->mini_tablet_server(i);
}
}
return leader_mini_tserver != nullptr && follower_mini_tserver != nullptr;
}, MonoDelta::FromSeconds(30), "Wait for tablet to have a leader."));


auto leader_proxy = std::make_unique<CDCServiceProxy>(
&client_->proxy_cache(),
HostPort::FromBoundEndpoint(leader_mini_tserver->bound_rpc_addr()));

auto follower_proxy = std::make_unique<CDCServiceProxy>(
&client_->proxy_cache(),
HostPort::FromBoundEndpoint(follower_mini_tserver->bound_rpc_addr()));

auto leader_tserver = leader_mini_tserver->server();
auto follower_tserver = follower_mini_tserver->server();
// Use proxy for to most accurately simulate normal requests.
const auto& proxy = leader_tserver->proxy();

auto cdc_service = dynamic_cast<CDCServiceImpl*>(
leader_tserver->rpc_server()->service_pool("yb.cdc.CDCService")->TEST_get_service().get());
auto cdc_service_follower = dynamic_cast<CDCServiceImpl*>(
follower_tserver->rpc_server()->service_pool("yb.cdc.CDCService")->TEST_get_service().get());

// At the start of time, assert both leader and follower at 0 lag.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
{
// Leader metrics
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
if (!(metrics->async_replication_sent_lag_micros->value() == 0 &&
metrics->async_replication_committed_lag_micros->value() == 0)) {
return false;
}
}
{
// Follower metrics
auto follower_metrics =
cdc_service_follower->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return follower_metrics->async_replication_sent_lag_micros->value() == 0 &&
follower_metrics->async_replication_committed_lag_micros->value() == 0;
}
}, MonoDelta::FromSeconds(10), "At start, wait for Lag = 0"));


// Create the in-memory structures for both follower and leader by polling for the tablet.
GetChangesRequestPB change_req;
GetChangesResponsePB change_resp;
change_req.set_tablet_id(tablet_id);
change_req.set_stream_id(stream_id);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
change_resp.Clear();
rpc.Reset();
ASSERT_OK(follower_proxy->GetChanges(change_req, &change_resp, &rpc));
}

// Insert test rows, one at a time so they have different hybrid times.
tserver::WriteRequestPB write_req;
tserver::WriteResponsePB write_resp;
write_req.set_tablet_id(tablet_id);
{
RpcController rpc;
AddTestRowInsert(1, 11, "key1", &write_req);
SCOPED_TRACE(write_req.DebugString());
ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc));
SCOPED_TRACE(write_resp.DebugString());
ASSERT_FALSE(write_resp.has_error());
}

{
write_req.Clear();
write_req.set_tablet_id(tablet_id);
RpcController rpc;
AddTestRowInsert(2, 22, "key2", &write_req);
SCOPED_TRACE(write_req.DebugString());
ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc));
SCOPED_TRACE(write_resp.DebugString());
ASSERT_FALSE(write_resp.has_error());
}

// Assert that leader lag > 0.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() > 0 &&
metrics->async_replication_committed_lag_micros->value() > 0;
}, MonoDelta::FromSeconds(10), "Wait for Lag > 0"));

{
// Make sure we wait for follower update thread to run at least once.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_update_metrics_interval_ms));
// On the follower, we shouldn't create metrics for tablets that we're not leader for, so these
// should be 0 even if there are un-polled for records.
auto metrics_follower = cdc_service_follower->
GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
ASSERT_TRUE(metrics_follower->async_replication_sent_lag_micros->value() == 0 &&
metrics_follower->async_replication_committed_lag_micros->value() == 0);
}

change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
}

// When we GetChanges the first time, only the read lag metric should be 0.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() == 0 &&
metrics->async_replication_committed_lag_micros->value() > 0;
}, MonoDelta::FromSeconds(10), "Wait for Read Lag = 0"));

change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
}

// When we GetChanges the second time, both the lag metrics should be 0.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
return metrics->async_replication_sent_lag_micros->value() == 0 &&
metrics->async_replication_committed_lag_micros->value() == 0;
}, MonoDelta::FromSeconds(10), "Wait for All Lag = 0"));
}

class CDCServiceTestMultipleServers : public CDCServiceTest {
public:
virtual int server_count() override { return 2; }
Expand Down

0 comments on commit a5f0657

Please sign in to comment.