Skip to content

Commit

Permalink
Optimize error handling (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
csdtowards authored Aug 6, 2024
1 parent 4a76e75 commit d357a32
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 47 deletions.
59 changes: 36 additions & 23 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,20 @@ func (b *Batcher) Start(ctx context.Context) error {
case <-ctx.Done():
return
case <-ticker.C:
if ts, err := b.HandleSingleBatch(ctx); err != nil {
if ts, blobMetadata, err := b.HandleSingleBatch(ctx); err != nil {
for _, metadata := range blobMetadata {
meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey())
if err != nil {
b.logger.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err)
} else {
if meta.BlobStatus == disperser.Failed {
b.logger.Info("[batcher] handle encode batch reach max retries", "key", metadata.GetBlobKey())
b.EncodingStreamer.RemoveEncodedBlob(metadata)
b.Queue.RemoveBlob(ctx, metadata)
}
}
}

b.EncodingStreamer.RemoveBatchingStatus(ts)
if errors.Is(err, errNoEncodedResults) {
b.logger.Debug("[batcher] no encoded results to make a batch with")
Expand All @@ -193,7 +206,20 @@ func (b *Batcher) Start(ctx context.Context) error {
}
case <-batchTrigger.Notify:
ticker.Stop()
if ts, err := b.HandleSingleBatch(ctx); err != nil {
if ts, blobMetadata, err := b.HandleSingleBatch(ctx); err != nil {
for _, metadata := range blobMetadata {
meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey())
if err != nil {
b.logger.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err)
} else {
if meta.BlobStatus == disperser.Failed {
b.logger.Info("[batcher] handle encode batch reach max retries", "key", metadata.GetBlobKey())
b.EncodingStreamer.RemoveEncodedBlob(metadata)
b.Queue.RemoveBlob(ctx, metadata)
}
}
}

b.EncodingStreamer.RemoveBatchingStatus(ts)
if errors.Is(err, errNoEncodedResults) {
b.logger.Debug("[batcher] no encoded results to make a batch with(Notified)")
Expand Down Expand Up @@ -267,7 +293,7 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
return result.ErrorOrNil()
}

func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, []*disperser.BlobMetadata, error) {
log := b.logger
// start a timer
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
Expand All @@ -279,7 +305,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
log.Info("[batcher] Creating batch", "ts", stageTimer)
batch, ts, err := b.EncodingStreamer.CreateBatch()
if err != nil {
return ts, err
return ts, nil, err
}
log.Info("[batcher] CreateBatch took", "duration", time.Since(stageTimer), "blobNum", len(batch.EncodedBlobs))

Expand All @@ -288,7 +314,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
headerHash, err := batch.BatchHeader.GetBatchHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchHeaderHash)
return ts, fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err)
return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err)
}

proofs := make([]*merkletree.Proof, 0)
Expand All @@ -298,19 +324,19 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
// generate inclusion proof
if blobIndex >= len(batch.BlobHeaders) {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchBlobIndex)
return ts, fmt.Errorf("HandleSingleBatch: error preparing kv data: blob header at index %d not found in batch", blobIndex)
return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: error preparing kv data: blob header at index %d not found in batch", blobIndex)
}
blobHeader = batch.BlobHeaders[blobIndex]

blobHeaderHash, err := blobHeader.GetBlobHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchBlobHeaderHash)
return ts, fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err)
return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err)
}
merkleProof, err := batch.MerkleTree.GenerateProof(blobHeaderHash[:], 0)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchProof)
return ts, fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err)
return ts, batch.BlobMetadata, fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err)
}
proofs = append(proofs, merkleProof)
}
Expand All @@ -320,21 +346,8 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
stageTimer = time.Now()
batch.TxHash, err = b.Dispatcher.DisperseBatch(ctx, headerHash, batch.BatchHeader, batch.EncodedBlobs, batch.BlobHeaders)
if err != nil {
for _, metadata := range batch.BlobMetadata {
meta, err := b.Queue.GetBlobMetadata(ctx, metadata.GetBlobKey())
if err != nil {
log.Error("[batcher] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err)
} else {
if meta.BlobStatus == disperser.Failed {
log.Info("[batcher] disperse batch reach max retries", "key", metadata.GetBlobKey())
b.EncodingStreamer.RemoveEncodedBlob(metadata)
b.Queue.RemoveBlob(ctx, metadata)
}
}
}

_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchSubmitRoot)
return ts, err
return ts, batch.BlobMetadata, err
}
log.Info("[batcher] DisperseBatch took", "duration", time.Since(stageTimer))

