Skip to content

Commit

Permalink
Merge remote-tracking branch 'mo/1.2-dev' into fix-reg-rows-incorrect
Browse files Browse the repository at this point in the history
  • Loading branch information
ouyuanning committed Jun 23, 2024
2 parents 375bae9 + fd51a24 commit da1c567
Show file tree
Hide file tree
Showing 18 changed files with 714 additions and 216 deletions.
2 changes: 2 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ const (
ErrRetryForCNRollingRestart uint16 = 20634
ErrNewTxnInCNRollingRestart uint16 = 20635
ErrPrevCheckpointNotFinished uint16 = 20636
ErrCantDelGCChecker uint16 = 20637

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
Expand Down Expand Up @@ -446,6 +447,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrRetryForCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "retry for CN rolling restart"},
ErrNewTxnInCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "new txn in CN rolling restart"},
ErrPrevCheckpointNotFinished: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "prev checkpoint not finished"},
ErrCantDelGCChecker: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can't delete gc checker"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func NewPrevCheckpointNotFinished() *Error {
return newError(Context(), ErrPrevCheckpointNotFinished)
}

func NewCantDelGCCheckerNoCtx() *Error {
return newError(Context(), ErrCantDelGCChecker)
}

func NewNotFoundNoCtx() *Error {
return newError(Context(), ErrNotFound)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pb/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
OpCode_OpGlobalCheckpoint: "GlobalCheckpoint",
OpCode_OpInterceptCommit: "InterceptCommit",
OpCode_OpCommitMerge: "CommitMerge",
OpCode_OpDiskDiskCleaner: "DiskCleaner",
}
)

Expand Down
296 changes: 150 additions & 146 deletions pkg/pb/api/api.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/sql/parsers/dialect/mysql/mysql_sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions pkg/sql/plan/function/ctl/cmd_disk_cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ctl

import (
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"strings"
)

func IsValidArg(parameter string, proc *process.Process) (*db.DiskCleaner, error) {
parameters := strings.Split(parameter, ".")
if len(parameters) > 3 || len(parameters) < 2 {
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid argument!")
}
op := parameters[0]
switch op {
case gc.AddChecker:
case gc.RemoveChecker:
break
default:
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid operation!")
}
key := parameters[1]
switch key {
case gc.CheckerKeyTTL:
case gc.CheckerKeyMinTS:
break
default:
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid key!")
}
return &db.DiskCleaner{
Op: op,
Key: key,
Value: parameters[2],
}, nil
}

func handleDiskCleaner() handleFunc {
return GetTNHandlerFunc(
api.OpCode_OpDiskDiskCleaner,
func(string) ([]uint64, error) { return nil, nil },
func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) {
// parameter like "name.freq.action.iarg.sarg"
diskcleaner, err := IsValidArg(parameter, proc)
if err != nil {
return nil, err
}
payload, err := types.Encode(diskcleaner)
if err != nil {
return nil, moerr.NewInternalError(proc.Ctx, "payload encode err")
}
return payload, nil
},
func(data []byte) (any, error) {
resp := api.TNStringResponse{
ReturnStr: string(data),
}
protoc.MustUnmarshal(&resp, data)
return resp, nil
})
}
2 changes: 2 additions & 0 deletions pkg/sql/plan/function/ctl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
InterceptCommitMethod = "INTERCEPTCOMMIT"
MergeObjectsMethod = "MERGEOBJECTS"
DisableCKPMethod = "DISABLECKP"
DiskCleanerMethod = "DISKCLEANER"

