Skip to content

Commit

Permalink
Merge branch '1.2-dev' into in_mem_reader_workaround_for_special_tabl…
Browse files Browse the repository at this point in the history
…es_to_1.2
  • Loading branch information
mergify[bot] authored Jun 25, 2024
2 parents 37ff48e + 2545ce4 commit f32f7c3
Show file tree
Hide file tree
Showing 42 changed files with 804 additions and 180 deletions.
41 changes: 39 additions & 2 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/sql/util"
"github.com/matrixorigin/matrixone/pkg/util/metric/mometric"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/sysview"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace"
Expand Down Expand Up @@ -3261,6 +3262,10 @@ func getSubscriptionMeta(ctx context.Context, dbName string, ses FeSession, txn
}

func checkSubscriptionValidCommon(ctx context.Context, ses FeSession, subName, accName, pubName string) (subs *plan.SubscriptionMeta, err error) {
start := time.Now()
defer func() {
v2.CheckSubValidDurationHistogram.Observe(time.Since(start).Seconds())
}()
bh := ses.GetBackgroundExec(ctx)
defer bh.Close()
var (
Expand Down Expand Up @@ -4298,7 +4303,7 @@ func postDropSuspendAccount(
}
var nodes []string
currTenant := ses.GetTenantInfo().GetTenant()
currUser := ses.GetTenantInfo().User
currUser := ses.GetTenantInfo().GetUser()
labels := clusterservice.NewSelector().SelectByLabel(
map[string]string{"account": accountName}, clusterservice.Contain)
sysTenant := isSysTenant(currTenant)
Expand Down Expand Up @@ -7606,6 +7611,10 @@ func InitGeneralTenant(ctx context.Context, ses *Session, ca *createAccount) (er
if !(tenant.IsSysTenant() && tenant.IsMoAdminRole()) {
return moerr.NewInternalError(ctx, "tenant %s user %s role %s do not have the privilege to create the new account", tenant.GetTenant(), tenant.GetUser(), tenant.GetDefaultRole())
}
start := time.Now()
defer func() {
v2.TotalCreateDurationHistogram.Observe(time.Since(start).Seconds())
}()

//normalize the name
err = normalizeNameOfAccount(ctx, ca)
Expand Down Expand Up @@ -7647,6 +7656,7 @@ func InitGeneralTenant(ctx context.Context, ses *Session, ca *createAccount) (er
return rtnErr
}

start1 := time.Now()
//USE the mo_catalog
// MOVE into txn, make sure only create ONE txn.
rtnErr = bh.Exec(ctx, "use mo_catalog;")
Expand All @@ -7672,6 +7682,10 @@ func InitGeneralTenant(ctx context.Context, ses *Session, ca *createAccount) (er
}
}

v2.Step1DurationHistogram.Observe(time.Since(start1).Seconds())

start2 := time.Now()

// create some tables and databases for new account
rtnErr = bh.Exec(newTenantCtx, createMoIndexesSql)
if rtnErr != nil {
Expand Down Expand Up @@ -7707,6 +7721,8 @@ func InitGeneralTenant(ctx context.Context, ses *Session, ca *createAccount) (er
}
}

v2.Step2DurationHistogram.Observe(time.Since(start2).Seconds())

// create tables for new account
rtnErr = createTablesInMoCatalogOfGeneralTenant2(bh, ca, newTenantCtx, newTenant, getGlobalPu())
if rtnErr != nil {
Expand Down Expand Up @@ -7823,10 +7839,17 @@ func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundEx
}

func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccount, newTenantCtx context.Context, newTenant *TenantInfo, pu *config.ParameterUnit) error {
start := time.Now()
defer func() {
v2.CreateTablesInMoCatalogDurationHistogram.Observe(time.Since(start).Seconds())
}()
var err error
var initDataSqls []string
newTenantCtx, span := trace.Debug(newTenantCtx, "createTablesInMoCatalogOfGeneralTenant2")
defer span.End()

start1 := time.Now()

//create tables for the tenant
for _, sql := range createSqls {
//only the SYS tenant has the table mo_account
Expand All @@ -7839,6 +7862,8 @@ func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccoun
}
}

v2.ExecDDL1DurationHistogram.Observe(time.Since(start1).Seconds())

//initialize the default data of tables for the tenant
addSqlIntoSet := func(sql string) {
initDataSqls = append(initDataSqls, sql)
Expand Down Expand Up @@ -7911,6 +7936,8 @@ func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccoun
addSqlIntoSet(addInitSystemVariablesSql(uint64(newTenant.GetTenantID()), newTenant.GetTenant(), QueryResultMaxsize, pu))
addSqlIntoSet(addInitSystemVariablesSql(uint64(newTenant.GetTenantID()), newTenant.GetTenant(), QueryResultTimeout, pu))

start2 := time.Now()

//fill the mo_role, mo_user, mo_role_privs, mo_user_grant, mo_role_grant
for _, sql := range initDataSqls {
bh.ClearExecResultSet()
Expand All @@ -7919,11 +7946,17 @@ func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccoun
return err
}
}

v2.InitData1DurationHistogram.Observe(time.Since(start2).Seconds())
return nil
}

