diff --git a/warehouse/app.go b/warehouse/app.go index 9fc61a2b71..e78c0e7ac8 100644 --- a/warehouse/app.go +++ b/warehouse/app.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "os" - "slices" "sync" "sync/atomic" "time" @@ -44,7 +43,6 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/router" "github.com/rudderlabs/rudder-server/warehouse/slave" "github.com/rudderlabs/rudder-server/warehouse/source" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) type App struct { @@ -396,9 +394,32 @@ func (a *App) Run(ctx context.Context) error { g.Go(misc.WithBugsnagForWarehouse(func() error { return a.notifier.ClearJobs(gCtx) })) + + routerManager := router.NewLifecycleManager( + a.tenantManager, + func(ctx context.Context, destType string) error { + r := router.New( + a.reporting, + destType, + a.conf, + a.logger.Child("router"), + a.statsFactory, + a.db, + a.notifier, + a.tenantManager, + a.controlPlaneClient, + a.bcManager, + a.encodingFactory, + a.triggerStore, + a.createUploadAlways, + ) + return r.Start(ctx) + }, + ) g.Go(misc.WithBugsnagForWarehouse(func() error { - return a.monitorDestRouters(gCtx) + return routerManager.Run(gCtx) })) + g.Go(misc.WithBugsnagForWarehouse(func() error { archive.CronArchiver(gCtx, archive.New( a.conf, @@ -433,63 +454,3 @@ func (a *App) Run(ctx context.Context) error { return g.Wait() } - -// Gets the config from config backend and extracts enabled write keys -func (a *App) monitorDestRouters(ctx context.Context) error { - dstToWhRouter := make(map[string]*router.Router) - g, ctx := errgroup.WithContext(ctx) - ch := a.tenantManager.WatchConfig(ctx) - for configData := range ch { - diffRouters := a.onConfigDataEvent(configData, dstToWhRouter) - for _, r := range diffRouters { - g.Go(func() error { return r.Start(ctx) }) - } - } - return g.Wait() -} - -func (a *App) onConfigDataEvent( - configMap map[string]backendconfig.ConfigT, - dstToWhRouter map[string]*router.Router, -) map[string]*router.Router { - enabledDestinations := make(map[string]bool) - diffRouters := make(map[string]*router.Router) - for _, wConfig := range configMap { - for _, source := range wConfig.Sources { - for _, destination := range source.Destinations { - enabledDestinations[destination.DestinationDefinition.Name] = true - - if !slices.Contains(warehouseutils.WarehouseDestinations, destination.DestinationDefinition.Name) { - continue - } - - _, ok := dstToWhRouter[destination.DestinationDefinition.Name] - if ok { - a.logger.Debug("Enabling existing Destination: ", destination.DestinationDefinition.Name) - continue - } - - a.logger.Info("Starting a new Warehouse Destination Router: ", destination.DestinationDefinition.Name) - - r := router.New( - a.reporting, - destination.DestinationDefinition.Name, - a.conf, - a.logger.Child("router"), - a.statsFactory, - a.db, - a.notifier, - a.tenantManager, - a.controlPlaneClient, - a.bcManager, - a.encodingFactory, - a.triggerStore, - a.createUploadAlways, - ) - dstToWhRouter[destination.DestinationDefinition.Name] = r - diffRouters[destination.DestinationDefinition.Name] = r - } - } - } - return diffRouters -} diff --git a/warehouse/app_test.go b/warehouse/app_test.go index d180ef7bf6..7ef1975dfd 100644 --- a/warehouse/app_test.go +++ b/warehouse/app_test.go @@ -365,7 +365,6 @@ func TestApp(t *testing.T) { a := New(mockApp, c, logger.NOP, stats.NOP, mockBackendConfig, filemanager.New) require.NoError(t, a.Setup(ctx)) cancel() - require.Contains(t, a.monitorDestRouters(ctx).Error(), "reset in progress: context canceled") }) t.Run("rudder core recovery mode", func(t *testing.T) { t.Run("stand alone", func(t *testing.T) { diff --git a/warehouse/router/factory.go b/warehouse/router/factory.go new file mode 100644 index 0000000000..0ef95d2b06 --- /dev/null +++ b/warehouse/router/factory.go @@ -0,0 +1,49 @@ +package router + +import ( + "sync" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/services/controlplane" + "github.com/rudderlabs/rudder-server/services/notifier" + "github.com/rudderlabs/rudder-server/utils/types" + "github.com/rudderlabs/rudder-server/warehouse/bcm" + "github.com/rudderlabs/rudder-server/warehouse/encoding" + "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + "github.com/rudderlabs/rudder-server/warehouse/multitenant" +) + +type Factory struct { + reporting types.Reporting + conf *config.Config + logger logger.Logger + statsFactory stats.Stats + db *sqlquerywrapper.DB + notifier *notifier.Notifier + tenantManager *multitenant.Manager + controlPlaneClient *controlplane.Client + bcManager *bcm.BackendConfigManager + encodingFactory *encoding.Factory + triggerStore *sync.Map + createUploadAlways createUploadAlwaysLoader +} + +func (f *Factory) New(destType string) *Router { + return New( + f.reporting, + destType, + f.conf, + f.logger.Child("router"), + f.statsFactory, + f.db, + f.notifier, + f.tenantManager, + f.controlPlaneClient, + f.bcManager, + f.encodingFactory, + f.triggerStore, + f.createUploadAlways, + ) +} diff --git a/warehouse/router/lifecycle.go b/warehouse/router/lifecycle.go new file mode 100644 index 0000000000..aae710006b --- /dev/null +++ b/warehouse/router/lifecycle.go @@ -0,0 +1,82 @@ +package router + +import ( + "context" + "slices" + + "golang.org/x/sync/errgroup" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" +) + +type configWatcher interface { + WatchConfig(ctx context.Context) <-chan map[string]backendconfig.ConfigT +} + +type LifecycleManager struct { + watcher configWatcher + handler func(ctx context.Context, destType string) error + + destTypes map[string]struct{} +} + +func NewLifecycleManager( + watcher configWatcher, + onDestType func(ctx context.Context, destType string) error, +) *LifecycleManager { + return &LifecycleManager{ + watcher: watcher, + handler: onDestType, + destTypes: make(map[string]struct{}), + } +} + +func (lm *LifecycleManager) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + ch := lm.watcher.WatchConfig(ctx) + g.Go(func() error { + for configData := range ch { + destTypes := lm.newDestTypes(configData) + for _, d := range destTypes { + g.Go(func() error { + return lm.handler(ctx, d) + }) + } + } + return nil + }) + + return g.Wait() +} + +func (lm *LifecycleManager) newDestTypes( + configMap map[string]backendconfig.ConfigT, +) []string { + var newDestTypes []string + + for _, wConfig := range configMap { + for _, source := range wConfig.Sources { + for _, destination := range source.Destinations { + + destType := destination.DestinationDefinition.Name + + if !slices.Contains(warehouseutils.WarehouseDestinations, destType) { + continue + } + + _, ok := lm.destTypes[destType] + if ok { + continue + } + + lm.destTypes[destType] = struct{}{} + + newDestTypes = append(newDestTypes, destType) + } + } + } + + return newDestTypes +} diff --git a/warehouse/router/lifecycle_test.go b/warehouse/router/lifecycle_test.go new file mode 100644 index 0000000000..4d0174740c --- /dev/null +++ b/warehouse/router/lifecycle_test.go @@ -0,0 +1,131 @@ +package router + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/utils/pubsub" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" +) + +func TestLifecycle(t *testing.T) { + t.Run("normal lifecycle", func(t *testing.T) { + ps := &configPubSub{} + runningDestTypes := []string{} + + wg := sync.WaitGroup{} + + lm := NewLifecycleManager(ps, func(ctx context.Context, destType string) error { + runningDestTypes = append(runningDestTypes, destType) + wg.Done() + <-ctx.Done() + return nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan error, 1) + go func() { + defer close(done) + done <- lm.Run(ctx) + }() + + wg.Add(1) + ps.PublishConfig(configGen(t, whutils.POSTGRES, whutils.POSTGRES, "CLOUD_DESTINATION")) + wg.Wait() + require.Equal(t, []string{whutils.POSTGRES}, runningDestTypes) + + wg.Add(1) + ps.PublishConfig(configGen(t, whutils.POSTGRES, whutils.SNOWFLAKE)) + wg.Wait() + require.Equal(t, []string{whutils.POSTGRES, whutils.SNOWFLAKE}, runningDestTypes) + + cancel() + err := <-done + require.NoError(t, err) + }) + + t.Run("error in handler", func(t *testing.T) { + ps := &configPubSub{} + wg := sync.WaitGroup{} + triggerErr := make(chan error) + + lm := NewLifecycleManager(ps, func(ctx context.Context, destType string) error { + wg.Done() + + return <-triggerErr + }) + + done := make(chan error, 1) + go func() { + defer close(done) + done <- lm.Run(context.Background()) + }() + + t.Log("setup a handler") + wg.Add(1) + ps.PublishConfig(configGen(t, whutils.POSTGRES)) + wg.Wait() + + triggerErr <- fmt.Errorf("some error") + + require.Equal(t, fmt.Errorf("some error"), <-done) + }) +} + +type configPubSub struct { + ps pubsub.PublishSubscriber +} + +func (cp *configPubSub) WatchConfig(ctx context.Context) <-chan map[string]backendconfig.ConfigT { + chIn := cp.ps.Subscribe(ctx, "test_config") + chOut := make(chan map[string]backendconfig.ConfigT) + go func() { + for data := range chIn { + input := data.Data.(map[string]backendconfig.ConfigT) + chOut <- input + } + close(chOut) + }() + + return chOut +} + +func (cp *configPubSub) PublishConfig(config map[string]backendconfig.ConfigT) { + cp.ps.Publish("test_config", config) +} + +func configGen(t testing.TB, destType ...string) map[string]backendconfig.ConfigT { + t.Helper() + + dsts := []backendconfig.DestinationT{} + for _, dt := range destType { + dsts = append(dsts, backendconfig.DestinationT{ + ID: uuid.NewString(), + Enabled: true, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: dt, + }, + }) + } + + wID := uuid.NewString() + return map[string]backendconfig.ConfigT{ + wID: { + WorkspaceID: wID, + Sources: []backendconfig.SourceT{ + { + ID: uuid.NewString(), + Enabled: true, + Destinations: dsts, + }, + }, + }, + } +}