Skip to content

Commit

Permalink
save blob into local kv db
Browse files Browse the repository at this point in the history
  • Loading branch information
csdtowards committed Jul 16, 2024
1 parent 5616d1a commit 8cce795
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 14 deletions.
27 changes: 23 additions & 4 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
metadata = &disperser.BlobMetadata{
BlobStatus: disperser.Finalized,
ConfirmationInfo: &disperser.ConfirmationInfo{
DataRoot: metadataFromKV.DataRoot,
Epoch: metadataFromKV.Epoch,
QuorumId: metadataFromKV.QuorumId,
ConfirmationBlockNumber: metadataFromKV.BlockNumber,
DataRoot: metadataFromKV.DataRoot,
Epoch: metadataFromKV.Epoch,
QuorumId: metadataFromKV.QuorumId,
},
}
} else {
Expand Down Expand Up @@ -211,6 +210,26 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob

s.logger.Info("[apiserver] received a new blob retrieval request", "blob storage root", req.StorageRoot, "blob epoch", req.Epoch, "quorum id", req.QuorumId)

metaData := disperser.BlobRetrieveMetadata{
DataRoot: req.StorageRoot,
Epoch: req.Epoch,
QuorumId: req.QuorumId,
}
blobKey, err := metaData.Serialize()
if err != nil {
s.logger.Error("[apiserver] failed to serialize metadata")
} else {
data, err := s.kvStore.GetBlob(ctx, blobKey)
if err != nil {
s.logger.Error("[apiserver] failed to get blob for key", "blobKey", blobKey)
} else {
s.metrics.HandleSuccessfulRequest(len(data), "RetrieveBlob")
return &pb.RetrieveBlobReply{
Data: data,
}, nil
}
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
conn, err := grpc.DialContext(
Expand Down
16 changes: 11 additions & 5 deletions disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ func (f *finalizer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disp

keys := make([][]byte, 0)
values := make([][]byte, 0)
blobs := make([][]byte, 0)
for _, metadata := range metadatas {
retrieveMetadata := disperser.BlobRetrieveMetadata{
DataRoot: metadata.ConfirmationInfo.DataRoot,
Epoch: metadata.ConfirmationInfo.Epoch,
QuorumId: metadata.ConfirmationInfo.QuorumId,
BlockNumber: metadata.ConfirmationInfo.ConfirmationBlockNumber,
DataRoot: metadata.ConfirmationInfo.DataRoot,
Epoch: metadata.ConfirmationInfo.Epoch,
QuorumId: metadata.ConfirmationInfo.QuorumId,
}

f.blobKeyCache.Add(retrieveMetadata.Hash(), retrieveMetadata.Epoch)
Expand All @@ -231,9 +231,15 @@ func (f *finalizer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disp
key := []byte(metadata.GetBlobKey().String())
keys = append(keys, key)
values = append(values, val)

b, err := f.blobStore.GetBlobContent(ctx, metadata)
if err != nil {
return errors.WithMessage(err, "failed to get blob content")
}
blobs = append(blobs, b)
}

_, err := f.kvStore.StoreMetadataBatch(ctx, keys, values)
_, err := f.kvStore.StoreMetadataBatch(ctx, keys, values, blobs)
if err != nil {
return errors.WithMessage(err, "failed to save retrieve metadata to kv db")
}
Expand Down
7 changes: 3 additions & 4 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ func (b *BlobKeyCache) Contains(key [32]byte) bool {
}

type BlobRetrieveMetadata struct {
DataRoot []byte
Epoch uint64
QuorumId uint64
BlockNumber uint32
DataRoot []byte
Epoch uint64
QuorumId uint64
}

func (m *BlobRetrieveMetadata) Serialize() ([]byte, error) {
Expand Down
27 changes: 26 additions & 1 deletion disperser/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ func (s *Store) deleteNBlobs(currentTimeUnixSec int64, numBatches int) (int, err
}

expiredKeys = append(expiredKeys, blobHeaderKey)

metaData, err := s.db.Get(blobHeaderKey)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
s.logger.Error("Cannot get metadata for key:", "metaData", metaData, "err", err)
} else {
return -1, err
}
}

expiredKeys = append(expiredKeys, metaData)
}
}

Expand Down Expand Up @@ -152,7 +163,7 @@ func (s *Store) StoreMetadata(ctx context.Context, key []byte, value []byte) err
return nil
}

func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metadatas [][]byte) (*[][]byte, error) {
func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metadatas [][]byte, blobs [][]byte) (*[][]byte, error) {
keys := make([][]byte, 0)
values := make([][]byte, 0)

Expand All @@ -174,6 +185,9 @@ func (s *Store) StoreMetadataBatch(ctx context.Context, blobKeys [][]byte, metad
if _, err := buf.Write(key); err != nil {
return nil, err
}

keys = append(keys, metadatas[idx])
values = append(values, blobs[idx])
}

curr := time.Now().Unix()
Expand Down Expand Up @@ -212,6 +226,17 @@ func (s *Store) GetMetadata(ctx context.Context, key []byte) ([]byte, error) {
return data, nil
}

func (s *Store) GetBlob(ctx context.Context, blobKey []byte) ([]byte, error) {
data, err := s.db.Get(blobKey)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return nil, ErrKeyNotFound
}
return nil, err
}
return data, nil
}

func (s *Store) MetadataIterator(ctx context.Context) iterator.Iterator {
return s.db.NewIterator(EncodeBlobHeaderKeyPrefix())
}
Expand Down

0 comments on commit 8cce795

Please sign in to comment.