diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go index fcf7415f97..31292d2c65 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go @@ -406,10 +406,10 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput { m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs)) return common.AsyncUploadOutput{ - ImportingJobIDs: asyncDest.ImportingJobIDs, - ImportingCount: len(asyncDest.ImportingJobIDs), - FailedReason: failedReason, - DestinationID: asyncDest.Destination.ID, + FailedJobIDs: asyncDest.ImportingJobIDs, + FailedCount: len(asyncDest.ImportingJobIDs), + FailedReason: failedReason, + DestinationID: asyncDest.Destination.ID, } } diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go index de2ab45246..688c977702 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go @@ -405,7 +405,7 @@ func TestSnowpipeStreaming(t *testing.T) { require.Equal(t, false, sm.authzBackoff.isInBackoff()) }) - t.Run("Upload with discards table authorization error should not abort the job", func(t *testing.T) { + t.Run("Upload with discards table authorization error should mark the job as failed", func(t *testing.T) { statsStore, err := memstats.New() require.NoError(t, err) @@ -415,9 +415,6 @@ func TestSnowpipeStreaming(t *testing.T) { "RUDDER_DISCARDS": func() (*model.ChannelResponse, error) { return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil }, - "USERS": func() (*model.ChannelResponse, error) { - return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil - }, }, } sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) { @@ -431,9 +428,8 @@ func TestSnowpipeStreaming(t *testing.T) { Destination: destination, FileName: "testdata/successful_user_records.txt", }) - require.Equal(t, 0, output.FailedCount) + require.Equal(t, 1, output.FailedCount) require.Equal(t, 0, output.AbortCount) - require.Equal(t, 1, output.ImportingCount) require.NotEmpty(t, output.FailedReason) require.Empty(t, output.AbortReason) require.Equal(t, true, sm.authzBackoff.isInBackoff())