Skip to content

Commit

Permalink
fix: router lifecycle to handle router error
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Jun 12, 2024
1 parent 4565b1f commit b0499f3
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 64 deletions.
87 changes: 24 additions & 63 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"os"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -44,7 +43,6 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/router"
"github.com/rudderlabs/rudder-server/warehouse/slave"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type App struct {
Expand Down Expand Up @@ -396,9 +394,32 @@ func (a *App) Run(ctx context.Context) error {
g.Go(misc.WithBugsnagForWarehouse(func() error {
return a.notifier.ClearJobs(gCtx)
}))

routerManager := router.NewLifecycleManager(
a.tenantManager,
func(ctx context.Context, destType string) error {
r := router.New(
a.reporting,
destType,
a.conf,
a.logger.Child("router"),
a.statsFactory,
a.db,
a.notifier,
a.tenantManager,
a.controlPlaneClient,
a.bcManager,
a.encodingFactory,
a.triggerStore,
a.createUploadAlways,
)
return r.Start(ctx)
},
)
g.Go(misc.WithBugsnagForWarehouse(func() error {
return a.monitorDestRouters(gCtx)
return routerManager.Run(gCtx)
}))

g.Go(misc.WithBugsnagForWarehouse(func() error {
archive.CronArchiver(gCtx, archive.New(
a.conf,
Expand Down Expand Up @@ -433,63 +454,3 @@ func (a *App) Run(ctx context.Context) error {

return g.Wait()
}

// Gets the config from config backend and extracts enabled write keys
func (a *App) monitorDestRouters(ctx context.Context) error {
dstToWhRouter := make(map[string]*router.Router)
g, ctx := errgroup.WithContext(ctx)
ch := a.tenantManager.WatchConfig(ctx)
for configData := range ch {
diffRouters := a.onConfigDataEvent(configData, dstToWhRouter)
for _, r := range diffRouters {
g.Go(func() error { return r.Start(ctx) })
}
}
return g.Wait()
}

func (a *App) onConfigDataEvent(
configMap map[string]backendconfig.ConfigT,
dstToWhRouter map[string]*router.Router,
) map[string]*router.Router {
enabledDestinations := make(map[string]bool)
diffRouters := make(map[string]*router.Router)
for _, wConfig := range configMap {
for _, source := range wConfig.Sources {
for _, destination := range source.Destinations {
enabledDestinations[destination.DestinationDefinition.Name] = true

if !slices.Contains(warehouseutils.WarehouseDestinations, destination.DestinationDefinition.Name) {
continue
}

_, ok := dstToWhRouter[destination.DestinationDefinition.Name]
if ok {
a.logger.Debug("Enabling existing Destination: ", destination.DestinationDefinition.Name)
continue
}

a.logger.Info("Starting a new Warehouse Destination Router: ", destination.DestinationDefinition.Name)

r := router.New(
a.reporting,
destination.DestinationDefinition.Name,
a.conf,
a.logger.Child("router"),
a.statsFactory,
a.db,
a.notifier,
a.tenantManager,
a.controlPlaneClient,
a.bcManager,
a.encodingFactory,
a.triggerStore,
a.createUploadAlways,
)
dstToWhRouter[destination.DestinationDefinition.Name] = r
diffRouters[destination.DestinationDefinition.Name] = r
}
}
}
return diffRouters
}
1 change: 0 additions & 1 deletion warehouse/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ func TestApp(t *testing.T) {
a := New(mockApp, c, logger.NOP, stats.NOP, mockBackendConfig, filemanager.New)
require.NoError(t, a.Setup(ctx))
cancel()
require.Contains(t, a.monitorDestRouters(ctx).Error(), "reset in progress: context canceled")
})
t.Run("rudder core recovery mode", func(t *testing.T) {
t.Run("stand alone", func(t *testing.T) {
Expand Down
49 changes: 49 additions & 0 deletions warehouse/router/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package router

import (
"sync"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/controlplane"
"github.com/rudderlabs/rudder-server/services/notifier"
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/warehouse/bcm"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
)

type Factory struct {
reporting types.Reporting
conf *config.Config
logger logger.Logger
statsFactory stats.Stats
db *sqlquerywrapper.DB
notifier *notifier.Notifier
tenantManager *multitenant.Manager
controlPlaneClient *controlplane.Client
bcManager *bcm.BackendConfigManager
encodingFactory *encoding.Factory
triggerStore *sync.Map
createUploadAlways createUploadAlwaysLoader
}

func (f *Factory) New(destType string) *Router {
return New(
f.reporting,
destType,
f.conf,
f.logger.Child("router"),
f.statsFactory,
f.db,
f.notifier,
f.tenantManager,
f.controlPlaneClient,
f.bcManager,
f.encodingFactory,
f.triggerStore,
f.createUploadAlways,
)
}
82 changes: 82 additions & 0 deletions warehouse/router/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package router

import (
"context"
"slices"

"golang.org/x/sync/errgroup"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type configWatcher interface {
WatchConfig(ctx context.Context) <-chan map[string]backendconfig.ConfigT
}

type LifecycleManager struct {
watcher configWatcher
handler func(ctx context.Context, destType string) error

destTypes map[string]struct{}
}

func NewLifecycleManager(
watcher configWatcher,
onDestType func(ctx context.Context, destType string) error,
) *LifecycleManager {
return &LifecycleManager{
watcher: watcher,
handler: onDestType,
destTypes: make(map[string]struct{}),
}
}

func (lm *LifecycleManager) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)

ch := lm.watcher.WatchConfig(ctx)
g.Go(func() error {
for configData := range ch {
destTypes := lm.newDestTypes(configData)
for _, d := range destTypes {
g.Go(func() error {
return lm.handler(ctx, d)
})
}
}
return nil
})

return g.Wait()
}

func (lm *LifecycleManager) newDestTypes(
configMap map[string]backendconfig.ConfigT,
) []string {
var newDestTypes []string

for _, wConfig := range configMap {
for _, source := range wConfig.Sources {
for _, destination := range source.Destinations {

destType := destination.DestinationDefinition.Name

if !slices.Contains(warehouseutils.WarehouseDestinations, destType) {
continue
}

_, ok := lm.destTypes[destType]
if ok {
continue
}

lm.destTypes[destType] = struct{}{}

newDestTypes = append(newDestTypes, destType)
}
}
}

return newDestTypes
}
131 changes: 131 additions & 0 deletions warehouse/router/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package router

import (
"context"
"fmt"
"sync"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func TestLifecycle(t *testing.T) {
t.Run("normal lifecycle", func(t *testing.T) {
ps := &configPubSub{}
runningDestTypes := []string{}

wg := sync.WaitGroup{}

lm := NewLifecycleManager(ps, func(ctx context.Context, destType string) error {
runningDestTypes = append(runningDestTypes, destType)
wg.Done()
<-ctx.Done()
return nil
})

ctx, cancel := context.WithCancel(context.Background())

done := make(chan error, 1)
go func() {
defer close(done)
done <- lm.Run(ctx)
}()

wg.Add(1)
ps.PublishConfig(configGen(t, whutils.POSTGRES, whutils.POSTGRES, "CLOUD_DESTINATION"))
wg.Wait()
require.Equal(t, []string{whutils.POSTGRES}, runningDestTypes)

wg.Add(1)
ps.PublishConfig(configGen(t, whutils.POSTGRES, whutils.SNOWFLAKE))
wg.Wait()
require.Equal(t, []string{whutils.POSTGRES, whutils.SNOWFLAKE}, runningDestTypes)

cancel()
err := <-done
require.NoError(t, err)
})

t.Run("error in handler", func(t *testing.T) {
ps := &configPubSub{}
wg := sync.WaitGroup{}
triggerErr := make(chan error)

lm := NewLifecycleManager(ps, func(ctx context.Context, destType string) error {
wg.Done()

return <-triggerErr
})

done := make(chan error, 1)
go func() {
defer close(done)
done <- lm.Run(context.Background())
}()

t.Log("setup a handler")
wg.Add(1)
ps.PublishConfig(configGen(t, whutils.POSTGRES))
wg.Wait()

triggerErr <- fmt.Errorf("some error")

require.Equal(t, fmt.Errorf("some error"), <-done)
})
}

type configPubSub struct {
ps pubsub.PublishSubscriber
}

func (cp *configPubSub) WatchConfig(ctx context.Context) <-chan map[string]backendconfig.ConfigT {
chIn := cp.ps.Subscribe(ctx, "test_config")
chOut := make(chan map[string]backendconfig.ConfigT)
go func() {
for data := range chIn {
input := data.Data.(map[string]backendconfig.ConfigT)
chOut <- input
}
close(chOut)
}()

return chOut
}

func (cp *configPubSub) PublishConfig(config map[string]backendconfig.ConfigT) {
cp.ps.Publish("test_config", config)
}

func configGen(t testing.TB, destType ...string) map[string]backendconfig.ConfigT {
t.Helper()

dsts := []backendconfig.DestinationT{}
for _, dt := range destType {
dsts = append(dsts, backendconfig.DestinationT{
ID: uuid.NewString(),
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: dt,
},
})
}

wID := uuid.NewString()
return map[string]backendconfig.ConfigT{
wID: {
WorkspaceID: wID,
Sources: []backendconfig.SourceT{
{
ID: uuid.NewString(),
Enabled: true,
Destinations: dsts,
},
},
},
}
}

0 comments on commit b0499f3

Please sign in to comment.