Skip to content

Commit

Permalink
fix: update failed jobs payload
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Jan 2, 2025
1 parent e268f73 commit e6a2122
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,10 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso
func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput {
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs))
return common.AsyncUploadOutput{
ImportingJobIDs: asyncDest.ImportingJobIDs,
ImportingCount: len(asyncDest.ImportingJobIDs),
FailedReason: failedReason,
DestinationID: asyncDest.Destination.ID,
FailedJobIDs: asyncDest.ImportingJobIDs,
FailedCount: len(asyncDest.ImportingJobIDs),
FailedReason: failedReason,
DestinationID: asyncDest.Destination.ID,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestSnowpipeStreaming(t *testing.T) {
require.Equal(t, false, sm.authzBackoff.isInBackoff())
})

t.Run("Upload with discards table authorization error should not abort the job", func(t *testing.T) {
t.Run("Upload with discards table authorization error should mark the job as failed", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)

Expand All @@ -415,9 +415,6 @@ func TestSnowpipeStreaming(t *testing.T) {
"RUDDER_DISCARDS": func() (*model.ChannelResponse, error) {
return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil
},
"USERS": func() (*model.ChannelResponse, error) {
return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil
},
},
}
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
Expand All @@ -431,9 +428,8 @@ func TestSnowpipeStreaming(t *testing.T) {
Destination: destination,
FileName: "testdata/successful_user_records.txt",
})
require.Equal(t, 0, output.FailedCount)
require.Equal(t, 1, output.FailedCount)
require.Equal(t, 0, output.AbortCount)
require.Equal(t, 1, output.ImportingCount)
require.NotEmpty(t, output.FailedReason)
require.Empty(t, output.AbortReason)
require.Equal(t, true, sm.authzBackoff.isInBackoff())
Expand Down

0 comments on commit e6a2122

Please sign in to comment.