From 040e3bfdba9fd3fe179e88fa5f6543e0b0ac237d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Sep 2024 16:06:35 +0200 Subject: [PATCH 1/5] Use franz-go fork for response parsing Signed-off-by: Dimitar Dimitrov --- go.mod | 2 + go.sum | 4 +- pkg/storage/ingest/reader.go | 50 +++++-- .../twmb/franz-go/pkg/kgo/client.go | 6 +- .../twmb/franz-go/pkg/kgo/compression.go | 2 + .../twmb/franz-go/pkg/kgo/source.go | 136 ++++++++++++------ vendor/modules.txt | 3 +- 7 files changed, 139 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 6bf45cf4dc2..46162273ee6 100644 --- a/go.mod +++ b/go.mod @@ -304,3 +304,5 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc // Replacing prometheus/alertmanager with our fork. replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 + +replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba diff --git a/go.sum b/go.sum index 65b173c87fa..a33bf4e9406 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/godo v1.119.0 h1:dmFNQwSIAcH3z+FVovHLkazKDC2uA8oOlGvg5+H4vRw= github.com/digitalocean/godo v1.119.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba h1:0qAbNCVz1hNcJyXjlBtBfSM6NIlOHhz6emcbwrN27k4= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= @@ -930,8 +932,6 @@ github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0h github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ= -github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60= github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kfake v0.0.0-20240730205842-6b61d17383b1 h1:Vo6jYkEuHJugUPS52xmcu1LJgDcC4AXUMVwo8Ct0kfQ= diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 0358fde925c..8ddd341b7e7 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -890,7 +890,7 @@ func (r *concurrentFetchers) pollFetches(ctx context.Context) (result kgo.Fetche r.metrics.fetchedBytes.Add(float64(len(record.Value))) // TODO dimitarvdimitrov maybe use the same metric name as franz-go, but make sure we're not conflicting with the actual client; perhaps disable metrics there and just use our own if record.Offset <= r.lastReturnedRecord { trimUntil++ - return // don't finish the traces multiple times + spanlogger.FromContext(record.Context, r.logger).DebugLog("msg", "skipping record because it has already been returned", "offset", record.Offset) } r.tracer.OnFetchRecordUnbuffered(record, true) }) @@ -926,6 +926,20 @@ func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant, logg return newEmptyFetchResult(errUnknownPartitionLeader) } + req := r.buildFetchRequest(fw, leaderEpoch) + + resp, err := req.RequestWith(ctx, r.client.Broker(int(leaderID))) + if err != nil { + if errors.Is(err, context.Canceled) { + return newEmptyFetchResult(nil) + } + return newEmptyFetchResult(fmt.Errorf("fetching from kafka: %w", err)) + } + + return r.parseFetchResponse(fw.startOffset, resp) +} + +func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) kmsg.FetchRequest { req := kmsg.NewFetchRequest() req.MinBytes = 1 req.Version = 13 @@ -944,22 +958,34 @@ func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant, logg reqTopic.Partitions = append(reqTopic.Partitions, reqPartition) req.Topics = append(req.Topics, reqTopic) + return req +} - resp, err := req.RequestWith(ctx, r.client.Broker(int(leaderID))) - if err != nil { - if errors.Is(err, context.Canceled) { - return newEmptyFetchResult(nil) - } - return newEmptyFetchResult(fmt.Errorf("fetching from kafka: %w", err)) - } +func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.FetchResponse) fetchResult { rawPartitionResp := resp.Topics[0].Partitions[0] // Here we ignore resp.ErrorCode. That error code was added for support for KIP-227 and is only set if we're using fetch sessions. We don't use fetch sessions. // We also ignore rawPartitionResp.PreferredReadReplica to keep the code simpler. We don't provide any rack in the FetchRequest, so the broker _probably_ doesn't have a recommended replica for us. - // TODO dimitarvdimitrov make this conditional on the kafka backend - for WS we use uncompressed bytes (sumRecordLengths), for kafka we use the size of the response (rawPartitionResp.RecordBatches) + parseOptions := kgo.ProcessFetchPartitionOptions{ + KeepControlRecords: false, + Offset: startOffset, + IsolationLevel: kgo.ReadUncommitted(), // we don't produce in transactions, but leaving this here so it's explicit. + Topic: r.topicName, + Partition: r.partitionID, + } + // TODO dimitarvdimitrov revisit metrics r.metrics.fetchesCompressedBytes.Add(float64(len(rawPartitionResp.RecordBatches))) // This doesn't include overhead in the response, but that should be small. - partition := processRespPartition(&rawPartitionResp, r.topicName) - partition.EachRecord(r.tracer.OnFetchRecordBuffered) // TODO dimitarvdimitrov we might end up buffering the same record multiple times - what happens then? - return fetchResult{partition, sumRecordLengths(partition.Records)} + + observeMetrics := func(m kgo.FetchBatchMetrics) { + brokerMeta := kgo.BrokerMetadata{} // leave it empty because kprom doesn't use it, and we don't exactly have all the metadata + r.metrics.kprom.OnFetchBatchRead(brokerMeta, r.topicName, r.partitionID, m) + } + partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, observeMetrics) + partition.EachRecord(r.tracer.OnFetchRecordBuffered) + + return fetchResult{ + FetchPartition: partition, + fetchedBytes: sumRecordLengths(partition.Records), + } } func sumRecordLengths(records []*kgo.Record) (sum int) { diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go index 775a22e6ee2..197c39336da 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go @@ -78,8 +78,7 @@ type Client struct { producer producer consumer consumer - compressor *compressor - decompressor *decompressor + compressor *compressor coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) { bufPool: newBufPool(), prsPool: newPrsPool(), - compressor: compressor, - decompressor: newDecompressor(), + compressor: compressor, coordinators: make(map[coordinatorKey]*coordinatorLoad), diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go index fe8ad645bbd..81d9d8a7e3b 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go @@ -235,6 +235,8 @@ type decompressor struct { unzstdPool sync.Pool } +var defaultDecompressor = newDecompressor() + func newDecompressor() *decompressor { d := &decompressor{ ungzPool: sync.Pool{ diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go index 0c475d14a94..85586a0a412 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go @@ -92,6 +92,26 @@ func (s *source) removeCursor(rm *cursor) { } } +type ProcessFetchPartitionOptions struct { + // KeepControlRecords sets the parser to keep control messages and return + // them with fetches, overriding the default that discards them. + // + // Generally, control messages are not useful. It is the same as kgo.KeepControlRecords(). + KeepControlRecords bool + + // Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned. + Offset int64 + + // IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel. + IsolationLevel IsolationLevel + + // Topic is used to populate the Topic field of each Record. + Topic string + + // Topic is used to populate the Partition field of each Record. + Partition int32 +} + // cursor is where we are consuming from for an individual partition. type cursor struct { topic string @@ -1068,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1265,41 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } + opts := ProcessFetchPartitionOptions{ + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + } + observeMetrics := func(m FetchBatchMetrics) { + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) + } + fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics) + if len(fp.Records) > 0 { + lastRecord := fp.Records[len(fp.Records)-1] + // We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions. + o.lastConsumedEpoch = lastRecord.LeaderEpoch + o.lastConsumedTime = lastRecord.Timestamp + } + + return fp +} + +// ProcessRespPartition processes all records in all potentially compressed batches (or message sets). +// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil. +// This is useful when issuing manual Fetch requests for records. +// In case of a compacted partition, the last offset may be larger than the offset of the last record. +// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record. +func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1253,12 +1307,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } - if rp.ErrorCode == 0 { - o.hwm = rp.HighWatermark - } var aborter aborter - if br.cl.cfg.isolationLevel == 1 { + if o.IsolationLevel.level == 1 { aborter = buildAborter(rp) } @@ -1349,10 +1400,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon default: fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - if next := offset + 1; next > o.offset { - o.offset = next + if next := offset + 1; next > o.Offset { + o.Offset = next } - return fp + return fp, o.Offset } if !check() { @@ -1367,30 +1418,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.MessageV0: m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.MessageV1: m.CompressedBytes = int(length) m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor) } if m.UncompressedBytes == 0 { m.UncompressedBytes = m.CompressedBytes } - hooks.each(func(h Hook) { - if h, ok := h.(HookFetchBatchRead); ok { - h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) - } - }) + if observeMetrics != nil { + observeMetrics(m) + } } - - return fp + return fp, o.Offset } type aborter map[int64][]int64 @@ -1453,7 +1501,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record { return rs } -func (o *cursorOffsetNext) processRecordBatch( +func processRecordBatch( + o *ProcessFetchPartitionOptions, fp *FetchPartition, batch *kmsg.RecordBatch, aborter aborter, @@ -1464,7 +1513,7 @@ func (o *cursorOffsetNext) processRecordBatch( return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) - if lastOffset < o.offset { + if lastOffset < o.Offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). @@ -1496,15 +1545,15 @@ func (o *cursorOffsetNext) processRecordBatch( // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if numRecords == len(krecords) && o.offset < nextAskOffset { - o.offset = nextAskOffset + if numRecords == len(krecords) && o.Offset < nextAskOffset { + o.Offset = nextAskOffset } }() abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( - o.from.topic, + o.Topic, fp.Partition, batch, &krecords[i], @@ -1528,14 +1577,10 @@ func (o *cursorOffsetNext) processRecordBatch( // this easy, but if not, we decompress and process each inner message as // either v0 or v1. We only expect the inner message to be v1, but technically // a crazy pipeline could have v0 anywhere. -func (o *cursorOffsetNext) processV1OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV1, - decompressor *decompressor, -) (int, int) { +func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV1Message(fp, message) + processV1Message(o, fp, message) return 1, 0 } @@ -1606,13 +1651,13 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV1Message(fp, innerMessage) { + if !processV1Message(o, fp, innerMessage) { return i, uncompressedBytes } } @@ -1620,7 +1665,8 @@ out: return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV1Message( +func processV1Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, ) bool { @@ -1632,21 +1678,22 @@ func (o *cursorOffsetNext) processV1Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v1MessageToRecord(o.from.topic, fp.Partition, message) + record := v1MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func (o *cursorOffsetNext) processV0OuterMessage( +func processV0OuterMessage( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor, ) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV0Message(fp, message) + processV0Message(o, fp, message) return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } @@ -1689,14 +1736,15 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Attributes |= int8(compression) innerMessage.Offset = firstOffset + int64(i) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } } return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV0Message( +func processV0Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, ) bool { @@ -1708,7 +1756,7 @@ func (o *cursorOffsetNext) processV0Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v0MessageToRecord(o.from.topic, fp.Partition, message) + record := v0MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } @@ -1717,8 +1765,8 @@ func (o *cursorOffsetNext) processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { - if record.Offset < o.offset { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { + if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. return @@ -1726,7 +1774,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // We only keep control records if specifically requested. if record.Attrs.IsControl() { - abort = !o.from.keepControl + abort = !o.KeepControlRecords } if !abort { fp.Records = append(fp.Records, record) @@ -1734,9 +1782,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // The record offset may be much larger than our expected offset if the // topic is compacted. - o.offset = record.Offset + 1 - o.lastConsumedEpoch = record.LeaderEpoch - o.lastConsumedTime = record.Timestamp + o.Offset = record.Offset + 1 } /////////////////////////////// diff --git a/vendor/modules.txt b/vendor/modules.txt index bdcad4f094b..078e336570d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1141,7 +1141,7 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.17.1 +# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr @@ -1656,3 +1656,4 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 +# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba From db77cfba8d667b7211eaaa2248a38d381f422a5b Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Sep 2024 16:11:10 +0200 Subject: [PATCH 2/5] Remove duplicate metrics Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 8ddd341b7e7..9840d6f51cd 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -887,7 +887,6 @@ func (r *concurrentFetchers) pollFetches(ctx context.Context) (result kgo.Fetche r.metrics.fetchWaitDuration.Observe(time.Since(waitStartTime).Seconds()) trimUntil := 0 f.EachRecord(func(record *kgo.Record) { - r.metrics.fetchedBytes.Add(float64(len(record.Value))) // TODO dimitarvdimitrov maybe use the same metric name as franz-go, but make sure we're not conflicting with the actual client; perhaps disable metrics there and just use our own if record.Offset <= r.lastReturnedRecord { trimUntil++ spanlogger.FromContext(record.Context, r.logger).DebugLog("msg", "skipping record because it has already been returned", "offset", record.Offset) @@ -962,7 +961,6 @@ func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) } func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.FetchResponse) fetchResult { - rawPartitionResp := resp.Topics[0].Partitions[0] // Here we ignore resp.ErrorCode. That error code was added for support for KIP-227 and is only set if we're using fetch sessions. We don't use fetch sessions. // We also ignore rawPartitionResp.PreferredReadReplica to keep the code simpler. We don't provide any rack in the FetchRequest, so the broker _probably_ doesn't have a recommended replica for us. parseOptions := kgo.ProcessFetchPartitionOptions{ @@ -972,14 +970,13 @@ func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.Fe Topic: r.topicName, Partition: r.partitionID, } - // TODO dimitarvdimitrov revisit metrics - r.metrics.fetchesCompressedBytes.Add(float64(len(rawPartitionResp.RecordBatches))) // This doesn't include overhead in the response, but that should be small. observeMetrics := func(m kgo.FetchBatchMetrics) { brokerMeta := kgo.BrokerMetadata{} // leave it empty because kprom doesn't use it, and we don't exactly have all the metadata r.metrics.kprom.OnFetchBatchRead(brokerMeta, r.topicName, r.partitionID, m) } - partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, observeMetrics) + // TODO dimitarvdimitrov assert the topic partitions we got + partition, _ := kgo.ProcessRespPartition(parseOptions, &resp.Topics[0].Partitions[0], observeMetrics) partition.EachRecord(r.tracer.OnFetchRecordBuffered) return fetchResult{ @@ -1677,8 +1674,6 @@ type readerMetrics struct { recordsPerFetch prometheus.Histogram fetchesErrors prometheus.Counter fetchesTotal prometheus.Counter - fetchedBytes prometheus.Counter - fetchesCompressedBytes prometheus.Counter fetchWaitDuration prometheus.Histogram fetchedDiscardedRecords prometheus.Counter strongConsistencyInstrumentation *StrongReadConsistencyInstrumentation[struct{}] @@ -1730,14 +1725,6 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", NativeHistogramBucketFactor: 1.1, }), - fetchedBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_fetched_bytes_total", - Help: "Total number of record bytes fetched from Kafka by the consumer.", - }), - fetchesCompressedBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_fetches_compressed_bytes_total", - Help: "Total number of compressed bytes fetched from Kafka by the consumer.", - }), fetchedDiscardedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_fetched_discarded_records_total", Help: "Total number of records discarded by the consumer because they were already consumed.", From 6aead4226108dbcf2fa73c0b1c6ef20c40ab93ec Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Sep 2024 16:48:12 +0200 Subject: [PATCH 3/5] Assert on the responses we get from kafka Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 9840d6f51cd..152d6e7d385 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -963,6 +963,15 @@ func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.FetchResponse) fetchResult { // Here we ignore resp.ErrorCode. That error code was added for support for KIP-227 and is only set if we're using fetch sessions. We don't use fetch sessions. // We also ignore rawPartitionResp.PreferredReadReplica to keep the code simpler. We don't provide any rack in the FetchRequest, so the broker _probably_ doesn't have a recommended replica for us. + + // Sanity check for the response we get. + // If we get something we didn't expect, maybe we're sending the wrong request or there's a bug in the kafka implementation. + // Even in case of errors we get the topic partition. + err := assertResponseContainsPartition(resp, r.topicID, r.partitionID) + if err != nil { + return newEmptyFetchResult(err) + } + parseOptions := kgo.ProcessFetchPartitionOptions{ KeepControlRecords: false, Offset: startOffset, @@ -975,7 +984,7 @@ func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.Fe brokerMeta := kgo.BrokerMetadata{} // leave it empty because kprom doesn't use it, and we don't exactly have all the metadata r.metrics.kprom.OnFetchBatchRead(brokerMeta, r.topicName, r.partitionID, m) } - // TODO dimitarvdimitrov assert the topic partitions we got + partition, _ := kgo.ProcessRespPartition(parseOptions, &resp.Topics[0].Partitions[0], observeMetrics) partition.EachRecord(r.tracer.OnFetchRecordBuffered) @@ -985,6 +994,24 @@ func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.Fe } } +func assertResponseContainsPartition(resp *kmsg.FetchResponse, topicID kadm.TopicID, partitionID int32) error { + if topics := resp.Topics; len(topics) < 1 || topics[0].TopicID != topicID { + receivedTopicID := kadm.TopicID{} + if len(topics) > 0 { + receivedTopicID = topics[0].TopicID + } + return fmt.Errorf("didn't find expected topic %s in fetch response; received topic %s", topicID, receivedTopicID) + } + if partitions := resp.Topics[0].Partitions; len(partitions) < 1 || partitions[0].Partition != partitionID { + receivedPartitionID := int32(-1) + if len(partitions) > 0 { + receivedPartitionID = partitions[0].Partition + } + return fmt.Errorf("didn't find expected partition %d in fetch response; received partition %d", partitionID, receivedPartitionID) + } + return nil +} + func sumRecordLengths(records []*kgo.Record) (sum int) { for _, r := range records { sum += len(r.Value) From 9c7170f82419c4a83f7b5e00e889675a1cae647e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Sep 2024 16:50:39 +0200 Subject: [PATCH 4/5] Remove copied code Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 300 ----------------------------------- 1 file changed, 300 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 152d6e7d385..f163545e623 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -3,13 +3,9 @@ package ingest import ( - "bytes" "container/list" "context" - "encoding/binary" "fmt" - "hash/crc32" - "io" "math" "strconv" "sync" @@ -20,13 +16,10 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" - "github.com/klauspost/compress/s2" - "github.com/pierrec/lz4/v4" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" @@ -1272,299 +1265,6 @@ func logCompletedFetch(logger log.Logger, f fetchResult, fetchStartTime time.Tim ) } -type readerFrom interface { - ReadFrom([]byte) error -} - -var crc32c = crc32.MakeTable(crc32.Castagnoli) // record crc's use Castagnoli table; for consuming/producing - -// processRespPartition processes all records in all potentially compressed -// batches (or message sets). -func processRespPartition(rp *kmsg.FetchResponseTopicPartition, topic string) kgo.FetchPartition { - fp := kgo.FetchPartition{ - Partition: rp.Partition, - Err: kerr.ErrorForCode(rp.ErrorCode), - HighWatermark: rp.HighWatermark, - LastStableOffset: rp.LastStableOffset, - LogStartOffset: rp.LogStartOffset, - } - - // A response could contain any of message v0, message v1, or record - // batches, and this is solely dictated by the magic byte (not the - // fetch response version). The magic byte is located at byte 17. - // - // 1 thru 8: int64 offset / first offset - // 9 thru 12: int32 length - // 13 thru 16: crc (magic 0 or 1), or partition leader epoch (magic 2) - // 17: magic - // - // We decode and validate similarly for messages and record batches, so - // we "abstract" away the high level stuff into a check function just - // below, and then switch based on the magic for how to process. - var ( - in = rp.RecordBatches - - r readerFrom - kind string - length int32 - lengthField *int32 - crcField *int32 - crcTable *crc32.Table - crcAt int - - check = func() bool { - // If we call into check, we know we have a valid - // length, so we should be at least able to parse our - // top level struct and validate the length and CRC. - if err := r.ReadFrom(in[:length]); err != nil { - fp.Err = fmt.Errorf("unable to read %s, not enough data", kind) - return false - } - if length := int32(len(in[12:length])); length != *lengthField { - fp.Err = fmt.Errorf("encoded length %d does not match read length %d", *lengthField, length) - return false - } - // We have already validated that the slice is at least - // 17 bytes, but our CRC may be later (i.e. RecordBatch - // starts at byte 21). Ensure there is at least space - // for a CRC. - if len(in) < crcAt { - fp.Err = fmt.Errorf("length %d is too short to allow for a crc", len(in)) - return false - } - if crcCalc := int32(crc32.Checksum(in[crcAt:length], crcTable)); crcCalc != *crcField { - fp.Err = fmt.Errorf("encoded crc %x does not match calculated crc %x", *crcField, crcCalc) - return false - } - return true - } - ) - - for len(in) > 17 && fp.Err == nil { - offset := int64(binary.BigEndian.Uint64(in)) - length = int32(binary.BigEndian.Uint32(in[8:])) - length += 12 // for the int64 offset we skipped and int32 length field itself - if len(in) < int(length) { - break - } - - switch magic := in[16]; magic { - case 0: - m := new(kmsg.MessageV0) - kind = "message v0" - lengthField = &m.MessageSize - crcField = &m.CRC - crcTable = crc32.IEEETable - crcAt = 16 - r = m - case 1: - m := new(kmsg.MessageV1) - kind = "message v1" - lengthField = &m.MessageSize - crcField = &m.CRC - crcTable = crc32.IEEETable - crcAt = 16 - r = m - case 2: - rb := new(kmsg.RecordBatch) - kind = "record batch" - lengthField = &rb.Length - crcField = &rb.CRC - crcTable = crc32c - crcAt = 21 - r = rb - - default: - fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - return fp - } - - if !check() { - break - } - - in = in[length:] - - switch t := r.(type) { - case *kmsg.MessageV0: - panic("unknown message type") - case *kmsg.MessageV1: - panic("unknown message type") - case *kmsg.RecordBatch: - _, _ = processRecordBatch(topic, &fp, t) - } - - } - - return fp -} - -func processRecordBatch( - topic string, - fp *kgo.FetchPartition, - batch *kmsg.RecordBatch, -) (int, int) { - if batch.Magic != 2 { - fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic) - return 0, 0 - } - - rawRecords := batch.Records - if compression := byte(batch.Attributes & 0x0007); compression != 0 { - var err error - if rawRecords, err = decompress(rawRecords, compression); err != nil { - return 0, 0 // truncated batch - } - } - - uncompressedBytes := len(rawRecords) - - numRecords := int(batch.NumRecords) - krecords := readRawRecords(numRecords, rawRecords) - - // KAFKA-5443: compacted topics preserve the last offset in a batch, - // even if the last record is removed, meaning that using offsets from - // records alone may not get us to the next offset we need to ask for. - // - // We only perform this logic if we did not consume a truncated batch. - // If we consume a truncated batch, then what was truncated could have - // been an offset we are interested in consuming. Even if our fetch did - // not advance this partition at all, we will eventually fetch from the - // partition and not have a truncated response, at which point we will - // either advance offsets or will set to nextAskOffset. - - for i := range krecords { - record := recordToRecord( - topic, - fp.Partition, - batch, - &krecords[i], - ) - fp.Records = append(fp.Records, record) - } - - return len(krecords), uncompressedBytes -} - -// recordToRecord converts a kmsg.RecordBatch's Record to a kgo Record. -func recordToRecord( - topic string, - partition int32, - batch *kmsg.RecordBatch, - record *kmsg.Record, -) *kgo.Record { - h := make([]kgo.RecordHeader, 0, len(record.Headers)) - for _, kv := range record.Headers { - h = append(h, kgo.RecordHeader{ - Key: kv.Key, - Value: kv.Value, - }) - } - - r := &kgo.Record{ - Key: record.Key, - Value: record.Value, - Headers: h, - Topic: topic, - Partition: partition, - // Attrs: kgo.RecordAttrs{uint8(batch.Attributes)}, - ProducerID: batch.ProducerID, - ProducerEpoch: batch.ProducerEpoch, - LeaderEpoch: batch.PartitionLeaderEpoch, - Offset: batch.FirstOffset + int64(record.OffsetDelta), - } - if r.Attrs.TimestampType() == 0 { - r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) - } else { - r.Timestamp = timeFromMillis(batch.MaxTimestamp) - } - return r -} - -func timeFromMillis(millis int64) time.Time { - return time.Unix(0, millis*1e6) -} - -// readRawRecords reads n records from in and returns them, returning early if -// there were partial records. -func readRawRecords(n int, in []byte) []kmsg.Record { - rs := make([]kmsg.Record, n) - for i := 0; i < n; i++ { - length, used := kbin.Varint(in) - total := used + int(length) - if used == 0 || length < 0 || len(in) < total { - return rs[:i] - } - if err := (&rs[i]).ReadFrom(in[:total]); err != nil { - return rs[:i] - } - in = in[total:] - } - return rs -} - -type codecType int8 - -const ( - codecNone codecType = iota - codecGzip // TODO dimitarvdimitrov add support - codecSnappy - codecLZ4 - codecZstd // TODO dimitarvdimitrov add support -) - -func decompress(src []byte, codec byte) ([]byte, error) { - switch codecType(codec) { - case codecNone: - return src, nil - case codecSnappy: - if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) { - return xerialDecode(src) - } - return s2.Decode(nil, src) - case codecLZ4: - unlz4 := lz4.NewReader(nil) // TODO dimitarvdimitrov this is pooled in franz-go now, consider exposing the funcs there - unlz4.Reset(bytes.NewReader(src)) - out := new(bytes.Buffer) - if _, err := io.Copy(out, unlz4); err != nil { - return nil, err - } - return out.Bytes(), nil - default: - return nil, errors.New("unknown compression codec") - } -} - -var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0} - -var errMalformedXerial = errors.New("malformed xerial framing") - -func xerialDecode(src []byte) ([]byte, error) { - // bytes 0-8: xerial header - // bytes 8-16: xerial version - // everything after: uint32 chunk size, snappy chunk - // we come into this function knowing src is at least 16 - src = src[16:] - var dst, chunk []byte - var err error - for len(src) > 0 { - if len(src) < 4 { - return nil, errMalformedXerial - } - size := int32(binary.BigEndian.Uint32(src)) - src = src[4:] - if size < 0 || len(src) < int(size) { - return nil, errMalformedXerial - } - if chunk, err = s2.Decode(chunk[:cap(chunk)], src[:size]); err != nil { - return nil, err - } - src = src[size:] - dst = append(dst, chunk...) - } - return dst, nil -} - type partitionCommitter struct { services.Service From 941b8418f58bf6b407b216397fb35c62466afc1e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Sep 2024 16:58:56 +0200 Subject: [PATCH 5/5] Change commit SHA Signed-off-by: Dimitar Dimitrov --- go.mod | 5 +++-- go.sum | 4 ++-- vendor/modules.txt | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 46162273ee6..0c58f3b22ad 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/okzk/sdnotify v0.0.0-20240725214427-1c1fdd37c5ac - github.com/pierrec/lz4/v4 v4.1.21 github.com/prometheus/procfs v0.15.1 github.com/shirou/gopsutil/v4 v4.24.7 github.com/thanos-io/objstore v0.0.0-20240722162417-19b0c0f0ffd8 @@ -120,6 +119,7 @@ require ( github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/reflectwalk v1.0.0 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pires/go-proxyproto v0.7.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/ryanuber/go-glob v1.0.0 // indirect @@ -305,4 +305,5 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc // Replacing prometheus/alertmanager with our fork. replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 -replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba +// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked. +replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 diff --git a/go.sum b/go.sum index a33bf4e9406..88098f222e8 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/godo v1.119.0 h1:dmFNQwSIAcH3z+FVovHLkazKDC2uA8oOlGvg5+H4vRw= github.com/digitalocean/godo v1.119.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY= -github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba h1:0qAbNCVz1hNcJyXjlBtBfSM6NIlOHhz6emcbwrN27k4= -github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= diff --git a/vendor/modules.txt b/vendor/modules.txt index 078e336570d..1f5222394df 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1141,7 +1141,7 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba +# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr @@ -1656,4 +1656,4 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 -# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240808160155-396d6f9c1eba +# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9