diff --git a/cmd/indexer.go b/cmd/indexer.go index f2187f8..3cdc828 100644 --- a/cmd/indexer.go +++ b/cmd/indexer.go @@ -5,16 +5,18 @@ import ( "github.com/0glabs/0g-storage-client/common/util" "github.com/0glabs/0g-storage-client/indexer" + "github.com/0glabs/0g-storage-client/indexer/gateway" "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) var ( indexerArgs struct { - endpoint string - nodes indexer.NodeManagerConfig - locations indexer.IPLocationConfig - locationCache indexer.FileLocationCacheConfig + endpoint string + nodes indexer.NodeManagerConfig + locations indexer.IPLocationConfig + locationCache indexer.FileLocationCacheConfig + maxDownloadFileSize uint64 } indexerCmd = &cobra.Command{ @@ -25,7 +27,7 @@ var ( ) func init() { - indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer RPC endpoint") + indexerCmd.Flags().StringVar(&indexerArgs.endpoint, "endpoint", ":12345", "Indexer service endpoint") indexerCmd.Flags().StringSliceVar(&indexerArgs.nodes.TrustedNodes, "trusted", nil, "Trusted storage node URLs that separated by comma") indexerCmd.Flags().StringVar(&indexerArgs.nodes.DiscoveryNode, "node", "", "Storage node to discover peers in P2P network") @@ -41,6 +43,8 @@ func init() { indexerCmd.Flags().DurationVar(&indexerArgs.locationCache.Expiry, "file-location-cache-expiry", 24*time.Hour, "Validity period of location information") indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache") + indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download") + indexerCmd.MarkFlagsOneRequired("trusted", "node") rootCmd.AddCommand(indexerCmd) @@ -71,7 +75,12 @@ func startIndexer(*cobra.Command, []string) { "discover": len(indexerArgs.nodes.DiscoveryNode) > 0, }).Info("Starting indexer service ...") - util.MustServeRPC(indexerArgs.endpoint, map[string]interface{}{ - api.Namespace: api, + gateway.MustServeWithRPC(gateway.Config{ + Endpoint: indexerArgs.endpoint, + Nodes: indexerArgs.nodes.TrustedNodes, + MaxDownloadFileSize: indexerArgs.maxDownloadFileSize, + RPCHandler: util.MustNewRPCHandler(map[string]interface{}{ + api.Namespace: api, + }), }) } diff --git a/common/util/rpc.go b/common/util/rpc.go index 162423a..0ba2874 100644 --- a/common/util/rpc.go +++ b/common/util/rpc.go @@ -1,7 +1,6 @@ package util import ( - "net" "net/http" "github.com/ethereum/go-ethereum/node" @@ -9,8 +8,8 @@ import ( "github.com/sirupsen/logrus" ) -// MustServeRPC starts RPC service until shutdown. -func MustServeRPC(endpoint string, apis map[string]interface{}) { +// MustNewRPCHandler creates a http.Handler for the specified RPC apis. +func MustNewRPCHandler(apis map[string]interface{}) http.Handler { handler := rpc.NewServer() for namespace, impl := range apis { @@ -19,16 +18,22 @@ func MustServeRPC(endpoint string, apis map[string]interface{}) { } } - httpServer := http.Server{ - // "github.com/ethereum/go-ethereum/node" - Handler: node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{}), - // Handler: handler, - } + // enable cors + return node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{}) +} - listener, err := net.Listen("tcp", endpoint) - if err != nil { - logrus.WithError(err).WithField("endpoint", endpoint).Fatal("Failed to listen to endpoint") +// MustServe starts a HTTP service util shutdown. +func MustServe(endpoint string, handler http.Handler) { + server := http.Server{ + Addr: endpoint, + Handler: handler, } - httpServer.Serve(listener) + server.ListenAndServe() +} + +// MustServeRPC starts RPC service until shutdown. +func MustServeRPC(endpoint string, apis map[string]interface{}) { + rpcHandler := MustNewRPCHandler(apis) + MustServe(endpoint, rpcHandler) } diff --git a/indexer/gateway/download.go b/indexer/gateway/download.go new file mode 100644 index 0000000..a36c57e --- /dev/null +++ b/indexer/gateway/download.go @@ -0,0 +1,82 @@ +package gateway + +import ( + "context" + "fmt" + "net/http" + "os" + "path" + + "github.com/0glabs/0g-storage-client/node" + "github.com/0glabs/0g-storage-client/transfer" + "github.com/ethereum/go-ethereum/common" + "github.com/gin-gonic/gin" +) + +var clients []*node.ZgsClient +var maxDownloadFileSize uint64 + +func downloadFile(c *gin.Context) { + var input struct { + Name string `form:"name" json:"name"` + Root string `form:"root" json:"root"` + TxSeq uint64 `form:"txSeq" json:"txSeq"` + } + + if err := c.ShouldBind(&input); err != nil { + c.JSON(http.StatusBadRequest, fmt.Sprintf("Failed to bind input parameters, %v", err.Error())) + return + } + + var fileInfo *node.FileInfo + var err error + + if len(input.Root) == 0 { + if fileInfo, err = clients[0].GetFileInfoByTxSeq(context.Background(), input.TxSeq); err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by tx seq, %v", err.Error())) + return + } + } else { + if fileInfo, err = clients[0].GetFileInfo(context.Background(), common.HexToHash(input.Root)); err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get file info by root, %v", err.Error())) + return + } + } + + if fileInfo == nil { + c.JSON(http.StatusNotFound, "File not found") + return + } + + if !fileInfo.Finalized { + c.JSON(http.StatusBadRequest, "File not finalized yet") + return + } + + if fileInfo.Tx.Size > maxDownloadFileSize { + errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fileInfo.Tx.Size, maxDownloadFileSize) + c.JSON(http.StatusBadRequest, errMsg) + return + } + + downloader, err := transfer.NewDownloader(clients) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to create downloader, %v", err.Error())) + return + } + + root := fileInfo.Tx.DataMerkleRoot.Hex() + tmpfile := path.Join(os.TempDir(), fmt.Sprintf("zgs_indexer_download_%v", root)) + defer os.Remove(tmpfile) + + if err = downloader.Download(context.Background(), fileInfo.Tx.DataMerkleRoot.Hex(), tmpfile, true); err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file, %v", err.Error())) + return + } + + if len(input.Name) == 0 { + c.FileAttachment(tmpfile, root) + } else { + c.FileAttachment(tmpfile, input.Name) + } +} diff --git a/indexer/gateway/server.go b/indexer/gateway/server.go new file mode 100644 index 0000000..e68dd4a --- /dev/null +++ b/indexer/gateway/server.go @@ -0,0 +1,63 @@ +package gateway + +import ( + "net/http" + + "github.com/0glabs/0g-storage-client/common/util" + "github.com/0glabs/0g-storage-client/node" + "github.com/gin-contrib/cors" + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" +) + +type Config struct { + Endpoint string + + Nodes []string // storage nodes to download files + MaxDownloadFileSize uint64 + + RPCHandler http.Handler // enable to provide both RPC and REST API service +} + +func MustServeWithRPC(config Config) { + if len(config.Nodes) == 0 { + logrus.Fatal("Nodes not specified to start HTTP server") + } + + // init global variables + clients = node.MustNewZgsClients(config.Nodes) + maxDownloadFileSize = config.MaxDownloadFileSize + + // init router + router := newRouter() + if config.RPCHandler != nil { + router.POST("/", gin.WrapH(config.RPCHandler)) + } + + util.MustServe(config.Endpoint, router) +} + +func newRouter() *gin.Engine { + router := gin.New() + + // middlewares + router.Use(gin.Recovery()) + if logrus.IsLevelEnabled(logrus.DebugLevel) { + router.Use(gin.Logger()) + } + router.Use(middlewareCors()) + + // handlers + router.GET("/file", downloadFile) + + return router +} + +func middlewareCors() gin.HandlerFunc { + conf := cors.DefaultConfig() + conf.AllowMethods = append(conf.AllowMethods, "OPTIONS") + conf.AllowHeaders = append(conf.AllowHeaders, "*") + conf.AllowAllOrigins = true + + return cors.New(conf) +}