Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save complie to prepare in session #17088

Merged
6 changes: 6 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ const (
ErrDuplicateKeyName uint16 = 20470
ErrFKNoReferencedRow2 uint16 = 20471
ErrBlobCantHaveDefault uint16 = 20472
ErrCantCompileForPrepare uint16 = 20473

// Group 5: rpc timeout
// ErrRPCTimeout rpc timeout
Expand Down Expand Up @@ -450,6 +451,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrRetryForCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "retry for CN rolling restart"},
ErrNewTxnInCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "new txn in CN rolling restart"},
ErrPrevCheckpointNotFinished: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "prev checkpoint not finished"},
ErrCantCompileForPrepare: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not compile for prepare"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
Expand Down Expand Up @@ -1403,6 +1405,10 @@ func NewErrBlobCantHaveDefault(ctx context.Context, arg any) *Error {
return newError(ctx, ErrBlobCantHaveDefault, arg)
}

func NewCantCompileForPrepare(ctx context.Context) *Error {
return newError(ctx, ErrCantCompileForPrepare)
}

var contextFunc atomic.Value

func SetContextFunc(f func() context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,7 @@ func NewReplicaNotFound(replica string) *Error {
func NewReplicaNotMatch(current, received string) *Error {
return newError(Context(), ErrReplicaNotMatch, current, received)
}

func NewCantCompileForPrepareNoCtx() *Error {
return newError(Context(), ErrCantCompileForPrepare)
}
2 changes: 1 addition & 1 deletion pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (tcc *TxnCompilerContext) SetSnapshot(snapshot *plan2.Snapshot) {
}

func (tcc *TxnCompilerContext) ReplacePlan(execPlan *plan.Execute) (*plan.Plan, tree.Statement, error) {
p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan)
_, p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan)
return p, st, err
}

Expand Down
212 changes: 123 additions & 89 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (cwft *TxnComputationWrapper) GetColumns(ctx context.Context) ([]interface{
return columns, err
}

func (cwft *TxnComputationWrapper) GetClock() clock.Clock {
func GetClock() clock.Clock {
rt := runtime.ProcessLevelRuntime()
return rt.Clock()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro

if _, ok := cwft.stmt.(*tree.Execute); ok {
executePlan := cwft.plan.GetDcl().GetExecute()
plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan)
retComp, plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan)
if err != nil {
return nil, err
}
Expand All @@ -243,94 +243,33 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro
return nil, nil
}

if retComp == nil {
cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, cwft.ses.GetSql(), cwft.stmt, cwft.plan, fill, false)
if err != nil {
return nil, err
}
cwft.compile.SetOriginSQL(originSQL)
} else {
// retComp
cwft.proc.Ctx = execCtx.reqCtx
retComp.Reset(cwft.proc, getStatementStartAt(execCtx.reqCtx), fill, cwft.ses.GetSql())
cwft.compile = retComp
}

//check privilege
/* prepare not need check privilege
err = authenticateUserCanExecutePrepareOrExecute(requestCtx, cwft.ses, prepareStmt.PrepareStmt, newPlan)
if err != nil {
return nil, err
}
*/
}

addr := ""
if len(getGlobalPu().ClusterNodes) > 0 {
addr = getGlobalPu().ClusterNodes[0].Addr
}
cwft.proc.Ctx = execCtx.reqCtx
cwft.proc.FileService = getGlobalPu().FileService

var tenant string
tInfo := cwft.ses.GetTenantInfo()
if tInfo != nil {
tenant = tInfo.GetTenant()
}

stats := statistic.StatsInfoFromContext(execCtx.reqCtx)
stats.CompileStart()
defer stats.CompileEnd()
cwft.compile = compile.NewCompile(
addr,
cwft.ses.GetDatabaseName(),
cwft.ses.GetSql(),
tenant,
cwft.ses.GetUserName(),
execCtx.reqCtx,
cwft.ses.GetTxnHandler().GetStorage(),
cwft.proc,
cwft.stmt,
cwft.ses.GetIsInternal(),
deepcopy.Copy(cwft.ses.getCNLabels()).(map[string]string),
getStatementStartAt(execCtx.reqCtx),
)
defer func() {
if err != nil {
cwft.compile.Release()
}
}()
cwft.compile.SetBuildPlanFunc(func() (*plan2.Plan, error) {
plan, err := buildPlan(execCtx.reqCtx, cwft.ses, cwft.ses.GetTxnCompileCtx(), cwft.stmt)
} else {
cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, execCtx.sqlOfStmt, cwft.stmt, cwft.plan, fill, false)
if err != nil {
return nil, err
}
if plan.IsPrepare {
_, _, err = plan2.ResetPreparePlan(cwft.ses.GetTxnCompileCtx(), plan)
}
return plan, err
})

if _, ok := cwft.stmt.(*tree.ExplainAnalyze); ok {
fill = func(bat *batch.Batch) error { return nil }
}
err = cwft.compile.Compile(execCtx.reqCtx, cwft.plan, fill)
if err != nil {
return nil, err
}
// check if it is necessary to initialize the temporary engine
if !cwft.ses.GetTxnHandler().HasTempEngine() && cwft.compile.NeedInitTempEngine() {
// 0. init memory-non-dist storage
err = cwft.ses.GetTxnHandler().CreateTempStorage(cwft.GetClock())
if err != nil {
return nil, err
}

// temporary storage is passed through Ctx
updateTempStorageInCtx(execCtx, cwft.proc, cwft.ses.GetTxnHandler().GetTempStorage())

// 1. init memory-non-dist engine
cwft.ses.GetTxnHandler().CreateTempEngine()
tempEngine := cwft.ses.GetTxnHandler().GetTempEngine()

// 2. bind the temporary engine to the session and txnHandler
cwft.compile.SetTempEngine(tempEngine, cwft.ses.GetTxnHandler().GetTempStorage())

// 3. init temp-db to store temporary relations
txnOp2 := cwft.ses.GetTxnHandler().GetTxn()
err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2)
if err != nil {
return nil, err
}
}
cwft.compile.SetOriginSQL(originSQL)
return cwft.compile, err
}

