Skip to content

Commit

Permalink
Save complie to prepare in session (#17088)
Browse files Browse the repository at this point in the history
Save complie to prepare in session ( but not used to run for now.  will used it next PR)

Approved by: @daviszhen, @m-schen, @heni02, @zhangxu19830126, @aunjgr
  • Loading branch information
ouyuanning authored Jun 24, 2024
1 parent 9f1c2df commit 4bb2193
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 122 deletions.
6 changes: 6 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ const (
ErrDuplicateKeyName uint16 = 20470
ErrFKNoReferencedRow2 uint16 = 20471
ErrBlobCantHaveDefault uint16 = 20472
ErrCantCompileForPrepare uint16 = 20473

// Group 5: rpc timeout
// ErrRPCTimeout rpc timeout
Expand Down Expand Up @@ -450,6 +451,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrRetryForCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "retry for CN rolling restart"},
ErrNewTxnInCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "new txn in CN rolling restart"},
ErrPrevCheckpointNotFinished: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "prev checkpoint not finished"},
ErrCantCompileForPrepare: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not compile for prepare"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
Expand Down Expand Up @@ -1403,6 +1405,10 @@ func NewErrBlobCantHaveDefault(ctx context.Context, arg any) *Error {
return newError(ctx, ErrBlobCantHaveDefault, arg)
}

func NewCantCompileForPrepare(ctx context.Context) *Error {
return newError(ctx, ErrCantCompileForPrepare)
}

var contextFunc atomic.Value

func SetContextFunc(f func() context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,7 @@ func NewReplicaNotFound(replica string) *Error {
func NewReplicaNotMatch(current, received string) *Error {
return newError(Context(), ErrReplicaNotMatch, current, received)
}

func NewCantCompileForPrepareNoCtx() *Error {
return newError(Context(), ErrCantCompileForPrepare)
}
2 changes: 1 addition & 1 deletion pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (tcc *TxnCompilerContext) SetSnapshot(snapshot *plan2.Snapshot) {
}

func (tcc *TxnCompilerContext) ReplacePlan(execPlan *plan.Execute) (*plan.Plan, tree.Statement, error) {
p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan)
_, p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan)
return p, st, err
}

Expand Down
212 changes: 123 additions & 89 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (cwft *TxnComputationWrapper) GetColumns(ctx context.Context) ([]interface{
return columns, err
}

func (cwft *TxnComputationWrapper) GetClock() clock.Clock {
func GetClock() clock.Clock {
rt := runtime.ProcessLevelRuntime()
return rt.Clock()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro

if _, ok := cwft.stmt.(*tree.Execute); ok {
executePlan := cwft.plan.GetDcl().GetExecute()
plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan)
retComp, plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan)
if err != nil {
return nil, err
}
Expand All @@ -243,94 +243,33 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro
return nil, nil
}

if retComp == nil {
cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, cwft.ses.GetSql(), cwft.stmt, cwft.plan, fill, false)
if err != nil {
return nil, err
}
cwft.compile.SetOriginSQL(originSQL)
} else {
// retComp
cwft.proc.Ctx = execCtx.reqCtx
retComp.Reset(cwft.proc, getStatementStartAt(execCtx.reqCtx), fill, cwft.ses.GetSql())
cwft.compile = retComp
}

//check privilege
/* prepare not need check privilege
err = authenticateUserCanExecutePrepareOrExecute(requestCtx, cwft.ses, prepareStmt.PrepareStmt, newPlan)
if err != nil {
return nil, err
}
*/
}

addr := ""
if len(getGlobalPu().ClusterNodes) > 0 {
addr = getGlobalPu().ClusterNodes[0].Addr
}
cwft.proc.Ctx = execCtx.reqCtx
cwft.proc.FileService = getGlobalPu().FileService

var tenant string
tInfo := cwft.ses.GetTenantInfo()
if tInfo != nil {
tenant = tInfo.GetTenant()
}

