Skip to content

Commit

Permalink
Merge branch '1.2-dev' into 0624-add-metric-1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
sukki37 authored Jun 24, 2024
2 parents eb178cf + c887a0e commit d05b276
Show file tree
Hide file tree
Showing 24 changed files with 354 additions and 84 deletions.
4 changes: 2 additions & 2 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,7 @@ func postDropSuspendAccount(
}
var nodes []string
currTenant := ses.GetTenantInfo().GetTenant()
currUser := ses.GetTenantInfo().User
currUser := ses.GetTenantInfo().GetUser()
labels := clusterservice.NewSelector().SelectByLabel(
map[string]string{"account": accountName}, clusterservice.Contain)
sysTenant := isSysTenant(currTenant)
Expand Down Expand Up @@ -9161,7 +9161,7 @@ func doRevokePrivilegeImplicitly(ctx context.Context, ses *Session, stmt tree.St
}

func doSetGlobalSystemVariable(ctx context.Context, ses *Session, varName string, varValue interface{}) (err error) {
accountId := uint64(ses.GetTenantInfo().TenantID)
accountId := uint64(ses.GetTenantInfo().GetTenantID())
accountName := ses.GetTenantName()
varName = strings.ToLower(varName)
bh := ses.GetBackgroundExec(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6104,7 +6104,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{
{"6", "role5"},
})
Expand Down Expand Up @@ -6145,7 +6145,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{})
bh.sql2result[sql] = mrs

Expand Down
8 changes: 3 additions & 5 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"

"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/compile"

