diff --git a/pkg/dataobj/internal/dataset/dataset.go b/pkg/dataobj/internal/dataset/dataset.go new file mode 100644 index 0000000000000..6c1cbd2402bb8 --- /dev/null +++ b/pkg/dataobj/internal/dataset/dataset.go @@ -0,0 +1,3 @@ +// Package dataset contains utilities for working with datasets. Datasets hold +// columnar data across multiple pages. +package dataset diff --git a/pkg/dataobj/internal/dataset/value.go b/pkg/dataobj/internal/dataset/value.go new file mode 100644 index 0000000000000..fec2da8f70c24 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value.go @@ -0,0 +1,125 @@ +package dataset + +import ( + "fmt" + "unsafe" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +// Helper types +type ( + stringptr *byte +) + +// A Value represents a single value within a dataset. Unlike [any], Values can +// be constructed without allocations. The zero Value corresponds to nil. +type Value struct { + // The internal representation of Value is based on log/slog.Value, which is + // also designed to avoid allocations. + // + // While usage of any typically causes an allocation (due to any being a fat + // pointer), our usage avoids it: + // + // * Go will avoid allocating integer values that can be stored in a single + // byte, which applies to datasetmd.ValueType. + // + // * If any is referring to a pointer, then wrapping the poitner in an any + // does not cause an allocation. This is why we use stringptr instead of a + // string. + + _ [0]func() // Disallow equality checking of two Values + + // num holds the value for numeric types, or the string length for string + // types. + num uint64 + + // If any is of type [datasetmd.ValueType], then the value is in num as + // described above. + // + // If any is of type stringptr, then the value is of type + // [datasetmd.VALUE_TYPE_STRING] and the string value consists of the length + // in num and the pointer in any. + any any +} + +// Int64Value rerturns a [Value] for an int64. +func Int64Value(v int64) Value { + return Value{ + num: uint64(v), + any: datasetmd.VALUE_TYPE_INT64, + } +} + +// Uint64Value returns a [Value] for a uint64. +func Uint64Value(v uint64) Value { + return Value{ + num: v, + any: datasetmd.VALUE_TYPE_UINT64, + } +} + +// StringValue returns a [Value] for a string. +func StringValue(v string) Value { + return Value{ + num: uint64(len(v)), + any: (stringptr)(unsafe.StringData(v)), + } +} + +// IsNil returns whether v is nil. +func (v Value) IsNil() bool { + return v.any == nil +} + +// IsZero reports whether v is the zero value. +func (v Value) IsZero() bool { + // If Value is a numeric type, v.num == 0 checks if it's the zero value. For + // string types, v.num == 0 means the string is empty. + return v.num == 0 +} + +// Type returns the [datasetmd.ValueType] of v. If v is nil, Type returns +// [datasetmd.VALUE_TYPE_UNSPECIFIED]. +func (v Value) Type() datasetmd.ValueType { + if v.IsNil() { + return datasetmd.VALUE_TYPE_UNSPECIFIED + } + + switch v := v.any.(type) { + case datasetmd.ValueType: + return v + case stringptr: + return datasetmd.VALUE_TYPE_STRING + default: + panic(fmt.Sprintf("dataset.Value has unexpected type %T", v)) + } +} + +// Int64 returns v's value as an int64. It panics if v is not a +// [datasetmd.VALUE_TYPE_INT64]. +func (v Value) Int64() int64 { + if expect, actual := datasetmd.VALUE_TYPE_INT64, v.Type(); expect != actual { + panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect)) + } + return int64(v.num) +} + +// Uint64 returns v's value as a uint64. It panics if v is not a +// [datasetmd.VALUE_TYPE_UINT64]. +func (v Value) Uint64() uint64 { + if expect, actual := datasetmd.VALUE_TYPE_UINT64, v.Type(); expect != actual { + panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect)) + } + return v.num +} + +// String returns v's value as a string. Because of Go's String method +// convention, if v is not a string, String returns a string of the form +// "VALUE_TYPE_T", where T is the underlying type of v. +func (v Value) String() string { + if sp, ok := v.any.(stringptr); ok { + return unsafe.String(sp, v.num) + } + return v.Type().String() +} diff --git a/pkg/dataobj/internal/dataset/value_encoding.go b/pkg/dataobj/internal/dataset/value_encoding.go new file mode 100644 index 0000000000000..b861a0142f29d --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding.go @@ -0,0 +1,123 @@ +package dataset + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +// A valueEncoder encodes sequences of [Value], writing them to an underlying +// [streamio.Writer]. Implementations of encoding types must call +// registerValueEncoding to register themselves. +type valueEncoder interface { + // ValueType returns the type of values supported by the valueEncoder. + ValueType() datasetmd.ValueType + + // EncodingType returns the encoding type used by the valueEncoder. + EncodingType() datasetmd.EncodingType + + // Encode encodes an individual [Value]. Encode returns an error if encoding + // fails or if value is an unsupported type. + Encode(value Value) error + + // Flush encodes any buffered data and immediately writes it to the + // underlying [streamio.Writer]. Flush returns an error if encoding fails. + Flush() error + + // Reset discards any state and resets the valueEncoder to write to w. This + // permits reusing a valueEncoder rather than allocating a new one. + Reset(w streamio.Writer) +} + +// A valueDecoder decodes sequences of [Value] from an underlying +// [streamio.Reader]. Implementations of encoding types must call +// registerValueEncoding to register themselves. +type valueDecoder interface { + // ValueType returns the type of values supported by the valueDecoder. + ValueType() datasetmd.ValueType + + // EncodingType returns the encoding type used by the valueDecoder. + EncodingType() datasetmd.EncodingType + + // Decode decodes an individual [Value]. Decode returns an error if decoding + // fails. + Decode() (Value, error) + + // Reset discards any state and resets the valueDecoder to read from r. This + // permits reusing a valueDecoder rather than allocating a new one. + Reset(r streamio.Reader) +} + +// registry stores known value encoders and decoders. We use a global variable +// to track implementations to allow encoding implementations to be +// self-contained in a single file. +var registry = map[registryKey]registryEntry{} + +type ( + registryKey struct { + Value datasetmd.ValueType + Encoding datasetmd.EncodingType + } + + registryEntry struct { + NewEncoder func(streamio.Writer) valueEncoder + NewDecoder func(streamio.Reader) valueDecoder + } +) + +// registerValueEncoding registers a [valueEncoder] and [valueDecoder] for a +// specified valueType and encodingType tuple. If another encoding has been +// registered for the same tuple, registerValueEncoding panics. +// +// registerValueEncoding should be called in an init method of files +// implementing encodings. +func registerValueEncoding( + valueType datasetmd.ValueType, + encodingType datasetmd.EncodingType, + newEncoder func(streamio.Writer) valueEncoder, + newDecoder func(streamio.Reader) valueDecoder, +) { + key := registryKey{ + Value: valueType, + Encoding: encodingType, + } + if _, exist := registry[key]; exist { + panic(fmt.Sprintf("dataset: registerValueEncoding already called for %s/%s", valueType, encodingType)) + } + + registry[key] = registryEntry{ + NewEncoder: newEncoder, + NewDecoder: newDecoder, + } +} + +// newValueEncoder creates a new valueEncoder for the specified valueType and +// encodingType. If no encoding is registered for the specified combination of +// valueType and encodingType, newValueEncoder returns nil and false. +func newValueEncoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, w streamio.Writer) (valueEncoder, bool) { + key := registryKey{ + Value: valueType, + Encoding: encodingType, + } + entry, exist := registry[key] + if !exist { + return nil, false + } + return entry.NewEncoder(w), true +} + +// newValueDecoder creates a new valueDecoder for the specified valueType and +// encodingType. If no encoding is registered for the specified combination of +// valueType and encodingType, vewValueDecoder returns nil and false. +func newValueDecoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, r streamio.Reader) (valueDecoder, bool) { + key := registryKey{ + Value: valueType, + Encoding: encodingType, + } + entry, exist := registry[key] + if !exist { + return nil, false + } + return entry.NewDecoder(r), true +} diff --git a/pkg/dataobj/internal/dataset/value_encoding_delta.go b/pkg/dataobj/internal/dataset/value_encoding_delta.go new file mode 100644 index 0000000000000..2028d2bbe6703 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_delta.go @@ -0,0 +1,110 @@ +package dataset + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +func init() { + // Register the encoding so instances of it can be dynamically created. + registerValueEncoding( + datasetmd.VALUE_TYPE_INT64, + datasetmd.ENCODING_TYPE_DELTA, + func(w streamio.Writer) valueEncoder { return newDeltaEncoder(w) }, + func(r streamio.Reader) valueDecoder { return newDeltaDecoder(r) }, + ) +} + +// deltaEncoder encodes delta-encoded int64s. Values are encoded as varint, +// with each subsequent value being the delta from the previous value. +type deltaEncoder struct { + w streamio.Writer + prev int64 +} + +var _ valueEncoder = (*deltaEncoder)(nil) + +// newDeltaEncoder creates a deltaEncoder that writes encoded numbers to w. +func newDeltaEncoder(w streamio.Writer) *deltaEncoder { + var enc deltaEncoder + enc.Reset(w) + return &enc +} + +// ValueType returns [datasetmd.VALUE_TYPE_INT64]. +func (enc *deltaEncoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_INT64 +} + +// EncodingType returns [datasetmd.ENCODING_TYPE_DELTA]. +func (enc *deltaEncoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_DELTA +} + +// Encode encodes a new value. +func (enc *deltaEncoder) Encode(v Value) error { + if v.Type() != datasetmd.VALUE_TYPE_INT64 { + return fmt.Errorf("delta: invalid value type %v", v.Type()) + } + iv := v.Int64() + + delta := iv - enc.prev + enc.prev = iv + return streamio.WriteVarint(enc.w, delta) +} + +// Flush implements [valueEncoder]. It is a no-op for deltaEncoder. +func (enc *deltaEncoder) Flush() error { + return nil +} + +// Reset resets the encoder to its initial state. +func (enc *deltaEncoder) Reset(w streamio.Writer) { + enc.prev = 0 + enc.w = w +} + +// deltaDecoder decodes delta-encoded numbers. Values are decoded as varint, +// with each subsequent value being the delta from the previous value. +type deltaDecoder struct { + r streamio.Reader + prev int64 +} + +var _ valueDecoder = (*deltaDecoder)(nil) + +// newDeltaDecoder creates a deltaDecoder that reads encoded numbers from r. +func newDeltaDecoder(r streamio.Reader) *deltaDecoder { + var dec deltaDecoder + dec.Reset(r) + return &dec +} + +// ValueType returns [datasetmd.VALUE_TYPE_INT64]. +func (dec *deltaDecoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_INT64 +} + +// Type returns [datasetmd.ENCODING_TYPE_DELTA]. +func (dec *deltaDecoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_DELTA +} + +// Decode decodes the next value. +func (dec *deltaDecoder) Decode() (Value, error) { + delta, err := streamio.ReadVarint(dec.r) + if err != nil { + return Int64Value(dec.prev), err + } + + dec.prev += delta + return Int64Value(dec.prev), nil +} + +// Reset resets the deltaDecoder to its initial state. +func (dec *deltaDecoder) Reset(r streamio.Reader) { + dec.prev = 0 + dec.r = r +} diff --git a/pkg/dataobj/internal/dataset/value_encoding_delta_test.go b/pkg/dataobj/internal/dataset/value_encoding_delta_test.go new file mode 100644 index 0000000000000..0837dbfe26bf5 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_delta_test.go @@ -0,0 +1,174 @@ +package dataset + +import ( + "bytes" + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +func Test_delta(t *testing.T) { + numbers := []int64{ + 1234, + 543, + 2345, + 1432, + } + + var buf bytes.Buffer + + var ( + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + ) + + for _, num := range numbers { + require.NoError(t, enc.Encode(Int64Value(num))) + } + + var actual []int64 + for range len(numbers) { + v, err := dec.Decode() + require.NoError(t, err) + actual = append(actual, v.Int64()) + } + + require.Equal(t, numbers, actual) +} + +func Fuzz_delta(f *testing.F) { + f.Add(int64(775972800), 10) + f.Add(int64(758350800), 25) + + f.Fuzz(func(t *testing.T, seed int64, count int) { + if count <= 0 { + t.Skip() + } + + rnd := rand.New(rand.NewSource(seed)) + + var buf bytes.Buffer + + var ( + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + ) + + var numbers []int64 + for i := 0; i < count; i++ { + v := rnd.Int63() + numbers = append(numbers, v) + require.NoError(t, enc.Encode(Int64Value(v))) + } + + var actual []int64 + for i := 0; i < count; i++ { + v, err := dec.Decode() + require.NoError(t, err) + actual = append(actual, v.Int64()) + } + + require.Equal(t, numbers, actual) + }) +} + +func Benchmark_deltaEncoder_Encode(b *testing.B) { + b.Run("Sequential", func(b *testing.B) { + enc := newDeltaEncoder(streamio.Discard) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = enc.Encode(Int64Value(int64(i))) + } + }) + + b.Run("Largest delta", func(b *testing.B) { + enc := newDeltaEncoder(streamio.Discard) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if i%2 == 0 { + _ = enc.Encode(Int64Value(0)) + } else { + _ = enc.Encode(Int64Value(math.MaxInt64)) + } + } + }) + + b.Run("Random", func(b *testing.B) { + rnd := rand.New(rand.NewSource(0)) + enc := newDeltaEncoder(streamio.Discard) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Int64Value(rnd.Int63())) + } + }) +} + +func Benchmark_deltaDecoder_Decode(b *testing.B) { + b.Run("Sequential", func(b *testing.B) { + var buf bytes.Buffer + + var ( + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Int64Value(int64(i))) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) + + b.Run("Largest delta", func(b *testing.B) { + var buf bytes.Buffer + + var ( + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + if i%2 == 0 { + _ = enc.Encode(Int64Value(0)) + } else { + _ = enc.Encode(Int64Value(math.MaxInt64)) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) + + b.Run("Random", func(b *testing.B) { + rnd := rand.New(rand.NewSource(0)) + + var buf bytes.Buffer + + var ( + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Int64Value(rnd.Int63())) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) +} diff --git a/pkg/dataobj/internal/dataset/value_encoding_plain.go b/pkg/dataobj/internal/dataset/value_encoding_plain.go new file mode 100644 index 0000000000000..bd9692025ba57 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_plain.go @@ -0,0 +1,114 @@ +package dataset + +import ( + "encoding/binary" + "fmt" + "unsafe" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +func init() { + // Register the encoding so instances of it can be dynamically created. + registerValueEncoding( + datasetmd.VALUE_TYPE_STRING, + datasetmd.ENCODING_TYPE_PLAIN, + func(w streamio.Writer) valueEncoder { return newPlainEncoder(w) }, + func(r streamio.Reader) valueDecoder { return newPlainDecoder(r) }, + ) +} + +// A plainEncoder encodes string values to an [streamio.Writer]. +type plainEncoder struct { + w streamio.Writer +} + +var _ valueEncoder = (*plainEncoder)(nil) + +// newPlainEncoder creates a plainEncoder that writes encoded strings to w. +func newPlainEncoder(w streamio.Writer) *plainEncoder { + return &plainEncoder{w: w} +} + +// ValueType returns [datasetmd.VALUE_TYPE_STRING]. +func (enc *plainEncoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_STRING +} + +// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN]. +func (enc *plainEncoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_PLAIN +} + +// Encode encodes an individual string value. +func (enc *plainEncoder) Encode(v Value) error { + if v.Type() != datasetmd.VALUE_TYPE_STRING { + return fmt.Errorf("plain: invalid value type %v", v.Type()) + } + sv := v.String() + + if err := streamio.WriteUvarint(enc.w, uint64(len(sv))); err != nil { + return err + } + + // This saves a few allocations by avoiding a copy of the string. + // Implementations of io.Writer are not supposed to modifiy the slice passed + // to Write, so this is generally safe. + _, err := enc.w.Write(unsafe.Slice(unsafe.StringData(sv), len(sv))) + return err +} + +// Flush implements [valueEncoder]. It is a no-op for plainEncoder. +func (enc *plainEncoder) Flush() error { + return nil +} + +// Reset implements [valueEncoder]. It resets the encoder to write to w. +func (enc *plainEncoder) Reset(w streamio.Writer) { + enc.w = w +} + +// plainDecoder decodes strings from an [streamio.Reader]. +type plainDecoder struct { + r streamio.Reader +} + +var _ valueDecoder = (*plainDecoder)(nil) + +// newPlainDecoder creates a plainDecoder that reads encoded strings from r. +func newPlainDecoder(r streamio.Reader) *plainDecoder { + return &plainDecoder{r: r} +} + +// ValueType returns [datasetmd.VALUE_TYPE_STRING]. +func (dec *plainDecoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_STRING +} + +// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN]. +func (dec *plainDecoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_PLAIN +} + +// Decode decodes a string. +func (dec *plainDecoder) Decode() (Value, error) { + sz, err := binary.ReadUvarint(dec.r) + if err != nil { + return StringValue(""), err + } + + dst := make([]byte, int(sz)) + if _, err := dec.r.Read(dst); err != nil { + return StringValue(""), err + } else if len(dst) != int(sz) { + return StringValue(""), fmt.Errorf("short read; expected %d bytes, got %d", sz, len(dst)) + } + + return StringValue(string(dst)), nil +} + +// Reset implements [valueDecoder]. It resets the decoder to read from r. +func (dec *plainDecoder) Reset(r streamio.Reader) { + dec.r = r +} diff --git a/pkg/dataobj/internal/dataset/value_encoding_plain_test.go b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go new file mode 100644 index 0000000000000..67819cf6fd956 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go @@ -0,0 +1,83 @@ +package dataset + +import ( + "bytes" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +var testStrings = []string{ + "hello", + "world", + "foo", + "bar", + "baz", +} + +func Test_plainEncoder(t *testing.T) { + var buf bytes.Buffer + + var ( + enc = newPlainEncoder(&buf) + dec = newPlainDecoder(&buf) + ) + + for _, v := range testStrings { + require.NoError(t, enc.Encode(StringValue(v))) + } + + var out []string + + for { + str, err := dec.Decode() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + t.Fatal(err) + } + out = append(out, str.String()) + } + + require.Equal(t, testStrings, out) +} + +func Benchmark_plainEncoder_Append(b *testing.B) { + enc := newPlainEncoder(streamio.Discard) + + for i := 0; i < b.N; i++ { + for _, v := range testStrings { + _ = enc.Encode(StringValue(v)) + } + } +} + +func Benchmark_plainDecoder_Decode(b *testing.B) { + buf := bytes.NewBuffer(make([]byte, 0, 1024)) // Large enough to avoid reallocations. + + var ( + enc = newPlainEncoder(buf) + dec = newPlainDecoder(buf) + ) + + for _, v := range testStrings { + require.NoError(b, enc.Encode(StringValue(v))) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for { + var err error + _, err = dec.Decode() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + b.Fatal(err) + } + } + } +} diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go new file mode 100644 index 0000000000000..a395f51a24713 --- /dev/null +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -0,0 +1,131 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto + +package datasetmd + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + math "math" + strconv "strconv" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// ValueType represents the valid types that values within a column can have. +type ValueType int32 + +const ( + // Invalid value type. + VALUE_TYPE_UNSPECIFIED ValueType = 0 + // VALUE_TYPE_INT64 is a column containing 64-bit integer values. + VALUE_TYPE_INT64 ValueType = 1 + // VALUE_TYPE_UINT64 is a column containing 64-bit unsigned integer values. + VALUE_TYPE_UINT64 ValueType = 2 + // VALUE_TYPE_STRING is a column containing string values. + VALUE_TYPE_STRING ValueType = 3 +) + +var ValueType_name = map[int32]string{ + 0: "VALUE_TYPE_UNSPECIFIED", + 1: "VALUE_TYPE_INT64", + 2: "VALUE_TYPE_UINT64", + 3: "VALUE_TYPE_STRING", +} + +var ValueType_value = map[string]int32{ + "VALUE_TYPE_UNSPECIFIED": 0, + "VALUE_TYPE_INT64": 1, + "VALUE_TYPE_UINT64": 2, + "VALUE_TYPE_STRING": 3, +} + +func (ValueType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_7ab9d5b21b743868, []int{0} +} + +// EncodingType represents the valid types that a sequence of values which a +// column can be encoded with. +type EncodingType int32 + +const ( + // Invalid encoding type. + ENCODING_TYPE_UNSPECIFIED EncodingType = 0 + // Plain encoding; data is stored as-is. + ENCODING_TYPE_PLAIN EncodingType = 1 + // Delta encoding. The first value within the page is stored as-is, and + // subsequent values are stored as the delta from the previous value. + ENCODING_TYPE_DELTA EncodingType = 2 +) + +var EncodingType_name = map[int32]string{ + 0: "ENCODING_TYPE_UNSPECIFIED", + 1: "ENCODING_TYPE_PLAIN", + 2: "ENCODING_TYPE_DELTA", +} + +var EncodingType_value = map[string]int32{ + "ENCODING_TYPE_UNSPECIFIED": 0, + "ENCODING_TYPE_PLAIN": 1, + "ENCODING_TYPE_DELTA": 2, +} + +func (EncodingType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_7ab9d5b21b743868, []int{1} +} + +func init() { + proto.RegisterEnum("dataobj.metadata.dataset.v1.ValueType", ValueType_name, ValueType_value) + proto.RegisterEnum("dataobj.metadata.dataset.v1.EncodingType", EncodingType_name, EncodingType_value) +} + +func init() { + proto.RegisterFile("pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto", fileDescriptor_7ab9d5b21b743868) +} + +var fileDescriptor_7ab9d5b21b743868 = []byte{ + // 296 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2f, 0xc8, 0x4e, 0xd7, + 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, + 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0x82, 0x65, 0x8a, 0x53, 0x4b, 0x72, 0x53, 0x10, 0x2c, + 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x69, 0xa8, 0x26, 0x3d, 0x98, 0x5a, 0x3d, 0xa8, 0x0a, + 0xbd, 0x32, 0x43, 0xad, 0x6c, 0x2e, 0xce, 0xb0, 0xc4, 0x9c, 0xd2, 0xd4, 0x90, 0xca, 0x82, 0x54, + 0x21, 0x29, 0x2e, 0xb1, 0x30, 0x47, 0x9f, 0x50, 0xd7, 0xf8, 0x90, 0xc8, 0x00, 0xd7, 0xf8, 0x50, + 0xbf, 0xe0, 0x00, 0x57, 0x67, 0x4f, 0x37, 0x4f, 0x57, 0x17, 0x01, 0x06, 0x21, 0x11, 0x2e, 0x01, + 0x24, 0x39, 0x4f, 0xbf, 0x10, 0x33, 0x13, 0x01, 0x46, 0x21, 0x51, 0x2e, 0x41, 0x64, 0x1d, 0x10, + 0x61, 0x26, 0x34, 0xe1, 0xe0, 0x90, 0x20, 0x4f, 0x3f, 0x77, 0x01, 0x66, 0xad, 0x78, 0x2e, 0x1e, + 0xd7, 0xbc, 0xe4, 0xfc, 0x94, 0xcc, 0xbc, 0x74, 0xb0, 0x7d, 0xb2, 0x5c, 0x92, 0xae, 0x7e, 0xce, + 0xfe, 0x2e, 0x9e, 0x7e, 0xee, 0xd8, 0xac, 0x14, 0xe7, 0x12, 0x46, 0x95, 0x0e, 0xf0, 0x71, 0xf4, + 0xf4, 0x13, 0x60, 0xc4, 0x94, 0x70, 0x71, 0xf5, 0x09, 0x71, 0x14, 0x60, 0x72, 0xaa, 0xb8, 0xf0, + 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, + 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, + 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, + 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xe5, 0x94, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, + 0x9f, 0xab, 0x9f, 0x5e, 0x94, 0x98, 0x96, 0x98, 0x97, 0xa8, 0x9f, 0x93, 0x9f, 0x9d, 0xa9, 0x5f, + 0x66, 0xac, 0x4f, 0x64, 0xa0, 0x27, 0xb1, 0x81, 0xc3, 0xda, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, + 0x03, 0x3b, 0xd9, 0x58, 0xa6, 0x01, 0x00, 0x00, +} + +func (x ValueType) String() string { + s, ok := ValueType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (x EncodingType) String() string { + s, ok := EncodingType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto new file mode 100644 index 0000000000000..eeb300d03da49 --- /dev/null +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -0,0 +1,35 @@ +// datasetmd.proto holds metadata types for storing columnar data. +syntax = "proto3"; + +package dataobj.metadata.dataset.v1; + +option go_package = "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"; + +// ValueType represents the valid types that values within a column can have. +enum ValueType { + // Invalid value type. + VALUE_TYPE_UNSPECIFIED = 0; + + // VALUE_TYPE_INT64 is a column containing 64-bit integer values. + VALUE_TYPE_INT64 = 1; + + // VALUE_TYPE_UINT64 is a column containing 64-bit unsigned integer values. + VALUE_TYPE_UINT64 = 2; + + // VALUE_TYPE_STRING is a column containing string values. + VALUE_TYPE_STRING = 3; +} + +// EncodingType represents the valid types that a sequence of values which a +// column can be encoded with. +enum EncodingType { + // Invalid encoding type. + ENCODING_TYPE_UNSPECIFIED = 0; + + // Plain encoding; data is stored as-is. + ENCODING_TYPE_PLAIN = 1; + + // Delta encoding. The first value within the page is stored as-is, and + // subsequent values are stored as the delta from the previous value. + ENCODING_TYPE_DELTA = 2; +} diff --git a/pkg/dataobj/internal/streamio/streamio.go b/pkg/dataobj/internal/streamio/streamio.go new file mode 100644 index 0000000000000..46ec96abca7e6 --- /dev/null +++ b/pkg/dataobj/internal/streamio/streamio.go @@ -0,0 +1,25 @@ +// Package streamio defines interfaces shared by other packages for streaming +// binary data. +package streamio + +import "io" + +// Reader is an interface that combines an [io.Reader] and an [io.ByteReader]. +type Reader interface { + io.Reader + io.ByteReader +} + +// Writer is an interface that combines an [io.Writer] and an [io.ByteWriter]. +type Writer interface { + io.Writer + io.ByteWriter +} + +// Discard is a [Writer] for which all calls succeed without doing anything. +var Discard Writer = discard{} + +type discard struct{} + +func (discard) Write(p []byte) (int, error) { return len(p), nil } +func (discard) WriteByte(_ byte) error { return nil } diff --git a/pkg/dataobj/internal/streamio/varint.go b/pkg/dataobj/internal/streamio/varint.go new file mode 100644 index 0000000000000..e8420605e8c45 --- /dev/null +++ b/pkg/dataobj/internal/streamio/varint.go @@ -0,0 +1,45 @@ +package streamio + +import ( + "encoding/binary" + "io" +) + +// [binary] does not have an implementation to write varints directly +// to a ByteWriter, and requires appending to a buffer. To allow dataobj +// encoders to stream values, we provide equivalent implementations of +// [binary.AppendUvarint] and [binary.AppendVarint] which accept a ByteWriter. + +// WriteVarint writes an encoded signed integer to w. +func WriteVarint(w io.ByteWriter, x int64) error { + // Like [binary.AppendVarint], we use zig-zag encoding so small negative + // values require fewer bytes. + // + // https://protobuf.dev/programming-guides/encoding/#signed-ints + ux := uint64(x) << 1 + if x < 0 { + ux = ^ux + } + return WriteUvarint(w, ux) +} + +// WriteUvarint writes an encoded unsigned integer to w. +func WriteUvarint(w io.ByteWriter, x uint64) error { + for x >= 0x80 { + if err := w.WriteByte(byte(x) | 0x80); err != nil { + return err + } + x >>= 7 + } + return w.WriteByte(byte(x)) +} + +// ReadVarint is a convenience wrapper around [binary.ReadVarint]. +func ReadVarint(r io.ByteReader) (int64, error) { + return binary.ReadVarint(r) +} + +// ReadUvarint is a convenience wrapper around [binary.ReadUvarint]. +func ReadUvarint(r io.ByteReader) (uint64, error) { + return binary.ReadUvarint(r) +}