Skip to content

Commit

Permalink
chore(dataobj): initial commit of value encoding
Browse files Browse the repository at this point in the history
This commit introduces the dataobj package with initial utilities for
encoding and decoding individual values within a "dataset." A dataset is
the generic representation of columnar data, with a dataset.Value being
one value in one page in one column.

This initial implementation includes two encodings:

* Plain encoding (for string values), and
* delta encoding (for signed integers).

A follow-up commit will introduce bitmap encoding for efficiently
bitpacking unsigned integers.

My initial prototype of dataobj used generics rather than the
dataset.Value wrapper. However, usage of generics made it difficult to
write utilities that operates on multiple columns. While dataset.Value
is slightly less type safe, it is significantly easier to work within
the scope of a dataset.

The encoding and decoding of values is implemented to support streaming
as much as possible: individual values can be encoded and passed
immediately to a compression block. Streaming values minimizes the
number of rows that needed to be stored in memory at once on both the
write path and the read path. This constrats with the design of
parquet-go, which primarily intends for an entire page of values to be
buffered in memory prior to encoding and compression. The streaming
approach trades off slightly slower performance for memory efficiency.
  • Loading branch information
rfratto committed Jan 6, 2025
1 parent 3b8d993 commit 0a19305
Show file tree
Hide file tree
Showing 11 changed files with 968 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/dataobj/internal/dataset/dataset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package dataset contains utilities for working with datasets. Datasets hold
// columnar data across multiple pages.
package dataset
125 changes: 125 additions & 0 deletions pkg/dataobj/internal/dataset/value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dataset

import (
"fmt"
"unsafe"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)

// Helper types
type (
stringptr *byte
)

// A Value represents a single value within a dataset. Unlike [any], Values can
// be constructed without allocations. The zero Value corresponds to nil.
type Value struct {
// The internal representation of Value is based on log/slog.Value, which is
// also designed to avoid allocations.
//
// While usage of any typically causes an allocation (due to any being a fat
// pointer), our usage avoids it:
//
// * Go will avoid allocating integer values that can be stored in a single
// byte, which applies to datasetmd.ValueType.
//
// * If any is referring to a pointer, then wrapping the poitner in an any
// does not cause an allocation. This is why we use stringptr instead of a
// string.

_ [0]func() // Disallow equality checking of two Values

// num holds the value for numeric types, or the string length for string
// types.
num uint64

// If any is of type [datasetmd.ValueType], then the value is in num as
// described above.
//
// If any is of type stringptr, then the value is of type
// [datasetmd.VALUE_TYPE_STRING] and the string value consists of the length
// in num and the pointer in any.
any any
}

// Int64Value rerturns a [Value] for an int64.
func Int64Value(v int64) Value {
return Value{
num: uint64(v),
any: datasetmd.VALUE_TYPE_INT64,
}
}

// Uint64Value returns a [Value] for a uint64.
func Uint64Value(v uint64) Value {
return Value{
num: v,
any: datasetmd.VALUE_TYPE_UINT64,
}
}

// StringValue returns a [Value] for a string.
func StringValue(v string) Value {
return Value{
num: uint64(len(v)),
any: (stringptr)(unsafe.StringData(v)),
}
}

// IsNil returns whether v is nil.
func (v Value) IsNil() bool {
return v.any == nil
}

// IsZero reports whether v is the zero value.
func (v Value) IsZero() bool {
// If Value is a numeric type, v.num == 0 checks if it's the zero value. For
// string types, v.num == 0 means the string is empty.
return v.num == 0
}

// Type returns the [datasetmd.ValueType] of v. If v is nil, Type returns
// [datasetmd.VALUE_TYPE_UNSPECIFIED].
func (v Value) Type() datasetmd.ValueType {
if v.IsNil() {
return datasetmd.VALUE_TYPE_UNSPECIFIED
}

switch v := v.any.(type) {
case datasetmd.ValueType:
return v
case stringptr:
return datasetmd.VALUE_TYPE_STRING
default:
panic(fmt.Sprintf("dataset.Value has unexpected type %T", v))
}
}