Expand All @@ -346,7 +359,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) (uint64, error) {
reties: 0,
}

return ts, nil
return ts, nil, nil
}

func (b *Batcher) HandleSignedBatch(ctx context.Context) error {
Expand Down
45 changes: 21 additions & 24 deletions disperser/batcher/slice_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,13 @@ func (s *SliceSigner) waitBatchTxFinalized(ctx context.Context, batchInfo *SignI

if err != nil || len(dataUploadEvents) == 0 {
// batch is not confirmed
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchReceipt)
s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts)
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchReceipt, batchInfo.ts)
return err
}

for i := 1; i < len(dataUploadEvents); i++ {
if dataUploadEvents[i].Epoch.Cmp(dataUploadEvents[i-1].Epoch) != 0 {
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchEpochMismatch)
s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts)
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailBatchEpochMismatch, batchInfo.ts)
return fmt.Errorf("epoch in one batch is mismatch: epoch %v, %v", dataUploadEvents[i].Epoch, dataUploadEvents[i-1].Epoch)
}
}
Expand All @@ -285,8 +283,7 @@ func (s *SliceSigner) waitBatchTxFinalized(ctx context.Context, batchInfo *SignI
// signInfo.reties += 1
// s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo)
// } else {
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailGetSigners)
s.EncodingStreamer.RemoveBatchingStatus(batchInfo.ts)
_ = s.handleFailure(ctx, batchInfo.batch.BlobMetadata, FailGetSigners, batchInfo.ts)
// }

return fmt.Errorf("failed to get signers from contract: %w", err)
Expand Down Expand Up @@ -410,7 +407,7 @@ func (s *SliceSigner) getSigners(epoch *big.Int, quorumId *big.Int) (map[eth_com
return hm, nil
}

func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason, ts uint64) error {
var result *multierror.Error
for _, metadata := range blobMetadatas {
err := s.blobStore.HandleBlobFailure(ctx, metadata, s.MaxNumRetriesPerBlob)
Expand All @@ -423,6 +420,21 @@ func (s *SliceSigner) handleFailure(ctx context.Context, blobMetadatas []*disper
}
s.metrics.UpdateBatchError(reason, len(blobMetadatas))

for _, metadata := range blobMetadatas {
meta, err := s.blobStore.GetBlobMetadata(ctx, metadata.GetBlobKey())
if err != nil {
s.logger.Error("[signer] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err)
} else {
if meta.BlobStatus == disperser.Failed {
s.logger.Info("[signer] signing blob reach max retries", "key", metadata.GetBlobKey())
s.EncodingStreamer.RemoveEncodedBlob(metadata)
s.blobStore.RemoveBlob(ctx, metadata)
}
}
}

s.EncodingStreamer.RemoveBatchingStatus(ts)

// Return the error(s)
return result.ErrorOrNil()
}
Expand Down Expand Up @@ -560,8 +572,7 @@ func (s *SliceSigner) aggregateSignature(ctx context.Context, signInfo *SignInfo
signInfo.reties += 1
s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo)
} else {
_ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures)
s.EncodingStreamer.RemoveBatchingStatus(signInfo.ts)
_ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures, signInfo.ts)
}
return err
}
Expand Down Expand Up @@ -685,22 +696,8 @@ func (s *SliceSigner) aggregateSignature(ctx context.Context, signInfo *SignInfo
s.pendingBatchesToSign = append(s.pendingBatchesToSign, signInfo)
s.logger.Warn("[signer] retry signing", "retries", signInfo.reties)
} else {
_ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures)

for _, metadata := range signInfo.batch.BlobMetadata {
meta, err := s.blobStore.GetBlobMetadata(ctx, metadata.GetBlobKey())
if err != nil {
s.logger.Error("[signer] failed to get blob metadata", "key", metadata.GetBlobKey(), "err", err)
} else {
if meta.BlobStatus == disperser.Failed {
s.logger.Info("[signer] signing blob reach max retries", "key", metadata.GetBlobKey())
s.EncodingStreamer.RemoveEncodedBlob(metadata)
s.blobStore.RemoveBlob(ctx, metadata)
}
}
}
_ = s.handleFailure(ctx, signInfo.batch.BlobMetadata, FailAggregateSignatures, signInfo.ts)

s.EncodingStreamer.RemoveBatchingStatus(signInfo.ts)
return errors.New("failed aggregate signatures")
}
}
Expand Down

0 comments on commit d357a32

Please sign in to comment.