From 42a7c4506823ee5b96c9ada12cca79ca95541355 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Sat, 14 Sep 2024 20:50:13 +0800 Subject: [PATCH] fix range checkpoint (#18799) 1. fix range checkpoint 2. add log Approved by: @XuPeng-SH, @sukki37 --- .../logstore/driver/logservicedriver/info.go | 20 +++++++------- .../driver/logservicedriver/truncate.go | 27 ++++++++++++++++--- .../engine/tae/logstore/store/checkpoint.go | 21 +++++++++++++++ 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go b/pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go index 2443d70596a93..97e939d280d33 100644 --- a/pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go +++ b/pkg/vm/engine/tae/logstore/driver/logservicedriver/info.go @@ -31,15 +31,14 @@ var ErrDriverLsnNotFound = moerr.NewInternalErrorNoCtx("driver info: driver lsn var ErrRetryTimeOut = moerr.NewInternalErrorNoCtx("driver info: retry time out") type driverInfo struct { - addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate - validLsn *roaring64.Bitmap - addrMu sync.RWMutex - driverLsn uint64 // - syncing uint64 - synced uint64 - syncedMu sync.RWMutex - driverLsnMu sync.RWMutex - + addr map[uint64]*common.ClosedIntervals //logservicelsn-driverlsn TODO drop on truncate + validLsn *roaring64.Bitmap + addrMu sync.RWMutex + driverLsn uint64 // + syncing uint64 + synced uint64 + syncedMu sync.RWMutex + driverLsnMu sync.RWMutex truncating atomic.Uint64 // truncatedLogserviceLsn uint64 // @@ -96,7 +95,7 @@ func (info *driverInfo) onReplayRecordEntry(lsn uint64, driverLsns *common.Close func (info *driverInfo) getNextValidLogserviceLsn(lsn uint64) uint64 { info.addrMu.Lock() defer info.addrMu.Unlock() - if info.validLsn.GetCardinality() == 0 { + if info.validLsn.IsEmpty() { return 0 } max := info.validLsn.Maximum() @@ -209,6 +208,7 @@ func (info *driverInfo) gcAddr(logserviceLsn uint64) { lsnToDelete = append(lsnToDelete, serviceLsn) } } + info.validLsn.RemoveRange(0, logserviceLsn) for _, lsn := range lsnToDelete { delete(info.addr, lsn) } diff --git a/pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go b/pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go index a947cbfda9067..cc789f972af7c 100644 --- a/pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go +++ b/pkg/vm/engine/tae/logstore/driver/logservicedriver/truncate.go @@ -16,14 +16,17 @@ package logservicedriver import ( "context" + "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" + "go.uber.org/zap" // "time" ) // driver lsn -> entry lsn func (d *LogServiceDriver) Truncate(lsn uint64) error { + logutil.Info("TRACE-WAL-TRUNCATE", zap.Uint64(" driver start truncate", lsn)) if lsn > d.truncating.Load() { d.truncating.Store(lsn) } @@ -44,19 +47,33 @@ func (d *LogServiceDriver) onTruncate(items ...any) { } func (d *LogServiceDriver) doTruncate() { + t0 := time.Now() target := d.truncating.Load() lastServiceLsn := d.truncatedLogserviceLsn lsn := lastServiceLsn //TODO use valid lsn next := d.getNextValidLogserviceLsn(lsn) + loopCount := 0 for d.isToTruncate(next, target) { + loopCount++ lsn = next next = d.getNextValidLogserviceLsn(lsn) if next <= lsn { break } } + d.addrMu.RLock() + min := d.validLsn.Minimum() + max := d.validLsn.Maximum() + d.addrMu.RUnlock() + logutil.Info("TRACE-WAL-TRUNCATE-Get LogService lsn", + zap.Int("loop count", loopCount), + zap.Uint64("driver lsn", target), + zap.Uint64("min", min), + zap.Uint64("max", max), + zap.String("duration", time.Since(t0).String())) if lsn == lastServiceLsn { + logutil.Info("LogService Driver: retrun because logservice is small") return } d.truncateLogservice(lsn) @@ -65,7 +82,8 @@ func (d *LogServiceDriver) doTruncate() { } func (d *LogServiceDriver) truncateLogservice(lsn uint64) { - logutil.Infof("LogService Driver: Start Truncate %d", lsn) + logutil.Info("TRACE-WAL-TRUNCATE-Start Truncate", zap.Uint64("lsn", lsn)) + t0 := time.Now() client, err := d.clientPool.Get() if err == ErrClientPoolClosed { return @@ -101,7 +119,10 @@ func (d *LogServiceDriver) truncateLogservice(lsn uint64) { panic(err) } } - logutil.Infof("LogService Driver: Truncate %d successfully", lsn) + logutil.Info("TRACE-WAL-TRUNCATE-Truncate successfully", + zap.Uint64("lsn", lsn), + zap.String("duration", + time.Since(t0).String())) } func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) { client, err := d.clientPool.Get() @@ -127,6 +148,6 @@ func (d *LogServiceDriver) getLogserviceTruncate() (lsn uint64) { panic(err) } } - logutil.Infof("Logservice Driver: Get Truncate %d", lsn) + logutil.Infof("TRACE-WAL-TRUNCATE-Get Truncate %d", lsn) return } diff --git a/pkg/vm/engine/tae/logstore/store/checkpoint.go b/pkg/vm/engine/tae/logstore/store/checkpoint.go index ccf17b46689d6..803fcfedd5cd9 100644 --- a/pkg/vm/engine/tae/logstore/store/checkpoint.go +++ b/pkg/vm/engine/tae/logstore/store/checkpoint.go @@ -15,13 +15,18 @@ package store import ( + "time" + + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" driverEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" + "go.uber.org/zap" ) func (w *StoreImpl) RangeCheckpoint(gid uint32, start, end uint64) (ckpEntry entry.Entry, err error) { + logutil.Info("TRACE-WAL-TRUNCATE-RangeCheckpoint", zap.Uint32("group", gid), zap.Uint64("lsn", end)) ckpEntry = w.makeRangeCheckpointEntry(gid, start, end) drentry, _, err := w.doAppend(GroupCKP, ckpEntry) if err == sm.ErrClose { @@ -58,6 +63,9 @@ func (w *StoreImpl) onLogCKPInfoQueue(items ...any) { if err != nil { panic(err) } + logutil.Info("TRACE-WAL-TRUNCATE-CKP-Entry", + zap.Uint32("group", e.Info.Checkpoints[0].Group), + zap.Uint64("lsn", e.Info.Checkpoints[0].Ranges.GetMax())) w.logCheckpointInfo(e.Info) } w.onCheckpoint() @@ -69,6 +77,7 @@ func (w *StoreImpl) onCheckpoint() { } func (w *StoreImpl) ckpCkp() { + t0 := time.Now() e := w.makeInternalCheckpointEntry() driverEntry, _, err := w.doAppend(GroupInternal, e) if err == sm.ErrClose { @@ -77,6 +86,8 @@ func (w *StoreImpl) ckpCkp() { if err != nil { panic(err) } + logutil.Info("TRACE-WAL-TRUNCATE-Internal-Entry", + zap.String("duration", time.Since(t0).String())) w.truncatingQueue.Enqueue(driverEntry) err = e.WaitDone() if err != nil { @@ -86,6 +97,7 @@ func (w *StoreImpl) ckpCkp() { } func (w *StoreImpl) onTruncatingQueue(items ...any) { + t0 := time.Now() for _, item := range items { e := item.(*driverEntry.Entry) err := e.WaitDone() @@ -94,7 +106,14 @@ func (w *StoreImpl) onTruncatingQueue(items ...any) { } w.logCheckpointInfo(e.Info) } + tTruncateEntry := time.Since(t0) + t0 = time.Now() gid, driverLsn := w.getDriverCheckpointed() + tGetDriverEntry := time.Since(t0) + logutil.Info("TRACE-WAL-TRUNCATE", + zap.String("wait truncating entry takes", tTruncateEntry.String()), + zap.String("get driver lsn takes", tGetDriverEntry.String()), + zap.Uint64("driver lsn", driverLsn)) if gid == 0 { return } @@ -113,7 +132,9 @@ func (w *StoreImpl) onTruncateQueue(items ...any) { lsn = w.driverCheckpointing.Load() err = w.driver.Truncate(lsn) } + t := time.Now() w.gcWalDriverLsnMap(lsn) + logutil.Info("TRACE-WAL-TRUNCATE-GC-Store", zap.String("duration", time.Since(t).String())) w.driverCheckpointed = lsn } }