Skip to content

Commit

Permalink
refactor tn migration 7 (#20821)
Browse files Browse the repository at this point in the history
refactor checkpoint-related code

Approved by: @LeftHandCold
  • Loading branch information
XuPeng-SH authored Dec 19, 2024
1 parent c3db9f5 commit c9d8e9f
Show file tree
Hide file tree
Showing 12 changed files with 790 additions and 615 deletions.
9 changes: 7 additions & 2 deletions pkg/vm/engine/tae/db/checkpoint/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *Checkpo
}
}

// e.start >= o.end
func (e *CheckpointEntry) AllGE(o *CheckpointEntry) bool {
return e.start.GE(&o.end)
}

func (e *CheckpointEntry) SetVersion(version uint32) {
e.Lock()
defer e.Unlock()
Expand Down Expand Up @@ -111,8 +116,8 @@ func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool {
}
return true
}
func (e *CheckpointEntry) LessEq(ts types.TS) bool {
return e.end.LE(&ts)
func (e *CheckpointEntry) LessEq(ts *types.TS) bool {
return e.end.LE(ts)
}
func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location) {
e.Lock()
Expand Down
288 changes: 39 additions & 249 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,43 @@ package checkpoint

import (
"context"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"time"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
)

type RunnerWriter interface {
RemoveCheckpointMetaFile(string)
AddCheckpointMetaFile(string)
UpdateCompacted(entry *CheckpointEntry)
}

type RunnerReader interface {
String() string
GetAllIncrementalCheckpoints() []*CheckpointEntry
GetAllGlobalCheckpoints() []*CheckpointEntry
GetPenddingIncrementalCount() int
GetGlobalCheckpointCount() int
CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
MaxGlobalCheckpoint() *CheckpointEntry
GetLowWaterMark() types.TS
MaxLSN() uint64
GetCatalog() *catalog.Catalog
GetCheckpointMetaFiles() map[string]struct{}
RemoveCheckpointMetaFile(string)
AddCheckpointMetaFile(string)
ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry
GetCompacted() *CheckpointEntry
UpdateCompacted(entry *CheckpointEntry)
GetDriver() wal.Driver

// for test, delete in next phase
GetAllCheckpoints() []*CheckpointEntry
GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry

MaxGlobalCheckpoint() *CheckpointEntry
MaxIncrementalCheckpoint() *CheckpointEntry
GetDirtyCollector() logtail.Collector
}

func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch {
Expand Down Expand Up @@ -80,301 +90,81 @@ func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncate
return bat
}
func (r *runner) GetAllIncrementalCheckpoints() []*CheckpointEntry {
r.storage.Lock()
snapshot := r.storage.incrementals.Copy()
r.storage.Unlock()
return snapshot.Items()
return r.store.GetAllIncrementalCheckpoints()
}

func (r *runner) GetAllGlobalCheckpoints() []*CheckpointEntry {
r.storage.Lock()
snapshot := r.storage.globals.Copy()
r.storage.Unlock()
return snapshot.Items()
return r.store.GetAllGlobalCheckpoints()
}

func (r *runner) MaxLSN() uint64 {
endTs := types.BuildTS(time.Now().UTC().UnixNano(), 0)
return r.source.GetMaxLSN(types.TS{}, endTs)
}

func (r *runner) MaxLSNInRange(end types.TS) uint64 {
return r.source.GetMaxLSN(types.TS{}, end)
}

func (r *runner) MaxGlobalCheckpoint() *CheckpointEntry {
r.storage.RLock()
defer r.storage.RUnlock()
global, _ := r.storage.globals.Max()
return global
return r.store.MaxGlobalCheckpoint()
}

func (r *runner) MaxIncrementalCheckpoint() *CheckpointEntry {
r.storage.RLock()
defer r.storage.RUnlock()
entry, _ := r.storage.incrementals.Max()
return entry
return r.store.MaxIncrementalCheckpoint()
}

func (r *runner) GetCatalog() *catalog.Catalog {
return r.catalog
}

func (r *runner) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry {
r.storage.Lock()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
it := tree.Iter()
ok := it.Seek(NewCheckpointEntry(r.rt.SID(), ts, ts, ET_Incremental))
incrementals := make([]*CheckpointEntry, 0)
if ok {
for len(incrementals) < cnt {
e := it.Item()
if !e.IsFinished() {
break
}
if e.start.LT(&ts) {
if !it.Next() {
break
}
continue
}
incrementals = append(incrementals, e)
if !it.Next() {
break
}
}
}
return incrementals
return r.store.ICKPSeekLT(ts, cnt)
}

func (r *runner) ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry {
r.storage.Lock()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
it := tree.Iter()
ok := it.Seek(NewCheckpointEntry(r.rt.SID(), *start, *start, ET_Incremental))
incrementals := make([]*CheckpointEntry, 0)
if ok {
for len(incrementals) < cnt {
e := it.Item()
if !e.IsFinished() {
break
}
if e.start.GE(start) && e.start.LT(end) {
incrementals = append(incrementals, e)
}
if !it.Next() {
break
}
}
}
return incrementals
return r.store.ICKPRange(start, end, cnt)
}

func (r *runner) GetCompacted() *CheckpointEntry {
r.storage.Lock()
defer r.storage.Unlock()
return r.storage.compacted.Load()
return r.store.GetCompacted()
}

func (r *runner) UpdateCompacted(entry *CheckpointEntry) {
r.storage.Lock()
defer r.storage.Unlock()
r.storage.compacted.Store(entry)
r.store.UpdateCompacted(entry)
}

// this API returns the min ts of all checkpoints
// the start of global checkpoint is always 0
func (r *runner) GetLowWaterMark() types.TS {
r.storage.RLock()
defer r.storage.RUnlock()
global, okG := r.storage.globals.Min()
incremental, okI := r.storage.incrementals.Min()
if !okG && !okI {
return types.TS{}
}
if !okG {
return incremental.start
}
if !okI {
return global.start
}
if global.end.LT(&incremental.start) {
return global.end
}
return incremental.start
return r.store.GetLowWaterMark()
}

func (r *runner) GetPenddingIncrementalCount() int {
entries := r.GetAllIncrementalCheckpoints()
global := r.MaxGlobalCheckpoint()

count := 0
for i := len(entries) - 1; i >= 0; i-- {
if global != nil && entries[i].end.LE(&global.end) {
break
}
if !entries[i].IsFinished() {
continue
}
count++
}
return count
return r.store.GetPenddingIncrementalCount()
}

func (r *runner) GetGlobalCheckpointCount() int {
r.storage.RLock()
defer r.storage.RUnlock()
return r.storage.globals.Len()
}

func (r *runner) getLastFinishedGlobalCheckpointLocked() *CheckpointEntry {
g, ok := r.storage.globals.Max()
if !ok {
return nil
}
if g.IsFinished() {
return g
}
it := r.storage.globals.Iter()
it.Seek(g)
defer it.Release()
if !it.Prev() {
return nil
}
return it.Item()
return r.store.GetGlobalCheckpointCount()
}

func (r *runner) GetAllCheckpoints() []*CheckpointEntry {
ckps := make([]*CheckpointEntry, 0)
var ts types.TS
r.storage.Lock()
g := r.getLastFinishedGlobalCheckpointLocked()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
if g != nil {
ts = g.GetEnd()
ckps = append(ckps, g)
}
pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental)
iter := tree.Iter()
defer iter.Release()
if ok := iter.Seek(pivot); ok {
for {
e := iter.Item()
if !e.IsFinished() {
break
}
ckps = append(ckps, e)
if !iter.Next() {
break
}
}
}
return ckps
return r.store.GetAllCheckpoints()
}