// createTablesInSystemOfGeneralTenant creates the database system and system_metrics as the external tables.
func createTablesInSystemOfGeneralTenant(ctx context.Context, bh BackgroundExec, newTenant *TenantInfo) error {
start := time.Now()
defer func() {
v2.CreateTablesInSystemDurationHistogram.Observe(time.Since(start).Seconds())
}()
ctx, span := trace.Debug(ctx, "createTablesInSystemOfGeneralTenant")
defer span.End()

Expand All @@ -7948,6 +7981,10 @@ func createTablesInSystemOfGeneralTenant(ctx context.Context, bh BackgroundExec,

// createTablesInInformationSchemaOfGeneralTenant creates the database information_schema and the views or tables.
func createTablesInInformationSchemaOfGeneralTenant(ctx context.Context, bh BackgroundExec) error {
start := time.Now()
defer func() {
v2.CreateTablesInInfoSchemaDurationHistogram.Observe(time.Since(start).Seconds())
}()
ctx, span := trace.Debug(ctx, "createTablesInInformationSchemaOfGeneralTenant")
defer span.End()
//with new tenant
Expand Down Expand Up @@ -9124,7 +9161,7 @@ func doRevokePrivilegeImplicitly(ctx context.Context, ses *Session, stmt tree.St
}

func doSetGlobalSystemVariable(ctx context.Context, ses *Session, varName string, varValue interface{}) (err error) {
accountId := uint64(ses.GetTenantInfo().TenantID)
accountId := uint64(ses.GetTenantInfo().GetTenantID())
accountName := ses.GetTenantName()
varName = strings.ToLower(varName)
bh := ses.GetBackgroundExec(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6104,7 +6104,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{
{"6", "role5"},
})
Expand Down Expand Up @@ -6145,7 +6145,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{})
bh.sql2result[sql] = mrs

Expand Down
8 changes: 3 additions & 5 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"

"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/compile"

"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
Expand Down Expand Up @@ -212,7 +214,7 @@ func doComQueryInBack(backSes *backSession, execCtx *ExecCtx,
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())
proc.Id = backSes.getNextProcessId()
proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -704,10 +706,6 @@ func (backSes *backSession) getNextProcessId() string {
func (backSes *backSession) cleanCache() {
}

func (backSes *backSession) GetUpstream() FeSession {
return backSes.upstream
}

func (backSes *backSession) getCNLabels() map[string]string {
return backSes.label
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func (tcc *TxnCompilerContext) getRelation(dbName string, tableName string, sub
return nil, nil, err
}

start := time.Now()
defer func() {
v2.GetRelationDurationHistogram.Observe(time.Since(start).Seconds())
}()

ses := tcc.GetSession()
txn := tcc.GetTxnHandler().GetTxn()
tempCtx := tcc.execCtx.reqCtx
Expand Down Expand Up @@ -267,6 +272,8 @@ func (tcc *TxnCompilerContext) getRelation(dbName string, tableName string, sub
tempCtx = defines.AttachAccountId(tempCtx, uint32(sysAccountID))
}

start1 := time.Now()

//open database
db, err := tcc.GetTxnHandler().GetStorage().Database(tempCtx, dbName, txn)
if err != nil {
Expand All @@ -277,12 +284,16 @@ func (tcc *TxnCompilerContext) getRelation(dbName string, tableName string, sub
return nil, nil, err
}

v2.OpenDBDurationHistogram.Observe(time.Since(start1).Seconds())

// tableNames, err := db.Relations(ctx)
// if err != nil {
// return nil, nil, err
// }
// logDebugf(ses.GetDebugString(), "dbName %v tableNames %v", dbName, tableNames)

start2 := time.Now()

//open table
table, err := db.Relation(tempCtx, tableName, nil)
if err != nil {
Expand All @@ -297,10 +308,17 @@ func (tcc *TxnCompilerContext) getRelation(dbName string, tableName string, sub
table = tmpTable
}
}

v2.OpenTableDurationHistogram.Observe(time.Since(start2).Seconds())

return tempCtx, table, nil
}

func (tcc *TxnCompilerContext) getTmpRelation(ctx context.Context, tableName string) (engine.Relation, error) {
start := time.Now()
defer func() {
v2.GetTmpTableDurationHistogram.Observe(time.Since(start).Seconds())
}()
e := tcc.execCtx.ses.GetTxnHandler().GetStorage()
txn := tcc.execCtx.ses.GetTxnHandler().GetTxn()
db, err := e.Database(ctx, defines.TEMPORARY_DBNAME, txn)
Expand All @@ -315,6 +333,10 @@ func (tcc *TxnCompilerContext) getTmpRelation(ctx context.Context, tableName str
}

func (tcc *TxnCompilerContext) ensureDatabaseIsNotEmpty(dbName string, checkSub bool, snapshot plan2.Snapshot) (string, *plan.SubscriptionMeta, error) {
start := time.Now()
defer func() {
v2.EnsureDatabaseDurationHistogram.Observe(time.Since(start).Seconds())
}()
if len(dbName) == 0 {
dbName = tcc.DefaultDatabase()
}
Expand Down Expand Up @@ -385,7 +407,9 @@ func (tcc *TxnCompilerContext) ResolveSubscriptionTableById(tableId uint64, pubm
func (tcc *TxnCompilerContext) Resolve(dbName string, tableName string, snapshot plan2.Snapshot) (*plan2.ObjectRef, *plan2.TableDef) {
start := time.Now()
defer func() {
v2.TxnStatementResolveDurationHistogram.Observe(time.Since(start).Seconds())
end := time.Since(start).Seconds()
v2.TxnStatementResolveDurationHistogram.Observe(end)
v2.TotalResolveDurationHistogram.Observe(end)
}()

// In order to be compatible with various GUI clients and BI tools, lower case db and table name if it's a mysql system table
Expand Down Expand Up @@ -854,7 +878,10 @@ func (tcc *TxnCompilerContext) GetQueryResultMeta(uuid string) ([]*plan.ColDef,
func (tcc *TxnCompilerContext) GetSubscriptionMeta(dbName string, snapshot plan2.Snapshot) (*plan.SubscriptionMeta, error) {
tempCtx := tcc.execCtx.reqCtx
txn := tcc.GetTxnHandler().GetTxn()

start := time.Now()
defer func() {
v2.GetSubMetaDurationHistogram.Observe(time.Since(start).Seconds())
}()
if plan2.IsSnapshotValid(&snapshot) && snapshot.TS.Less(txn.Txn().SnapshotTS) {
txn = txn.CloneSnapshotOp(*snapshot.TS)

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func setGlobalAicm(aicm *defines.AutoIncrCacheManager) {
globalAicm.Store(aicm)
}

func getGlobalAic() *defines.AutoIncrCacheManager {
func getGlobalAicm() *defines.AutoIncrCacheManager {
if globalAicm.Load() != nil {
return globalAicm.Load().(*defines.AutoIncrCacheManager)
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"

"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -35,7 +37,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
Expand Down Expand Up @@ -237,7 +238,7 @@ type Session struct {
}

func (ses *Session) InitSystemVariables(ctx context.Context) (err error) {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().TenantID, ses, ctx); err != nil {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().GetTenantID(), ses, ctx); err != nil {
return
}
ses.sesSysVars = ses.gSysVars.Clone()
Expand Down Expand Up @@ -532,7 +533,7 @@ func NewSession(connCtx context.Context, proto MysqlRrWr, mp *mpool.MPool) *Sess
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())

ses.proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
ses.proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -983,15 +984,6 @@ func (ses *Session) GetTxnInfo() string {
return meta.DebugString()
}

func (ses *Session) GetDatabaseName() string {
return ses.GetResponser().GetStr(DBNAME)
}

func (ses *Session) SetDatabaseName(db string) {
ses.GetResponser().SetStr(DBNAME, db)
ses.GetTxnCompileCtx().SetDatabase(db)
}

func (ses *Session) DatabaseNameIsEmpty() bool {
return len(ses.GetDatabaseName()) == 0
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/show_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_getSqlForAccountInfo(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func (th *TxnHandler) createTxnOpUnsafe(execCtx *ExecCtx) error {
connectionID = execCtx.resper.GetU32(CONNID)
}
if execCtx.ses.GetTenantInfo() != nil {
accountID = execCtx.ses.GetTenantInfo().TenantID
userName = execCtx.ses.GetTenantInfo().User
accountID = execCtx.ses.GetTenantInfo().GetTenantID()
userName = execCtx.ses.GetTenantInfo().GetUser()
}
sessionInfo := execCtx.ses.GetDebugString()
opts = append(opts,
Expand Down
Loading

0 comments on commit f32f7c3

Please sign in to comment.