Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 8, 2025
1 parent 30d0f23 commit f27fb84
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 24 deletions.
6 changes: 0 additions & 6 deletions warehouse/router/state_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package router

import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
Expand Down Expand Up @@ -292,9 +291,6 @@ func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error)
if tableSchemaDiff.Exists {
err = job.UpdateTableSchema(tName, tableSchemaDiff)
if err != nil {
res, _ := json.Marshal(job.GetTableSchemaInUpload(tName))
schema, _ := json.Marshal(job.schemaHandle.GetTableSchema(tName))
job.logger.Errorf("Error updating schema for table %s in namespace %s of destination %s:%s, error: %v, schema: %v, schema2: %v", tName, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID, err, string(res), string(schema))
return
}

Expand All @@ -309,8 +305,6 @@ func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff whutils.Ta
if tableSchemaDiff.TableToBeCreated {
err = job.whManager.CreateTable(job.ctx, tName, tableSchemaDiff.ColumnMap)
if err != nil {
res, _ := json.Marshal(tableSchemaDiff)
job.logger.Errorf("Error creating table %s on namespace: %s, error: %v, tableSchemaDiff: %v", tName, job.warehouse.Namespace, err, string(res))
return err
}
job.stats.tablesAdded.Increment()
Expand Down
21 changes: 12 additions & 9 deletions warehouse/router/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ func (r *Router) sync(ctx context.Context) error {
r.configSubscriberLock.RLock()
warehouses := append([]model.Warehouse{}, r.warehouses...)
r.configSubscriberLock.RUnlock()
execTime := time.Now()
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
return fmt.Errorf("failed to create warehouse manager: %w", err)
}
execTime := r.now()
log := r.logger.Child("syncer")

for _, warehouse := range warehouses {
err := whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader())
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
log.Warnn("create warehouse manager: %w", obskit.Error(err))
continue

Check warning on line 34 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}
err = whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader())
if err != nil {
r.logger.Errorn("failed to setup WH Manager", obskit.Error(err))
log.Warnn("failed to setup WH Manager", obskit.Error(err))
continue

Check warning on line 39 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}
sh := schema.New(
Expand All @@ -43,14 +46,14 @@ func (r *Router) sync(ctx context.Context) error {
r.statsFactory,
)
if err := r.syncRemoteSchema(ctx, whManager, sh); err != nil {
r.logger.Errorn("failed to sync schema", obskit.Error(err))
log.Warnn("failed to sync schema", obskit.Error(err))
}
whManager.Close()
}
nextExecTime := execTime.Add(r.config.syncSchemaFrequency)
select {
case <-ctx.Done():
r.logger.Infon("context is cancelled, stopped running schema syncer")
log.Infon("context is cancelled, stopped running schema syncer")
return nil
case <-time.After(time.Until(nextExecTime)):
}
Expand Down
14 changes: 5 additions & 9 deletions warehouse/router/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package router

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -51,11 +50,7 @@ func schemaKey(sourceID, destinationID, namespace string) string {
}

func (m *mockSyncSchemaRepo) GetLocalSchema(_ context.Context) (model.Schema, error) {
if m.getLocalSchemaerr != nil {
return model.Schema{}, m.getLocalSchemaerr
}

return model.Schema{}, nil
return model.Schema{}, m.getLocalSchemaerr
}

func (m *mockSyncSchemaRepo) UpdateLocalSchemaWithWarehouse(_ context.Context, _ model.Schema) error {
Expand Down Expand Up @@ -88,7 +83,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) {
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.getLocalSchemaerr))
require.ErrorIs(t, err, mock.getLocalSchemaerr)
})
t.Run("fetching schema from warehouse fails", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
Expand All @@ -100,7 +95,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) {
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.fetchSchemaErr))
require.ErrorIs(t, err, mock.fetchSchemaErr)
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
Expand All @@ -114,7 +109,7 @@ func TestSync_SyncRemoteSchema(t *testing.T) {
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
t.Run("schema has not changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
hasChanged: false,
updateLocalSchemaErr: fmt.Errorf("error updating local schema"),
Expand Down Expand Up @@ -204,6 +199,7 @@ func TestSync_SyncRemoteSchemaIntegration(t *testing.T) {
warehouses: []model.Warehouse{warehouse},
destType: warehouseutils.POSTGRES,
statsFactory: stats.NOP,
now: time.Now,
}

setupCh := make(chan struct{})
Expand Down

0 comments on commit f27fb84

Please sign in to comment.