stats := statistic.StatsInfoFromContext(execCtx.reqCtx)
stats.CompileStart()
defer stats.CompileEnd()
cwft.compile = compile.NewCompile(
addr,
cwft.ses.GetDatabaseName(),
cwft.ses.GetSql(),
tenant,
cwft.ses.GetUserName(),
execCtx.reqCtx,
cwft.ses.GetTxnHandler().GetStorage(),
cwft.proc,
cwft.stmt,
cwft.ses.GetIsInternal(),
deepcopy.Copy(cwft.ses.getCNLabels()).(map[string]string),
getStatementStartAt(execCtx.reqCtx),
)
defer func() {
if err != nil {
cwft.compile.Release()
}
}()
cwft.compile.SetBuildPlanFunc(func() (*plan2.Plan, error) {
plan, err := buildPlan(execCtx.reqCtx, cwft.ses, cwft.ses.GetTxnCompileCtx(), cwft.stmt)
} else {
cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, execCtx.sqlOfStmt, cwft.stmt, cwft.plan, fill, false)
if err != nil {
return nil, err
}
if plan.IsPrepare {
_, _, err = plan2.ResetPreparePlan(cwft.ses.GetTxnCompileCtx(), plan)
}
return plan, err
})

if _, ok := cwft.stmt.(*tree.ExplainAnalyze); ok {
fill = func(bat *batch.Batch) error { return nil }
}
err = cwft.compile.Compile(execCtx.reqCtx, cwft.plan, fill)
if err != nil {
return nil, err
}
// check if it is necessary to initialize the temporary engine
if !cwft.ses.GetTxnHandler().HasTempEngine() && cwft.compile.NeedInitTempEngine() {
// 0. init memory-non-dist storage
err = cwft.ses.GetTxnHandler().CreateTempStorage(cwft.GetClock())
if err != nil {
return nil, err
}

// temporary storage is passed through Ctx
updateTempStorageInCtx(execCtx, cwft.proc, cwft.ses.GetTxnHandler().GetTempStorage())

// 1. init memory-non-dist engine
cwft.ses.GetTxnHandler().CreateTempEngine()
tempEngine := cwft.ses.GetTxnHandler().GetTempEngine()

// 2. bind the temporary engine to the session and txnHandler
cwft.compile.SetTempEngine(tempEngine, cwft.ses.GetTxnHandler().GetTempStorage())

// 3. init temp-db to store temporary relations
txnOp2 := cwft.ses.GetTxnHandler().GetTxn()
err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2)
if err != nil {
return nil, err
}
}
cwft.compile.SetOriginSQL(originSQL)
return cwft.compile, err
}

Expand Down Expand Up @@ -387,12 +326,12 @@ func getStatementStartAt(ctx context.Context) time.Time {

// replacePlan replaces the plan of the EXECUTE by the plan generated by
// the PREPARE and setups the params for the plan.
func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*plan.Plan, tree.Statement, string, error) {
func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*compile.Compile, *plan.Plan, tree.Statement, string, error) {
originSQL := ""
stmtName := execPlan.GetName()
prepareStmt, err := ses.GetPrepareStmt(reqCtx, stmtName)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
if txnTrace.GetService().Enabled(txnTrace.FeatureTraceTxn) {
originSQL = tree.String(prepareStmt.PrepareStmt, dialect.MYSQL)
Expand All @@ -403,10 +342,10 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp
for _, obj := range preparePlan.GetSchemas() {
newObj, newTableDef := ses.txnCompileCtx.Resolve(obj.SchemaName, obj.ObjName, plan2.Snapshot{TS: &timestamp.Timestamp{}})
if newObj == nil {
return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName)
return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName)
}
if newObj.Obj != obj.Obj || newTableDef.Version != uint32(obj.Server) {
return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName)
return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName)
}
}

