Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49021
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lyzx2001 authored and ti-chi-bot committed Dec 11, 2023
1 parent b7a4d7b commit 5832349
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_test(
"table_import_test.go",
"tidb_test.go",
],
data = glob(["testdata/**"]),
embed = [":importer"],
flaky = True,
shard_count = 50,
Expand Down
74 changes: 74 additions & 0 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/lightning/web"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/errno"
tidbkv "github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -766,6 +767,12 @@ ChunkLoop:
break
}

if chunk.FileMeta.Type == mydump.SourceTypeParquet {
// TODO: use the compressed size of the chunk to conduct memory control
_, err = getChunkCompressedSizeForParquet(ctx, chunk, rc.store)
return nil, errors.Trace(err)
}

restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *worker.Worker, cr *chunkProcessor) {
Expand Down Expand Up @@ -1161,6 +1168,73 @@ func (tr *TableImporter) postProcess(
return true, nil
}

<<<<<<< HEAD
=======
func getChunkCompressedSizeForParquet(
ctx context.Context,
chunk *checkpoints.ChunkCheckpoint,
store storage.ExternalStorage,
) (int64, error) {
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{})
if err != nil {
return 0, errors.Trace(err)
}
parser, err := mydump.NewParquetParser(ctx, store, reader, chunk.FileMeta.Path)
if err != nil {
_ = reader.Close()
return 0, errors.Trace(err)
}
//nolint: errcheck
defer parser.Close()
err = parser.Reader.ReadFooter()
if err != nil {
return 0, errors.Trace(err)
}
rowGroups := parser.Reader.Footer.GetRowGroups()
var maxRowGroupSize int64
for _, rowGroup := range rowGroups {
var rowGroupSize int64
columnChunks := rowGroup.GetColumns()
for _, columnChunk := range columnChunks {
columnChunkSize := columnChunk.MetaData.GetTotalCompressedSize()
rowGroupSize += columnChunkSize
}
maxRowGroupSize = max(maxRowGroupSize, rowGroupSize)
}
return maxRowGroupSize, nil
}

func updateStatsMeta(ctx context.Context, db *sql.DB, tableID int64, count int) {
s := common.SQLWithRetry{
DB: db,
Logger: log.FromContext(ctx).With(zap.Int64("tableID", tableID)),
}
err := s.Transact(ctx, "update stats_meta", func(ctx context.Context, tx *sql.Tx) error {
rs, err := tx.ExecContext(ctx, `
update mysql.stats_meta
set modify_count = ?,
count = ?,
version = @@tidb_current_ts
where table_id = ?;
`, count, count, tableID)
if err != nil {
return errors.Trace(err)
}
affected, err := rs.RowsAffected()
if err != nil {
return errors.Trace(err)
}
if affected == 0 {
return errors.Errorf("record with table_id %d not found", tableID)
}
return nil
})
if err != nil {
s.Logger.Warn("failed to update stats_meta", zap.Error(err))
}
}

>>>>>>> 7aba5245f1f (lightning: add function `getChunkCompressedSizeForParquet` for solving parquet oom issue (#49021))
func parseColumnPermutations(
tableInfo *model.TableInfo,
columns []string,
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2420,3 +2420,39 @@ func TestGetDDLStatus(t *testing.T) {
require.Equal(t, model.JobStateRunning, status.state)
require.Equal(t, int64(123)+int64(456), status.rowCount)
}

func TestGetChunkCompressedSizeForParquet(t *testing.T) {
dir := "./testdata/"
fileName := "000000_0.parquet"
store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

dataFiles := make([]mydump.FileInfo, 0)
dataFiles = append(dataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: fileName,
Type: mydump.SourceTypeParquet,
Compression: mydump.CompressionNone,
SortKey: "99",
FileSize: 192,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: dataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: dataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: 192,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

compressedSize, err := getChunkCompressedSizeForParquet(ctx, &chunk, store)
require.NoError(t, err)
require.Equal(t, compressedSize, int64(192))
}
Binary file added br/pkg/lightning/importer/testdata/000000_0.parquet
Binary file not shown.

0 comments on commit 5832349

Please sign in to comment.