Skip to content

Commit

Permalink
04_19 fix binlog & add date, hour (#223)
Browse files Browse the repository at this point in the history
* 04_19 fix binlog & add date, hour

* 04_19 fix binlog & add date, hour

* 04_19 fix binlog & add date, hour

* fix truncate
  • Loading branch information
lgqss authored Jul 19, 2023
1 parent b5ff95b commit 84ef9c2
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 59 deletions.
9 changes: 0 additions & 9 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -822,12 +822,3 @@ baikaldb_proto_library(
include = "proto",
visibility = ["//visibility:public"],
)

cc_binary(
name = "test_date_time",
srcs = ["test/test_date_time.cpp"],
copts = ["-Iexternal/gtest/include"],
deps = [
":common",
],
)
2 changes: 1 addition & 1 deletion include/common/datetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern time_t str_to_timestamp(const char* str_time);
// encode DATETIME to string format
// ref: https://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
extern std::string datetime_to_str(uint64_t datetime);
extern uint64_t str_to_datetime(const char* str_time);
extern uint64_t str_to_datetime(const char* str_time, bool* is_full_datetime = nullptr);

extern time_t datetime_to_timestamp(uint64_t datetime);
extern uint64_t timestamp_to_datetime(time_t timestamp);
Expand Down
7 changes: 2 additions & 5 deletions include/exec/fetcher_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,8 @@ class FetcherStore {
if (client_conn->need_send_binlog()) {
return true;
}
} else if (op_type == pb::OP_ROLLBACK) {
if (state->open_binlog() && binlog_prepare_success) {
return true;
}
} else if (op_type == pb::OP_ROLLBACK && state->open_binlog()) {
return true;
}
return false;
}
Expand Down Expand Up @@ -577,7 +575,6 @@ class FetcherStore {
bool is_cancelled = false;
BthreadCond binlog_cond;
NetworkSocket* client_conn = nullptr;
bool binlog_prepare_success = false;
std::atomic<bool> primary_timestamp_updated{false};
std::set<int64_t> no_copy_cache_plan_set;
int64_t dynamic_timeout_ms = -1;
Expand Down
2 changes: 2 additions & 0 deletions include/expr/internal_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ ExprValue curtime(const std::vector<ExprValue>& input);
ExprValue current_time(const std::vector<ExprValue>& input);
ExprValue current_timestamp(const std::vector<ExprValue>& input);
ExprValue timestamp(const std::vector<ExprValue>& input);
ExprValue date(const std::vector<ExprValue>& input);
ExprValue hour(const std::vector<ExprValue>& input);
ExprValue day(const std::vector<ExprValue>& input);
ExprValue dayname(const std::vector<ExprValue>& input);
ExprValue dayofweek(const std::vector<ExprValue>& input);
Expand Down
2 changes: 1 addition & 1 deletion include/sqlparser/sql_parse.y
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@ FunctionaNameCurdate:
;

FunctionaNameDateRelate:
DAY | MONTH | YEAR
HOUR | DAY | MONTH | YEAR | DATE
;

FunctionCallNonKeyword:
Expand Down
22 changes: 21 additions & 1 deletion src/common/datetime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ std::string datetime_to_str(uint64_t datetime) {
return std::string(buf);
}

uint64_t str_to_datetime(const char* str_time) {
uint64_t str_to_datetime(const char* str_time, bool* is_full_datetime) {
//[YY]YY-MM-DD HH:MM:SS.xxxxxx
//[YY]YYMMDDHHMMSS.xxxxxx

bool is_full = false;

while (*str_time == ' ') {
str_time++;
}
Expand Down Expand Up @@ -112,6 +115,7 @@ uint64_t str_to_datetime(const char* str_time) {
sscanf(buf, "%4lu%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu"
"%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu.%6lu",
&year, &month, &day, &hour, &minute, &second, &macrosec);
is_full = true;
} else {
if (idx <= 6) {
sscanf(buf, "%2lu%2lu%2lu", &year, &month, &day);
Expand All @@ -120,11 +124,14 @@ uint64_t str_to_datetime(const char* str_time) {
} else if (idx == 12) {
sscanf(buf, "%2lu%2lu%2lu%2lu%2lu%2lu.%6lu",
&year, &month, &day, &hour, &minute, &second, &macrosec);
is_full = true;
} else if (idx <= 13) {
sscanf(buf, "%2lu%2lu%2lu%2lu%2lu%2lu", &year, &month, &day, &hour, &minute, &second);
is_full = true;
} else if (idx >= 14) {
sscanf(buf, "%4lu%2lu%2lu%2lu%2lu%2lu.%6lu",
&year, &month, &day, &hour, &minute, &second, &macrosec);
is_full = true;
} else {
return 0;
}
Expand All @@ -144,6 +151,10 @@ uint64_t str_to_datetime(const char* str_time) {
return 0;
}

if (is_full_datetime != nullptr) {
*is_full_datetime = is_full;
}

//datetime中间计算时会转化成int64, 最高位必须为0
uint64_t datetime = 0;
uint64_t year_month = year * 13 + month;
Expand Down Expand Up @@ -363,6 +374,15 @@ int32_t str_to_time(const char* str_time) {
}
}

// 先判断是否是完整的datetime类型字符串, 12为YYMMDDHHMMSS
if (idx >= 12) {
bool is_full_datetime = false;
uint64_t datetime = str_to_datetime(str_time, &is_full_datetime);
if (is_full_datetime) {
return datetime_to_time(datetime);
}
}

if (has_blank) {
sscanf(str_time, "%d %u:%2u:%2u",
&day, &hour, &minute, &second);
Expand Down
3 changes: 2 additions & 1 deletion src/engine/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1509,8 +1509,10 @@ int Transaction::remove_columns(const TableKey& primary_key) {

static TimeCost print_lock_last_time;
void Transaction::print_txninfo_holding_lock(const std::string& key) {
return; //pthread会卡bthread,基本不用这个追case了
//内部有pthread锁
if (print_lock_last_time.get_time() > 10 * 1000 * 1000) {
print_lock_last_time.reset();
auto lock_info = _db->get_db()->GetLockStatusData();
for (auto& it : lock_info) {
if (it.second.key.size() == key.size() && it.second.key == key) {
Expand All @@ -1521,7 +1523,6 @@ void Transaction::print_txninfo_holding_lock(const std::string& key) {
break;
}
}
print_lock_last_time.reset();
}
}
} //nanespace baikaldb
65 changes: 32 additions & 33 deletions src/exec/fetcher_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace baikaldb {
DEFINE_int64(retry_interval_us, 500 * 1000, "retry interval ");
DEFINE_int32(single_store_concurrency, 20, "max request for one store");
DEFINE_int64(max_select_rows, 10000000, "query will be fail when select too much rows");
DEFINE_int64(max_affected_rows, 10000000, "query will be fail when select too much rows");
DEFINE_int64(max_affected_rows, 10000000, "query will be fail when affect too much rows");
DEFINE_int64(print_time_us, 10000, "print log when time_cost > print_time_us(us)");
DEFINE_int64(baikaldb_alive_time_s, 10 * 60, "obervation time length in baikaldb, default:10 min");
BRPC_VALIDATE_GFLAG(print_time_us, brpc::NonNegativeInteger);
Expand Down Expand Up @@ -828,7 +828,8 @@ ErrorType OnRPCDone::handle_response(const std::string& remote_side) {
if (_op_type != pb::OP_SELECT && _op_type != pb::OP_SELECT_FOR_UPDATE && _op_type != pb::OP_ROLLBACK) {
_fetcher_store->affected_rows += _response.affected_rows();
_client_conn->txn_affected_rows += _response.affected_rows();
if (_client_conn->txn_affected_rows > FLAGS_max_affected_rows) {
// 事务限制affected_rows,非事务限制会导致部分成功
if (_client_conn->txn_affected_rows > FLAGS_max_affected_rows && _state->txn_id != 0) {
DB_DONE(FATAL, "_affected_row:%ld > %ld FLAGS_max_affected_rows",
_client_conn->txn_affected_rows.load(), FLAGS_max_affected_rows);
return E_BIG_SQL;
Expand Down Expand Up @@ -1025,39 +1026,37 @@ ErrorType FetcherStore::process_binlog_start(RuntimeState* state, pb::OpType op_
if (need_process_binlog(state, op_type)) {
auto binlog_ctx = client_conn->get_binlog_ctx();
uint64_t log_id = state->log_id();
if (op_type == pb::OP_PREPARE || binlog_prepare_success) {
binlog_cond.increase();
auto write_binlog_func = [this, state, binlog_ctx, op_type, log_id]() {
ON_SCOPE_EXIT([this]() {
binlog_cond.decrease_signal();
});
if (op_type == pb::OP_PREPARE) {
int64_t timestamp = TsoFetcher::get_instance()->get_tso(binlog_ctx->tso_count());
if (timestamp < 0) {
DB_WARNING("get tso failed log_id: %lu txn_id:%lu op_type:%s", log_id, state->txn_id,
pb::OpType_Name(op_type).c_str());
error = E_FATAL;
return;
}
write_binlog_param.txn_id = state->txn_id;
write_binlog_param.log_id = log_id;
write_binlog_param.primary_region_id = client_conn->primary_region_id;
write_binlog_param.global_conn_id = client_conn->get_global_conn_id();
write_binlog_param.username = client_conn->user_info->username;
write_binlog_param.ip = client_conn->ip;
write_binlog_param.client_conn = client_conn;
write_binlog_param.fetcher_store = this;
binlog_ctx->set_start_ts(timestamp);
}
write_binlog_param.op_type = op_type;
auto ret = binlog_ctx->write_binlog(&write_binlog_param);
if (ret != E_OK) {
binlog_cond.increase();
auto write_binlog_func = [this, state, binlog_ctx, op_type, log_id]() {
ON_SCOPE_EXIT([this]() {
binlog_cond.decrease_signal();
});
if (op_type == pb::OP_PREPARE) {
int64_t timestamp = TsoFetcher::get_instance()->get_tso(binlog_ctx->tso_count());
if (timestamp < 0) {
DB_WARNING("get tso failed log_id: %lu txn_id:%lu op_type:%s", log_id, state->txn_id,
pb::OpType_Name(op_type).c_str());
error = E_FATAL;
return;
}
};
Bthread bth(&BTHREAD_ATTR_SMALL);
bth.run(write_binlog_func);
}
write_binlog_param.txn_id = state->txn_id;
write_binlog_param.log_id = log_id;
write_binlog_param.primary_region_id = client_conn->primary_region_id;
write_binlog_param.global_conn_id = client_conn->get_global_conn_id();
write_binlog_param.username = client_conn->user_info->username;
write_binlog_param.ip = client_conn->ip;
write_binlog_param.client_conn = client_conn;
write_binlog_param.fetcher_store = this;
binlog_ctx->set_start_ts(timestamp);
}
write_binlog_param.op_type = op_type;
auto ret = binlog_ctx->write_binlog(&write_binlog_param);
if (ret != E_OK) {
error = E_FATAL;
}
};
Bthread bth(&BTHREAD_ATTR_SMALL);
bth.run(write_binlog_func);
return E_OK;
}
return E_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/expr/fn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void FunctionManager::register_operators() {
register_object_ret("current_time", current_time, pb::TIME);
register_object_ret("current_timestamp", current_timestamp, pb::TIMESTAMP);
register_object_ret("timestamp", timestamp, pb::TIMESTAMP);
register_object_ret("date", date, pb::DATE);
register_object_ret("hour", hour, pb::UINT32);
register_object_ret("day", day, pb::UINT32);
register_object_ret("dayname", dayname, pb::STRING);
register_object_ret("dayofweek", dayofweek, pb::UINT32);
Expand Down
27 changes: 27 additions & 0 deletions src/expr/internal_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,33 @@ ExprValue curtime(const std::vector<ExprValue>& input) {
ExprValue current_time(const std::vector<ExprValue>& input) {
return curtime(input);
}
ExprValue date(const std::vector<ExprValue>& input) {
if (input.size() == 0 || input[0].is_null()) {
return ExprValue::Null();
}
ExprValue in = input[0];
if (in.type == pb::INT64) {
in.cast_to(pb::STRING);
}
ExprValue tmp(pb::DATE);
uint64_t dt = in.cast_to(pb::DATETIME)._u.uint64_val;
tmp._u.uint32_val = datetime_to_date(dt);
return tmp;
}
ExprValue hour(const std::vector<ExprValue>& input) {
// Mysql最大值为838,BaikalDB最大值为1023
if (input.size() == 0 || input[0].is_null()) {
return ExprValue::Null();
}
ExprValue in = input[0];
time_t t = in.cast_to(pb::TIME)._u.int32_val;
if (t < 0) {
t = -t;
}
ExprValue tmp(pb::UINT32);
tmp._u.uint32_val = (t >> 12) & 0x3FF;
return tmp;
}
ExprValue day(const std::vector<ExprValue>& input) {
if (input.size() == 0 || input[0].is_null()) {
return ExprValue::Null();
Expand Down
5 changes: 3 additions & 2 deletions src/session/binlog_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
req.set_region_id(region_id);
req.set_region_version(info.version());
int retry_times = 0;
bool binlog_prepare_success = false;
do {
brpc::Channel channel;
brpc::Controller cntl;
Expand Down Expand Up @@ -603,7 +604,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
return -1;
} else {
// success
param->fetcher_store->binlog_prepare_success = true;
binlog_prepare_success = true;
break;
}
} while (retry_times < 5);
Expand All @@ -612,7 +613,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
DB_WARNING("write binlog region_id:%ld log_id:%lu txn_id:%ld cost time:%ld op_type:%s ip:%s",
region_id, param->log_id, param->txn_id, query_cost, pb::OpType_Name(param->op_type).c_str(), info.leader().c_str());
}
if (param->fetcher_store->binlog_prepare_success) {
if (binlog_prepare_success) {
if (param->op_type == pb::OP_PREPARE) {
partition_binlog_ptr->binlog_prewrite_time.reset();
} else if (param->op_type == pb::OP_COMMIT) {
Expand Down
5 changes: 3 additions & 2 deletions src/store/region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ void Region::exec_in_txn_query(google::protobuf::RpcController* controller,
remote_side, _region_id, txn_id, pb::OpType_Name(op_type).c_str(), last_seq, seq_id, log_id);
// OP_SELECT_FOR_UPDATE强制KV模式保证一致
// 不走幂等,重新执行一遍,这样db那边能拿到数据
if (op_type != pb::OP_SELECT_FOR_UPDATE) {
if (op_type != pb::OP_SELECT_FOR_UPDATE && op_type != pb::OP_SELECT) {
txn->load_last_response(*response);
response->set_affected_rows(txn->dml_num_affected_rows);
response->set_errcode(txn->err_code);
Expand Down Expand Up @@ -2827,7 +2827,8 @@ int Region::select(const pb::StoreReq& request,
}
txn->push_cmd_to_cache(seq_id, plan_item);
//DB_WARNING("put txn cmd to cache: region_id: %ld, txn_id: %lu:%d", _region_id, txn_info.txn_id(), seq_id);
txn->save_last_response(response);
// OP_SELECT_FOR_UPDATE幂等重新执行一次,不保存response
//txn->save_last_response(response);
}

//DB_NOTICE("select rows:%d", rows);
Expand Down
12 changes: 9 additions & 3 deletions src/store/region_binlog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,15 @@ int Region::binlog_update_map_when_apply(const std::map<std::string, ExprValue>&

auto iter = _binlog_param.ts_binlog_map.find(start_ts);
if (iter == _binlog_param.ts_binlog_map.end()) {
DB_FATAL("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
remote_side.c_str());
if (type == COMMIT_BINLOG) {
DB_FATAL("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
remote_side.c_str());
} else {
DB_WARNING("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
remote_side.c_str());
}
return 0;
} else {
bool repeated_commit = false;
Expand Down
Loading

0 comments on commit 84ef9c2

Please sign in to comment.