Skip to content

Commit

Permalink
chore: send sample event to reporting in async destinations (#5396)
Browse files Browse the repository at this point in the history
* chore: async destination reporting

* chore: removing redundant properties

* chore: review comments

---------

Co-authored-by: Sai Sankeerth <[email protected]>
  • Loading branch information
sanpj2292 and Sai Sankeerth authored Jan 8, 2025
1 parent b51605a commit 688262e
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 53 deletions.
145 changes: 93 additions & 52 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/samber/lo"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -43,7 +44,11 @@ func (brt *Handle) getImportingJobs(ctx context.Context, destinationID string, l
}

func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string, allJobs, completedJobs []*jobsdb.JobT, statusList []*jobsdb.JobStatusT) error {
reportMetrics := brt.getReportMetrics(statusList, brt.getParamertsFromJobs(allJobs))
reportMetrics := brt.getReportMetrics(getReportMetricsParams{
StatusList: statusList,
ParametersMap: brt.getParamertsFromJobs(allJobs),
JobsList: allJobs,
})

parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destinationID}}
return misc.RetryWithNotify(ctx, brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
Expand Down Expand Up @@ -364,7 +369,13 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 {
brt.asyncDestinationStruct[destinationID].UploadInProgress = true
}
brt.setMultipleJobStatus(uploadResponse, true, brt.asyncDestinationStruct[destinationID].AttemptNums, brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, brt.asyncDestinationStruct[destinationID].OriginalJobParameters)
brt.setMultipleJobStatus(setMultipleJobStatusParams{
AsyncOutput: uploadResponse,
Attempted: true,
AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums,
FirstAttemptedAts: brt.asyncDestinationStruct[destinationID].FirstAttemptedAts,
OriginalJobParameters: brt.asyncDestinationStruct[destinationID].OriginalJobParameters,
})
brt.asyncStructCleanUp(destinationID)
}
brt.asyncDestinationStruct[destinationID].UploadMutex.Unlock()
Expand Down Expand Up @@ -452,7 +463,13 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) {
out.SuccessResponse = fmt.Sprintf(`{"error":"%s"`, rterror.DisabledEgress.Error()) // skipcq: GO-R4002
}

brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs))
brt.setMultipleJobStatus(setMultipleJobStatusParams{
AsyncOutput: out,
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
JobsList: batchJobs.Jobs,
})
return
}

Expand All @@ -469,7 +486,13 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) {
out.FailedReason = `Jobs flowed over the prescribed limit`
}

brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs))
brt.setMultipleJobStatus(setMultipleJobStatusParams{
AsyncOutput: out,
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
JobsList: batchJobs.Jobs,
})
return
}
}
Expand Down Expand Up @@ -533,7 +556,15 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) {
out.FailedReason = `Jobs flowed over the prescribed limit`
}

brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs))
brt.setMultipleJobStatus(
setMultipleJobStatusParams{
AsyncOutput: out,
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
JobsList: batchJobs.Jobs,
},
)
}

newAttemptNums := getAttemptNumbers(batchJobs.Jobs)
Expand All @@ -560,17 +591,20 @@ func (brt *Handle) createFakeJob(jobID int64, parameters stdjson.RawMessage) *jo
}
}

