From 21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 17 Dec 2024 10:27:59 +0530 Subject: [PATCH 1/6] fix: processing pickup race condition (#5374) --- warehouse/router/router.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/warehouse/router/router.go b/warehouse/router/router.go index 7c02155477..b2e66d249d 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -74,6 +74,8 @@ type Router struct { inProgressMap map[workerIdentifierMapKey][]jobID inProgressMapLock sync.RWMutex + processingMu sync.Mutex + scheduledTimesCache map[string][]int scheduledTimesCacheLock sync.RWMutex @@ -371,18 +373,22 @@ loop: continue } + r.processingMu.Lock() inProgressNamespaces := r.getInProgressNamespaces() r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces) uploadJobsToProcess, err := r.uploadsToProcess(ctx, availableWorkers, inProgressNamespaces) if err != nil && ctx.Err() == nil { r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err)) + r.processingMu.Unlock() return err } - for _, uploadJob := range uploadJobsToProcess { r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID) + } + r.processingMu.Unlock() + for _, uploadJob := range uploadJobsToProcess { workerName := r.workerIdentifier(uploadJob.warehouse) r.workerChannelMapLock.RLock() @@ -596,6 +602,9 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse return defaultUploadPriority, nil } + r.processingMu.Lock() + defer r.processingMu.Unlock() + // If it is present do nothing else delete it if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress { if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil { From ab867a5ec9abdfe660e1e2ef8d19d7cfc34b3146 Mon Sep 17 00:00:00 2001 From: devops-github-rudderstack <88187154+devops-github-rudderstack@users.noreply.github.com> Date: Tue, 17 Dec 2024 10:51:31 +0530 Subject: [PATCH 2/6] chore: release 1.39.1 (#5376) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa1cc7fc5e..d7ee5b9a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17) + + +### Bug Fixes + +* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([21f0d13](https://github.com/rudderlabs/rudder-server/commit/21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea)) + ## [1.39.0](https://github.com/rudderlabs/rudder-server/compare/v1.38.0...v1.39.0) (2024-12-10) From a0ef0f15ca456fc459233bf70c324cc60cfca7f0 Mon Sep 17 00:00:00 2001 From: Mihir Gandhi Date: Wed, 18 Dec 2024 18:12:55 +0530 Subject: [PATCH 3/6] chore: add error msg in the logs when gw req fails (#5369) * chore: add error msg in the logs when gw req fails * addressed comments * addressed comments * update go version and schema version * update go version in docker file * downgrade swagger-editor-validate * update action to apidom-validate --- .github/workflows/verify.yml | 2 +- Dockerfile | 2 +- gateway/handle.go | 31 ++++++++++++++++++++++++--- go.mod | 6 +++--- go.sum | 8 +++---- suppression-backup-service/Dockerfile | 2 +- 6 files changed, 38 insertions(+), 13 deletions(-) diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index 92e92bec2a..ac8d7ff81b 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -51,7 +51,7 @@ jobs: if: ${{ failure() }} run: echo 'Not formatted files. Ensure you have run `make fmt` and committed the files locally.' - name: Validate OpenAPI definition - uses: char0n/swagger-editor-validate@v1 + uses: char0n/apidom-validate@v1 with: definition-file: gateway/openapi.yaml diff --git a/Dockerfile b/Dockerfile index a38c0b66ea..2db21beee9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ # syntax=docker/dockerfile:1 # GO_VERSION is updated automatically to match go.mod, see Makefile -ARG GO_VERSION=1.23.3 +ARG GO_VERSION=1.23.4 ARG ALPINE_VERSION=3.20 FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder ARG VERSION diff --git a/gateway/handle.go b/gateway/handle.go index 402ae15bd3..abacefee13 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -709,11 +709,11 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { errorMessage = err.Error() status = response.GetErrorStatusCode(errorMessage) responseBody = response.GetStatus(errorMessage) - gw.logger.Infon("response", + gw.logger.Errorn("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), logger.NewStringField("path", r.URL.Path), logger.NewIntField("status", int64(status)), - logger.NewStringField("body", responseBody), + obskit.Error(err), ) gw.logger.Debugn("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), @@ -763,6 +763,8 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt if err != nil { stat.RequestFailed(response.InvalidJSON) stat.Report(gw.stats) + gw.logger.Errorn("invalid json in request", + obskit.Error(err)) return nil, errors.New(response.InvalidJSON) } gw.requestSizeStat.Observe(float64(len(body))) @@ -770,6 +772,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt if len(messages) == 0 { stat.RequestFailed(response.NotRudderEvent) stat.Report(gw.stats) + gw.logger.Errorn("no messages in request") return nil, errors.New(response.NotRudderEvent) } @@ -779,7 +782,10 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt stat := gwstats.SourceStat{ReqType: reqType} err := gw.streamMsgValidator(&msg) if err != nil { - gw.logger.Errorn("invalid message in request", logger.NewErrorField(err)) + loggerFields := msg.Properties.LoggerFields() + loggerFields = append(loggerFields, obskit.Error(err)) + gw.logger.Errorn("invalid message in request", + loggerFields...) stat.RequestEventsFailed(1, response.InvalidStreamMessage) stat.Report(gw.stats) return nil, errors.New(response.InvalidStreamMessage) @@ -793,6 +799,9 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt if err != nil { stat.RequestEventsFailed(1, response.NotRudderEvent) stat.Report(gw.stats) + loggerFields := msg.Properties.LoggerFields() + loggerFields = append(loggerFields, obskit.Error(err)) + gw.logger.Errorn("failed to set type in message", loggerFields...) return nil, errors.New(response.NotRudderEvent) } } @@ -806,6 +815,8 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt if err != nil { stat.RequestFailed(response.NotRudderEvent) stat.Report(gw.stats) + gw.logger.Errorn("failed to set messageID in message", + obskit.Error(err)) return nil, errors.New(response.NotRudderEvent) } } @@ -813,12 +824,18 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt if err != nil { stat.RequestFailed(response.NotRudderEvent) stat.Report(gw.stats) + gw.logger.Errorn("failed to get rudderId", + obskit.Error(err)) return nil, errors.New(response.NotRudderEvent) } msg.Payload, err = sjson.SetBytes(msg.Payload, "rudderId", rudderId.String()) if err != nil { stat.RequestFailed(response.NotRudderEvent) stat.Report(gw.stats) + loggerFields := msg.Properties.LoggerFields() + loggerFields = append(loggerFields, obskit.Error(err)) + gw.logger.Errorn("failed to set rudderId in message", + loggerFields...) return nil, errors.New(response.NotRudderEvent) } writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) @@ -877,6 +894,8 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt err = fmt.Errorf("filling receivedAt: %w", err) stat.RequestEventsFailed(1, err.Error()) stat.Report(gw.stats) + gw.logger.Errorn("failed to fill receivedAt in message", + obskit.Error(err)) return nil, fmt.Errorf("filling receivedAt: %w", err) } msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP) @@ -884,6 +903,8 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt err = fmt.Errorf("filling request_ip: %w", err) stat.RequestEventsFailed(1, err.Error()) stat.Report(gw.stats) + gw.logger.Errorn("failed to fill request_ip in message", + obskit.Error(err)) return nil, fmt.Errorf("filling request_ip: %w", err) } @@ -899,6 +920,10 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt err = fmt.Errorf("marshalling event batch: %w", err) stat.RequestEventsFailed(1, err.Error()) stat.Report(gw.stats) + loggerFields := msg.Properties.LoggerFields() + loggerFields = append(loggerFields, obskit.Error(err)) + gw.logger.Errorn("failed to marshal event batch", + loggerFields...) return nil, fmt.Errorf("marshalling event batch: %w", err) } jobUUID := uuid.New() diff --git a/go.mod b/go.mod index ff3e77c14c..db74ebe06a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/rudderlabs/rudder-server -go 1.23.3 +go 1.23.4 // Addressing snyk vulnerabilities in indirect dependencies // When upgrading a dependency, please make sure that @@ -78,7 +78,7 @@ require ( github.com/rudderlabs/compose-test v0.1.3 github.com/rudderlabs/rudder-go-kit v0.45.0 github.com/rudderlabs/rudder-observability-kit v0.0.3 - github.com/rudderlabs/rudder-schemas v0.5.3 + github.com/rudderlabs/rudder-schemas v0.5.4 github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a github.com/rudderlabs/sql-tunnels v0.1.7 github.com/rudderlabs/sqlconnect-go v1.13.0 @@ -214,7 +214,7 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.22.1 // indirect + github.com/go-playground/validator/v10 v10.23.0 // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/go-viper/mapstructure/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 9d8f12ec86..08fc077a34 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,8 @@ github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/Nu github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= -github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA= -github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o= +github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= @@ -1175,8 +1175,8 @@ github.com/rudderlabs/rudder-go-kit v0.45.0 h1:y8ModVsl2rAdqnqv/di82cddkaEBT1qZv github.com/rudderlabs/rudder-go-kit v0.45.0/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4= github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q= github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8= -github.com/rudderlabs/rudder-schemas v0.5.3 h1:IWWjAo2TzsjwHNhS2EAr1+0MjvA8BoTpJvB2o/GFwNU= -github.com/rudderlabs/rudder-schemas v0.5.3/go.mod h1:iUpjG/Zb+ioZcNLvXNYXSKQ2LpPlsIDBfxfCDH9ue/E= +github.com/rudderlabs/rudder-schemas v0.5.4 h1:QzI6vIC38W0jGJu6E0vdwh4IAtSKx8ziqff+JL0xmIE= +github.com/rudderlabs/rudder-schemas v0.5.4/go.mod h1:oypQtew1d2jBGJdFMT4zf1XOvXgGEcNPtRFy6vRDAv8= github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a h1:OZcvpApxEYNkB9UNXrKDUBufQ24Lsr2Cs0pw70tzXBw= github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE= github.com/rudderlabs/sql-tunnels v0.1.7 h1:wDCRl6zY4M5gfWazf7XkSTGQS3yjBzUiUgEMBIfHNDA= diff --git a/suppression-backup-service/Dockerfile b/suppression-backup-service/Dockerfile index f10a80d4e2..adabf15d31 100644 --- a/suppression-backup-service/Dockerfile +++ b/suppression-backup-service/Dockerfile @@ -1,7 +1,7 @@ # syntax=docker/dockerfile:1 # GO_VERSION is updated automatically to match go.mod, see Makefile -ARG GO_VERSION=1.23.3 +ARG GO_VERSION=1.23.4 ARG ALPINE_VERSION=3.20 FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder RUN mkdir /app From abd2cdba2fb4e2b5e5f779540dff38cbeab899e1 Mon Sep 17 00:00:00 2001 From: devops-github-rudderstack <88187154+devops-github-rudderstack@users.noreply.github.com> Date: Wed, 18 Dec 2024 18:59:23 +0530 Subject: [PATCH 4/6] chore: release 1.39.2 (#5382) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7ee5b9a15..d462b71a70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.39.2](https://github.com/rudderlabs/rudder-server/compare/v1.39.1...v1.39.2) (2024-12-18) + + +### Miscellaneous + +* add error msg in the logs when gw req fails ([#5369](https://github.com/rudderlabs/rudder-server/issues/5369)) ([a0ef0f1](https://github.com/rudderlabs/rudder-server/commit/a0ef0f15ca456fc459233bf70c324cc60cfca7f0)) + ## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17) From 74056f8211b3ed5b4c3d0485753401371bc53751 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Mon, 23 Dec 2024 15:04:26 +0530 Subject: [PATCH 5/6] fix: replay tracking plan bug (#5389) --- processor/trackingplan.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 33fdaea873..a085517b6f 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -89,6 +89,10 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans continue } + transformerEvent := eventList[0] + destination := &transformerEvent.Destination + commonMetaData := makeCommonMetadataFromTransformerEvent(&transformerEvent) + validationStart := time.Now() response := proc.transformer.Validate(context.TODO(), eventList, proc.config.userTransformBatchSize.Load()) validationStat.tpValidationTime.Since(validationStart) @@ -103,15 +107,9 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans } enhanceWithViolation(response, eventList[0].Metadata.TrackingPlanID, eventList[0].Metadata.TrackingPlanVersion) - - transformerEvent := eventList[0] - destination := &transformerEvent.Destination - sourceID := transformerEvent.Metadata.SourceID - commonMetaData := makeCommonMetadataFromTransformerEvent(&transformerEvent) - // Set trackingPlanEnabledMap for the sourceID to true. // This is being used to distinguish the flows in reporting service - trackingPlanEnabledMap[SourceIDT(sourceID)] = true + trackingPlanEnabledMap[sourceId] = true var successMetrics []*types.PUReportedMetric eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, backendconfig.Connection{}, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. @@ -146,15 +144,16 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans func makeCommonMetadataFromTransformerEvent(transformerEvent *transformer.TransformerEvent) *transformer.Metadata { metadata := transformerEvent.Metadata commonMetaData := transformer.Metadata{ - SourceID: metadata.SourceID, - SourceName: metadata.SourceName, - SourceType: metadata.SourceType, - SourceCategory: metadata.SourceCategory, - WorkspaceID: metadata.WorkspaceID, - Namespace: config.GetKubeNamespace(), - InstanceID: misc.GetInstanceID(), - DestinationID: metadata.DestinationID, - DestinationType: metadata.DestinationType, + SourceID: metadata.SourceID, + SourceName: metadata.SourceName, + SourceType: metadata.SourceType, + SourceCategory: metadata.SourceCategory, + WorkspaceID: metadata.WorkspaceID, + Namespace: config.GetKubeNamespace(), + InstanceID: misc.GetInstanceID(), + DestinationID: metadata.DestinationID, + DestinationType: metadata.DestinationType, + OriginalSourceID: metadata.OriginalSourceID, } return &commonMetaData } From adce18ffa50615be0bbcb8822ae3f9cb846c39e6 Mon Sep 17 00:00:00 2001 From: devops-github-rudderstack <88187154+devops-github-rudderstack@users.noreply.github.com> Date: Mon, 23 Dec 2024 16:02:17 +0530 Subject: [PATCH 6/6] chore: release 1.39.3 (#5391) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d462b71a70..6e1c8633ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.39.3](https://github.com/rudderlabs/rudder-server/compare/v1.39.2...v1.39.3) (2024-12-23) + + +### Bug Fixes + +* replay tracking plan bug ([#5389](https://github.com/rudderlabs/rudder-server/issues/5389)) ([74056f8](https://github.com/rudderlabs/rudder-server/commit/74056f8211b3ed5b4c3d0485753401371bc53751)) + ## [1.39.2](https://github.com/rudderlabs/rudder-server/compare/v1.39.1...v1.39.2) (2024-12-18)