Expand Down Expand Up @@ -387,12 +326,12 @@ func getStatementStartAt(ctx context.Context) time.Time {

// replacePlan replaces the plan of the EXECUTE by the plan generated by
// the PREPARE and setups the params for the plan.
func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*plan.Plan, tree.Statement, string, error) {
func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*compile.Compile, *plan.Plan, tree.Statement, string, error) {
originSQL := ""
stmtName := execPlan.GetName()
prepareStmt, err := ses.GetPrepareStmt(reqCtx, stmtName)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
if txnTrace.GetService().Enabled(txnTrace.FeatureTraceTxn) {
originSQL = tree.String(prepareStmt.PrepareStmt, dialect.MYSQL)
Expand All @@ -403,10 +342,10 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp
for _, obj := range preparePlan.GetSchemas() {
newObj, newTableDef := ses.txnCompileCtx.Resolve(obj.SchemaName, obj.ObjName, plan2.Snapshot{TS: &timestamp.Timestamp{}})
if newObj == nil {
return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName)
return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName)
}
if newObj.Obj != obj.Obj || newTableDef.Version != uint32(obj.Server) {
return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName)
return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName)
}
}

Expand All @@ -420,36 +359,131 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp
numParams := len(preparePlan.ParamTypes)
if prepareStmt.params != nil && prepareStmt.params.Length() > 0 { // use binary protocol
if prepareStmt.params.Length() != numParams {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
cwft.proc.SetPrepareParams(prepareStmt.params)
} else if len(execPlan.Args) > 0 {
if len(execPlan.Args) != numParams {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
params := cwft.proc.GetVector(types.T_text.ToType())
paramVals := make([]any, numParams)
for i, arg := range execPlan.Args {
exprImpl := arg.Expr.(*plan.Expr_V)
param, err := cwft.proc.GetResolveVariableFunc()(exprImpl.V.Name, exprImpl.V.System, exprImpl.V.Global)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
if param == nil {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
err = util.AppendAnyToStringVector(cwft.proc, param, params)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
paramVals[i] = param
}
cwft.proc.SetPrepareParams(params)
cwft.paramVals = paramVals
} else {
if numParams > 0 {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
}
return prepareStmt.compile, preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil
}

func createCompile(
execCtx *ExecCtx,
ses FeSession,
proc *process.Process,
originSQL string,
stmt tree.Statement,
plan *plan2.Plan,
fill func(*batch.Batch) error,
isPrepare bool,
) (retCompile *compile.Compile, err error) {

addr := ""
if len(getGlobalPu().ClusterNodes) > 0 {
addr = getGlobalPu().ClusterNodes[0].Addr
}
proc.Ctx = execCtx.reqCtx
proc.FileService = getGlobalPu().FileService

var tenant string
tInfo := ses.GetTenantInfo()
if tInfo != nil {
tenant = tInfo.GetTenant()
}

stats := statistic.StatsInfoFromContext(execCtx.reqCtx)
stats.CompileStart()
defer stats.CompileEnd()
defer func() {
if err != nil && retCompile != nil {
retCompile.Release()
retCompile = nil
}
}()
retCompile = compile.NewCompile(
addr,
ses.GetDatabaseName(),
ses.GetSql(),
tenant,
ses.GetUserName(),
execCtx.reqCtx,
ses.GetTxnHandler().GetStorage(),
proc,
stmt,
ses.GetIsInternal(),
deepcopy.Copy(ses.getCNLabels()).(map[string]string),
getStatementStartAt(execCtx.reqCtx),
)
retCompile.SetIsPrepare(isPrepare)
retCompile.SetBuildPlanFunc(func() (*plan2.Plan, error) {
plan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), stmt)
if err != nil {
return nil, err
}
if plan.IsPrepare {
_, _, err = plan2.ResetPreparePlan(ses.GetTxnCompileCtx(), plan)
}
return plan, err
})

if _, ok := stmt.(*tree.ExplainAnalyze); ok {
fill = func(bat *batch.Batch) error { return nil }
}
err = retCompile.Compile(execCtx.reqCtx, plan, fill)
if err != nil {
return
}
// check if it is necessary to initialize the temporary engine
if !ses.GetTxnHandler().HasTempEngine() && retCompile.NeedInitTempEngine() {
// 0. init memory-non-dist storage
err = ses.GetTxnHandler().CreateTempStorage(GetClock())
if err != nil {
return
}

// temporary storage is passed through Ctx
updateTempStorageInCtx(execCtx, proc, ses.GetTxnHandler().GetTempStorage())

// 1. init memory-non-dist engine
ses.GetTxnHandler().CreateTempEngine()
tempEngine := ses.GetTxnHandler().GetTempEngine()

// 2. bind the temporary engine to the session and txnHandler
retCompile.SetTempEngine(tempEngine, ses.GetTxnHandler().GetTempStorage())

// 3. init temp-db to store temporary relations
txnOp2 := ses.GetTxnHandler().GetTxn()
err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2)
if err != nil {
return
}
}
return preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil
retCompile.SetOriginSQL(originSQL)
return
}
Loading
Loading