Skip to content

Commit

Permalink
enable downloading raw results (#2217)
Browse files Browse the repository at this point in the history
introduce flag to download `--raw` results instead of merged results.
After sharding was removed, we always merge and delete raw directory,
which might break something if a user submits a non-deterministic job
with concurrency > 1.

This PR also fixes a bug by failing in case multiple results with
different CIDs share the same output file names instead of silently
skipping
  • Loading branch information
wdbaruni authored Mar 21, 2023
1 parent 124824b commit f0ae6b9
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 25 deletions.
2 changes: 2 additions & 0 deletions cmd/bacalhau/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ curl -sL https://get.bacalhau.org/install.sh | bash`,

func NewIPFSDownloadFlags(settings *model.DownloaderSettings) *pflag.FlagSet {
flags := pflag.NewFlagSet("IPFS Download flags", pflag.ContinueOnError)
flags.BoolVar(&settings.Raw, "raw",
settings.Raw, "Download raw result CIDs instead of merging multiple CIDs into a single result")
flags.DurationVar(&settings.Timeout, "download-timeout-secs",
settings.Timeout, "Timeout duration for IPFS downloads.")
flags.StringVar(&settings.OutputDir, "output-dir",
Expand Down
24 changes: 14 additions & 10 deletions pkg/downloader/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ func DownloadResults( //nolint:funlen,gocyclo
}
}

for _, cidDownloadDir := range downloadedCids {
err = moveData(ctx, resultsOutputDir, cidDownloadDir, len(downloadedCids) > 1)
if err != nil {
return err
if settings.Raw {
return nil
} else {
for _, cidDownloadDir := range downloadedCids {
err = moveData(ctx, resultsOutputDir, cidDownloadDir, len(downloadedCids) > 1)
if err != nil {
return err
}
}
return os.RemoveAll(cidParentDir)
}
return os.RemoveAll(cidParentDir)
}

func moveData(
Expand Down Expand Up @@ -130,7 +134,7 @@ func moveData(
if d.IsDir() {
err = os.MkdirAll(globalTargetPath, model.DownloadFolderPerm)
if err != nil {
return nil
return err
}
} else {
// if it's not a special file then we move it into the global dir
Expand All @@ -140,7 +144,7 @@ func moveData(
globalTargetPath,
)
if err != nil {
return nil
return err
}
}

Expand All @@ -152,7 +156,7 @@ func moveData(
globalTargetPath,
)
if err != nil {
return nil
return err
}
}
}
Expand Down Expand Up @@ -195,8 +199,8 @@ func moveFile(sourcePath, targetPath string) error {
}
// file doesn't exist
} else {
// this means there was no error and so the file exists
return nil
return fmt.Errorf(
"cannot merge results as output already exists: %s. Try --raw to download raw results instead of merging them", targetPath)
}

return os.Rename(sourcePath, targetPath)
Expand Down
183 changes: 168 additions & 15 deletions pkg/downloader/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ func (ds *DownloaderSuite) SetupTest() {
)
}

type mockResult struct {
cid string
stdout []byte
stderr []byte
exitCode []byte
outputs map[string][]byte
}

// Generate a file with random data.
func generateFile(path string) ([]byte, error) {
file, err := os.Create(path)
Expand Down Expand Up @@ -113,6 +121,25 @@ func mockOutput(ds *DownloaderSuite, setup func(string)) string {
return cid
}

func (ds *DownloaderSuite) easyMockOutput(outputNames ...string) mockResult {
dir := ds.T().TempDir()

res := &mockResult{
stdout: mockFile(ds, dir, model.DownloadFilenameStdout),
stderr: mockFile(ds, dir, model.DownloadFilenameStderr),
exitCode: mockFile(ds, dir, model.DownloadFilenameExitCode),
outputs: make(map[string][]byte),
}
for _, name := range outputNames {
res.outputs[name] = mockFile(ds, dir, "outputs", name)
}

cid, err := ds.client.Put(context.Background(), dir)
ds.NoError(err)
res.cid = cid
return *res
}

// Generates a test file at the given path filled with random data, ensuring
// that any parent directories for the file are also present.
func mockFile(ds *DownloaderSuite, path ...string) []byte {
Expand Down Expand Up @@ -154,15 +181,8 @@ func (ds *DownloaderSuite) TestNoExpectedResults() {
require.NoError(ds.T(), err)
}

func (ds *DownloaderSuite) TestFullOutput() {
var exitCode, stdout, stderr, hello, goodbye []byte
cid := mockOutput(ds, func(dir string) {
exitCode = mockFile(ds, dir, "exitCode")
stdout = mockFile(ds, dir, model.DownloadFilenameStdout)
stderr = mockFile(ds, dir, "stderr")
hello = mockFile(ds, dir, "outputs", "hello.txt")
goodbye = mockFile(ds, dir, "outputs", "goodbye.txt")
})
func (ds *DownloaderSuite) TestSingleOutput() {
res := ds.easyMockOutput("hello.txt", "goodbye.txt")

err := DownloadResults(
context.Background(),
Expand All @@ -172,20 +192,153 @@ func (ds *DownloaderSuite) TestFullOutput() {
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-0",
CID: cid,
CID: res.cid,
},
},
},
ds.downloadProvider,
ds.downloadSettings,
)
require.NoError(ds.T(), err)

requireFile(ds, res.stdout, "stdout")
requireFile(ds, res.stderr, "stderr")
requireFile(ds, res.exitCode, "exitCode")
requireFile(ds, res.outputs["goodbye.txt"], "outputs", "goodbye.txt")
requireFile(ds, res.outputs["hello.txt"], "outputs", "hello.txt")
}

func (ds *DownloaderSuite) TestSingleRawOutput() {
res := ds.easyMockOutput("hello.txt", "goodbye.txt")

settings := ds.downloadSettings
settings.Raw = true
err := DownloadResults(
context.Background(),
[]model.PublishedResult{
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-0",
CID: res.cid,
},
},
},
ds.downloadProvider,
settings,
)
require.NoError(ds.T(), err)

requireFile(ds, res.stdout, model.DownloadCIDsFolderName, res.cid, "stdout")
requireFile(ds, res.stderr, model.DownloadCIDsFolderName, res.cid, "stderr")
requireFile(ds, res.exitCode, model.DownloadCIDsFolderName, res.cid, "exitCode")
requireFile(ds, res.outputs["goodbye.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "goodbye.txt")
requireFile(ds, res.outputs["hello.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "hello.txt")
}

func (ds *DownloaderSuite) TestMultiRawOutput() {
res := ds.easyMockOutput("hello.txt")
res2 := ds.easyMockOutput("goodbye.txt")

settings := ds.downloadSettings
settings.Raw = true
err := DownloadResults(
context.Background(),
[]model.PublishedResult{
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-1",
CID: res.cid,
},
},
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-2",
CID: res2.cid,
},
},
},
ds.downloadProvider,
settings,
)
require.NoError(ds.T(), err)

requireFile(ds, res.stdout, model.DownloadCIDsFolderName, res.cid, "stdout")
requireFile(ds, res.stderr, model.DownloadCIDsFolderName, res.cid, "stderr")
requireFile(ds, res.exitCode, model.DownloadCIDsFolderName, res.cid, "exitCode")
requireFile(ds, res.outputs["hello.txt"], model.DownloadCIDsFolderName, res.cid, "outputs", "hello.txt")

requireFile(ds, res2.stdout, model.DownloadCIDsFolderName, res2.cid, "stdout")
requireFile(ds, res2.stderr, model.DownloadCIDsFolderName, res2.cid, "stderr")
requireFile(ds, res2.exitCode, model.DownloadCIDsFolderName, res2.cid, "exitCode")
requireFile(ds, res2.outputs["goodbye.txt"], model.DownloadCIDsFolderName, res2.cid, "outputs", "goodbye.txt")
}