func (r *runner) GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry {
ckps := make([]*CheckpointEntry, 0)
var ts types.TS
if compact != nil {
ts = compact.GetEnd()
ckps = append(ckps, compact)
}
r.storage.Lock()
g := r.getLastFinishedGlobalCheckpointLocked()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
if g != nil {
if ts.IsEmpty() {
ts = g.GetEnd()
}
ckps = append(ckps, g)
}
pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental)
iter := tree.Iter()
defer iter.Release()
if ok := iter.Seek(pivot); ok {
for {
e := iter.Item()
if !e.IsFinished() {
break
}
ckps = append(ckps, e)
if !iter.Next() {
break
}
}
}
return ckps

return r.store.GetAllCheckpointsForBackup(compact)
}

func (r *runner) GCByTS(ctx context.Context, ts types.TS) error {
prev := r.gcTS.Load()
if prev == nil {
r.gcTS.Store(ts)
} else {
prevTS := prev.(types.TS)
if prevTS.LT(&ts) {
r.gcTS.Store(ts)
}
}
logutil.Debugf("GC %v", ts.ToString())
r.gcCheckpointQueue.Enqueue(struct{}{})
return nil
}

func (r *runner) getGCTS() types.TS {
prev := r.gcTS.Load()
if prev == nil {
return types.TS{}
}
return prev.(types.TS)
}

func (r *runner) getGCedTS() types.TS {
r.storage.RLock()
minGlobal, _ := r.storage.globals.Min()
minIncremental, _ := r.storage.incrementals.Min()
r.storage.RUnlock()
if minGlobal == nil {
return types.TS{}
}
if minIncremental == nil {
return minGlobal.end
}
if minIncremental.start.GE(&minGlobal.end) {
return minGlobal.end
}
return minIncremental.start
}

func (r *runner) getTSToGC() types.TS {
maxGlobal := r.MaxGlobalCheckpoint()
if maxGlobal == nil {
return types.TS{}
}
if maxGlobal.IsFinished() {
return maxGlobal.end.Prev()
}
globals := r.GetAllGlobalCheckpoints()
if len(globals) == 1 {
return types.TS{}
if _, updated := r.store.UpdateGCIntent(&ts); !updated {
// TODO
return nil
}
maxGlobal = globals[len(globals)-1]
return maxGlobal.end.Prev()
_, err := r.gcCheckpointQueue.Enqueue(struct{}{})
return err
}

func (r *runner) ExistPendingEntryToGC() bool {
_, needGC := r.getTSTOGC()
return needGC
}

func (r *runner) IsTSStale(ts types.TS) bool {
gcts := r.getGCTS()
if gcts.IsEmpty() {
return false
}
minValidTS := gcts.Physical() - r.options.globalVersionInterval.Nanoseconds()
return ts.Physical() < minValidTS
func (r *runner) GCNeeded() bool {
return r.store.GCNeeded()
}
Loading

0 comments on commit c9d8e9f

Please sign in to comment.