GetProtocolVersionMethod = "GETPROTOCOLVERSION"
SetProtocolVersionMethod = "SETPROTOCOLVERSION"
Expand Down Expand Up @@ -87,6 +88,7 @@ var (
InterceptCommitMethod: handleInterceptCommit(),
MergeObjectsMethod: handleMerge(),
DisableCKPMethod: handleDisableCheckpoint(),
DiskCleanerMethod: handleDiskCleaner(),

GetProtocolVersionMethod: handleGetProtocolVersion,
SetProtocolVersionMethod: handleSetProtocolVersion,
Expand Down
12 changes: 12 additions & 0 deletions pkg/txn/storage/tae/storage_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func (s *taeStorage) Debug(ctx context.Context,
})
}
return resp.Read()
case uint32(api.OpCode_OpDiskDiskCleaner):
_, err := handleRead(ctx, txnMeta, data, s.taeHandler.HandleDiskCleaner)
if err != nil {
resp := protoc.MustMarshal(&api.TNStringResponse{
ReturnStr: "Failed!" + err.Error(),
})
return resp, err
}
resp := protoc.MustMarshal(&api.TNStringResponse{
ReturnStr: "OK",
})
return resp, nil
default:
return nil, moerr.NewNotSupportedNoCtx("TAEStorage not support ctl method %d", opCode)
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/vm/engine/tae/db/gc/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type checkpointCleaner struct {
// checker is to check whether the checkpoint can be consumed
checker struct {
sync.RWMutex
extras []func(item any) bool
extras map[string]func(item any) bool
}

// delWorker is a worker that deletes s3‘s objects or local
Expand Down Expand Up @@ -118,6 +118,7 @@ func NewCheckpointCleaner(
cleaner.snapshotMeta = logtail.NewSnapshotMeta()
cleaner.option.enableGC = true
cleaner.mPool = common.DebugAllocator
cleaner.checker.extras = make(map[string]func(item any) bool)
return cleaner
}

Expand Down Expand Up @@ -911,10 +912,24 @@ func (c *checkpointCleaner) checkExtras(item any) bool {
return true
}

func (c *checkpointCleaner) AddChecker(checker func(item any) bool) {
// AddChecker add&update a checker to the cleaner,return the number of checkers
// key is the unique identifier of the checker
func (c *checkpointCleaner) AddChecker(checker func(item any) bool, key string) int {
c.checker.Lock()
defer c.checker.Unlock()
c.checker.extras = append(c.checker.extras, checker)
c.checker.extras[key] = checker
return len(c.checker.extras)
}

// RemoveChecker remove a checker from the cleaner,return true if the checker is removed successfully
func (c *checkpointCleaner) RemoveChecker(key string) error {
c.checker.Lock()
defer c.checker.Unlock()
if len(c.checker.extras) == 1 {
return moerr.NewCantDelGCCheckerNoCtx()
}
delete(c.checker.extras, key)
return nil
}

func (c *checkpointCleaner) createNewInput(
Expand Down
13 changes: 12 additions & 1 deletion pkg/vm/engine/tae/db/gc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ const (
GCAttrVersion = "version"
)

const (
AddChecker = "add_checker"
RemoveChecker = "remove_checker"
)

const (
CheckerKeyTTL = "ttl"
CheckerKeyMinTS = "min_ts"
)

var (
BlockSchemaAttr = []string{
GCAttrObjectName,
Expand Down Expand Up @@ -143,7 +153,8 @@ type Cleaner interface {
Replay() error
Process()
TryGC() error
AddChecker(checker func(item any) bool)
AddChecker(checker func(item any) bool, key string) int
RemoveChecker(key string) error
GetMaxConsumed() *checkpoint.CheckpointEntry
Stop()
// for test
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
ts := types.BuildTS(time.Now().UTC().UnixNano()-int64(opts.GCCfg.GCTTL), 0)
endTS := checkpoint.GetEnd()
return !endTS.GreaterEq(&ts)
})
}, gc2.CheckerKeyTTL)
db.DiskCleaner = gc2.NewDiskCleaner(cleaner)
db.DiskCleaner.Start()
// Init gc manager at last
Expand Down
14 changes: 14 additions & 0 deletions pkg/vm/engine/tae/db/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ func (m *FlushTable) UnmarshalBinary(data []byte) error {
return m.Unmarshal(data)
}

type DiskCleaner struct {
Op string
Key string
Value string
}

func (m *DiskCleaner) MarshalBinary() ([]byte, error) {
return m.Marshal()
}

func (m *DiskCleaner) UnmarshalBinary(data []byte) error {
return m.Unmarshal(data)
}

type Checkpoint struct {
FlushDuration time.Duration
Enable bool
Expand Down
Loading

0 comments on commit da1c567

Please sign in to comment.