diff --git a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go index 3af370271680c..a1f482912b192 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go @@ -288,12 +288,12 @@ func (task *flushTableTailTask) Execute(ctx context.Context) (err error) { inst := time.Now() snapshotSubtasks, err := task.flushAObjsForSnapshot(ctx) statFlushAobj := time.Since(inst) - if err != nil { - return - } defer func() { releaseFlushObjTasks(snapshotSubtasks, err) }() + if err != nil { + return + } ///////////////////// //// phase seperator @@ -621,11 +621,6 @@ func (task *flushTableTailTask) mergeAObjs(ctx context.Context) (err error) { // flushAObjsForSnapshot schedule io task to flush aobjects for snapshot read. this function will not release any data in io task func (task *flushTableTailTask) flushAObjsForSnapshot(ctx context.Context) (subtasks []*flushObjTask, err error) { - defer func() { - if err != nil { - releaseFlushObjTasks(subtasks, err) - } - }() subtasks = make([]*flushObjTask, len(task.aObjMetas)) // fire flush task for i, obj := range task.aObjMetas { @@ -647,13 +642,16 @@ func (task *flushTableTailTask) flushAObjsForSnapshot(ctx context.Context) (subt if deletes, _, err = objData.CollectDeleteInRange( ctx, types.TS{}, task.txn.GetStartTS(), true, common.MergeAllocator, ); err != nil { + data.Close() return } if deletes != nil { // make sure every batch in deltaloc object is sorted by rowid - _, err := mergesort.SortBlockColumns(deletes.Vecs, 0, task.rt.VectorPool.Transient) + _, err = mergesort.SortBlockColumns(deletes.Vecs, 0, task.rt.VectorPool.Transient) if err != nil { - return nil, err + data.Close() + deletes.Close() + return } }