diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 0701f3c..9539211 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -183,7 +183,20 @@ func (b *Batcher) Start(ctx context.Context) error { case <-ctx.Done(): return case <-ticker.C: - if ts, err := b.HandleSingleBatch(ctx); err != nil { + if ts, blobMetadata, err := b.HandleSingleBatch(ctx); err != nil { + for _, metadata := range blobMetadata { + meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey()) + if err != nil { + b.logger.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err) + } else { + if meta.BlobStatus == disperser.Failed { + b.logger.Info("[batcher] handle encode batch reach max retries", "key", metadata.GetBlobKey()) + b.EncodingStreamer.RemoveEncodedBlob(metadata) + b.Queue.RemoveBlob(ctx, metadata) + } + } + } + b.EncodingStreamer.RemoveBatchingStatus(ts) if errors.Is(err, errNoEncodedResults) { b.logger.Debug("[batcher] no encoded results to make a batch with") @@ -193,7 +206,20 @@ func (b *Batcher) Start(ctx context.Context) error { } case <-batchTrigger.Notify: ticker.Stop() - if ts, err := b.HandleSingleBatch(ctx); err != nil { + if ts, blobMetadata, err := b.HandleSingleBatch(ctx); err != nil { + for _, metadata := range blobMetadata { + meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey()) + if err != nil { + b.logger.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err) + } else { + if meta.BlobStatus == disperser.Failed { + b.logger.Info("[batcher] handle encode batch reach max retries", "key", metadata.GetBlobKey()) + b.EncodingStreamer.RemoveEncodedBlob(metadata) + b.Queue.RemoveBlob(ctx, metadata) + } + } + } + b.EncodingStreamer.RemoveBatchingStatus(ts) if errors.Is(err, errNoEncodedResults) { b.logger.Debug("[batcher] no encoded results to make a batch with(Notified)") @@ -267,7 +293,7 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser. return result.ErrorOrNil() } -func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { +func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, []*disperser.BlobMetadata, error) { log := b.logger // start a timer timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { @@ -279,7 +305,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { log.Info("[batcher] Creating batch", "ts", stageTimer) batch, ts, err := b.EncodingStreamer.CreateBatch() if err != nil { - return ts, err + return ts, nil, err } log.Info("[batcher] CreateBatch took", "duration", time.Since(stageTimer), "blobNum", len(batch.EncodedBlobs)) @@ -288,7 +314,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { headerHash, err := batch.BatchHeader.GetBatchHeaderHash() if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchHeaderHash) - return ts, fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err) + return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err) } proofs := make([]*merkletree.Proof, 0) @@ -298,19 +324,19 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { // generate inclusion proof if blobIndex >= len(batch.BlobHeaders) { _ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchBlobIndex) - return ts, fmt.Errorf("HandleSingleBatch: error preparing kv data: blob header at index %d not found in batch", blobIndex) + return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: error preparing kv data: blob header at index %d not found in batch", blobIndex) } blobHeader = batch.BlobHeaders[blobIndex] blobHeaderHash, err := blobHeader.GetBlobHeaderHash() if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchBlobHeaderHash) - return ts, fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err) + return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err) } merkleProof, err := batch.MerkleTree.GenerateProof(blobHeaderHash[:], 0) if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchProof) - return ts, fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err) + return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err) } proofs = append(proofs, merkleProof) } @@ -320,21 +346,8 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { stageTimer = time.Now() batch.TxHash, err = b.Dispatcher.DisperseBatch(ctx, headerHash, batch.BatchHeader, batch.EncodedBlobs, batch.BlobHeaders) if err != nil { - for _, metadata := range batch.BlobMetadata { - meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey()) - if err != nil { - log.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err) - } else { - if meta.BlobStatus == disperser.Failed { - log.Info("[batcher] disperse batch reach max retries", "key", metadata.GetBlobKey()) - b.EncodingStreamer.RemoveEncodedBlob(metadata) - b.Queue.RemoveBlob(ctx, metadata) - } - } - } - _ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchSubmitRoot) - return ts, err + return ts, batch.BlobMetadata, err } log.Info("[batcher] DisperseBatch took", "duration", time.Since(stageTimer)) @@ -346,7 +359,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) { reties: 0, } - return ts, nil + return ts, nil, nil } func (b *Batcher) HandleSignedBatch(ctx context.Context) error { diff --git a/disperser/batcher/slice_signer.go b/disperser/batcher/slice_signer.go index 5e3ef36..fd2bcc1 100644 --- a/disperser/batcher/slice_signer.go +++ b/disperser/batcher/slice_signer.go @@ -261,15 +261,13 @@ func (s *SliceSigner) waitBatchTxFinalized(ctx context.Context, batchInfo *SignI if err != nil || len(dataUploadEvents) == 0 { // batch is not confirmed - _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchReceipt) - s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts) + _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchReceipt, batchInfo.ts) return err } for i := 1; i < len(dataUploadEvents); i++ { if dataUploadEvents[i].Epoch.Cmp(dataUploadEvents[i-1].Epoch) != 0 { - _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchEpochMismatch) - s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts) + _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchEpochMismatch, batchInfo.ts) return fmt.Errorf("epoch in one batch is mismatch: epoch %v, %v", dataUploadEvents[i].Epoch, dataUploadEvents[i-1].Epoch) } } @@ -285,8 +283,7 @@ func (s *SliceSigner) waitBatchTxFinalized(ctx context.Context, batchInfo *SignI // signInfo.reties += 1 // s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo) // } else { - _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailGetSigners) - s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts) + _ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailGetSigners, batchInfo.ts) // } return fmt.Errorf("failed to get signers from contract: %w", err) @@ -410,7 +407,7 @@ func (s *SliceSigner) getSigners(epoch *big.Int, quorumId *big.Int) (map[eth_com return hm, nil } -func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { +func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason, ts uint64) error { var result *multierror.Error for _, metadata := range blobMetadatas { err := s.blobStore.HandleBlobFailure(ctx, metadata, s.MaxNumRetriesPerBlob) @@ -423,6 +420,21 @@ func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disper } s.metrics.UpdateBatchError(reason, len(blobMetadatas)) + for _, metadata := range blobMetadatas { + meta, err := s.blobStore.GetBlobMetadata(ctx, metadata.GetBlobKey()) + if err != nil { + s.logger.Error("[signer] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err) + } else { + if meta.BlobStatus == disperser.Failed { + s.logger.Info("[signer] signing blob reach max retries", "key", metadata.GetBlobKey()) + s.EncodingStreamer.RemoveEncodedBlob(metadata) + s.blobStore.RemoveBlob(ctx, metadata) + } + } + } + + s.EncodingStreamer.RemoveBatchingStatus(ts) + // Return the error(s) return result.ErrorOrNil() } @@ -560,8 +572,7 @@ func (s *SliceSigner) aggregateSignature(ctx context.Context, signInfo *SignInfo signInfo.reties += 1 s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo) } else { - _ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures) - s.EncodingStreamer.RemoveBatchingStatus(signInfo.ts) + _ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures, signInfo.ts) } return err } @@ -685,22 +696,8 @@ func (s *SliceSigner) aggregateSignature(ctx context.Context, signInfo *SignInfo s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo) s.logger.Warn("[signer] retry signing", "retries", signInfo.reties) } else { - _ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures) - - for _, metadata := range signInfo.batch.BlobMetadata { - meta, err := s.blobStore.GetBlobMetadata(ctx, metadata.GetBlobKey()) - if err != nil { - s.logger.Error("[signer] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err) - } else { - if meta.BlobStatus == disperser.Failed { - s.logger.Info("[signer] signing blob reach max retries", "key", metadata.GetBlobKey()) - s.EncodingStreamer.RemoveEncodedBlob(metadata) - s.blobStore.RemoveBlob(ctx, metadata) - } - } - } + _ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures, signInfo.ts) - s.EncodingStreamer.RemoveBatchingStatus(signInfo.ts) return errors.New("failed aggregate signatures") } }