Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance indexer to provide REST API to download files #41

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/indexer/gateway"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
indexerArgs struct {
endpoint string
nodes indexer.NodeManagerConfig
locations indexer.IPLocationConfig
locationCache indexer.FileLocationCacheConfig
endpoint string
nodes indexer.NodeManagerConfig
locations indexer.IPLocationConfig
locationCache indexer.FileLocationCacheConfig
maxDownloadFileSize uint64
}

indexerCmd = &cobra.Command{
Expand All @@ -25,7 +27,7 @@ var (
)

func init() {
indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer RPC endpoint")
indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer service endpoint")

indexerCmd.Flags().StringSliceVar(&indexerArgs.nodes.TrustedNodes, "trusted", nil, "Trusted storage node URLs that separated by comma")
indexerCmd.Flags().StringVar(&indexerArgs.nodes.DiscoveryNode, "node", "", "Storage node to discover peers in P2P network")
Expand All @@ -41,6 +43,8 @@ func init() {
indexerCmd.Flags().DurationVar(&indexerArgs.locationCache.Expiry, "file-location-cache-expiry", 24*time.Hour, "Validity period of location information")
indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache")

indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download")

indexerCmd.MarkFlagsOneRequired("trusted", "node")

rootCmd.AddCommand(indexerCmd)
Expand Down Expand Up @@ -71,7 +75,12 @@ func startIndexer(*cobra.Command, []string) {
"discover": len(indexerArgs.nodes.DiscoveryNode) > 0,
}).Info("Starting indexer service ...")

util.MustServeRPC(indexerArgs.endpoint, map[string]interface{}{
api.Namespace: api,
gateway.MustServeWithRPC(gateway.Config{
Endpoint: indexerArgs.endpoint,
Nodes: indexerArgs.nodes.TrustedNodes,
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
RPCHandler: util.MustNewRPCHandler(map[string]interface{}{
0g-peterzhb marked this conversation as resolved.
Show resolved Hide resolved
api.Namespace: api,
}),
})
}
29 changes: 17 additions & 12 deletions common/util/rpc.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package util

import (
"net"
"net/http"

"github.com/ethereum/go-ethereum/node"
"github.com/openweb3/go-rpc-provider"
"github.com/sirupsen/logrus"
)

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
// MustNewRPCHandler creates a http.Handler for the specified RPC apis.
func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
handler := rpc.NewServer()

for namespace, impl := range apis {
Expand All @@ -19,16 +18,22 @@ func MustServeRPC(endpoint string, apis map[string]interface{}) {
}
}

httpServer := http.Server{
// "github.com/ethereum/go-ethereum/node"
Handler: node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{}),
// Handler: handler,
}
// enable cors
return node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{})
}

listener, err := net.Listen("tcp", endpoint)
if err != nil {
logrus.WithError(err).WithField("endpoint", endpoint).Fatal("Failed to listen to endpoint")
// MustServe starts a HTTP service util shutdown.
func MustServe(endpoint string, handler http.Handler) {
server := http.Server{
Addr: endpoint,
Handler: handler,
}

httpServer.Serve(listener)
server.ListenAndServe()
}

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewRPCHandler(apis)
MustServe(endpoint, rpcHandler)
}
82 changes: 82 additions & 0 deletions indexer/gateway/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package gateway

import (
"context"
"fmt"
"net/http"
"os"
"path"

"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common"
"github.com/gin-gonic/gin"
)

var clients []*node.ZgsClient
var maxDownloadFileSize uint64

func downloadFile(c *gin.Context) {
var input struct {
Name string `form:"name" json:"name"`
Root string `form:"root" json:"root"`
TxSeq uint64 `form:"txSeq" json:"txSeq"`
}

if err := c.ShouldBind(&input); err != nil {
c.JSON(http.StatusBadRequest, fmt.Sprintf("Failed to bind input parameters, %v", err.Error()))
return
}

var fileInfo *node.FileInfo
var err error

if len(input.Root) == 0 {
if fileInfo, err = clients[0].GetFileInfoByTxSeq(context.Background(), input.TxSeq); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by tx seq, %v", err.Error()))
return
}
} else {
if fileInfo, err = clients[0].GetFileInfo(context.Background(), common.HexToHash(input.Root)); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by root, %v", err.Error()))
return
}
}

if fileInfo == nil {
c.JSON(http.StatusNotFound, "File not found")
return
}

if !fileInfo.Finalized {
c.JSON(http.StatusBadRequest, "File not finalized yet")
return
}

if fileInfo.Tx.Size > maxDownloadFileSize {
errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fileInfo.Tx.Size, maxDownloadFileSize)
c.JSON(http.StatusBadRequest, errMsg)
return
}

downloader, err := transfer.NewDownloader(clients)
if err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to create downloader, %v", err.Error()))
return
}

root := fileInfo.Tx.DataMerkleRoot.Hex()
tmpfile := path.Join(os.TempDir(), fmt.Sprintf("zgs_indexer_download_%v", root))
defer os.Remove(tmpfile)

if err = downloader.Download(context.Background(), fileInfo.Tx.DataMerkleRoot.Hex(), tmpfile, true); err != nil {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file, %v", err.Error()))
return
}

if len(input.Name) == 0 {
c.FileAttachment(tmpfile, root)
} else {
c.FileAttachment(tmpfile, input.Name)
}
}
63 changes: 63 additions & 0 deletions indexer/gateway/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package gateway

import (
"net/http"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/node"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)

type Config struct {
Endpoint string

Nodes []string // storage nodes to download files
MaxDownloadFileSize uint64

RPCHandler http.Handler // enable to provide both RPC and REST API service
}

func MustServeWithRPC(config Config) {
if len(config.Nodes) == 0 {
logrus.Fatal("Nodes not specified to start HTTP server")
}

// init global variables
clients = node.MustNewZgsClients(config.Nodes)
maxDownloadFileSize = config.MaxDownloadFileSize

// init router
router := newRouter()
if config.RPCHandler != nil {
router.POST("/", gin.WrapH(config.RPCHandler))
}

util.MustServe(config.Endpoint, router)
}

func newRouter() *gin.Engine {
router := gin.New()

// middlewares
router.Use(gin.Recovery())
if logrus.IsLevelEnabled(logrus.DebugLevel) {
router.Use(gin.Logger())
}
router.Use(middlewareCors())

// handlers
router.GET("/file", downloadFile)

return router
}

func middlewareCors() gin.HandlerFunc {
conf := cors.DefaultConfig()
conf.AllowMethods = append(conf.AllowMethods, "OPTIONS")
conf.AllowHeaders = append(conf.AllowHeaders, "*")
conf.AllowAllOrigins = true

return cors.New(conf)
}
Loading