Skip to content

Commit

Permalink
chore(dataobj): deduplicate info about page metadata
Browse files Browse the repository at this point in the history
This change evenly splits information about encoding, compression, and
values across column and page metadata:

* Page metadata now only contains encoding type (previously value,
  encoding, and compression)

* Column metadata only contains value and compression type (no change).

Information about the encoding is stored at the page level to allow
different pages in the same column to have different encodings; Parquet
makes use of this to allow some pages to have dictionary encoding while
others are PLAIN encoded.
  • Loading branch information
rfratto committed Jan 8, 2025
1 parent 33669ab commit 14ff134
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 22 deletions.
7 changes: 3 additions & 4 deletions pkg/dataobj/internal/dataset/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
type (
// ColumnInfo describes a column.
ColumnInfo struct {
Name string // Name of the column, if any.
Type datasetmd.ValueType // Type of values in the column.
Name string // Name of the column, if any.
Type datasetmd.ValueType // Type of values in the column.
Compression datasetmd.CompressionType // Compression used for the column.

RowsCount int // Total number of rows in the column.
CompressedSize int // Total size of all pages in the column after compression.
UncompressedSize int // Total size of all pages in the column before compression.

Compression datasetmd.CompressionType // Compression used for the column.

Statistics *datasetmd.Statistics // Optional statistics for the column.
}
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataobj/internal/dataset/column_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "github.com/grafana/loki/v3/pkg/dataobj/internal/result"
func iterMemColumn(col *MemColumn) result.Seq[Value] {
return result.Iter(func(yield func(Value) bool) error {
for _, page := range col.Pages {
for result := range iterMemPage(page) {
for result := range iterMemPage(page, col.Info.Type, col.Info.Compression) {
val, err := result.Value()
if err != nil {
return err
Expand Down
12 changes: 5 additions & 7 deletions pkg/dataobj/internal/dataset/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ type (
CRC32 uint32 // CRC32 checksum of the page after encoding and compression.
RowCount int // RowCount is the number of rows in the page, including NULLs.

Value datasetmd.ValueType // Type of values stored in the page.
Compression datasetmd.CompressionType // Compression used for values in the page.
Encoding datasetmd.EncodingType // Encoding used for values in the page.
Stats *datasetmd.Statistics // Optional statistics for the page.
Encoding datasetmd.EncodingType // Encoding used for values in the page.
Stats *datasetmd.Statistics // Optional statistics for the page.
}
)

Expand All @@ -52,7 +50,7 @@ var checksumTable = crc32.MakeTable(crc32.Castagnoli)

// reader returns a reader for decompressed page data. Reader returns an error
// if the CRC32 fails to validate.
func (p *MemPage) reader() (presence io.Reader, values io.ReadCloser, err error) {
func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Reader, values io.ReadCloser, err error) {
if actual := crc32.Checksum(p.Data, checksumTable); p.Info.CRC32 != actual {
return nil, nil, fmt.Errorf("invalid CRC32 checksum %x, expected %x", actual, p.Info.CRC32)
}
Expand All @@ -67,7 +65,7 @@ func (p *MemPage) reader() (presence io.Reader, values io.ReadCloser, err error)
compressedDataReader = bytes.NewReader(p.Data[n+int(bitmapSize):])
)

switch p.Info.Compression {
switch compression {
case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE:
return bitmapReader, io.NopCloser(compressedDataReader), nil

Expand All @@ -83,7 +81,7 @@ func (p *MemPage) reader() (presence io.Reader, values io.ReadCloser, err error)
return bitmapReader, newZstdReader(zr), nil
}

panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", p.Info.Compression.String()))
panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", compression.String()))
}

// zstdReader implements [io.ReadCloser] for a [zstd.Decoder].
Expand Down
4 changes: 1 addition & 3 deletions pkg/dataobj/internal/dataset/page_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ func (b *pageBuilder) Flush() (*MemPage, error) {
CRC32: checksum,
RowCount: b.rows,

Value: b.opts.Value,
Compression: b.opts.Compression,
Encoding: b.opts.Encoding,
Encoding: b.opts.Encoding,

// TODO(rfratto): At the moment we don't compute stats because they're
// not going to be valuable in every scenario: the min/max values for log
Expand Down
8 changes: 4 additions & 4 deletions pkg/dataobj/internal/dataset/page_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

func iterMemPage(p *MemPage) result.Seq[Value] {
func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType datasetmd.CompressionType) result.Seq[Value] {
return result.Iter(func(yield func(Value) bool) error {
presenceReader, valuesReader, err := p.reader()
presenceReader, valuesReader, err := p.reader(compressionType)
if err != nil {
return fmt.Errorf("opening page for reading: %w", err)
}
defer valuesReader.Close()

presenceDec := newBitmapDecoder(bufio.NewReader(presenceReader))
valuesDec, ok := newValueDecoder(p.Info.Value, p.Info.Encoding, bufio.NewReader(valuesReader))
valuesDec, ok := newValueDecoder(valueType, p.Info.Encoding, bufio.NewReader(valuesReader))
if !ok {
return fmt.Errorf("no decoder available for %s/%s", p.Info.Value, p.Info.Encoding)
return fmt.Errorf("no decoder available for %s/%s", valueType, p.Info.Encoding)
}

for {
Expand Down
4 changes: 1 addition & 3 deletions pkg/dataobj/internal/dataset/page_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ func Test_pageBuilder_WriteRead(t *testing.T) {

page, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, page.Info.Value)

t.Log("Uncompressed size: ", page.Info.UncompressedSize)
t.Log("Compressed size: ", page.Info.CompressedSize)

var actual []string
for result := range iterMemPage(page) {
for result := range iterMemPage(page, opts.Value, opts.Compression) {
val, err := result.Value()
require.NoError(t, err)

Expand Down Expand Up @@ -76,7 +75,6 @@ func Test_pageBuilder_Fill(t *testing.T) {

page, err := buf.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_INT64, page.Info.Value)
require.Equal(t, page.Info.UncompressedSize, page.Info.CompressedSize)

t.Log("Uncompressed size: ", page.Info.UncompressedSize)
Expand Down

0 comments on commit 14ff134

Please sign in to comment.