From 8cce79591a2aac424d3511a57beb3ab141713419 Mon Sep 17 00:00:00 2001 From: Joel Date: Tue, 16 Jul 2024 13:59:17 +0800 Subject: [PATCH] save blob into local kv db --- disperser/apiserver/server.go | 27 +++++++++++++++++++++++---- disperser/batcher/finalizer.go | 16 +++++++++++----- disperser/disperser.go | 7 +++---- disperser/store.go | 27 ++++++++++++++++++++++++++- 4 files changed, 63 insertions(+), 14 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 7846acc..ff64645 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -162,10 +162,9 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR metadata = &disperser.BlobMetadata{ BlobStatus: disperser.Finalized, ConfirmationInfo: &disperser.ConfirmationInfo{ - DataRoot: metadataFromKV.DataRoot, - Epoch: metadataFromKV.Epoch, - QuorumId: metadataFromKV.QuorumId, - ConfirmationBlockNumber: metadataFromKV.BlockNumber, + DataRoot: metadataFromKV.DataRoot, + Epoch: metadataFromKV.Epoch, + QuorumId: metadataFromKV.QuorumId, }, } } else { @@ -211,6 +210,26 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.logger.Info("[apiserver] received a new blob retrieval request", "blob storage root", req.StorageRoot, "blob epoch", req.Epoch, "quorum id", req.QuorumId) + metaData := disperser.BlobRetrieveMetadata{ + DataRoot: req.StorageRoot, + Epoch: req.Epoch, + QuorumId: req.QuorumId, + } + blobKey, err := metaData.Serialize() + if err != nil { + s.logger.Error("[apiserver] failed to serialize metadata") + } else { + data, err := s.kvStore.GetBlob(ctx, blobKey) + if err != nil { + s.logger.Error("[apiserver] failed to get blob for key", "blobKey", blobKey) + } else { + s.metrics.HandleSuccessfulRequest(len(data), "RetrieveBlob") + return &pb.RetrieveBlobReply{ + Data: data, + }, nil + } + } + ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() conn, err := grpc.DialContext( diff --git a/disperser/batcher/finalizer.go b/disperser/batcher/finalizer.go index 78c1205..f4602ed 100644 --- a/disperser/batcher/finalizer.go +++ b/disperser/batcher/finalizer.go @@ -214,12 +214,12 @@ func (f *finalizer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disp keys := make([][]byte, 0) values := make([][]byte, 0) + blobs := make([][]byte, 0) for _, metadata := range metadatas { retrieveMetadata := disperser.BlobRetrieveMetadata{ - DataRoot: metadata.ConfirmationInfo.DataRoot, - Epoch: metadata.ConfirmationInfo.Epoch, - QuorumId: metadata.ConfirmationInfo.QuorumId, - BlockNumber: metadata.ConfirmationInfo.ConfirmationBlockNumber, + DataRoot: metadata.ConfirmationInfo.DataRoot, + Epoch: metadata.ConfirmationInfo.Epoch, + QuorumId: metadata.ConfirmationInfo.QuorumId, } f.blobKeyCache.Add(retrieveMetadata.Hash(), retrieveMetadata.Epoch) @@ -231,9 +231,15 @@ func (f *finalizer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disp key := []byte(metadata.GetBlobKey().String()) keys = append(keys, key) values = append(values, val) + + b, err := f.blobStore.GetBlobContent(ctx, metadata) + if err != nil { + return errors.WithMessage(err, "failed to get blob content") + } + blobs = append(blobs, b) } - _, err := f.kvStore.StoreMetadataBatch(ctx, keys, values) + _, err := f.kvStore.StoreMetadataBatch(ctx, keys, values, blobs) if err != nil { return errors.WithMessage(err, "failed to save retrieve metadata to kv db") } diff --git a/disperser/disperser.go b/disperser/disperser.go index 6587243..5a98330 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -95,10 +95,9 @@ func (b *BlobKeyCache) Contains(key [32]byte) bool { } type BlobRetrieveMetadata struct { - DataRoot []byte - Epoch uint64 - QuorumId uint64 - BlockNumber uint32 + DataRoot []byte + Epoch uint64 + QuorumId uint64 } func (m *BlobRetrieveMetadata) Serialize() ([]byte, error) { diff --git a/disperser/store.go b/disperser/store.go index 6a7deef..5a5aa15 100644 --- a/disperser/store.go +++ b/disperser/store.go @@ -123,6 +123,17 @@ func (s *Store) deleteNBlobs(currentTimeUnixSec int64, numBatches int) (int, err } expiredKeys = append(expiredKeys, blobHeaderKey) + + metaData, err := s.db.Get(blobHeaderKey) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + s.logger.Error("Cannot get metadata for key:", "metaData", metaData, "err", err) + } else { + return -1, err + } + } + + expiredKeys = append(expiredKeys, metaData) } } @@ -152,7 +163,7 @@ func (s *Store) StoreMetadata(ctx context.Context, key []byte, value []byte) err return nil } -func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metadatas [][]byte) (*[][]byte, error) { +func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metadatas [][]byte, blobs [][]byte) (*[][]byte, error) { keys := make([][]byte, 0) values := make([][]byte, 0) @@ -174,6 +185,9 @@ func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metad if _, err := buf.Write(key); err != nil { return nil, err } + + keys = append(keys, metadatas[idx]) + values = append(values, blobs[idx]) } curr := time.Now().Unix() @@ -212,6 +226,17 @@ func (s *Store) GetMetadata(ctx context.Context, key []byte) ([]byte, error) { return data, nil } +func (s *Store) GetBlob(ctx context.Context, blobKey []byte) ([]byte, error) { + data, err := s.db.Get(blobKey) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + return nil, ErrKeyNotFound + } + return nil, err + } + return data, nil +} + func (s *Store) MetadataIterator(ctx context.Context) iterator.Iterator { return s.db.NewIterator(EncodeBlobHeaderKeyPrefix()) }