Skip to content

Commit

Permalink
fix: implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Jan 9, 2025
1 parent 9ef0b40 commit d38e842
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 54 deletions.
1 change: 1 addition & 0 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func TestIntegration(t *testing.T) {
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_ENABLE_DELETE_BY_JOBS", "true")
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_MAX_PARALLEL_LOADS", "8")
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_SLOW_QUERY_THRESHOLD", "0s")
t.Setenv("RSERVER_WAREHOUSE_SYNC_SCHEMA_FREQUENCY", "5s")

whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)

Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (r *Router) loadReloadableConfig(whName string) {
r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries")
r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin")
r.config.syncSchemaFrequency = r.conf.GetDurationVar(12, time.Hour, "Warehouse.syncSchemaFrequency")
r.config.enableSyncSchema = r.conf.GetBoolVar(true, "Warehouse.enableSyncSchema")
r.config.enableSyncSchema = r.conf.GetBoolVar(true, "Warehouse.enableSyncSchema")
}

func (r *Router) loadStats() {
Expand Down
15 changes: 8 additions & 7 deletions warehouse/router/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package router

import (
"context"
"fmt"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
Expand All @@ -18,20 +17,22 @@ func (r *Router) sync(ctx context.Context) error {
warehouses := append([]model.Warehouse{}, r.warehouses...)
r.configSubscriberLock.RUnlock()
execTime := time.Now()
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
return fmt.Errorf("failed to create warehouse manager: %w", err)
}
for _, warehouse := range warehouses {
err := whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader())
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
r.logger.Errorn("create warehouse manager: %w", obskit.Error(err))
continue
}
err = whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader())
if err != nil {
r.logger.Errorn("failed to setup WH Manager", obskit.Error(err))
continue
}
if err := schema.SyncSchema(ctx, whManager, warehouse, r.db, r.logger.Child("syncer")); err != nil {
if err := schema.FetchAndSaveSchema(ctx, whManager, warehouse, r.db, r.logger.Child("syncer")); err != nil {
r.logger.Errorn("failed to sync schema", obskit.Error(err))
continue
}
whManager.Cleanup(ctx)
}
nextExecTime := execTime.Add(r.config.syncSchemaFrequency)
select {
Expand Down
12 changes: 9 additions & 3 deletions warehouse/router/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type mockFetchSchemaRepo struct{}

func (m mockFetchSchemaRepo) FetchSchema(ctx context.Context) (model.Schema, error) {
return model.Schema{}, nil
}

func TestSync_SyncRemoteSchemaIntegration(t *testing.T) {
destinationType := warehouseutils.POSTGRES
bucket := "some-bucket"
Expand Down Expand Up @@ -124,16 +130,16 @@ func TestSync_SyncRemoteSchemaIntegration(t *testing.T) {

<-setupCh
r.conf.Set("Warehouse.enableSyncSchema", true)
sh, err := schema.New(
context.Background(),
sh := schema.New(
r.db,
warehouse,
r.conf,
r.logger.Child("syncer"),
r.statsFactory,
)
require.NoError(t, err)
require.Eventually(t, func() bool {
_, err := sh.SyncRemoteSchema(ctx, &mockFetchSchemaRepo{}, 0)
require.NoError(t, err)
schema, err := sh.GetLocalSchema(ctx)
require.NoError(t, err)
return reflect.DeepEqual(schema, model.Schema{
Expand Down
7 changes: 1 addition & 6 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,13 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
logfield.UseRudderStorage, dto.Upload.UseRudderStorage,
)

schemaHandle, err := schema.New(
ctx,
schemaHandle := schema.New(
f.db,
dto.Warehouse,
f.conf,
f.logger.Child("warehouse"),
f.statsFactory,
)
if err != nil {
log.Errorw("failed to create schema handler", logfield.Error, err)
return nil
}

uj := &UploadJob{
ctx: ujCtx,
Expand Down
9 changes: 9 additions & 0 deletions warehouse/router/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,21 @@ func TestColumnCountStat(t *testing.T) {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()
conf := config.New()
conf.Set(fmt.Sprintf("Warehouse.%s.columnCountLimit", strings.ToLower(warehouseutils.WHDestNameMap[tc.destinationType])), tc.columnCountLimit)

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pgResource, err := postgres.Setup(pool, t)
require.NoError(t, err)

uploadJobFactory := &UploadJobFactory{
logger: logger.NOP,
statsFactory: statsStore,
conf: conf,
db: sqlmiddleware.New(pgResource.DB),
}
rs := redshift.New(config.New(), logger.NOP, stats.NOP)
j := uploadJobFactory.NewUploadJob(context.Background(), &model.UploadJob{
Expand Down
31 changes: 16 additions & 15 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,12 @@ type schema struct {
}

func New(
ctx context.Context,
db *sqlquerywrapper.DB,
warehouse model.Warehouse,
conf *config.Config,
logger logger.Logger,
statsFactory stats.Stats,
) (SchemaHandler, error) {
) SchemaHandler {
schemaSize := statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{
"module": "warehouse",
"workspaceId": warehouse.WorkspaceID,
Expand All @@ -102,15 +101,12 @@ func New(
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
}
if conf.GetBoolVar(true, "Warehouse.enableSyncSchema") {
schemaHandler, err := newSchemaV2(ctx, schemaV1, warehouse, log)
if err != nil {
return nil, fmt.Errorf("creating schema handler: %w", err)
}
schemaHandler := newSchemaV2(schemaV1, warehouse, log)
schemaHandler.stats.schemaSize = schemaSize
return schemaHandler, nil
return schemaHandler
}
schemaV1.stats.schemaSize = schemaSize
return schemaV1, nil
return schemaV1
}

// ConsolidateStagingFilesUsingLocalSchema
Expand Down Expand Up @@ -293,13 +289,7 @@ func (sh *schema) updateLocalSchema(ctx context.Context, updatedSchema model.Sch
}
sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes)))

_, err = sh.schemaRepo.Insert(ctx, &model.WHSchema{
SourceID: sh.warehouse.Source.ID,
Namespace: sh.warehouse.Namespace,
DestinationID: sh.warehouse.Destination.ID,
DestinationType: sh.warehouse.Type,
Schema: updatedSchema,
})
err = writeSchema(ctx, sh.schemaRepo, sh.warehouse, updatedSchema)
if err != nil {
return fmt.Errorf("updating local schema: %w", err)
}
Expand Down Expand Up @@ -481,3 +471,14 @@ func removeDeprecatedColumns(schema model.Schema, warehouse model.Warehouse, log
}
}
}

func writeSchema(ctx context.Context, schemaRepo schemaRepo, warehouse model.Warehouse, updatedSchema model.Schema) error {
_, err := schemaRepo.Insert(ctx, &model.WHSchema{
SourceID: warehouse.Source.ID,
Namespace: warehouse.Namespace,
DestinationID: warehouse.Destination.ID,
DestinationType: warehouse.Type,
Schema: updatedSchema,
})
return err
}
53 changes: 31 additions & 22 deletions warehouse/schema/schema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ type schemaV2 struct {
stats struct {
schemaSize stats.Histogram
}
// caches the schema present in the repository
cachedSchema model.Schema
warehouse model.Warehouse
v1 *schema
log logger.Logger
schemaMu sync.RWMutex
}

func SyncSchema(ctx context.Context, fetchSchemaRepo fetchSchemaRepo, warehouse model.Warehouse, db *sqlquerywrapper.DB, log logger.Logger) error {
func FetchAndSaveSchema(ctx context.Context, fetchSchemaRepo fetchSchemaRepo, warehouse model.Warehouse, db *sqlquerywrapper.DB, log logger.Logger) error {
warehouseSchema, err := fetchSchemaRepo.FetchSchema(ctx)
if err != nil {
return fmt.Errorf("fetching schema: %w", err)
Expand All @@ -36,19 +37,31 @@ func SyncSchema(ctx context.Context, fetchSchemaRepo fetchSchemaRepo, warehouse
return writeSchema(ctx, schemaRepo, warehouse, warehouseSchema)
}

func newSchemaV2(ctx context.Context, v1 *schema, warehouse model.Warehouse, log logger.Logger) (*schemaV2, error) {
v2 := &schemaV2{
v1: v1,
warehouse: warehouse,
log: log,
func newSchemaV2(v1 *schema, warehouse model.Warehouse, log logger.Logger) *schemaV2 {
return &schemaV2{
v1: v1,
warehouse: warehouse,
log: log,
cachedSchema: model.Schema{},
}
var err error
v2.cachedSchema, err = v1.GetLocalSchema(ctx)
return v2, err
}

func (sh *schemaV2) SyncRemoteSchema(ctx context.Context, fetchSchemaRepo fetchSchemaRepo, uploadID int64) (bool, error) {
// no-op since syncing of local schema with warehouse schema is being done in the background
whSchema, err := sh.v1.schemaRepo.GetForNamespace(
ctx,
sh.warehouse.Source.ID,
sh.warehouse.Destination.ID,
sh.warehouse.Namespace,
)
if err != nil {
return false, fmt.Errorf("getting schema for namespace: %w", err)
}
if whSchema.Schema == nil {
return false, nil
}
sh.schemaMu.Lock()
defer sh.schemaMu.Unlock()
sh.cachedSchema = whSchema.Schema
return false, nil
}

Expand Down Expand Up @@ -85,7 +98,14 @@ func (sh *schemaV2) UpdateLocalSchema(ctx context.Context, updatedSchema model.S
}

func (sh *schemaV2) UpdateWarehouseTableSchema(tableName string, tableSchema model.TableSchema) {
// no-op since there is no warehouse schema to update
sh.schemaMu.Lock()
defer sh.schemaMu.Unlock()
sh.cachedSchema[tableName] = tableSchema
err := writeSchema(context.TODO(), sh.v1.schemaRepo, sh.warehouse, sh.cachedSchema)
if err != nil {
// TODO - Return error to the caller
sh.log.Errorf("error updating warehouse schema: %v", err)
}
}

func (sh *schemaV2) GetColumnsCountInWarehouseSchema(tableName string) int {
Expand Down Expand Up @@ -130,14 +150,3 @@ func (sh *schemaV2) FetchSchemaFromWarehouse(ctx context.Context, repo fetchSche
// no-op since local schema and warehouse schema are supposed to be in sync
return nil
}

func writeSchema(ctx context.Context, schemaRepo schemaRepo, warehouse model.Warehouse, updatedSchema model.Schema) error {
_, err := schemaRepo.Insert(ctx, &model.WHSchema{
SourceID: warehouse.Source.ID,
Namespace: warehouse.Namespace,
DestinationID: warehouse.Destination.ID,
DestinationType: warehouse.Type,
Schema: updatedSchema,
})
return err
}

0 comments on commit d38e842

Please sign in to comment.