Expand All @@ -420,36 +359,131 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp
numParams := len(preparePlan.ParamTypes)
if prepareStmt.params != nil && prepareStmt.params.Length() > 0 { // use binary protocol
if prepareStmt.params.Length() != numParams {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
cwft.proc.SetPrepareParams(prepareStmt.params)
} else if len(execPlan.Args) > 0 {
if len(execPlan.Args) != numParams {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
params := cwft.proc.GetVector(types.T_text.ToType())
paramVals := make([]any, numParams)
for i, arg := range execPlan.Args {
exprImpl := arg.Expr.(*plan.Expr_V)
param, err := cwft.proc.GetResolveVariableFunc()(exprImpl.V.Name, exprImpl.V.System, exprImpl.V.Global)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
if param == nil {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
err = util.AppendAnyToStringVector(cwft.proc, param, params)
if err != nil {
return nil, nil, originSQL, err
return nil, nil, nil, originSQL, err
}
paramVals[i] = param
}
cwft.proc.SetPrepareParams(params)
cwft.paramVals = paramVals
} else {
if numParams > 0 {
return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE")
}
}
return prepareStmt.compile, preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil
}

func createCompile(
execCtx *ExecCtx,
ses FeSession,
proc *process.Process,
originSQL string,
stmt tree.Statement,
plan *plan2.Plan,
fill func(*batch.Batch) error,
isPrepare bool,
) (retCompile *compile.Compile, err error) {

addr := ""
if len(getGlobalPu().ClusterNodes) > 0 {
addr = getGlobalPu().ClusterNodes[0].Addr
}
proc.Ctx = execCtx.reqCtx
proc.FileService = getGlobalPu().FileService

var tenant string
tInfo := ses.GetTenantInfo()
if tInfo != nil {
tenant = tInfo.GetTenant()
}

stats := statistic.StatsInfoFromContext(execCtx.reqCtx)
stats.CompileStart()
defer stats.CompileEnd()
defer func() {
if err != nil && retCompile != nil {
retCompile.Release()
retCompile = nil
}
}()
retCompile = compile.NewCompile(
addr,
ses.GetDatabaseName(),
ses.GetSql(),
tenant,
ses.GetUserName(),
execCtx.reqCtx,
ses.GetTxnHandler().GetStorage(),
proc,
stmt,
ses.GetIsInternal(),
deepcopy.Copy(ses.getCNLabels()).(map[string]string),
getStatementStartAt(execCtx.reqCtx),
)
retCompile.SetIsPrepare(isPrepare)
retCompile.SetBuildPlanFunc(func() (*plan2.Plan, error) {
plan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), stmt)
if err != nil {
return nil, err
}
if plan.IsPrepare {
_, _, err = plan2.ResetPreparePlan(ses.GetTxnCompileCtx(), plan)
}
return plan, err
})

if _, ok := stmt.(*tree.ExplainAnalyze); ok {
fill = func(bat *batch.Batch) error { return nil }
}
err = retCompile.Compile(execCtx.reqCtx, plan, fill)
if err != nil {
return
}
// check if it is necessary to initialize the temporary engine
if !ses.GetTxnHandler().HasTempEngine() && retCompile.NeedInitTempEngine() {
// 0. init memory-non-dist storage
err = ses.GetTxnHandler().CreateTempStorage(GetClock())
if err != nil {
return
}

// temporary storage is passed through Ctx
updateTempStorageInCtx(execCtx, proc, ses.GetTxnHandler().GetTempStorage())

// 1. init memory-non-dist engine
ses.GetTxnHandler().CreateTempEngine()
tempEngine := ses.GetTxnHandler().GetTempEngine()

// 2. bind the temporary engine to the session and txnHandler
retCompile.SetTempEngine(tempEngine, ses.GetTxnHandler().GetTempStorage())

// 3. init temp-db to store temporary relations
txnOp2 := ses.GetTxnHandler().GetTxn()
err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2)
if err != nil {
return
}
}
return preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil
retCompile.SetOriginSQL(originSQL)
return
}
Loading

0 comments on commit 4bb2193

Please sign in to comment.