func (ds *DownloaderSuite) TestMultiMergedOutput() {
res := ds.easyMockOutput("hello.txt")
res2 := ds.easyMockOutput("goodbye.txt")

err := DownloadResults(
context.Background(),
[]model.PublishedResult{
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-1",
CID: res.cid,
},
},
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-2",
CID: res2.cid,
},
},
},
ds.downloadProvider,
ds.downloadSettings,
)
require.NoError(ds.T(), err)
requireFile(ds, res.outputs["hello.txt"], "outputs", "hello.txt")
requireFile(ds, res2.outputs["goodbye.txt"], "outputs", "goodbye.txt")
}

func (ds *DownloaderSuite) TestMultiMergeConflictingOutput() {
res := ds.easyMockOutput("same_same.txt")
res2 := ds.easyMockOutput("same_same.txt")

requireFile(ds, stdout, "stdout")
requireFile(ds, stderr, "stderr")
requireFile(ds, exitCode, "exitCode")
requireFile(ds, goodbye, "outputs", "goodbye.txt")
requireFile(ds, hello, "outputs", "hello.txt")
err := DownloadResults(
context.Background(),
[]model.PublishedResult{
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-1",
CID: res.cid,
},
},
{
NodeID: "testnode",
Data: model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
Name: "result-2",
CID: res2.cid,
},
},
},
ds.downloadProvider,
ds.downloadSettings,
)
require.Error(ds.T(), err)
}

func (ds *DownloaderSuite) TestOutputWithNoStdFiles() {
Expand Down
1 change: 1 addition & 0 deletions pkg/model/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type DownloaderSettings struct {
OutputDir string
IPFSSwarmAddrs string
LocalIPFS bool
Raw bool
}

0 comments on commit f0ae6b9

Please sign in to comment.