func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersMap map[int64]stdjson.RawMessage) []*utilTypes.PUReportedMetric {
func (brt *Handle) getReportMetrics(params getReportMetricsParams) []*utilTypes.PUReportedMetric {
reportMetrics := make([]*utilTypes.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails)
transformedAtMap := make(map[string]string)
statusDetailsMap := make(map[string]*utilTypes.StatusDetail)
routerWorkspaceJobStatusCount := make(map[string]int)
for _, status := range statusList {
jobsMap := lo.SliceToMap(params.JobsList, func(j *jobsdb.JobT) (int64, *jobsdb.JobT) {
return j.JobID, j
})
for _, status := range params.StatusList {
var parameters routerutils.JobParameters
err := json.Unmarshal(parametersMap[status.JobID], &parameters)
err := json.Unmarshal(params.ParametersMap[status.JobID], &parameters)
if err != nil {
brt.logger.Error("Unmarshal of job parameters failed. ", string(parametersMap[status.JobID]))
brt.logger.Error("Unmarshal of job parameters failed. ", string(params.ParametersMap[status.JobID]))
}
workspaceID := status.WorkspaceId
eventName := parameters.EventName
Expand Down Expand Up @@ -598,6 +632,9 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM
errorCode = 0
}
sampleEvent := routerutils.EmptyPayload
if job, ok := jobsMap[status.JobID]; ok {
sampleEvent = job.EventPayload
}
sd = &utilTypes.StatusDetail{
Status: status.JobState,
StatusCode: errorCode,
Expand Down Expand Up @@ -647,105 +684,105 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM
return reportMetrics
}

func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, attempted bool, attemptNums map[int64]int, firstAttemptedAts map[int64]time.Time, originalJobParameters map[int64]stdjson.RawMessage) {
workspaceID := brt.GetWorkspaceIDForDestID(asyncOutput.DestinationID)
func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
workspaceID := brt.GetWorkspaceIDForDestID(params.AsyncOutput.DestinationID)
var completedJobsList []*jobsdb.JobT
var statusList []*jobsdb.JobStatusT
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
if len(asyncOutput.ImportingJobIDs) > 0 {
for _, jobId := range asyncOutput.ImportingJobIDs {
if len(params.AsyncOutput.ImportingJobIDs) > 0 {
for _, jobId := range params.AsyncOutput.ImportingJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
DestinationID: params.AsyncOutput.DestinationID,
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
}
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Importing.State,
AttemptNum: attemptNums[jobId] + 1,
AttemptNum: params.AttemptNums[jobId] + 1,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload),
Parameters: asyncOutput.ImportingParameters,
JobParameters: originalJobParameters[jobId],
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload),
Parameters: params.AsyncOutput.ImportingParameters,
JobParameters: params.OriginalJobParameters[jobId],
WorkspaceId: workspaceID,
}
statusList = append(statusList, &status)
}
}
if len(asyncOutput.SucceededJobIDs) > 0 {
for _, jobId := range asyncOutput.SucceededJobIDs {
if len(params.AsyncOutput.SucceededJobIDs) > 0 {
for _, jobId := range params.AsyncOutput.SucceededJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
DestinationID: params.AsyncOutput.DestinationID,
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
}
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Succeeded.State,
AttemptNum: attemptNums[jobId],
AttemptNum: params.AttemptNums[jobId],
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(asyncOutput.SuccessResponse)),
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(params.AsyncOutput.SuccessResponse)),
Parameters: routerutils.EmptyPayload,
JobParameters: originalJobParameters[jobId],
JobParameters: params.OriginalJobParameters[jobId],
WorkspaceId: workspaceID,
}
statusList = append(statusList, &status)
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId]))
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
}
}
if len(asyncOutput.FailedJobIDs) > 0 {
for _, jobId := range asyncOutput.FailedJobIDs {
if len(params.AsyncOutput.FailedJobIDs) > 0 {
for _, jobId := range params.AsyncOutput.FailedJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
DestinationID: params.AsyncOutput.DestinationID,
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
}
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.FailedReason)
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.FailedReason)
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Failed.State,
AttemptNum: attemptNums[jobId],
AttemptNum: params.AttemptNums[jobId],
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "500",
ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", resp),
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", resp),
Parameters: routerutils.EmptyPayload,
JobParameters: originalJobParameters[jobId],
JobParameters: params.OriginalJobParameters[jobId],
WorkspaceId: workspaceID,
}
if attempted {
status.AttemptNum = attemptNums[jobId] + 1
if params.Attempted {
status.AttemptNum = params.AttemptNums[jobId] + 1
}

if brt.retryLimitReached(&status) {
status.JobState = jobsdb.Aborted.State
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId]))
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
}
statusList = append(statusList, &status)
}
}
if len(asyncOutput.AbortJobIDs) > 0 {
for _, jobId := range asyncOutput.AbortJobIDs {
if len(params.AsyncOutput.AbortJobIDs) > 0 {
for _, jobId := range params.AsyncOutput.AbortJobIDs {
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
DestinationID: asyncOutput.DestinationID,
SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(),
DestinationID: params.AsyncOutput.DestinationID,
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
}
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.AbortReason)
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.AbortReason)
status := jobsdb.JobStatusT{
JobID: jobId,
JobState: jobsdb.Aborted.State,
AttemptNum: attemptNums[jobId],
AttemptNum: params.AttemptNums[jobId],
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "400",
ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)),
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)),
Parameters: routerutils.EmptyPayload,
JobParameters: originalJobParameters[jobId],
JobParameters: params.OriginalJobParameters[jobId],
WorkspaceId: workspaceID,
}
statusList = append(statusList, &status)
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId]))
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
}
}

Expand All @@ -756,11 +793,15 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
parameterFilters := []jobsdb.ParameterFilterT{
{
Name: "destination_id",
Value: asyncOutput.DestinationID,
Value: params.AsyncOutput.DestinationID,
},
}

reportMetrics := brt.getReportMetrics(statusList, originalJobParameters)
reportMetrics := brt.getReportMetrics(getReportMetricsParams{
StatusList: statusList,
ParametersMap: params.OriginalJobParameters,
JobsList: params.JobsList,
})

// Mark the status of the jobs
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
Expand Down Expand Up @@ -794,12 +835,12 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
brt.destType,
float64(len(completedJobsList)),
)
if attempted {
if params.Attempted {
var sourceID string
if len(statusList) > 0 {
sourceID = gjson.GetBytes(originalJobParameters[statusList[0].JobID], "source_id").String()
sourceID = gjson.GetBytes(params.OriginalJobParameters[statusList[0].JobID], "source_id").String()
}
brt.recordAsyncDestinationDeliveryStatus(sourceID, asyncOutput.DestinationID, statusList)
brt.recordAsyncDestinationDeliveryStatus(sourceID, params.AsyncOutput.DestinationID, statusList)
}
}

Expand Down
17 changes: 17 additions & 0 deletions router/batchrouter/types.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package batchrouter

import (
stdjson "encoding/json"
"time"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
router_utils "github.com/rudderlabs/rudder-server/router/utils"
)

Expand Down Expand Up @@ -59,3 +61,18 @@ type BatchedJobs struct {
TimeWindow time.Time
JobState string // ENUM waiting, executing, succeeded, waiting_retry, filtered, failed, aborted, migrating, migrated, wont_migrate
}

type getReportMetricsParams struct {
StatusList []*jobsdb.JobStatusT
ParametersMap map[int64]stdjson.RawMessage
JobsList []*jobsdb.JobT
}

type setMultipleJobStatusParams struct {
AsyncOutput common.AsyncUploadOutput
Attempted bool
AttemptNums map[int64]int
FirstAttemptedAts map[int64]time.Time
OriginalJobParameters map[int64]stdjson.RawMessage
JobsList []*jobsdb.JobT
}
5 changes: 4 additions & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
if err != nil {
panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err))
}
reportMetrics := brt.getReportMetrics(drainList, brt.getParamertsFromJobs(drainJobList))
reportMetrics := brt.getReportMetrics(getReportMetricsParams{
StatusList: drainList,
ParametersMap: brt.getParamertsFromJobs(drainJobList),
})
err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, drainList, []string{brt.destType}, parameterFilters)
Expand Down

0 comments on commit 688262e

Please sign in to comment.