Skip to content

Commit

Permalink
Enhance indexer to provide REST API to download files (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu authored Aug 13, 2024
1 parent a5b9576 commit e96480e
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 19 deletions.
23 changes: 16 additions & 7 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/indexer/gateway"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
indexerArgs struct {
endpoint string
nodes indexer.NodeManagerConfig
locations indexer.IPLocationConfig
locationCache indexer.FileLocationCacheConfig
endpoint string
nodes indexer.NodeManagerConfig
locations indexer.IPLocationConfig
locationCache indexer.FileLocationCacheConfig
maxDownloadFileSize uint64
}

indexerCmd = &cobra.Command{
Expand All @@ -25,7 +27,7 @@ var (
)

func init() {
indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer RPC endpoint")
indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer service endpoint")

indexerCmd.Flags().StringSliceVar(&indexerArgs.nodes.TrustedNodes, "trusted", nil, "Trusted storage node URLs that separated by comma")
indexerCmd.Flags().StringVar(&indexerArgs.nodes.DiscoveryNode, "node", "", "Storage node to discover peers in P2P network")
Expand All @@ -41,6 +43,8 @@ func init() {
indexerCmd.Flags().DurationVar(&indexerArgs.locationCache.Expiry, "file-location-cache-expiry", 24*time.Hour, "Validity period of location information")
indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache")

indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download")

indexerCmd.MarkFlagsOneRequired("trusted", "node")

rootCmd.AddCommand(indexerCmd)
Expand Down Expand Up @@ -71,7 +75,12 @@ func startIndexer(*cobra.Command, []string) {
"discover": len(indexerArgs.nodes.DiscoveryNode) > 0,
}).Info("Starting indexer service ...")

util.MustServeRPC(indexerArgs.endpoint, map[string]interface{}{
api.Namespace: api,
gateway.MustServeWithRPC(gateway.Config{
Endpoint: indexerArgs.endpoint,
Nodes: indexerArgs.nodes.TrustedNodes,
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
RPCHandler: util.MustNewRPCHandler(map[string]interface{}{
api.Namespace: api,
}),
})
}
29 changes: 17 additions & 12 deletions common/util/rpc.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package util

import (
"net"
"net/http"

"github.com/ethereum/go-ethereum/node"
"github.com/openweb3/go-rpc-provider"
"github.com/sirupsen/logrus"
)

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
// MustNewRPCHandler creates a http.Handler for the specified RPC apis.
func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
handler := rpc.NewServer()

for namespace, impl := range apis {
Expand All @@ -19,16 +18,22 @@ func MustServeRPC(endpoint string, apis map[string]interface{}) {
}
}

httpServer := http.Server{
// "github.com/ethereum/go-ethereum/node"
Handler: node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{}),
// Handler: handler,
}
// enable cors
return node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{})
}

listener, err := net.Listen("tcp", endpoint)
if err != nil {
logrus.WithError(err).WithField("endpoint", endpoint).Fatal("Failed to listen to endpoint")
// MustServe starts a HTTP service util shutdown.
func MustServe(endpoint string, handler http.Handler) {
server := http.Server{
Addr: endpoint,
Handler: handler,
}

httpServer.Serve(listener)
server.ListenAndServe()
}

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewRPCHandler(apis)
MustServe(endpoint, rpcHandler)
}
82 changes: 82 additions & 0 deletions indexer/gateway/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package gateway

import (
"context"
"fmt"
"net/http"
"os"
"path"

"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common"
"github.com/gin-gonic/gin"
)

var clients []*node.ZgsClient
var maxDownloadFileSize uint64

func downloadFile(c *gin.Context) {
var input struct {
Name string `form:"name" json:"name"`
Root string `form:"root" json:"root"`
TxSeq uint64 `form:"txSeq" json:"txSeq"`
}

if err := c.ShouldBind(&input); err != nil {
c.JSON(http.StatusBadRequest, fmt.Sprintf("Failed to bind input parameters, %v", err.Error()))
return
}

var fileInfo *node.FileInfo
var err error

if len(input.Root) == 0 {
if fileInfo, err = clients[0].GetFileInfoByTxSeq(context.Background(), input.TxSeq); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by tx seq, %v", err.Error()))
return
}
} else {
if fileInfo, err = clients[0].GetFileInfo(context.Background(), common.HexToHash(input.Root)); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by root, %v", err.Error()))
return
}
}

if fileInfo == nil {
c.JSON(http.StatusNotFound, "File not found")
return
}

if !fileInfo.Finalized {
c.JSON(http.StatusBadRequest, "File not finalized yet")
return
}

if fileInfo.Tx.Size > maxDownloadFileSize {
errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fileInfo.Tx.Size, maxDownloadFileSize)
c.JSON(http.StatusBadRequest, errMsg)
return
}

downloader, err := transfer.NewDownloader(clients)
if err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to create downloader, %v", err.Error()))
return
}

root := fileInfo.Tx.DataMerkleRoot.Hex()
tmpfile := path.Join(os.TempDir(), fmt.Sprintf("zgs_indexer_download_%v", root))
defer os.Remove(tmpfile)

if err = downloader.Download(context.Background(), fileInfo.Tx.DataMerkleRoot.Hex(), tmpfile, true); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file, %v", err.Error()))
return
}

if len(input.Name) == 0 {
c.FileAttachment(tmpfile, root)
} else {
c.FileAttachment(tmpfile, input.Name)
}
}
63 changes: 63 additions & 0 deletions indexer/gateway/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package gateway

import (
"net/http"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/node"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)

type Config struct {
Endpoint string

Nodes []string // storage nodes to download files
MaxDownloadFileSize uint64

RPCHandler http.Handler // enable to provide both RPC and REST API service
}

func MustServeWithRPC(config Config) {
if len(config.Nodes) == 0 {
logrus.Fatal("Nodes not specified to start HTTP server")
}

// init global variables
clients = node.MustNewZgsClients(config.Nodes)
maxDownloadFileSize = config.MaxDownloadFileSize

// init router
router := newRouter()
if config.RPCHandler != nil {
router.POST("/", gin.WrapH(config.RPCHandler))
}

util.MustServe(config.Endpoint, router)
}

func newRouter() *gin.Engine {
router := gin.New()

// middlewares
router.Use(gin.Recovery())
if logrus.IsLevelEnabled(logrus.DebugLevel) {
router.Use(gin.Logger())
}
router.Use(middlewareCors())

// handlers
router.GET("/file", downloadFile)

return router
}

func middlewareCors() gin.HandlerFunc {
conf := cors.DefaultConfig()
conf.AllowMethods = append(conf.AllowMethods, "OPTIONS")
conf.AllowHeaders = append(conf.AllowHeaders, "*")
conf.AllowAllOrigins = true

return cors.New(conf)
}

0 comments on commit e96480e

Please sign in to comment.