diff --git a/warehouse/encoding/parquetloader.go b/warehouse/encoding/parquetloader.go index 2a215ed245..e962127e6d 100644 --- a/warehouse/encoding/parquetloader.go +++ b/warehouse/encoding/parquetloader.go @@ -7,6 +7,7 @@ import ( "github.com/xitongsys/parquet-go/types" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -63,28 +64,23 @@ func (loader *parquetLoader) Write() error { func parquetValue(val interface{}, colType string) (interface{}, error) { switch colType { - case "bigint", "int": - retVal, err := getInt64(val) - return retVal, err - case "boolean": - retVal, err := getBool(val) - return retVal, err - case "float": - retVal, err := getFloat64(val) - return retVal, err - case "datetime": - retVal, err := getUnixTimestamp(val) - return retVal, err - case "string", "text": - retVal, err := getString(val) - return retVal, err + case model.BigIntDataType, model.IntDataType: + return getInt64(val) + case model.BooleanDataType: + return getBool(val) + case model.FloatDataType: + return getFloat64(val) + case model.DateTimeDataType: + return getUnixTimestamp(val) + case model.StringDataType, model.TextDataType: + return getString(val) } return nil, fmt.Errorf("unsupported type for parquet: %s", colType) } func getInt64(val interface{}) (int64, error) { if intVal, ok := val.(int); !ok { - return 0, fmt.Errorf("failed to convert %v to int64", val) + return 0, errors.New("failed to convert to int64") } else { return int64(intVal), nil } @@ -92,7 +88,7 @@ func getInt64(val interface{}) (int64, error) { func getBool(val interface{}) (bool, error) { if boolVal, ok := val.(bool); !ok { - return false, fmt.Errorf("failed to convert %v to bool", val) + return false, errors.New("failed to convert to bool") } else { return boolVal, nil } @@ -100,7 +96,7 @@ func getBool(val interface{}) (bool, error) { func getFloat64(val interface{}) (float64, error) { if float64Val, ok := val.(float64); !ok { - return 0, fmt.Errorf("failed to convert %v to float64", val) + return 0, errors.New("failed to convert to float64") } else { return float64Val, nil } @@ -109,7 +105,7 @@ func getFloat64(val interface{}) (float64, error) { func getUnixTimestamp(val interface{}) (int64, error) { tsString, ok := val.(string) if !ok { - return 0, fmt.Errorf("%v is not a valid timestamp string", val) + return 0, errors.New("not a valid timestamp string") } parsedTS, err := time.Parse(time.RFC3339, tsString) @@ -122,7 +118,7 @@ func getUnixTimestamp(val interface{}) (int64, error) { func getString(val interface{}) (string, error) { if stringVal, ok := val.(string); !ok { - return "", fmt.Errorf("failed to convert %v to string", val) + return "", errors.New("failed to convert to string") } else { return stringVal, nil } diff --git a/warehouse/integrations/azure-synapse/azure-synapse.go b/warehouse/integrations/azure-synapse/azure-synapse.go index eb8d44f181..6816a7bafa 100644 --- a/warehouse/integrations/azure-synapse/azure-synapse.go +++ b/warehouse/integrations/azure-synapse/azure-synapse.go @@ -413,7 +413,6 @@ func (as *AzureSynapse) loadDataIntoStagingTable( log.Warnw("mismatch in datatype", logfield.ColumnType, valueType, logfield.ColumnName, sortedColumnKeys[index], - logfield.ColumnValue, value, logfield.Error, err, ) finalColumnValues = append(finalColumnValues, nil) diff --git a/warehouse/integrations/mssql/mssql.go b/warehouse/integrations/mssql/mssql.go index b1ecb4d20a..4357b585b8 100644 --- a/warehouse/integrations/mssql/mssql.go +++ b/warehouse/integrations/mssql/mssql.go @@ -449,7 +449,6 @@ func (ms *MSSQL) loadDataIntoStagingTable( log.Warnw("mismatch in datatype", logfield.ColumnType, valueType, logfield.ColumnName, sortedColumnKeys[index], - logfield.ColumnValue, value, logfield.Error, err, ) finalColumnValues = append(finalColumnValues, nil) diff --git a/warehouse/logfield/logfield.go b/warehouse/logfield/logfield.go index 3bc3865db7..9c586dc579 100644 --- a/warehouse/logfield/logfield.go +++ b/warehouse/logfield/logfield.go @@ -20,7 +20,6 @@ const ( TableName = "tableName" ColumnName = "columnName" ColumnType = "columnType" - ColumnValue = "columnValue" Priority = "priority" Retried = "retried" Attempt = "attempt" diff --git a/warehouse/slave/worker.go b/warehouse/slave/worker.go index 83725cd276..8be3aca228 100644 --- a/warehouse/slave/worker.go +++ b/warehouse/slave/worker.go @@ -12,6 +12,8 @@ import ( jsoniter "github.com/json-iterator/go" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" + "github.com/rudderlabs/rudder-server/warehouse/bcm" "github.com/rudderlabs/rudder-server/warehouse/constraints" "github.com/rudderlabs/rudder-server/warehouse/utils/types" @@ -265,7 +267,10 @@ func (w *worker) processStagingFile(ctx context.Context, job payload) ([]uploadR ) if err := json.Unmarshal(lineBytes, &batchRouterEvent); err != nil { - jr.logger.Errorf("Failed to unmarshal JSON line to batchrouter event: %+v", batchRouterEvent) + jr.logger.Warnn("Failed to unmarshal line from staging file to BatchRouterEvent", + logger.NewIntField("stagingFileID", job.StagingFileID), + obskit.Error(err), + ) continue }