diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ff73fe014..c8a6b1da93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.40.1](https://github.com/rudderlabs/rudder-server/compare/v1.40.0...v1.40.1) (2025-01-08) + + +### Bug Fixes + +* warehouse router tracker ([#5407](https://github.com/rudderlabs/rudder-server/issues/5407)) ([8e314b6](https://github.com/rudderlabs/rudder-server/commit/8e314b6160c37283fbbe889f1dbad86f994943b1)) + ## [1.40.0](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.40.0) (2025-01-06) diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index 3cfe020c0e..10e7c07377 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -62,3 +62,7 @@ func (w *Warehouse) GetPreferAppendSetting() bool { } return value } + +func (w *Warehouse) IsEnabled() bool { + return w.Source.Enabled && w.Destination.Enabled +} diff --git a/warehouse/router/identities.go b/warehouse/router/identities.go index 51edc96eee..aa6862c6fc 100644 --- a/warehouse/router/identities.go +++ b/warehouse/router/identities.go @@ -18,7 +18,6 @@ import ( "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/utils/timeutil" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -309,7 +308,7 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse) RETURNING id `, warehouseutils.WarehouseUploadsTable) - now := timeutil.Now() + now := r.now() row := r.db.QueryRow( sqlStatement, warehouse.Source.ID, diff --git a/warehouse/router/router.go b/warehouse/router/router.go index b2e66d249d..15b54a067d 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -105,16 +105,17 @@ type Router struct { stagingFilesBatchSize config.ValueLoader[int] warehouseSyncFreqIgnore config.ValueLoader[bool] cronTrackerRetries config.ValueLoader[int64] + uploadBufferTimeInMin config.ValueLoader[time.Duration] } stats struct { - processingPendingJobsStat stats.Measurement - processingAvailableWorkersStat stats.Measurement - processingPickupLagStat stats.Measurement - processingPickupWaitTimeStat stats.Measurement - - schedulerWarehouseLengthStat stats.Measurement - schedulerTotalSchedulingTimeStat stats.Measurement + processingPendingJobsStat stats.Gauge + processingAvailableWorkersStat stats.Gauge + processingPickupLagStat stats.Timer + processingPickupWaitTimeStat stats.Timer + schedulerWarehouseLengthStat stats.Gauge + schedulerTotalSchedulingTimeStat stats.Timer + cronTrackerExecTimestamp stats.Gauge } } @@ -150,7 +151,7 @@ func New( r.tenantManager = tenantManager r.bcManager = bcManager r.destType = destType - r.now = time.Now + r.now = timeutil.Now r.triggerStore = triggerStore r.createJobMarkerMap = make(map[string]time.Time) r.createUploadAlways = createUploadAlways @@ -200,7 +201,7 @@ func (r *Router) Start(ctx context.Context) error { return nil })) g.Go(crash.NotifyWarehouse(func() error { - return r.CronTracker(gCtx) + return r.cronTracker(gCtx) })) return g.Wait() } @@ -616,7 +617,7 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse func (r *Router) uploadStartAfterTime() time.Time { if r.config.enableJitterForSyncs.Load() { - return timeutil.Now().Add(time.Duration(rand.Intn(15)) * time.Second) + return r.now().Add(time.Duration(rand.Intn(15)) * time.Second) } return r.now() } @@ -711,14 +712,32 @@ func (r *Router) loadReloadableConfig(whName string) { r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs") r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore") r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries") + r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin") } func (r *Router) loadStats() { - tags := stats.Tags{"destType": r.destType} + tags := stats.Tags{"module": moduleName, "destType": r.destType} r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, tags) r.stats.processingAvailableWorkersStat = r.statsFactory.NewTaggedStat("wh_processing_available_workers", stats.GaugeType, tags) r.stats.processingPickupLagStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_lag", stats.TimerType, tags) r.stats.processingPickupWaitTimeStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_wait_time", stats.TimerType, tags) - r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler.warehouse_length", stats.GaugeType, tags) - r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler.total_scheduling_time", stats.TimerType, tags) + r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler_warehouse_length", stats.GaugeType, tags) + r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler_total_scheduling_time", stats.TimerType, tags) + r.stats.cronTrackerExecTimestamp = r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, tags) +} + +func (r *Router) copyWarehouses() []model.Warehouse { + r.configSubscriberLock.RLock() + defer r.configSubscriberLock.RUnlock() + + warehouses := make([]model.Warehouse, len(r.warehouses)) + copy(warehouses, r.warehouses) + return warehouses +} + +func (r *Router) getNowSQL() string { + if r.nowSQL != "" { + return r.nowSQL + } + return "NOW()" } diff --git a/warehouse/router/router_test.go b/warehouse/router/router_test.go index 9feb3dfd34..c0759305d8 100644 --- a/warehouse/router/router_test.go +++ b/warehouse/router/router_test.go @@ -32,6 +32,7 @@ import ( "github.com/rudderlabs/rudder-server/services/notifier" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" "github.com/rudderlabs/rudder-server/utils/pubsub" + "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/warehouse/bcm" "github.com/rudderlabs/rudder-server/warehouse/encoding" sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" @@ -143,7 +144,7 @@ func TestRouter(t *testing.T) { db := sqlmiddleware.New(pgResource.DB) - now := time.Now() + now := timeutil.Now() repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time { return now diff --git a/warehouse/router/scheduling_test.go b/warehouse/router/scheduling_test.go index ec1146c023..1c8d2b1fc6 100644 --- a/warehouse/router/scheduling_test.go +++ b/warehouse/router/scheduling_test.go @@ -18,6 +18,7 @@ import ( backendConfig "github.com/rudderlabs/rudder-server/backend-config" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" + "github.com/rudderlabs/rudder-server/utils/timeutil" sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/internal/repo" @@ -235,7 +236,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { Identifier: "test_identifier_upload_frequency_exceeded", } - now := time.Now() + now := timeutil.Now() r := Router{} r.conf = config.New() @@ -260,7 +261,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { Identifier: "test_identifier_upload_frequency_exceeded", } - now := time.Now() + now := timeutil.Now() r := Router{} r.conf = config.New() @@ -316,7 +317,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { }, } - now := time.Now() + now := timeutil.Now() r := Router{} r.conf = config.New() @@ -344,7 +345,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { }, } - now := time.Now() + now := timeutil.Now() r := Router{} r.conf = config.New() @@ -453,7 +454,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { return tc.now } - r.updateCreateJobMarker(w, time.Now()) + r.updateCreateJobMarker(w, now) err := r.canCreateUpload(context.Background(), w) if tc.wantErr != nil { diff --git a/warehouse/router/testdata/sql/seed_tracker_test.sql b/warehouse/router/testdata/sql/seed_tracker_test.sql deleted file mode 100644 index 5e41a5ed1a..0000000000 --- a/warehouse/router/testdata/sql/seed_tracker_test.sql +++ /dev/null @@ -1,23 +0,0 @@ -BEGIN; -INSERT INTO wh_staging_files (id, location, schema, source_id, destination_id, status, total_events, first_event_at, - last_event_at, created_at, updated_at, metadata) -VALUES (1, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:23:37.100685', NOW(), '{}'), - (2, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:24:37.100685', NOW(), '{}'), - (3, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:25:37.100685', NOW(), '{}'), - (4, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:26:37.100685', NOW(), '{}'), - (5, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:27:37.100685', NOW(), '{}'), - (6, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID-1', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:27:37.100685', NOW(), '{}'); -INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type, start_staging_file_id, - end_staging_file_id, start_load_file_id, end_load_file_id, status, schema, error, first_event_at, - last_event_at, last_exec_at, timings, created_at, updated_at, metadata, - in_progress, workspace_id) -VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exported_data', '{}', - '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}', TRUE, - 'test-workspaceID'); -COMMIT; diff --git a/warehouse/router/tracker.go b/warehouse/router/tracker.go index 9b00dd06fc..2902785a88 100644 --- a/warehouse/router/tracker.go +++ b/warehouse/router/tracker.go @@ -10,196 +10,170 @@ import ( "github.com/cenkalti/backoff/v4" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/warehouse/internal/model" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -// CronTracker Track the status of the staging file whether it has reached the terminal state or not for every warehouse +// cronTracker Track the status of the staging file whether it has reached the terminal state or not for every warehouse // we pick the staging file which is oldest within the range NOW() - 2 * syncFrequency and NOW() - 3 * syncFrequency -func (r *Router) CronTracker(ctx context.Context) error { - cronTrackerExecTimestamp := r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, stats.Tags{ - "module": moduleName, - "destType": r.destType, - }) +// and checks if the corresponding upload has reached the terminal state or not. +// If the upload has not reached the terminal state, then we send a gauge metric with value 1 else 0 +func (r *Router) cronTracker(ctx context.Context) error { for { - - execTime := time.Now() - cronTrackerExecTimestamp.Gauge(execTime.Unix()) - - r.configSubscriberLock.RLock() - warehouses := append([]model.Warehouse{}, r.warehouses...) - r.configSubscriberLock.RUnlock() - - for _, warehouse := range warehouses { - b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(r.config.cronTrackerRetries.Load())), ctx) - err := backoff.Retry(func() error { - return r.Track(ctx, &warehouse, r.conf) - }, b) - if err != nil { - r.logger.Errorn( - "cron tracker failed for", - obskit.SourceID(warehouse.Source.ID), - obskit.DestinationID(warehouse.Destination.ID), - obskit.Error(err), - ) - break + execTime := r.now() + r.stats.cronTrackerExecTimestamp.Gauge(execTime.Unix()) + + for _, warehouse := range r.copyWarehouses() { + if err := r.retryTrackSync(ctx, &warehouse); err != nil { + if ctx.Err() != nil { + return nil //nolint:nilerr + } + return fmt.Errorf("cron tracker: %w", err) } } nextExecTime := execTime.Add(r.config.uploadStatusTrackFrequency) select { case <-ctx.Done(): - r.logger.Infon("context is cancelled, stopped running tracking") + r.logger.Infon("Context cancelled. Exiting cron tracker") return nil case <-time.After(time.Until(nextExecTime)): } } } -// Track tracks the status of the warehouse uploads for the corresponding cases: -// 1. Staging files is not picked. -// 2. Upload job is struck -func (r *Router) Track( - ctx context.Context, - warehouse *model.Warehouse, - config *config.Config, -) error { - var ( - createdAt sql.NullTime - exists bool - syncFrequency = "1440" - now = timeutil.Now - nowSQL = "NOW()" - failedStatusRegex = "%_failed" - timeWindow = config.GetDuration("Warehouse.uploadBufferTimeInMin", 180, time.Minute) - source = warehouse.Source - destination = warehouse.Destination - ) - - if r.nowSQL != "" { - nowSQL = r.nowSQL - } - if r.now != nil { - now = r.now +func (r *Router) retryTrackSync(ctx context.Context, warehouse *model.Warehouse) error { + o := func() error { + return r.trackSync(ctx, warehouse) } + b := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), + uint64(r.config.cronTrackerRetries.Load()), + ), + ctx, + ) + return backoff.Retry(o, b) +} - trackUploadMissingStat := r.statsFactory.NewTaggedStat("warehouse_track_upload_missing", stats.GaugeType, stats.Tags{ - "workspaceId": warehouse.WorkspaceID, - "module": moduleName, - "destType": r.destType, - "sourceId": source.ID, - "destinationId": destination.ID, - "warehouseID": misc.GetTagName( - destination.ID, - source.Name, - destination.Name, - misc.TailTruncateStr(source.ID, 6)), - }) - trackUploadMissingStat.Gauge(0) - - if !source.Enabled || !destination.Enabled { +func (r *Router) trackSync(ctx context.Context, warehouse *model.Warehouse) error { + if !warehouse.IsEnabled() || r.isWithinExcludeWindow(warehouse) { return nil } - excludeWindow := warehouse.GetMapDestinationConfig(model.ExcludeWindowSetting) - excludeWindowStartTime, excludeWindowEndTime := excludeWindowStartEndTimes(excludeWindow) - if checkCurrentTimeExistsInExcludeWindow(now(), excludeWindowStartTime, excludeWindowEndTime) { + createdAt, err := r.getOldestStagingFile(ctx, warehouse) + if err != nil { + return err + } + if createdAt.IsZero() { return nil } - if sf := warehouse.GetStringDestinationConfig(r.conf, model.SyncFrequencySetting); sf != "" { - syncFrequency = sf - } - if value, err := strconv.Atoi(syncFrequency); err == nil { - timeWindow += time.Duration(value) * time.Minute + exists, err := r.checkUploadStatus(ctx, warehouse, createdAt) + if err != nil { + return err } + r.recordUploadMissingMetric(warehouse, exists) + return nil +} + +func (r *Router) isWithinExcludeWindow(warehouse *model.Warehouse) bool { + excludeWindow := warehouse.GetMapDestinationConfig(model.ExcludeWindowSetting) + startTime, endTime := excludeWindowStartEndTimes(excludeWindow) + return checkCurrentTimeExistsInExcludeWindow(r.now(), startTime, endTime) +} + +func (r *Router) getOldestStagingFile(ctx context.Context, warehouse *model.Warehouse) (time.Time, error) { + nowSQL := r.getNowSQL() + timeWindow := r.calculateTimeWindow(warehouse) + query := fmt.Sprintf(` - SELECT - created_at - FROM - %[1]s - WHERE - source_id = $1 AND - destination_id = $2 AND - created_at > %[2]s - $3 * INTERVAL '1 MIN' AND - created_at < %[2]s - $4 * INTERVAL '1 MIN' - ORDER BY - id DESC - LIMIT - 1; - `, - warehouseutils.WarehouseStagingFilesTable, + SELECT created_at + FROM `+whutils.WarehouseStagingFilesTable+` + WHERE source_id = $1 + AND destination_id = $2 + AND created_at > %[1]s - $3 * INTERVAL '1 MIN' + AND created_at < %[1]s - $4 * INTERVAL '1 MIN' + ORDER BY id DESC + LIMIT 1;`, nowSQL, ) - queryArgs := []interface{}{ - source.ID, - destination.ID, + queryArgs := []any{ + warehouse.Source.ID, + warehouse.Destination.ID, 2 * timeWindow / time.Minute, timeWindow / time.Minute, } + var createdAt sql.NullTime err := r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&createdAt) - if errors.Is(err, sql.ErrNoRows) { - return nil + if err != nil && errors.Is(err, sql.ErrNoRows) { + return time.Time{}, nil } if err != nil { - return fmt.Errorf("fetching last upload time for source: %s and destination: %s: %w", source.ID, destination.ID, err) + return time.Time{}, fmt.Errorf("fetching oldest staging file for source %s and destination %s: %w", + warehouse.Source.ID, warehouse.Destination.ID, err) } - if !createdAt.Valid { - return fmt.Errorf("invalid last upload time for source: %s and destination: %s", source.ID, destination.ID) + return time.Time{}, fmt.Errorf("invalid created_at time for source %s and destination %s", + warehouse.Source.ID, warehouse.Destination.ID) } + return createdAt.Time, nil +} - query = ` - SELECT - EXISTS ( - SELECT - 1 - FROM - ` + warehouseutils.WarehouseUploadsTable + ` - WHERE - source_id = $1 AND - destination_id = $2 AND - ( - status = $3 - OR status = $4 - OR status LIKE $5 - ) AND - updated_at > $6 - ); - ` - queryArgs = []interface{}{ - source.ID, - destination.ID, +func (r *Router) calculateTimeWindow(warehouse *model.Warehouse) time.Duration { + timeWindow := r.config.uploadBufferTimeInMin.Load() + syncFrequency := warehouse.GetStringDestinationConfig(r.conf, model.SyncFrequencySetting) + if syncFrequency != "" { + if value, err := strconv.Atoi(syncFrequency); err == nil { + timeWindow += time.Duration(value) * time.Minute + } + } + return timeWindow +} + +func (r *Router) checkUploadStatus(ctx context.Context, warehouse *model.Warehouse, createdAt time.Time) (bool, error) { + query := ` + SELECT EXISTS ( + SELECT 1 + FROM ` + whutils.WarehouseUploadsTable + ` + WHERE source_id = $1 AND destination_id = $2 AND + (status = $3 OR status = $4 OR status LIKE $5) AND + updated_at > $6 + );` + queryArgs := []any{ + warehouse.Source.ID, + warehouse.Destination.ID, model.ExportedData, model.Aborted, - failedStatusRegex, - createdAt.Time.Format(misc.RFC3339Milli), + "%_failed", + createdAt.Format(misc.RFC3339Milli), } - err = r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&exists) + var exists bool + err := r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&exists) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return fmt.Errorf("fetching last upload status for source: %s and destination: %s: %w", source.ID, destination.ID, err) + return false, fmt.Errorf("checking upload status for source %s and destination %s: %w", + warehouse.Source.ID, warehouse.Destination.ID, err) } + return exists, nil +} - if !exists { - r.logger.Warnn("pending staging files not picked", - obskit.SourceID(source.ID), - obskit.SourceType(source.SourceDefinition.Name), - obskit.DestinationID(destination.ID), - obskit.DestinationType(destination.DestinationDefinition.Name), - obskit.WorkspaceID(warehouse.WorkspaceID), - ) - - trackUploadMissingStat.Gauge(1) +func (r *Router) recordUploadMissingMetric(warehouse *model.Warehouse, exists bool) { + metric := r.statsFactory.NewTaggedStat("warehouse_track_upload_missing", stats.GaugeType, stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + if exists { + metric.Gauge(0) + } else { + metric.Gauge(1) } - - return nil } diff --git a/warehouse/router/tracker_test.go b/warehouse/router/tracker_test.go index 5de2942076..ea7b68e38a 100644 --- a/warehouse/router/tracker_test.go +++ b/warehouse/router/tracker_test.go @@ -2,230 +2,379 @@ package router import ( "context" - "errors" - "os" + "fmt" "testing" "time" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/logger/mock_logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/memstats" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" backendconfig "github.com/rudderlabs/rudder-server/backend-config" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" - "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + "github.com/rudderlabs/rudder-server/utils/timeutil" + sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/repo" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -func TestRouter_Track(t *testing.T) { - var ( - workspaceID = "test-workspaceID" - sourceID = "test-sourceID" - sourceName = "test-sourceName" - destID = "test-destinationID" - destName = "test-destinationName" - destType = warehouseutils.POSTGRES - ) - - testcases := []struct { - name string - destID string - destDisabled bool - wantErr error - missing bool - NowSQL string - exclusionWindow map[string]any - uploadBufferTime string - }{ - { - name: "unknown destination", - destID: "unknown-destination", - }, - { - name: "disabled destination", - destID: destID, - destDisabled: true, - }, - { - name: "successful upload exists", - destID: destID, - missing: false, - }, - { - name: "successful upload exists with upload buffer time", - destID: destID, - missing: false, - uploadBufferTime: "0m", - }, - { - name: "exclusion window", - destID: destID, - missing: false, - exclusionWindow: map[string]any{ - "excludeWindowStartTime": "05:09", - "excludeWindowEndTime": "09:07", +func TestRouter_CronTrack(t *testing.T) { + t.Run("source / destination disabled", func(t *testing.T) { + ctx := context.Background() + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: false, }, - }, - { - name: "no successful upload exists", - destID: "test-destinationID-1", - missing: true, - }, - { - name: "throw error while fetching last upload time", - destID: destID, - missing: false, - NowSQL: "ABC", - wantErr: errors.New("fetching last upload time for source: test-sourceID and destination: test-destinationID: pq: column \"abc\" does not exist"), - }, - } - - for _, tc := range testcases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pgResource, err := postgres.Setup(pool, t) - require.NoError(t, err) - - t.Log("db:", pgResource.DBDsn) - - err = (&migrator.Migrator{ - Handle: pgResource.DB, - MigrationsTable: "wh_schema_migrations", - }).Migrate("warehouse") - require.NoError(t, err) - - sqlStatement, err := os.ReadFile("testdata/sql/seed_tracker_test.sql") - require.NoError(t, err) - - _, err = pgResource.DB.Exec(string(sqlStatement)) - require.NoError(t, err) - - statsStore, err := memstats.New() - require.NoError(t, err) - - ctx := context.Background() - nowSQL := "'2022-12-06 15:40:00'::timestamp" - - now, err := time.Parse(misc.RFC3339Milli, "2022-12-06T06:19:00.169Z") - require.NoError(t, err) - - conf := config.New() - if tc.uploadBufferTime != "" { - conf.Set("Warehouse.uploadBufferTimeInMin", tc.uploadBufferTime) - } else { - conf.Set("Warehouse.uploadBufferTimeInMin", 0) - } - - warehouse := model.Warehouse{ - WorkspaceID: workspaceID, - Source: backendconfig.SourceT{ - ID: sourceID, - Name: sourceName, - Enabled: true, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: false, + Config: map[string]any{ + "syncFrequency": "30", }, - Destination: backendconfig.DestinationT{ - ID: tc.destID, - Name: destName, - Enabled: !tc.destDisabled, - Config: map[string]any{ - "syncFrequency": "10", - "excludeWindow": tc.exclusionWindow, + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: timeutil.Now, + statsFactory: statsStore, + logger: logger.NOP, + } + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("exclusion window", func(t *testing.T) { + ctx := context.Background() + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + "excludeWindow": map[string]any{ + "excludeWindowStartTime": "05:09", + "excludeWindowEndTime": "09:07", }, }, - } + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + }, + statsFactory: statsStore, + logger: logger.NOP, + } + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("no staging files present", func(t *testing.T) { + db, ctx := setupDB(t), context.Background() - if tc.NowSQL != "" { - nowSQL = tc.NowSQL - } + statsStore, err := memstats.New() + require.NoError(t, err) - handle := Router{ - conf: config.New(), - destType: destType, - now: func() time.Time { - return now + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", }, - nowSQL: nowSQL, - statsFactory: statsStore, - db: sqlquerywrapper.New(pgResource.DB), - logger: logger.NOP, - } - - err = handle.Track(ctx, &warehouse, conf) - if tc.wantErr != nil { - require.EqualError(t, err, tc.wantErr.Error()) - return - } - require.NoError(t, err) - - m := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ - "module": moduleName, - "workspaceId": warehouse.WorkspaceID, - "destType": handle.destType, - "sourceId": warehouse.Source.ID, - "destinationId": warehouse.Destination.ID, - "warehouseID": misc.GetTagName( - warehouse.Destination.ID, - warehouse.Source.Name, - warehouse.Destination.Name, - misc.TailTruncateStr(warehouse.Source.ID, 6)), + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: timeutil.Now, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("staging files without missing uploads", func(t *testing.T) { + testCases := []struct { + name string + status string + }{ + {name: "ExportedData", status: model.ExportedData}, + {name: "Aborted", status: model.Aborted}, + {name: "Failed", status: model.ExportingDataFailed}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + db, ctx := setupDB(t), context.Background() + + now := time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + nowSQL := fmt.Sprintf("'%s'::timestamp", now.Format(time.DateTime)) + + repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour*1 - time.Minute*30) + })) + repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour * 1) + })) + + stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{ + StagingFile: model.StagingFile{ + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }, + }) + require.NoError(t, err) + + _, err = repoUpload.CreateWithStagingFiles(ctx, model.Upload{ + WorkspaceID: "test-workspaceID", + Namespace: "namespace", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + DestinationType: whutils.POSTGRES, + Status: tc.status, + }, []*model.StagingFile{{ + ID: stagingID, + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }}) + require.NoError(t, err) + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: whutils.POSTGRES, + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return now + }, + nowSQL: nowSQL, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + + uploadMissingStat := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + require.NotNil(t, uploadMissingStat) + require.EqualValues(t, 0, uploadMissingStat.LastValue()) }) + } + }) + t.Run("staging files with missing uploads", func(t *testing.T) { + db, ctx := setupDB(t), context.Background() - if tc.missing { - require.EqualValues(t, m.LastValue(), 1) - } else { - require.EqualValues(t, m.LastValue(), 0) - } + now := time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + nowSQL := fmt.Sprintf("'%s'::timestamp", now.Format(time.DateTime)) + + repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour*1 - time.Minute*30) + })) + + _, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{ + StagingFile: model.StagingFile{ + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }, }) - } -} + require.NoError(t, err) -func TestRouter_CronTracker(t *testing.T) { + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: whutils.POSTGRES, + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return now + }, + nowSQL: nowSQL, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + + uploadMissingStat := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + require.NotNil(t, uploadMissingStat) + require.EqualValues(t, 1, uploadMissingStat.LastValue()) + }) t.Run("context cancelled", func(t *testing.T) { - t.Parallel() + db, ctx := setupDB(t), context.Background() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) cancel() - mockCtrl := gomock.NewController(t) - mockLogger := mock_logger.NewMockLogger(mockCtrl) - statsStore, err := memstats.New() require.NoError(t, err) + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } + r := Router{ - logger: mockLogger, + conf: config.New(), + destType: whutils.POSTGRES, + now: time.Now, statsFactory: statsStore, - destType: warehouseutils.POSTGRES, + db: db, + logger: logger.NOP, + warehouses: []model.Warehouse{warehouse}, } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + r.config.cronTrackerRetries = config.SingleValueLoader(int64(5)) + r.stats.cronTrackerExecTimestamp = statsStore.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, stats.Tags{"module": moduleName, "destType": r.destType}) + + require.NoError(t, r.cronTracker(ctx)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) +} - mockLogger.EXPECT().Infon("context is cancelled, stopped running tracking").Times(1) +func setupDB(t testing.TB) *sqlmiddleware.DB { + t.Helper() - executionTime := time.Now().Unix() - err = r.CronTracker(ctx) - require.NoError(t, err) + pool, err := dockertest.NewPool("") + require.NoError(t, err) - m := statsStore.GetByName("warehouse_cron_tracker_timestamp_seconds") - require.Equal(t, len(m), 1) - require.Equal(t, m[0].Name, "warehouse_cron_tracker_timestamp_seconds") - require.Equal(t, m[0].Tags, stats.Tags{ - "module": moduleName, - "destType": warehouseutils.POSTGRES, - }) - require.GreaterOrEqual(t, m[0].Value, float64(executionTime)) - }) + pgResource, err := postgres.Setup(pool, t) + require.NoError(t, err) + + require.NoError(t, (&migrator.Migrator{ + Handle: pgResource.DB, + MigrationsTable: "wh_schema_migrations", + }).Migrate("warehouse")) + + return sqlmiddleware.New(pgResource.DB) }