diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index 0a0df25519..0d9e76df63 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -162,6 +162,8 @@ curl -sL https://get.bacalhau.org/install.sh | bash`, func NewIPFSDownloadFlags(settings *model.DownloaderSettings) *pflag.FlagSet { flags := pflag.NewFlagSet("IPFS Download flags", pflag.ContinueOnError) + flags.BoolVar(&settings.Raw, "raw", + settings.Raw, "Download raw result CIDs instead of merging multiple CIDs into a single result") flags.DurationVar(&settings.Timeout, "download-timeout-secs", settings.Timeout, "Timeout duration for IPFS downloads.") flags.StringVar(&settings.OutputDir, "output-dir", diff --git a/pkg/downloader/download.go b/pkg/downloader/download.go index 5dfe7510f5..4012e6b8cc 100644 --- a/pkg/downloader/download.go +++ b/pkg/downloader/download.go @@ -85,13 +85,17 @@ func DownloadResults( //nolint:funlen,gocyclo } } - for _, cidDownloadDir := range downloadedCids { - err = moveData(ctx, resultsOutputDir, cidDownloadDir, len(downloadedCids) > 1) - if err != nil { - return err + if settings.Raw { + return nil + } else { + for _, cidDownloadDir := range downloadedCids { + err = moveData(ctx, resultsOutputDir, cidDownloadDir, len(downloadedCids) > 1) + if err != nil { + return err + } } + return os.RemoveAll(cidParentDir) } - return os.RemoveAll(cidParentDir) } func moveData( @@ -130,7 +134,7 @@ func moveData( if d.IsDir() { err = os.MkdirAll(globalTargetPath, model.DownloadFolderPerm) if err != nil { - return nil + return err } } else { // if it's not a special file then we move it into the global dir @@ -140,7 +144,7 @@ func moveData( globalTargetPath, ) if err != nil { - return nil + return err } } @@ -152,7 +156,7 @@ func moveData( globalTargetPath, ) if err != nil { - return nil + return err } } } @@ -195,8 +199,8 @@ func moveFile(sourcePath, targetPath string) error { } // file doesn't exist } else { - // this means there was no error and so the file exists - return nil + return fmt.Errorf( + "cannot merge results as output already exists: %s. Try --raw to download raw results instead of merging them", targetPath) } return os.Rename(sourcePath, targetPath) diff --git a/pkg/downloader/download_test.go b/pkg/downloader/download_test.go index 4642546de2..70ba74f125 100644 --- a/pkg/downloader/download_test.go +++ b/pkg/downloader/download_test.go @@ -74,6 +74,14 @@ func (ds *DownloaderSuite) SetupTest() { ) } +type mockResult struct { + cid string + stdout []byte + stderr []byte + exitCode []byte + outputs map[string][]byte +} + // Generate a file with random data. func generateFile(path string) ([]byte, error) { file, err := os.Create(path) @@ -113,6 +121,25 @@ func mockOutput(ds *DownloaderSuite, setup func(string)) string { return cid } +func (ds *DownloaderSuite) easyMockOutput(outputNames ...string) mockResult { + dir := ds.T().TempDir() + + res := &mockResult{ + stdout: mockFile(ds, dir, model.DownloadFilenameStdout), + stderr: mockFile(ds, dir, model.DownloadFilenameStderr), + exitCode: mockFile(ds, dir, model.DownloadFilenameExitCode), + outputs: make(map[string][]byte), + } + for _, name := range outputNames { + res.outputs[name] = mockFile(ds, dir, "outputs", name) + } + + cid, err := ds.client.Put(context.Background(), dir) + ds.NoError(err) + res.cid = cid + return *res +} + // Generates a test file at the given path filled with random data, ensuring // that any parent directories for the file are also present. func mockFile(ds *DownloaderSuite, path ...string) []byte { @@ -154,15 +181,8 @@ func (ds *DownloaderSuite) TestNoExpectedResults() { require.NoError(ds.T(), err) } -func (ds *DownloaderSuite) TestFullOutput() { - var exitCode, stdout, stderr, hello, goodbye []byte - cid := mockOutput(ds, func(dir string) { - exitCode = mockFile(ds, dir, "exitCode") - stdout = mockFile(ds, dir, model.DownloadFilenameStdout) - stderr = mockFile(ds, dir, "stderr") - hello = mockFile(ds, dir, "outputs", "hello.txt") - goodbye = mockFile(ds, dir, "outputs", "goodbye.txt") - }) +func (ds *DownloaderSuite) TestSingleOutput() { + res := ds.easyMockOutput("hello.txt", "goodbye.txt") err := DownloadResults( context.Background(), @@ -172,7 +192,114 @@ func (ds *DownloaderSuite) TestFullOutput() { Data: model.StorageSpec{ StorageSource: model.StorageSourceIPFS, Name: "result-0", - CID: cid, + CID: res.cid, + }, + }, + }, + ds.downloadProvider, + ds.downloadSettings, + ) + require.NoError(ds.T(), err) + + requireFile(ds, res.stdout, "stdout") + requireFile(ds, res.stderr, "stderr") + requireFile(ds, res.exitCode, "exitCode") + requireFile(ds, res.outputs["goodbye.txt"], "outputs", "goodbye.txt") + requireFile(ds, res.outputs["hello.txt"], "outputs", "hello.txt") +} + +func (ds *DownloaderSuite) TestSingleRawOutput() { + res := ds.easyMockOutput("hello.txt", "goodbye.txt") + + settings := ds.downloadSettings + settings.Raw = true + err := DownloadResults( + context.Background(), + []model.PublishedResult{ + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-0", + CID: res.cid, + }, + }, + }, + ds.downloadProvider, + settings, + ) + require.NoError(ds.T(), err) + + requireFile(ds, res.stdout, model.DownloadCIDsFolderName, res.cid, "stdout") + requireFile(ds, res.stderr, model.DownloadCIDsFolderName, res.cid, "stderr") + requireFile(ds, res.exitCode, model.DownloadCIDsFolderName, res.cid, "exitCode") + requireFile(ds, res.outputs["goodbye.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "goodbye.txt") + requireFile(ds, res.outputs["hello.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "hello.txt") +} + +func (ds *DownloaderSuite) TestMultiRawOutput() { + res := ds.easyMockOutput("hello.txt") + res2 := ds.easyMockOutput("goodbye.txt") + + settings := ds.downloadSettings + settings.Raw = true + err := DownloadResults( + context.Background(), + []model.PublishedResult{ + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-1", + CID: res.cid, + }, + }, + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-2", + CID: res2.cid, + }, + }, + }, + ds.downloadProvider, + settings, + ) + require.NoError(ds.T(), err) + + requireFile(ds, res.stdout, model.DownloadCIDsFolderName, res.cid, "stdout") + requireFile(ds, res.stderr, model.DownloadCIDsFolderName, res.cid, "stderr") + requireFile(ds, res.exitCode, model.DownloadCIDsFolderName, res.cid, "exitCode") + requireFile(ds, res.outputs["hello.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "hello.txt") + + requireFile(ds, res2.stdout, model.DownloadCIDsFolderName, res2.cid, "stdout") + requireFile(ds, res2.stderr, model.DownloadCIDsFolderName, res2.cid, "stderr") + requireFile(ds, res2.exitCode, model.DownloadCIDsFolderName, res2.cid, "exitCode") + requireFile(ds, res2.outputs["goodbye.txt"], model.DownloadCIDsFolderName, res2.cid, "outputs", "goodbye.txt") +} + +func (ds *DownloaderSuite) TestMultiMergedOutput() { + res := ds.easyMockOutput("hello.txt") + res2 := ds.easyMockOutput("goodbye.txt") + + err := DownloadResults( + context.Background(), + []model.PublishedResult{ + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-1", + CID: res.cid, + }, + }, + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-2", + CID: res2.cid, }, }, }, @@ -180,12 +307,38 @@ func (ds *DownloaderSuite) TestFullOutput() { ds.downloadSettings, ) require.NoError(ds.T(), err) + requireFile(ds, res.outputs["hello.txt"], "outputs", "hello.txt") + requireFile(ds, res2.outputs["goodbye.txt"], "outputs", "goodbye.txt") +} + +func (ds *DownloaderSuite) TestMultiMergeConflictingOutput() { + res := ds.easyMockOutput("same_same.txt") + res2 := ds.easyMockOutput("same_same.txt") - requireFile(ds, stdout, "stdout") - requireFile(ds, stderr, "stderr") - requireFile(ds, exitCode, "exitCode") - requireFile(ds, goodbye, "outputs", "goodbye.txt") - requireFile(ds, hello, "outputs", "hello.txt") + err := DownloadResults( + context.Background(), + []model.PublishedResult{ + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-1", + CID: res.cid, + }, + }, + { + NodeID: "testnode", + Data: model.StorageSpec{ + StorageSource: model.StorageSourceIPFS, + Name: "result-2", + CID: res2.cid, + }, + }, + }, + ds.downloadProvider, + ds.downloadSettings, + ) + require.Error(ds.T(), err) } func (ds *DownloaderSuite) TestOutputWithNoStdFiles() { diff --git a/pkg/model/downloader.go b/pkg/model/downloader.go index 916a7261ad..c99eb801f4 100644 --- a/pkg/model/downloader.go +++ b/pkg/model/downloader.go @@ -17,4 +17,5 @@ type DownloaderSettings struct { OutputDir string IPFSSwarmAddrs string LocalIPFS bool + Raw bool }