// Int64 returns v's value as an int64. It panics if v is not a
// [datasetmd.VALUE_TYPE_INT64].
func (v Value) Int64() int64 {
if expect, actual := datasetmd.VALUE_TYPE_INT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
return int64(v.num)
}

// Uint64 returns v's value as a uint64. It panics if v is not a
// [datasetmd.VALUE_TYPE_UINT64].
func (v Value) Uint64() uint64 {
if expect, actual := datasetmd.VALUE_TYPE_UINT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
return v.num
}

// String returns v's value as a string. Because of Go's String method
// convention, if v is not a string, String returns a string of the form
// "VALUE_TYPE_T", where T is the underlying type of v.
func (v Value) String() string {
if sp, ok := v.any.(stringptr); ok {
return unsafe.String(sp, v.num)
}
return v.Type().String()
}
123 changes: 123 additions & 0 deletions pkg/dataobj/internal/dataset/value_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package dataset

import (
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

// A valueEncoder encodes sequences of [Value], writing them to an underlying
// [streamio.Writer]. Implementations of encoding types must call
// registerValueEncoding to register themselves.
type valueEncoder interface {
// ValueType returns the type of values supported by the valueEncoder.
ValueType() datasetmd.ValueType

// EncodingType returns the encoding type used by the valueEncoder.
EncodingType() datasetmd.EncodingType

// Encode encodes an individual [Value]. Encode returns an error if encoding
// fails or if value is an unsupported type.
Encode(value Value) error

// Flush encodes any buffered data and immediately writes it to the
// underlying [streamio.Writer]. Flush returns an error if encoding fails.
Flush() error

// Reset discards any state and resets the valueEncoder to write to w. This
// permits reusing a valueEncoder rather than allocating a new one.
Reset(w streamio.Writer)
}

// A valueDecoder decodes sequences of [Value] from an underlying
// [streamio.Reader]. Implementations of encoding types must call
// registerValueEncoding to register themselves.
type valueDecoder interface {
// ValueType returns the type of values supported by the valueDecoder.
ValueType() datasetmd.ValueType

// EncodingType returns the encoding type used by the valueDecoder.
EncodingType() datasetmd.EncodingType

// Decode decodes an individual [Value]. Decode returns an error if decoding
// fails.
Decode() (Value, error)

// Reset discards any state and resets the valueDecoder to read from r. This
// permits reusing a valueDecoder rather than allocating a new one.
Reset(r streamio.Reader)
}

// registry stores known value encoders and decoders. We use a global variable
// to track implementations to allow encoding implementations to be
// self-contained in a single file.
var registry = map[registryKey]registryEntry{}

type (
registryKey struct {
Value datasetmd.ValueType
Encoding datasetmd.EncodingType
}

registryEntry struct {
NewEncoder func(streamio.Writer) valueEncoder
NewDecoder func(streamio.Reader) valueDecoder
}
)

// registerValueEncoding registers a [valueEncoder] and [valueDecoder] for a
// specified valueType and encodingType tuple. If another encoding has been
// registered for the same tuple, registerValueEncoding panics.
//
// registerValueEncoding should be called in an init method of files
// implementing encodings.
func registerValueEncoding(
valueType datasetmd.ValueType,
encodingType datasetmd.EncodingType,
newEncoder func(streamio.Writer) valueEncoder,
newDecoder func(streamio.Reader) valueDecoder,
) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
if _, exist := registry[key]; exist {
panic(fmt.Sprintf("dataset: registerValueEncoding already called for %s/%s", valueType, encodingType))
}

registry[key] = registryEntry{
NewEncoder: newEncoder,
NewDecoder: newDecoder,
}
}

