diff --git a/warehouse/router/state_export_data.go b/warehouse/router/state_export_data.go index 6b13cb5dc1..eaca3678d2 100644 --- a/warehouse/router/state_export_data.go +++ b/warehouse/router/state_export_data.go @@ -2,7 +2,6 @@ package router import ( "context" - "encoding/json" "fmt" "slices" "strings" @@ -292,9 +291,6 @@ func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) if tableSchemaDiff.Exists { err = job.UpdateTableSchema(tName, tableSchemaDiff) if err != nil { - res, _ := json.Marshal(job.GetTableSchemaInUpload(tName)) - schema, _ := json.Marshal(job.schemaHandle.GetTableSchema(tName)) - job.logger.Errorf("Error updating schema for table %s in namespace %s of destination %s:%s, error: %v, schema: %v, schema2: %v", tName, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID, err, string(res), string(schema)) return } @@ -309,8 +305,6 @@ func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff whutils.Ta if tableSchemaDiff.TableToBeCreated { err = job.whManager.CreateTable(job.ctx, tName, tableSchemaDiff.ColumnMap) if err != nil { - res, _ := json.Marshal(tableSchemaDiff) - job.logger.Errorf("Error creating table %s on namespace: %s, error: %v, tableSchemaDiff: %v", tName, job.warehouse.Namespace, err, string(res)) return err } job.stats.tablesAdded.Increment() diff --git a/warehouse/router/sync.go b/warehouse/router/sync.go index 158b2adcef..6c41fcd68f 100644 --- a/warehouse/router/sync.go +++ b/warehouse/router/sync.go @@ -24,15 +24,18 @@ func (r *Router) sync(ctx context.Context) error { r.configSubscriberLock.RLock() warehouses := append([]model.Warehouse{}, r.warehouses...) r.configSubscriberLock.RUnlock() - execTime := time.Now() - whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory) - if err != nil { - return fmt.Errorf("failed to create warehouse manager: %w", err) - } + execTime := r.now() + log := r.logger.Child("syncer") + for _, warehouse := range warehouses { - err := whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader()) + whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory) + if err != nil { + log.Warnn("create warehouse manager: %w", obskit.Error(err)) + continue + } + err = whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader()) if err != nil { - r.logger.Errorn("failed to setup WH Manager", obskit.Error(err)) + log.Warnn("failed to setup WH Manager", obskit.Error(err)) continue } sh := schema.New( @@ -43,14 +46,14 @@ func (r *Router) sync(ctx context.Context) error { r.statsFactory, ) if err := r.syncRemoteSchema(ctx, whManager, sh); err != nil { - r.logger.Errorn("failed to sync schema", obskit.Error(err)) + log.Warnn("failed to sync schema", obskit.Error(err)) } whManager.Close() } nextExecTime := execTime.Add(r.config.syncSchemaFrequency) select { case <-ctx.Done(): - r.logger.Infon("context is cancelled, stopped running schema syncer") + log.Infon("context is cancelled, stopped running schema syncer") return nil case <-time.After(time.Until(nextExecTime)): } diff --git a/warehouse/router/sync_test.go b/warehouse/router/sync_test.go index c1fa23b32c..93b14d0ff0 100644 --- a/warehouse/router/sync_test.go +++ b/warehouse/router/sync_test.go @@ -2,7 +2,6 @@ package router import ( "context" - "errors" "fmt" "reflect" "testing" @@ -51,11 +50,7 @@ func schemaKey(sourceID, destinationID, namespace string) string { } func (m *mockSyncSchemaRepo) GetLocalSchema(_ context.Context) (model.Schema, error) { - if m.getLocalSchemaerr != nil { - return model.Schema{}, m.getLocalSchemaerr - } - - return model.Schema{}, nil + return model.Schema{}, m.getLocalSchemaerr } func (m *mockSyncSchemaRepo) UpdateLocalSchemaWithWarehouse(_ context.Context, _ model.Schema) error { @@ -88,7 +83,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) { } err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock) require.Error(t, err) - require.True(t, errors.Is(err, mock.getLocalSchemaerr)) + require.ErrorIs(t, err, mock.getLocalSchemaerr) }) t.Run("fetching schema from warehouse fails", func(t *testing.T) { mock := &mockSyncSchemaRepo{ @@ -100,7 +95,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) { } err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock) require.Error(t, err) - require.True(t, errors.Is(err, mock.fetchSchemaErr)) + require.ErrorIs(t, err, mock.fetchSchemaErr) }) t.Run("schema has changed and updating errors", func(t *testing.T) { mock := &mockSyncSchemaRepo{ @@ -114,7 +109,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) { err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock) require.Error(t, err) }) - t.Run("schema has changed and updating errors", func(t *testing.T) { + t.Run("schema has not changed and updating errors", func(t *testing.T) { mock := &mockSyncSchemaRepo{ hasChanged: false, updateLocalSchemaErr: fmt.Errorf("error updating local schema"), @@ -204,6 +199,7 @@ func TestSync_SyncRemoteSchemaIntegration(t *testing.T) { warehouses: []model.Warehouse{warehouse}, destType: warehouseutils.POSTGRES, statsFactory: stats.NOP, + now: time.Now, } setupCh := make(chan struct{})