Skip to content

Commit

Permalink
Merge branch '1.2-dev' into tn_dedup_metrics_to_1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jun 17, 2024
2 parents 0fea4b6 + 6cfdcaf commit 4a5092c
Show file tree
Hide file tree
Showing 28 changed files with 1,670 additions and 133 deletions.
5 changes: 2 additions & 3 deletions pkg/fileservice/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

var (
connectTimeout = time.Second * 3
readWriteTimeout = time.Second * 2
connectTimeout = time.Second * 5
readWriteTimeout = time.Second * 20
maxIdleConns = 100
maxIdleConnsPerHost = 100
maxConnsPerHost = 1000
Expand Down Expand Up @@ -60,7 +60,6 @@ func newHTTPClient(args ObjectStorageArguments) *http.Client {
MaxConnsPerHost: maxConnsPerHost,
TLSHandshakeTimeout: connectTimeout,
ResponseHeaderTimeout: readWriteTimeout,
ExpectContinueTimeout: readWriteTimeout,
ForceAttemptHTTP2: true,
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/sql/util"
"github.com/matrixorigin/matrixone/pkg/util/metric/mometric"
"github.com/matrixorigin/matrixone/pkg/util/sysview"
"github.com/matrixorigin/matrixone/pkg/util/trace"
Expand Down Expand Up @@ -1495,6 +1496,14 @@ var (
}
)

func init() {
tables := make([]string, 0)
for tbl := range predefinedTables {
tables = append(tables, tbl)
}
util.InitPredefinedTables(tables)
}

func getSqlForAccountIdAndStatus(ctx context.Context, accName string, check bool) (string, error) {
if check && accountNameIsInvalid(accName) {
return "", moerr.NewInternalError(ctx, fmt.Sprintf("account name %s is invalid", accName))
Expand Down
25 changes: 9 additions & 16 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,7 +2833,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,
case COM_QUIT:
return resp, moerr.GetMysqlClientQuit()
case COM_QUERY:
var query = string(req.GetData().([]byte))
var query = util.UnsafeBytesToString(req.GetData().([]byte))
ses.addSqlCount(1)
ses.Debug(execCtx.reqCtx, "query trace", logutil.QueryField(SubStringFromBegin(query, int(getGlobalPu().SV.LengthOfQueryPrinted))))
err = doComQuery(ses, execCtx, &UserInput{sql: query})
Expand All @@ -2842,7 +2842,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,
}
return resp, nil
case COM_INIT_DB:
var dbname = string(req.GetData().([]byte))
var dbname = util.UnsafeBytesToString(req.GetData().([]byte))
ses.addSqlCount(1)
query := "use `" + dbname + "`"
err = doComQuery(ses, execCtx, &UserInput{sql: query})
Expand All @@ -2852,7 +2852,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,

return resp, nil
case COM_FIELD_LIST:
var payload = string(req.GetData().([]byte))
var payload = util.UnsafeBytesToString(req.GetData().([]byte))
ses.addSqlCount(1)
query := makeCmdFieldListSql(payload)
err = doComQuery(ses, execCtx, &UserInput{sql: query})
Expand All @@ -2868,7 +2868,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,

case COM_STMT_PREPARE:
ses.SetCmd(COM_STMT_PREPARE)
sql = string(req.GetData().([]byte))
sql = util.UnsafeBytesToString(req.GetData().([]byte))
ses.addSqlCount(1)

// rewrite to "Prepare stmt_name from 'xxx'"
Expand All @@ -2885,9 +2885,8 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,

case COM_STMT_EXECUTE:
ses.SetCmd(COM_STMT_EXECUTE)
data := req.GetData().([]byte)
var prepareStmt *PrepareStmt
sql, prepareStmt, err = parseStmtExecute(execCtx.reqCtx, ses, data)
sql, prepareStmt, err = parseStmtExecute(execCtx.reqCtx, ses, req.GetData().([]byte))
if err != nil {
return NewGeneralErrorResponse(COM_STMT_EXECUTE, ses.GetTxnHandler().GetServerStatus(), err), nil
}
Expand All @@ -2905,19 +2904,16 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,

case COM_STMT_SEND_LONG_DATA:
ses.SetCmd(COM_STMT_SEND_LONG_DATA)
data := req.GetData().([]byte)
err = parseStmtSendLongData(execCtx.reqCtx, ses, data)
err = parseStmtSendLongData(execCtx.reqCtx, ses, req.GetData().([]byte))
if err != nil {
resp = NewGeneralErrorResponse(COM_STMT_SEND_LONG_DATA, ses.GetTxnHandler().GetServerStatus(), err)
return resp, nil
}
return nil, nil

case COM_STMT_CLOSE:
data := req.GetData().([]byte)

// rewrite to "deallocate Prepare stmt_name"
stmtID := binary.LittleEndian.Uint32(data[0:4])
stmtID := binary.LittleEndian.Uint32(req.GetData().([]byte)[0:4])
stmtName := getPrepareStmtName(stmtID)
sql = fmt.Sprintf("deallocate prepare %s", stmtName)
ses.Debug(execCtx.reqCtx, "query trace", logutil.QueryField(sql))
Expand All @@ -2929,10 +2925,8 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,
return resp, nil

case COM_STMT_RESET:
data := req.GetData().([]byte)

//Payload of COM_STMT_RESET
stmtID := binary.LittleEndian.Uint32(data[0:4])
stmtID := binary.LittleEndian.Uint32(req.GetData().([]byte)[0:4])
stmtName := getPrepareStmtName(stmtID)
sql = fmt.Sprintf("reset prepare %s", stmtName)
ses.Debug(execCtx.reqCtx, "query trace", logutil.QueryField(sql))
Expand All @@ -2943,8 +2937,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,
return resp, nil

case COM_SET_OPTION:
data := req.GetData().([]byte)
err := handleSetOption(ses, execCtx, data)
err = handleSetOption(ses, execCtx, req.GetData().([]byte))
if err != nil {
resp = NewGeneralErrorResponse(COM_SET_OPTION, ses.GetTxnHandler().GetServerStatus(), err)
}
Expand Down
25 changes: 24 additions & 1 deletion pkg/frontend/routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ func (rm *RoutineManager) Handler(rs goetty.IOSession, msg interface{}, received

length := packet.Length
payload := packet.Payload
payloads := make([][]byte, 1)
payloads[0] = payload
for uint32(length) == MaxPayloadSize {
msg, err = protocol.Read(goetty.ReadOptions{})
if err != nil {
Expand All @@ -414,10 +416,31 @@ func (rm *RoutineManager) Handler(rs goetty.IOSession, msg interface{}, received

protocol.SetU8(SEQUENCEID, uint8(packet.SequenceID+1))
seq = protocol.GetU8(SEQUENCEID)
payload = append(payload, packet.Payload...)
payloads = append(payloads, packet.Payload)
length = packet.Length
}

//combine these payloads
if len(payloads) > 1 {
tLen := 0
for _, bytes := range payloads {
tLen += len(bytes)
}
alloc := getGlobalSessionAlloc()
var temp []byte
temp, err = alloc.Alloc(tLen)
defer alloc.Free(temp)
if err != nil {
return err
}
offset := 0
for _, bytes := range payloads {
copy(temp[offset:offset+len(bytes)], bytes)
offset += len(bytes)
}
payload = temp
}

// finish handshake process
if !protocol.GetBool(ESTABLISHED) {
tempCtx, tempCancel := context.WithTimeout(ctx, getGlobalPu().SV.SessionTimeout.Duration)
Expand Down
12 changes: 11 additions & 1 deletion pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ var globalRtMgr atomic.Value
var globalPu atomic.Value
var globalAicm atomic.Value
var moServerStarted atomic.Bool
var globalSessionAlloc atomic.Value

func getGlobalSessionAlloc() *SessionAllocator {
return globalSessionAlloc.Load().(*SessionAllocator)
}

func setGlobalSessionAlloc(s *SessionAllocator) {
globalSessionAlloc.Store(s)
}

func setGlobalRtMgr(rtMgr *RoutineManager) {
globalRtMgr.Store(rtMgr)
Expand Down Expand Up @@ -132,6 +141,7 @@ func NewMOServer(
) *MOServer {
setGlobalPu(pu)
setGlobalAicm(aicm)
setGlobalSessionAlloc(NewSessionAllocator(pu))
codec := NewSqlCodec()
rm, err := NewRoutineManager(ctx)
if err != nil {
Expand Down Expand Up @@ -163,7 +173,7 @@ func NewMOServer(
goetty.WithSessionCodec(codec),
goetty.WithSessionLogger(logutil.GetGlobalLogger()),
goetty.WithSessionRWBUfferSize(DefaultRpcBufferSize, DefaultRpcBufferSize),
goetty.WithSessionAllocator(NewSessionAllocator(pu))),
goetty.WithSessionAllocator(getGlobalSessionAlloc())),
goetty.WithAppSessionAware(rm),
//when the readTimeout expires the goetty will close the tcp connection.
goetty.WithReadTimeout(pu.SV.SessionTimeout.Duration))
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/colexec/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,10 +873,38 @@ func getBatchData(param *ExternalParam, plh *ParseLineHandler, proc *process.Pro
if param.ClusterTable != nil && param.ClusterTable.GetIsClusterTable() {
//the column account_id of the cluster table do need to be filled here
if len(line)+1 < getRealAttrCnt(param.Attrs, param.Cols) {
logutil.Errorf("load %s failed", param.TblName)
logutil.Errorf("extern values is %s\n", param.CreateSql)
for i, c := range param.Cols {
if !c.Hidden && i < len(line) {
str := fmt.Sprintf("col name is %s and ", c.Name)
if line[i].IsNull {
str += "line value is null\n"
logutil.Error(str)
} else {
str += fmt.Sprintf("line value is %s\n", line[i].Val)
logutil.Error(str)
}
}
}
return nil, moerr.NewInternalError(proc.Ctx, ColumnCntLargerErrorInfo)
}
} else {
if !param.Extern.SysTable && len(line) < getRealAttrCnt(param.Attrs, param.Cols) {
logutil.Errorf("load %s failed", param.TblName)
logutil.Errorf("extern values is %s\n", param.CreateSql)
for i, c := range param.Cols {
if !c.Hidden && i < len(line) {
str := fmt.Sprintf("col name is %s and ", c.Name)
if line[i].IsNull {
str += "line value is null\n"
logutil.Error(str)
} else {
str += fmt.Sprintf("line value is %s\n", line[i].Val)
logutil.Error(str)
}
}
}
return nil, moerr.NewInternalError(proc.Ctx, ColumnCntLargerErrorInfo)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/external/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type ExParamConst struct {
Extern *tree.ExternParam
tableDef *plan.TableDef
ClusterTable *plan.ClusterTable
TblName string // for debug moc3421
}

type ExParam struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ func constructExternal(n *plan.Node, param *tree.ExternParam, ctx context.Contex
FileList: fileList,
FileSize: FileSize,
ClusterTable: n.GetClusterTable(),
TblName: n.TableDef.Name,
},
ExParam: external.ExParam{
Fileparam: new(external.ExFileparam),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/parsers/dialect/mysql/mysql_sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/parsers/dialect/mysql/mysql_sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ parallel_opt:
}
strict_opt:
{
$$ = false
$$ = true
}
| STRICT STRING
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/parsers/dialect/mysql/mysql_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ var (
output: "load data infile /root/lineorder_flat_10.tbl into table lineorder_flat fields terminated by '' optionally enclosed by '' lines terminated by ''",
}, {
input: "load data infile '/root/lineorder_flat_10.tbl' into table lineorder_flat FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' LINES TERMINATED BY '' parallel 'true';",
output: "load data infile /root/lineorder_flat_10.tbl into table lineorder_flat fields terminated by '' optionally enclosed by '' lines terminated by '' parallel true ",
output: "load data infile /root/lineorder_flat_10.tbl into table lineorder_flat fields terminated by '' optionally enclosed by '' lines terminated by '' parallel true strict true ",
}, {
input: "load data infile '/root/lineorder_flat_10.tbl' into table lineorder_flat FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' LINES TERMINATED BY '' parallel 'true' strict 'true';",
output: "load data infile /root/lineorder_flat_10.tbl into table lineorder_flat fields terminated by '' optionally enclosed by '' lines terminated by '' parallel true strict true ",
Expand Down
65 changes: 23 additions & 42 deletions pkg/sql/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

var (
//every account have these tables.
predefinedTables []string
specialTables = map[string]struct{}{
catalog.MO_DATABASE: {},
catalog.MO_TABLES: {},
catalog.MO_COLUMNS: {},
}
)

func InitPredefinedTables(tables []string) {
predefinedTables = tables
}

func CopyBatch(bat *batch.Batch, proc *process.Process) (*batch.Batch, error) {
rbat := batch.NewWithSize(len(bat.Vecs))
rbat.Attrs = append(rbat.Attrs, bat.Attrs...)
Expand Down Expand Up @@ -200,48 +214,15 @@ func BuildMoColumnsFilter(curAccountId uint64) tree.Expr {
// datname in ('mo_catalog')
inExpr := tree.NewComparisonExpr(tree.IN, att_dblnameColName, inValues)

mo_userConst := tree.NewNumValWithType(constant.MakeString("mo_user"), "mo_user", false, tree.P_char)
mo_roleConst := tree.NewNumValWithType(constant.MakeString("mo_role"), "mo_role", false, tree.P_char)
mo_user_grantConst := tree.NewNumValWithType(constant.MakeString("mo_user_grant"), "mo_user_grant", false, tree.P_char)
mo_role_grantConst := tree.NewNumValWithType(constant.MakeString("mo_role_grant"), "mo_role_grant", false, tree.P_char)
mo_role_privsConst := tree.NewNumValWithType(constant.MakeString("mo_role_privs"), "mo_role_privs", false, tree.P_char)
mo_user_defined_functionConst := tree.NewNumValWithType(constant.MakeString("mo_user_defined_function"), "mo_user_defined_function", false, tree.P_char)
mo_mysql_compatibility_modeConst := tree.NewNumValWithType(constant.MakeString("mo_mysql_compatibility_mode"), "mo_mysql_compatibility_mode", false, tree.P_char)
mo_indexes := tree.NewNumValWithType(constant.MakeString("mo_indexes"), "mo_indexes", false, tree.P_char)
mo_table_partitions := tree.NewNumValWithType(constant.MakeString("mo_table_partitions"), "mo_table_partitions", false, tree.P_char)
mo_pubs := tree.NewNumValWithType(constant.MakeString("mo_pubs"), "mo_pubs", false, tree.P_char)
mo_stored_procedure := tree.NewNumValWithType(constant.MakeString("mo_stored_procedure"), "mo_stored_procedure", false, tree.P_char)
mo_stages := tree.NewNumValWithType(constant.MakeString("mo_stages"), "mo_stages", false, tree.P_char)
mo_snapshots := tree.NewNumValWithType(constant.MakeString("mo_snapshots"), "mo_snapshots", false, tree.P_char)

mo_locks := tree.NewNumValWithType(constant.MakeString("mo_locks"), "mo_locks", false, tree.P_char)
mo_variables := tree.NewNumValWithType(constant.MakeString("mo_variables"), "mo_variables", false, tree.P_char)
mo_transactions := tree.NewNumValWithType(constant.MakeString("mo_transactions"), "mo_transactions", false, tree.P_char)
mo_cache := tree.NewNumValWithType(constant.MakeString("mo_cache"), "mo_cache", false, tree.P_char)
mo_sessions := tree.NewNumValWithType(constant.MakeString("mo_sessions"), "mo_sessions", false, tree.P_char)
mo_configurations := tree.NewNumValWithType(constant.MakeString("mo_configurations"), "mo_configurations", false, tree.P_char)

notInValues := tree.NewTuple(tree.Exprs{
mo_userConst,
mo_roleConst,
mo_user_grantConst,
mo_role_grantConst,
mo_role_privsConst,
mo_user_defined_functionConst,
mo_mysql_compatibility_modeConst,
mo_indexes,
mo_table_partitions,
mo_pubs,
mo_stored_procedure,
mo_stages,
mo_snapshots,
mo_locks,
mo_variables,
mo_transactions,
mo_cache,
mo_sessions,
mo_configurations,
})
exprs := make([]tree.Expr, 0)
for _, table := range predefinedTables {
if _, ok := specialTables[table]; ok {
continue
}
exprs = append(exprs, tree.NewNumValWithType(constant.MakeString(table), table, false, tree.P_char))
}

notInValues := tree.NewTuple(exprs)

notInexpr := tree.NewComparisonExpr(tree.NOT_IN, att_relnameColName, notInValues)

Expand Down
Loading

0 comments on commit 4a5092c

Please sign in to comment.