// newValueEncoder creates a new valueEncoder for the specified valueType and
// encodingType. If no encoding is registered for the specified combination of
// valueType and encodingType, newValueEncoder returns nil and false.
func newValueEncoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, w streamio.Writer) (valueEncoder, bool) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
entry, exist := registry[key]
if !exist {
return nil, false
}
return entry.NewEncoder(w), true
}

// newValueDecoder creates a new valueDecoder for the specified valueType and
// encodingType. If no encoding is registered for the specified combination of
// valueType and encodingType, vewValueDecoder returns nil and false.
func newValueDecoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, r streamio.Reader) (valueDecoder, bool) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
entry, exist := registry[key]
if !exist {
return nil, false
}
return entry.NewDecoder(r), true
}
110 changes: 110 additions & 0 deletions pkg/dataobj/internal/dataset/value_encoding_delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dataset

import (
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

func init() {
// Register the encoding so instances of it can be dynamically created.
registerValueEncoding(
datasetmd.VALUE_TYPE_INT64,
datasetmd.ENCODING_TYPE_DELTA,
func(w streamio.Writer) valueEncoder { return newDeltaEncoder(w) },
func(r streamio.Reader) valueDecoder { return newDeltaDecoder(r) },
)
}

// deltaEncoder encodes delta-encoded int64s. Values are encoded as varint,
// with each subsequent value being the delta from the previous value.
type deltaEncoder struct {
w streamio.Writer
prev int64
}

var _ valueEncoder = (*deltaEncoder)(nil)

// newDeltaEncoder creates a deltaEncoder that writes encoded numbers to w.
func newDeltaEncoder(w streamio.Writer) *deltaEncoder {
var enc deltaEncoder
enc.Reset(w)
return &enc
}

// ValueType returns [datasetmd.VALUE_TYPE_INT64].
func (enc *deltaEncoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_INT64
}

// EncodingType returns [datasetmd.ENCODING_TYPE_DELTA].
func (enc *deltaEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_DELTA
}

// Encode encodes a new value.
func (enc *deltaEncoder) Encode(v Value) error {
if v.Type() != datasetmd.VALUE_TYPE_INT64 {
return fmt.Errorf("delta: invalid value type %v", v.Type())
}
iv := v.Int64()

delta := iv - enc.prev
enc.prev = iv
return streamio.WriteVarint(enc.w, delta)
}

// Flush implements [valueEncoder]. It is a no-op for deltaEncoder.
func (enc *deltaEncoder) Flush() error {
return nil
}

// Reset resets the encoder to its initial state.
func (enc *deltaEncoder) Reset(w streamio.Writer) {
enc.prev = 0
enc.w = w
}

// deltaDecoder decodes delta-encoded numbers. Values are decoded as varint,
// with each subsequent value being the delta from the previous value.
type deltaDecoder struct {
r streamio.Reader
prev int64
}

var _ valueDecoder = (*deltaDecoder)(nil)

// newDeltaDecoder creates a deltaDecoder that reads encoded numbers from r.
func newDeltaDecoder(r streamio.Reader) *deltaDecoder {
var dec deltaDecoder
dec.Reset(r)
return &dec
}

// ValueType returns [datasetmd.VALUE_TYPE_INT64].
func (dec *deltaDecoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_INT64
}

// Type returns [datasetmd.ENCODING_TYPE_DELTA].
func (dec *deltaDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_DELTA
}

// Decode decodes the next value.
func (dec *deltaDecoder) Decode() (Value, error) {
delta, err := streamio.ReadVarint(dec.r)
if err != nil {
return Int64Value(dec.prev), err
}

dec.prev += delta
return Int64Value(dec.prev), nil
}

// Reset resets the deltaDecoder to its initial state.
func (dec *deltaDecoder) Reset(r streamio.Reader) {
dec.prev = 0
dec.r = r
}
Loading

0 comments on commit 0a19305

Please sign in to comment.