diff --git a/pkg/common/moerr/error.go b/pkg/common/moerr/error.go index fd36fa6a8b1f6..a8a1abc8349c0 100644 --- a/pkg/common/moerr/error.go +++ b/pkg/common/moerr/error.go @@ -161,6 +161,7 @@ const ( ErrDuplicateKeyName uint16 = 20470 ErrFKNoReferencedRow2 uint16 = 20471 ErrBlobCantHaveDefault uint16 = 20472 + ErrCantCompileForPrepare uint16 = 20473 // Group 5: rpc timeout // ErrRPCTimeout rpc timeout @@ -450,6 +451,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{ ErrRetryForCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "retry for CN rolling restart"}, ErrNewTxnInCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "new txn in CN rolling restart"}, ErrPrevCheckpointNotFinished: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "prev checkpoint not finished"}, + ErrCantCompileForPrepare: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not compile for prepare"}, // Group 7: lock service ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"}, @@ -1403,6 +1405,10 @@ func NewErrBlobCantHaveDefault(ctx context.Context, arg any) *Error { return newError(ctx, ErrBlobCantHaveDefault, arg) } +func NewCantCompileForPrepare(ctx context.Context) *Error { + return newError(ctx, ErrCantCompileForPrepare) +} + var contextFunc atomic.Value func SetContextFunc(f func() context.Context) { diff --git a/pkg/common/moerr/error_no_ctx.go b/pkg/common/moerr/error_no_ctx.go index 8459a3d913044..2dad755d9fa0a 100644 --- a/pkg/common/moerr/error_no_ctx.go +++ b/pkg/common/moerr/error_no_ctx.go @@ -382,3 +382,7 @@ func NewReplicaNotFound(replica string) *Error { func NewReplicaNotMatch(current, received string) *Error { return newError(Context(), ErrReplicaNotMatch, current, received) } + +func NewCantCompileForPrepareNoCtx() *Error { + return newError(Context(), ErrCantCompileForPrepare) +} diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index ff7787fb59261..7957860112f75 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -100,7 +100,7 @@ func (tcc *TxnCompilerContext) SetSnapshot(snapshot *plan2.Snapshot) { } func (tcc *TxnCompilerContext) ReplacePlan(execPlan *plan.Execute) (*plan.Plan, tree.Statement, error) { - p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan) + _, p, st, _, err := replacePlan(tcc.execCtx.reqCtx, tcc.execCtx.ses.(*Session), tcc.tcw.(*TxnComputationWrapper), execPlan) return p, st, err } diff --git a/pkg/frontend/computation_wrapper.go b/pkg/frontend/computation_wrapper.go index 48aecb3f49452..70fa81a82e092 100644 --- a/pkg/frontend/computation_wrapper.go +++ b/pkg/frontend/computation_wrapper.go @@ -173,7 +173,7 @@ func (cwft *TxnComputationWrapper) GetColumns(ctx context.Context) ([]interface{ return columns, err } -func (cwft *TxnComputationWrapper) GetClock() clock.Clock { +func GetClock() clock.Clock { rt := runtime.ProcessLevelRuntime() return rt.Clock() } @@ -222,7 +222,7 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro if _, ok := cwft.stmt.(*tree.Execute); ok { executePlan := cwft.plan.GetDcl().GetExecute() - plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan) + retComp, plan, stmt, sql, err := replacePlan(execCtx.reqCtx, cwft.ses.(*Session), cwft, executePlan) if err != nil { return nil, err } @@ -243,6 +243,19 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro return nil, nil } + if retComp == nil { + cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, cwft.ses.GetSql(), cwft.stmt, cwft.plan, fill, false) + if err != nil { + return nil, err + } + cwft.compile.SetOriginSQL(originSQL) + } else { + // retComp + cwft.proc.Ctx = execCtx.reqCtx + retComp.Reset(cwft.proc, getStatementStartAt(execCtx.reqCtx), fill, cwft.ses.GetSql()) + cwft.compile = retComp + } + //check privilege /* prepare not need check privilege err = authenticateUserCanExecutePrepareOrExecute(requestCtx, cwft.ses, prepareStmt.PrepareStmt, newPlan) @@ -250,87 +263,13 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro return nil, err } */ - } - - addr := "" - if len(getGlobalPu().ClusterNodes) > 0 { - addr = getGlobalPu().ClusterNodes[0].Addr - } - cwft.proc.Ctx = execCtx.reqCtx - cwft.proc.FileService = getGlobalPu().FileService - - var tenant string - tInfo := cwft.ses.GetTenantInfo() - if tInfo != nil { - tenant = tInfo.GetTenant() - } - - stats := statistic.StatsInfoFromContext(execCtx.reqCtx) - stats.CompileStart() - defer stats.CompileEnd() - cwft.compile = compile.NewCompile( - addr, - cwft.ses.GetDatabaseName(), - cwft.ses.GetSql(), - tenant, - cwft.ses.GetUserName(), - execCtx.reqCtx, - cwft.ses.GetTxnHandler().GetStorage(), - cwft.proc, - cwft.stmt, - cwft.ses.GetIsInternal(), - deepcopy.Copy(cwft.ses.getCNLabels()).(map[string]string), - getStatementStartAt(execCtx.reqCtx), - ) - defer func() { - if err != nil { - cwft.compile.Release() - } - }() - cwft.compile.SetBuildPlanFunc(func() (*plan2.Plan, error) { - plan, err := buildPlan(execCtx.reqCtx, cwft.ses, cwft.ses.GetTxnCompileCtx(), cwft.stmt) + } else { + cwft.compile, err = createCompile(execCtx, cwft.ses, cwft.proc, execCtx.sqlOfStmt, cwft.stmt, cwft.plan, fill, false) if err != nil { return nil, err } - if plan.IsPrepare { - _, _, err = plan2.ResetPreparePlan(cwft.ses.GetTxnCompileCtx(), plan) - } - return plan, err - }) - - if _, ok := cwft.stmt.(*tree.ExplainAnalyze); ok { - fill = func(bat *batch.Batch) error { return nil } } - err = cwft.compile.Compile(execCtx.reqCtx, cwft.plan, fill) - if err != nil { - return nil, err - } - // check if it is necessary to initialize the temporary engine - if !cwft.ses.GetTxnHandler().HasTempEngine() && cwft.compile.NeedInitTempEngine() { - // 0. init memory-non-dist storage - err = cwft.ses.GetTxnHandler().CreateTempStorage(cwft.GetClock()) - if err != nil { - return nil, err - } - // temporary storage is passed through Ctx - updateTempStorageInCtx(execCtx, cwft.proc, cwft.ses.GetTxnHandler().GetTempStorage()) - - // 1. init memory-non-dist engine - cwft.ses.GetTxnHandler().CreateTempEngine() - tempEngine := cwft.ses.GetTxnHandler().GetTempEngine() - - // 2. bind the temporary engine to the session and txnHandler - cwft.compile.SetTempEngine(tempEngine, cwft.ses.GetTxnHandler().GetTempStorage()) - - // 3. init temp-db to store temporary relations - txnOp2 := cwft.ses.GetTxnHandler().GetTxn() - err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2) - if err != nil { - return nil, err - } - } - cwft.compile.SetOriginSQL(originSQL) return cwft.compile, err } @@ -387,12 +326,12 @@ func getStatementStartAt(ctx context.Context) time.Time { // replacePlan replaces the plan of the EXECUTE by the plan generated by // the PREPARE and setups the params for the plan. -func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*plan.Plan, tree.Statement, string, error) { +func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapper, execPlan *plan.Execute) (*compile.Compile, *plan.Plan, tree.Statement, string, error) { originSQL := "" stmtName := execPlan.GetName() prepareStmt, err := ses.GetPrepareStmt(reqCtx, stmtName) if err != nil { - return nil, nil, originSQL, err + return nil, nil, nil, originSQL, err } if txnTrace.GetService().Enabled(txnTrace.FeatureTraceTxn) { originSQL = tree.String(prepareStmt.PrepareStmt, dialect.MYSQL) @@ -403,10 +342,10 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp for _, obj := range preparePlan.GetSchemas() { newObj, newTableDef := ses.txnCompileCtx.Resolve(obj.SchemaName, obj.ObjName, plan2.Snapshot{TS: ×tamp.Timestamp{}}) if newObj == nil { - return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName) + return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' in prepare statement '%s' does not exist anymore", obj.ObjName, stmtName) } if newObj.Obj != obj.Obj || newTableDef.Version != uint32(obj.Server) { - return nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName) + return nil, nil, nil, originSQL, moerr.NewInternalError(reqCtx, "table '%s' has been changed, please reset prepare statement '%s'", obj.ObjName, stmtName) } } @@ -420,12 +359,12 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp numParams := len(preparePlan.ParamTypes) if prepareStmt.params != nil && prepareStmt.params.Length() > 0 { // use binary protocol if prepareStmt.params.Length() != numParams { - return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") + return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") } cwft.proc.SetPrepareParams(prepareStmt.params) } else if len(execPlan.Args) > 0 { if len(execPlan.Args) != numParams { - return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") + return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") } params := cwft.proc.GetVector(types.T_text.ToType()) paramVals := make([]any, numParams) @@ -433,14 +372,14 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp exprImpl := arg.Expr.(*plan.Expr_V) param, err := cwft.proc.GetResolveVariableFunc()(exprImpl.V.Name, exprImpl.V.System, exprImpl.V.Global) if err != nil { - return nil, nil, originSQL, err + return nil, nil, nil, originSQL, err } if param == nil { - return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") + return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") } err = util.AppendAnyToStringVector(cwft.proc, param, params) if err != nil { - return nil, nil, originSQL, err + return nil, nil, nil, originSQL, err } paramVals[i] = param } @@ -448,8 +387,103 @@ func replacePlan(reqCtx context.Context, ses *Session, cwft *TxnComputationWrapp cwft.paramVals = paramVals } else { if numParams > 0 { - return nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") + return nil, nil, nil, originSQL, moerr.NewInvalidInput(reqCtx, "Incorrect arguments to EXECUTE") + } + } + return prepareStmt.compile, preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil +} + +func createCompile( + execCtx *ExecCtx, + ses FeSession, + proc *process.Process, + originSQL string, + stmt tree.Statement, + plan *plan2.Plan, + fill func(*batch.Batch) error, + isPrepare bool, +) (retCompile *compile.Compile, err error) { + + addr := "" + if len(getGlobalPu().ClusterNodes) > 0 { + addr = getGlobalPu().ClusterNodes[0].Addr + } + proc.Ctx = execCtx.reqCtx + proc.FileService = getGlobalPu().FileService + + var tenant string + tInfo := ses.GetTenantInfo() + if tInfo != nil { + tenant = tInfo.GetTenant() + } + + stats := statistic.StatsInfoFromContext(execCtx.reqCtx) + stats.CompileStart() + defer stats.CompileEnd() + defer func() { + if err != nil && retCompile != nil { + retCompile.Release() + retCompile = nil + } + }() + retCompile = compile.NewCompile( + addr, + ses.GetDatabaseName(), + ses.GetSql(), + tenant, + ses.GetUserName(), + execCtx.reqCtx, + ses.GetTxnHandler().GetStorage(), + proc, + stmt, + ses.GetIsInternal(), + deepcopy.Copy(ses.getCNLabels()).(map[string]string), + getStatementStartAt(execCtx.reqCtx), + ) + retCompile.SetIsPrepare(isPrepare) + retCompile.SetBuildPlanFunc(func() (*plan2.Plan, error) { + plan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), stmt) + if err != nil { + return nil, err + } + if plan.IsPrepare { + _, _, err = plan2.ResetPreparePlan(ses.GetTxnCompileCtx(), plan) + } + return plan, err + }) + + if _, ok := stmt.(*tree.ExplainAnalyze); ok { + fill = func(bat *batch.Batch) error { return nil } + } + err = retCompile.Compile(execCtx.reqCtx, plan, fill) + if err != nil { + return + } + // check if it is necessary to initialize the temporary engine + if !ses.GetTxnHandler().HasTempEngine() && retCompile.NeedInitTempEngine() { + // 0. init memory-non-dist storage + err = ses.GetTxnHandler().CreateTempStorage(GetClock()) + if err != nil { + return + } + + // temporary storage is passed through Ctx + updateTempStorageInCtx(execCtx, proc, ses.GetTxnHandler().GetTempStorage()) + + // 1. init memory-non-dist engine + ses.GetTxnHandler().CreateTempEngine() + tempEngine := ses.GetTxnHandler().GetTempEngine() + + // 2. bind the temporary engine to the session and txnHandler + retCompile.SetTempEngine(tempEngine, ses.GetTxnHandler().GetTempStorage()) + + // 3. init temp-db to store temporary relations + txnOp2 := ses.GetTxnHandler().GetTxn() + err = tempEngine.Create(execCtx.reqCtx, defines.TEMPORARY_DBNAME, txnOp2) + if err != nil { + return } } - return preparePlan.Plan, prepareStmt.PrepareStmt, originSQL, nil + retCompile.SetOriginSQL(originSQL) + return } diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 1cb8f1c62980f..346edb271edac 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -1011,31 +1011,25 @@ func handleExplainStmt(ses FeSession, execCtx *ExecCtx, stmt *tree.ExplainStmt) return doExplainStmt(execCtx.reqCtx, ses.(*Session), stmt) } -func doPrepareStmt(ctx context.Context, ses *Session, st *tree.PrepareStmt, sql string, paramTypes []byte) (*PrepareStmt, error) { - preparePlan, err := buildPlan(ctx, ses, ses.GetTxnCompileCtx(), st) +func doPrepareStmt(execCtx *ExecCtx, ses *Session, st *tree.PrepareStmt, sql string, paramTypes []byte) (*PrepareStmt, error) { + idx := strings.Index(strings.ToLower(sql[:(len(st.Name)+20)]), "from") + 5 + originSql := strings.TrimLeft(sql[idx:], " ") + // fmt.Print(originSql) + prepareStmt, err := createPrepareStmt(execCtx, ses, originSql, st, st.Stmt) if err != nil { return nil, err } - - prepareStmt := &PrepareStmt{ - Name: preparePlan.GetDcl().GetPrepare().GetName(), - Sql: sql, - PreparePlan: preparePlan, - PrepareStmt: st.Stmt, - getFromSendLongData: make(map[int]struct{}), - } if len(paramTypes) > 0 { prepareStmt.ParamTypes = paramTypes } - prepareStmt.InsertBat = ses.GetTxnCompileCtx().GetProcess().GetPrepareBatch() - err = ses.SetPrepareStmt(ctx, preparePlan.GetDcl().GetPrepare().GetName(), prepareStmt) + err = ses.SetPrepareStmt(execCtx.reqCtx, prepareStmt.Name, prepareStmt) return prepareStmt, err } // handlePrepareStmt -func handlePrepareStmt(ses FeSession, execCtx *ExecCtx, st *tree.PrepareStmt) (*PrepareStmt, error) { - return doPrepareStmt(execCtx.reqCtx, ses.(*Session), st, execCtx.sqlOfStmt, execCtx.executeParamTypes) +func handlePrepareStmt(ses FeSession, execCtx *ExecCtx, st *tree.PrepareStmt, sql string) (*PrepareStmt, error) { + return doPrepareStmt(execCtx, ses.(*Session), st, sql, execCtx.executeParamTypes) } func doPrepareString(ses *Session, execCtx *ExecCtx, st *tree.PrepareString) (*PrepareStmt, error) { @@ -1049,18 +1043,12 @@ func doPrepareString(ses *Session, execCtx *ExecCtx, st *tree.PrepareString) (*P return nil, err } - preparePlan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), st) + prepareStmt, err := createPrepareStmt(execCtx, ses, st.Sql, st, stmts[0]) if err != nil { return nil, err } - prepareStmt := &PrepareStmt{ - Name: preparePlan.GetDcl().GetPrepare().GetName(), - Sql: st.Sql, - PreparePlan: preparePlan, - PrepareStmt: stmts[0], - } - prepareStmt.InsertBat = ses.GetTxnCompileCtx().GetProcess().GetPrepareBatch() - err = ses.SetPrepareStmt(execCtx.reqCtx, preparePlan.GetDcl().GetPrepare().GetName(), prepareStmt) + + err = ses.SetPrepareStmt(execCtx.reqCtx, prepareStmt.Name, prepareStmt) return prepareStmt, err } @@ -1069,6 +1057,52 @@ func handlePrepareString(ses FeSession, execCtx *ExecCtx, st *tree.PrepareString return doPrepareString(ses.(*Session), execCtx, st) } +func createPrepareStmt( + execCtx *ExecCtx, + ses *Session, + originSQL string, + stmt tree.Statement, + saveStmt tree.Statement) (*PrepareStmt, error) { + + preparePlan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), stmt) + if err != nil { + return nil, err + } + + var comp *compile.Compile + if _, ok := preparePlan.GetDcl().GetPrepare().Plan.Plan.(*plan.Plan_Query); ok { + //only DQL & DML will pre compile + comp, err = createCompile(execCtx, ses, ses.proc, originSQL, saveStmt, preparePlan.GetDcl().GetPrepare().Plan, ses.GetOutputCallback(execCtx), true) + if err != nil { + if !moerr.IsMoErrCode(err, moerr.ErrCantCompileForPrepare) { + return nil, err + } + } + // do not save ap query now() + if comp != nil && !comp.IsTpQuery() { + comp.Release() + comp = nil + } + + // @xxx when refactor prepare finish, remove this code + if comp != nil { + comp.Release() + comp = nil + } + } + + prepareStmt := &PrepareStmt{ + Name: preparePlan.GetDcl().GetPrepare().GetName(), + Sql: originSQL, + compile: comp, + PreparePlan: preparePlan, + PrepareStmt: saveStmt, + getFromSendLongData: make(map[int]struct{}), + } + prepareStmt.InsertBat = ses.GetTxnCompileCtx().GetProcess().GetPrepareBatch() + return prepareStmt, nil +} + func doDeallocate(ses *Session, execCtx *ExecCtx, st *tree.Deallocate) error { deallocatePlan, err := buildPlan(execCtx.reqCtx, ses, ses.GetTxnCompileCtx(), st) if err != nil { diff --git a/pkg/frontend/mysql_cmd_executor_test.go b/pkg/frontend/mysql_cmd_executor_test.go index 2722ca2eab3ac..7b722dbf65ad6 100644 --- a/pkg/frontend/mysql_cmd_executor_test.go +++ b/pkg/frontend/mysql_cmd_executor_test.go @@ -828,7 +828,7 @@ func Test_HandlePrepareStmt(t *testing.T) { runTestHandle("handlePrepareStmt", t, func(ses *Session) error { stmt := stmt.(*tree.PrepareStmt) - _, err := handlePrepareStmt(ses, ec, stmt) + _, err := handlePrepareStmt(ses, ec, stmt, "Prepare stmt1 from select 1, 2") return err }) } diff --git a/pkg/frontend/self_handle.go b/pkg/frontend/self_handle.go index 4fc6b9593f13a..65c60598a85df 100644 --- a/pkg/frontend/self_handle.go +++ b/pkg/frontend/self_handle.go @@ -67,7 +67,7 @@ func execInFrontend(ses *Session, execCtx *ExecCtx) (err error) { case *tree.PrepareStmt: ses.EnterFPrint(13) defer ses.ExitFPrint(13) - execCtx.prepareStmt, err = handlePrepareStmt(ses, execCtx, st) + execCtx.prepareStmt, err = handlePrepareStmt(ses, execCtx, st, execCtx.sqlOfStmt) if err != nil { return } diff --git a/pkg/frontend/txn_test.go b/pkg/frontend/txn_test.go index c5e650bee828a..fe1d56c9f721d 100644 --- a/pkg/frontend/txn_test.go +++ b/pkg/frontend/txn_test.go @@ -49,12 +49,13 @@ type testWorkspace struct { func (txn *testWorkspace) UpdateSnapshotWriteOffset() { //TODO implement me - panic("implement me") + // panic("implement me") } func (txn *testWorkspace) GetSnapshotWriteOffset() int { //TODO implement me - panic("implement me") + // panic("implement me") + return 0 } func newTestWorkspace() *testWorkspace { diff --git a/pkg/frontend/types.go b/pkg/frontend/types.go index d11915ef2edbd..81ad69c51d8f0 100644 --- a/pkg/frontend/types.go +++ b/pkg/frontend/types.go @@ -36,6 +36,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/sql/compile" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -122,6 +123,8 @@ type PrepareStmt struct { params *vector.Vector getFromSendLongData map[int]struct{} + + compile *compile.Compile } /* @@ -230,6 +233,10 @@ func (prepareStmt *PrepareStmt) Close() { } } } + if prepareStmt.compile != nil { + prepareStmt.compile.Release() + prepareStmt.compile = nil + } if prepareStmt.PrepareStmt != nil { prepareStmt.PrepareStmt.Free() } diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index aabe8f8e3aa24..99bd0abadf878 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -106,6 +106,8 @@ const ( var ( ncpu = runtime.GOMAXPROCS(0) ctxCancelError = context.Canceled.Error() + + cantCompileForPrepareErr = moerr.NewCantCompileForPrepareNoCtx() ) // NewCompile is used to new an object of compile @@ -171,13 +173,23 @@ func (c *Compile) GetMessageCenter() *process.MessageCenter { return nil } -func (c *Compile) Reset(startAt time.Time) { +func (c *Compile) Reset(proc *process.Process, startAt time.Time, fill func(*batch.Batch) error, sql string) { + c.proc = proc + c.fill = fill + c.sql = sql + c.proc.Ctx = perfcounter.WithCounterSet(c.proc.Ctx, c.counterSet) + c.ctx = c.proc.Ctx + c.proc.Ctx = context.WithValue(c.proc.Ctx, defines.EngineKey{}, c.e) c.affectRows.Store(0) for _, info := range c.anal.analInfos { info.Reset() } + for _, s := range c.scope { + s.Reset(c) + } + c.MessageBoard = c.MessageBoard.Reset() c.counterSet.Reset() @@ -185,6 +197,9 @@ func (c *Compile) Reset(startAt time.Time) { f.reset() } c.startAt = startAt + if c.proc.TxnOperator != nil { + c.proc.TxnOperator.GetWorkspace().UpdateSnapshotWriteOffset() + } } func (c *Compile) clear() { @@ -220,6 +235,7 @@ func (c *Compile) clear() { c.needLockMeta = false c.isInternal = false c.lastAllocID = 0 + c.isPrepare = false for k := range c.metaTables { delete(c.metaTables, k) @@ -647,6 +663,10 @@ func (c *Compile) IsTpQuery() bool { return c.execType == plan2.ExecTypeTP } +func (c *Compile) SetIsPrepare(isPrepare bool) { + c.isPrepare = isPrepare +} + /* func (c *Compile) printPipeline() { if c.IsTpQuery() { @@ -1026,6 +1046,10 @@ func (c *Compile) compileQuery(ctx context.Context, qry *plan.Query) ([]*Scope, sort.Slice(c.cnList, func(i, j int) bool { return c.cnList[i].Addr < c.cnList[j].Addr }) } + if c.isPrepare && c.IsTpQuery() { + return nil, cantCompileForPrepareErr + } + c.initAnalyze(qry) // deal with sink scan first. @@ -1905,6 +1929,9 @@ func calculatePartitions(start, end, n int64) [][2]int64 { } func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope, error) { + if c.isPrepare { + return nil, cantCompileForPrepareErr + } ctx, span := trace.Start(ctx, "compileExternScan") defer span.End() start := time.Now() @@ -2367,7 +2394,7 @@ func (c *Compile) compileRestrict(n *plan.Node, ss []*Scope) []*Scope { } currentFirstFlag := c.anal.isFirst // for dynamic parameter, substitute param ref and const fold cast expression here to improve performance - newFilters, err := plan2.ConstandFoldList(n.FilterList, c.proc, true) + newFilters, err := plan2.ConstandFoldList(n.FilterList, c.proc, false) if err != nil { newFilters = n.FilterList } @@ -4106,6 +4133,9 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e } if c.determinExpandRanges(n) { + if c.isPrepare { + return nil, nil, nil, cantCompileForPrepareErr + } db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) if err != nil { return nil, nil, nil, err diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 90f930b87518b..85e0318800066 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -28,6 +28,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/bitmap" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/output" "github.com/matrixorigin/matrixone/pkg/sql/colexec/right" "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightanti" "github.com/matrixorigin/matrixone/pkg/sql/colexec/rightsemi" @@ -98,6 +99,51 @@ func (s *Scope) release() { reuse.Free[Scope](s, nil) } +func (s *Scope) Reset(c *Compile) error { + err := s.resetForReuse(c) + if err != nil { + return err + } + for _, scope := range s.PreScopes { + err := scope.Reset(c) + if err != nil { + return err + } + } + return nil +} + +func (s *Scope) resetForReuse(c *Compile) (err error) { + if s.Proc != nil { + newctx, cancel := context.WithCancel(c.ctx) + s.Proc.SetPrepareBatch(c.proc.GetPrepareBatch()) + s.Proc.SetPrepareExprList(c.proc.GetPrepareExprList()) + s.Proc.SetPrepareParams(c.proc.GetPrepareParams()) + s.Proc.TxnClient = c.proc.TxnClient + s.Proc.TxnOperator = c.proc.TxnOperator + s.Proc.SessionInfo = c.proc.SessionInfo + s.Proc.UnixTime = c.proc.UnixTime + s.Proc.LastInsertID = c.proc.LastInsertID + s.Proc.MessageBoard = c.proc.MessageBoard + s.Proc.Ctx = newctx + s.Proc.Cancel = cancel + } + for _, ins := range s.Instructions { + if ins.Op == vm.Output { + ins.Arg.(*output.Argument).Func = c.fill + } + } + if s.DataSource != nil { + if s.DataSource.isConst { + s.DataSource.Bat = nil + } else { + s.DataSource.Rel = nil + s.DataSource.R = nil + } + } + return nil +} + func (s *Scope) initDataSource(c *Compile) (err error) { if s.DataSource == nil { return nil diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index 0ba5fa2c30319..866335d51ba00 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -287,6 +287,8 @@ type Compile struct { disableRetry bool lastAllocID int32 + + isPrepare bool } type RemoteReceivRegInfo struct { diff --git a/test/distributed/cases/zz_statement_query_type/statement_query_type_1.result b/test/distributed/cases/zz_statement_query_type/statement_query_type_1.result index 4c7eeb85347f0..09dd42b97d572 100644 --- a/test/distributed/cases/zz_statement_query_type/statement_query_type_1.result +++ b/test/distributed/cases/zz_statement_query_type/statement_query_type_1.result @@ -3,7 +3,7 @@ statement query_type sql_source_type use system Other external_sql rollback TCL external_sql deallocate prepare s1 Other external_sql -execute s1 using @a // prepare s1 from select * from test_table where col1=? ; set @a=2 Other external_sql +execute s1 using @a // select * from test_table where col1=? ; set @a=2 Other external_sql set @a=2 Other external_sql prepare s1 from select * from test_table where col1=? Other external_sql drop database db2 DDL external_sql diff --git a/test/distributed/cases/zz_statement_query_type/statement_query_type_2.result b/test/distributed/cases/zz_statement_query_type/statement_query_type_2.result index 26a949a757489..c22ce21475ef9 100644 --- a/test/distributed/cases/zz_statement_query_type/statement_query_type_2.result +++ b/test/distributed/cases/zz_statement_query_type/statement_query_type_2.result @@ -14,7 +14,7 @@ show create table test_01 Other cloud_user_sql create table test_01(a int, b varchar) DDL cloud_user_sql drop table if exists test_01 DDL cloud_user_sql deallocate prepare s1 Other cloud_user_sql -execute s1 using @a // prepare s1 from select * from test_table where col1=? ; set @a=2 Other cloud_user_sql +execute s1 using @a // select * from test_table where col1=? ; set @a=2 Other cloud_user_sql set @a=2 Other cloud_user_sql prepare s1 from select * from test_table where col1=? Other cloud_user_sql select * from unnest('{"a":1}') as f DQL cloud_user_sql diff --git a/test/distributed/cases/zz_statement_query_type/statement_query_type_3.result b/test/distributed/cases/zz_statement_query_type/statement_query_type_3.result index c4621cfa7d90d..6a84c8fe51017 100644 --- a/test/distributed/cases/zz_statement_query_type/statement_query_type_3.result +++ b/test/distributed/cases/zz_statement_query_type/statement_query_type_3.result @@ -6,7 +6,7 @@ select sleep(1), * from unnest('{"a":1}') as f DQL cloud_nonuser_sql drop table test_table DDL cloud_nonuser_sql truncate table test_table DDL cloud_nonuser_sql deallocate prepare s1 Other cloud_nonuser_sql -execute s1 using @a // prepare s1 from select * from test_table where col1=? ; set @a=2 Other cloud_nonuser_sql +execute s1 using @a // select * from test_table where col1=? ; set @a=2 Other cloud_nonuser_sql set @a=2 Other cloud_nonuser_sql prepare s1 from select * from test_table where col1=? Other cloud_nonuser_sql drop database db2 DDL cloud_nonuser_sql