Skip to content

Commit

Permalink
fix: fix the get object status from bundle service as it latency too …
Browse files Browse the repository at this point in the history
…high
  • Loading branch information
alexgao001 committed Aug 7, 2024
1 parent b77228d commit 4c34af9
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
3 changes: 2 additions & 1 deletion external/cmn/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type QuotaInfo struct {
}

type ObjectInfo struct {
Checksums []string `json:"checksums"`
Checksums []string `json:"checksums"`
ObjectStatus string `json:"object_status"`
}

type GetObjectInfoResponse struct {
Expand Down
6 changes: 3 additions & 3 deletions restapi/handlers/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func HandleGetBlobSidecars() func(params blob.GetBlobSidecarsByBlockNumParams) m
}
sidecars, err = service.BlobSvc.GetBlobSidecarsByRoot(hex.EncodeToString(root), indicesInx)
if err != nil {
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalErrorWithError(err))
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalError())
}
} else {
slot, err := util.StringToUint64(blockID)
Expand All @@ -54,7 +54,7 @@ func HandleGetBlobSidecars() func(params blob.GetBlobSidecarsByBlockNumParams) m
}
sidecars, err = service.BlobSvc.GetBlobSidecarsByBlockNumOrSlot(slot, indicesInx)
if err != nil {
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalErrorWithError(err))
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalError())
}
}
payload := models.GetBlobSideCarsResponse{
Expand Down Expand Up @@ -99,7 +99,7 @@ func HandleGetBSCBlobSidecars() func(params blob.GetBSCBlobSidecarsByBlockNumPar
}
sidecars, err := service.BlobSvc.GetBlobSidecarsByBlockNumOrSlot(blockNum, nil)
if err != nil {
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalErrorWithError(err))
return blob.NewGetBlobSidecarsByBlockNumInternalServerError().WithPayload(service.InternalError())
}
// group sidecars by tx hash
bscTxSidecars := make(map[string]*models.BSCBlobTxSidecar)
Expand Down
4 changes: 2 additions & 2 deletions service/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func (e Err) Error() string {
return fmt.Sprintf("%d: %s", e.Code, e.Message)
}

func InternalErrorWithError(err error) *models.Error {
func InternalError() *models.Error {
return &models.Error{
Code: 500,
Message: err.Error(),
Message: "internal error",
}
}

Expand Down
6 changes: 2 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,15 @@ func (s *BlobSyncer) StartLoop() {
syncTicker := time.NewTicker(LoopSleepTime)
for range syncTicker.C {
if err = s.sync(); err != nil {
logging.Logger.Error(err)
continue
logging.Logger.Errorf("failed to sync, err=%s", err.Error())
}
}
}()
go func() {
verifyTicket := time.NewTicker(LoopSleepTime)
for range verifyTicket.C {
if err := s.verify(); err != nil {
logging.Logger.Error(err)
continue
logging.Logger.Errorf("failed to verify, err=%s", err.Error())
}
}
}()
Expand Down
28 changes: 21 additions & 7 deletions syncer/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
//
// a new bundle should be re-uploaded.
func (s *BlobSyncer) verify() error {

// get the earliest unverified block
verifyBlock, err := s.blobDao.GetEarliestUnverifiedBlock()
if err != nil {
Expand All @@ -64,7 +65,6 @@ func (s *BlobSyncer) verify() error {
if err != nil {
return err
}

verifyBlockID := verifyBlock.Slot
// validate the bundle info at the start slot of a bundle
if verifyBlockID == bundleStartBlockID || !s.DetailedIntegrityCheckEnabled() {
Expand Down Expand Up @@ -92,13 +92,26 @@ func (s *BlobSyncer) verify() error {
}
return nil
}

// the bundle is not sealed yet
if bundleInfo.Status == BundleStatusFinalized || bundleInfo.Status == BundleStatusCreatedOnChain {
if bundle.CreatedTime > 0 && time.Now().Unix()-bundle.CreatedTime > s.config.GetReUploadBundleThresh() {
logging.Logger.Infof("the bundle %s is not sealed and exceed the re-upload threshold %d ", bundleName, s.config.GetReUploadBundleThresh())
return s.reUploadBundle(bundleName)
// get the object meta from chain
objectMeta, err := s.chainClient.GetObjectMeta(context.Background(), s.getBucketName(), bundleName)
if err != nil {
time.Sleep(VerifyPauseTime)
logging.Logger.Errorf("failed to get object meta from chain, bundleName=%s", bundleName)
return err
}
// check the object info from chain to make sure it is not be sealed
// if it is not be sealed, re-upload it
if objectMeta.ObjectStatus != "OBJECT_STATUS_SEALED" {
if bundle.CreatedTime > 0 && time.Now().Unix()-bundle.CreatedTime > s.config.GetReUploadBundleThresh() {
logging.Logger.Infof("the bundle %s is not sealed and exceed the re-upload threshold %d ", bundleName, s.config.GetReUploadBundleThresh())
return s.reUploadBundle(bundleName)
}
logging.Logger.Info("the bundle is not sealed yet, bundleName=%s, status = %d", bundleName, bundleInfo.Status)
return nil
}
return nil
}
}

Expand Down Expand Up @@ -256,7 +269,7 @@ func (s *BlobSyncer) verifyBlobsAtBlock(blockID uint64, sidecars []*types.Genera
// get blob from bundle service
blobFromBundle, err := s.bundleClient.GetObject(s.getBucketName(), bundleName, types.GetBlobName(blockID, i))
if err != nil {
if err == cmn.ErrorBundleObjectNotExist {
if errors.Is(err, cmn.ErrorBundleObjectNotExist) {
logging.Logger.Errorf("the bundle object not found in bundle service, object=%s", types.GetBlobName(blockID, i))
return ErrVerificationFailed
}
Expand Down Expand Up @@ -310,8 +323,9 @@ func (s *BlobSyncer) reUploadBundle(bundleName string) error {
if err := s.blobDao.UpdateBundleStatus(bundleName, db.Deprecated); err != nil {
return err
}
parts := strings.Split(bundleName, "_")
newBundleName := parts[0] + "_" + parts[1] + "_" + parts[2] + "_calibrated_" + util.Int64ToString(time.Now().Unix())

newBundleName := bundleName + "_calibrated_" + util.Int64ToString(time.Now().Unix())
startBlockID, endBlockID, err := types.ParseBundleName(bundleName)
if err != nil {
return err
Expand Down

0 comments on commit 4c34af9

Please sign in to comment.