"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
Expand Down Expand Up @@ -212,7 +214,7 @@ func doComQueryInBack(backSes *backSession, execCtx *ExecCtx,
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())
proc.Id = backSes.getNextProcessId()
proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -704,10 +706,6 @@ func (backSes *backSession) getNextProcessId() string {
func (backSes *backSession) cleanCache() {
}

func (backSes *backSession) GetUpstream() FeSession {
return backSes.upstream
}

func (backSes *backSession) getCNLabels() map[string]string {
return backSes.label
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func setGlobalAicm(aicm *defines.AutoIncrCacheManager) {
globalAicm.Store(aicm)
}

func getGlobalAic() *defines.AutoIncrCacheManager {
func getGlobalAicm() *defines.AutoIncrCacheManager {
if globalAicm.Load() != nil {
return globalAicm.Load().(*defines.AutoIncrCacheManager)
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"

"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -35,7 +37,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
Expand Down Expand Up @@ -237,7 +238,7 @@ type Session struct {
}

func (ses *Session) InitSystemVariables(ctx context.Context) (err error) {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().TenantID, ses, ctx); err != nil {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().GetTenantID(), ses, ctx); err != nil {
return
}
ses.sesSysVars = ses.gSysVars.Clone()
Expand Down Expand Up @@ -532,7 +533,7 @@ func NewSession(connCtx context.Context, proto MysqlRrWr, mp *mpool.MPool) *Sess
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())

ses.proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
ses.proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -983,15 +984,6 @@ func (ses *Session) GetTxnInfo() string {
return meta.DebugString()
}

func (ses *Session) GetDatabaseName() string {
return ses.GetResponser().GetStr(DBNAME)
}

func (ses *Session) SetDatabaseName(db string) {
ses.GetResponser().SetStr(DBNAME, db)
ses.GetTxnCompileCtx().SetDatabase(db)
}

func (ses *Session) DatabaseNameIsEmpty() bool {
return len(ses.GetDatabaseName()) == 0
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/show_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_getSqlForAccountInfo(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func (th *TxnHandler) createTxnOpUnsafe(execCtx *ExecCtx) error {
connectionID = execCtx.resper.GetU32(CONNID)
}
if execCtx.ses.GetTenantInfo() != nil {
accountID = execCtx.ses.GetTenantInfo().TenantID
userName = execCtx.ses.GetTenantInfo().User
accountID = execCtx.ses.GetTenantInfo().GetTenantID()
userName = execCtx.ses.GetTenantInfo().GetUser()
}
sessionInfo := execCtx.ses.GetDebugString()
opts = append(opts,
Expand Down
31 changes: 25 additions & 6 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ func (l *lockTableAllocator) setRestartService(serviceID string) {
zap.String("serviceID", serviceID))
return
}
getLogger().Info("set restart lock service",
zap.String("serviceID", serviceID))
logServiceStatus("set restart lock service",
serviceID,
b.getStatus())
b.setStatus(pb.Status_ServiceLockWaiting)
}

Expand All @@ -221,7 +222,8 @@ func (l *lockTableAllocator) remainTxnInService(serviceID string) int32 {
txnIDs := b.getTxnIds()
getLogger().Error("remain txn in restart service",
bytesArrayField("txnIDs", txnIDs),
zap.String("serviceID", serviceID))
zap.String("serviceID", serviceID),
zap.String("status", b.getStatus().String()))

c := len(txnIDs)
if c == 0 {
Expand All @@ -231,7 +233,8 @@ func (l *lockTableAllocator) remainTxnInService(serviceID string) int32 {
// -1 means can not get right remain txn in restart lock service
c = -1
getLogger().Error("can not get right remain txn in restart lock service",
zap.String("serviceID", serviceID))
zap.String("serviceID", serviceID),
zap.String("status", b.getStatus().String()))
}

}
Expand All @@ -255,6 +258,9 @@ func (l *lockTableAllocator) canRestartService(serviceID string) bool {
zap.String("serviceID", serviceID))
return true
}
logServiceStatus("can restart lock service",
serviceID,
b.getStatus())
return b.isStatus(pb.Status_ServiceCanRestart)
}

Expand Down Expand Up @@ -612,9 +618,16 @@ func (b *serviceBinds) isStatus(status pb.Status) bool {
return b.status == status
}

func (b *serviceBinds) getStatus() pb.Status {
b.RLock()
defer b.RUnlock()
return b.status
}

func (b *serviceBinds) setStatus(status pb.Status) {
b.Lock()
defer b.Unlock()
logStatusChange(b.status, status)
b.status = status
}

Expand Down Expand Up @@ -754,8 +767,14 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
}
b := l.getServiceBinds(req.KeepLockTableBind.ServiceID)
if b.isStatus(pb.Status_ServiceLockEnable) {
writeResponse(ctx, cancel, resp, nil, cs)
return
if req.KeepLockTableBind.Status != pb.Status_ServiceLockEnable {
getLogger().Error("tn has abnormal lock service status",
zap.String("serviceID", b.serviceID),
zap.String("status", req.KeepLockTableBind.Status.String()))
} else {
writeResponse(ctx, cancel, resp, nil, cs)
return
}
}
b.setTxnIds(req.KeepLockTableBind.TxnIDs)
switch req.KeepLockTableBind.Status {
Expand Down
7 changes: 7 additions & 0 deletions pkg/lockservice/lock_table_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package lockservice

import (
"context"
"go.uber.org/zap"
"time"

"github.com/matrixorigin/matrixone/pkg/common/morpc"
Expand Down Expand Up @@ -209,6 +210,12 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
if resp.KeepLockTableBind.OK {
switch resp.KeepLockTableBind.Status {
case pb.Status_ServiceLockEnable:
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
getLogger().Error("tn has abnormal lock service status",
zap.String("serviceID", k.serviceID),
zap.String("status", k.service.getStatus().String()))
}
return
case pb.Status_ServiceLockWaiting:
// maybe pb.Status_ServiceUnLockSucc
if k.service.isStatus(pb.Status_ServiceLockEnable) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/lockservice/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,12 @@ func logPingFailed(
}
}

func logCanLockOnService() {
func logCanLockOnService(
serviceID string) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Error("if lock on service")
logger.Error("if lock on service",
zap.String("serviceID", serviceID))
}
}

Expand Down Expand Up @@ -500,6 +502,19 @@ func logStatus(
}
}

func logServiceStatus(
info string,
serviceID string,
status pb.Status) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Info("service status",
zap.String("info", info),
zap.String("serviceID", serviceID),
zap.String("status", status.String()))
}
}

func logStatusChange(
from pb.Status,
to pb.Status) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/lockservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ func (s *service) canLockOnServiceStatus(
if s.isStatus(pb.Status_ServiceLockEnable) {
return true
}
defer logCanLockOnService()
if opts.Sharding == pb.Sharding_ByRow {
tableID = shardingByRow(rows[0])
}
if !s.validGroupTable(opts.Group, tableID) {
logCanLockOnService(s.serviceID)
return false
}
if s.activeTxnHolder.hasActiveTxn(txnID) {
Expand Down Expand Up @@ -391,8 +391,8 @@ func (s *service) Close() error {
func (s *service) setStatus(status pb.Status) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.status = status
logStatusChange(s.mu.status, status)
s.mu.status = status
}

func (s *service) getStatus() pb.Status {
Expand Down
Loading

0 comments on commit d05b276

Please sign in to comment.