Skip to content

Commit

Permalink
Merge branch 'master' into chore.memory-stat-for-badger-event-sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
vamsikrishnakandi authored Jan 10, 2025
2 parents b3ad4ae + f182738 commit 170c268
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 24 deletions.
36 changes: 16 additions & 20 deletions warehouse/encoding/parquetloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/xitongsys/parquet-go/types"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -63,44 +64,39 @@ func (loader *parquetLoader) Write() error {

func parquetValue(val interface{}, colType string) (interface{}, error) {
switch colType {
case "bigint", "int":
retVal, err := getInt64(val)
return retVal, err
case "boolean":
retVal, err := getBool(val)
return retVal, err
case "float":
retVal, err := getFloat64(val)
return retVal, err
case "datetime":
retVal, err := getUnixTimestamp(val)
return retVal, err
case "string", "text":
retVal, err := getString(val)
return retVal, err
case model.BigIntDataType, model.IntDataType:
return getInt64(val)
case model.BooleanDataType:
return getBool(val)
case model.FloatDataType:
return getFloat64(val)
case model.DateTimeDataType:
return getUnixTimestamp(val)
case model.StringDataType, model.TextDataType:
return getString(val)
}
return nil, fmt.Errorf("unsupported type for parquet: %s", colType)
}

func getInt64(val interface{}) (int64, error) {
if intVal, ok := val.(int); !ok {
return 0, fmt.Errorf("failed to convert %v to int64", val)
return 0, errors.New("failed to convert to int64")
} else {
return int64(intVal), nil
}
}

func getBool(val interface{}) (bool, error) {
if boolVal, ok := val.(bool); !ok {
return false, fmt.Errorf("failed to convert %v to bool", val)
return false, errors.New("failed to convert to bool")
} else {
return boolVal, nil
}
}

func getFloat64(val interface{}) (float64, error) {
if float64Val, ok := val.(float64); !ok {
return 0, fmt.Errorf("failed to convert %v to float64", val)
return 0, errors.New("failed to convert to float64")
} else {
return float64Val, nil
}
Expand All @@ -109,7 +105,7 @@ func getFloat64(val interface{}) (float64, error) {
func getUnixTimestamp(val interface{}) (int64, error) {
tsString, ok := val.(string)
if !ok {
return 0, fmt.Errorf("%v is not a valid timestamp string", val)
return 0, errors.New("not a valid timestamp string")
}

parsedTS, err := time.Parse(time.RFC3339, tsString)
Expand All @@ -122,7 +118,7 @@ func getUnixTimestamp(val interface{}) (int64, error) {

func getString(val interface{}) (string, error) {
if stringVal, ok := val.(string); !ok {
return "", fmt.Errorf("failed to convert %v to string", val)
return "", errors.New("failed to convert to string")
} else {
return stringVal, nil
}
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (as *AzureSynapse) loadDataIntoStagingTable(
log.Warnw("mismatch in datatype",
logfield.ColumnType, valueType,
logfield.ColumnName, sortedColumnKeys[index],
logfield.ColumnValue, value,
logfield.Error, err,
)
finalColumnValues = append(finalColumnValues, nil)
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ func (ms *MSSQL) loadDataIntoStagingTable(
log.Warnw("mismatch in datatype",
logfield.ColumnType, valueType,
logfield.ColumnName, sortedColumnKeys[index],
logfield.ColumnValue, value,
logfield.Error, err,
)
finalColumnValues = append(finalColumnValues, nil)
Expand Down
1 change: 0 additions & 1 deletion warehouse/logfield/logfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const (
TableName = "tableName"
ColumnName = "columnName"
ColumnType = "columnType"
ColumnValue = "columnValue"
Priority = "priority"
Retried = "retried"
Attempt = "attempt"
Expand Down
7 changes: 6 additions & 1 deletion warehouse/slave/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

jsoniter "github.com/json-iterator/go"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/warehouse/bcm"
"github.com/rudderlabs/rudder-server/warehouse/constraints"
"github.com/rudderlabs/rudder-server/warehouse/utils/types"
Expand Down Expand Up @@ -265,7 +267,10 @@ func (w *worker) processStagingFile(ctx context.Context, job payload) ([]uploadR
)

if err := json.Unmarshal(lineBytes, &batchRouterEvent); err != nil {
jr.logger.Errorf("Failed to unmarshal JSON line to batchrouter event: %+v", batchRouterEvent)
jr.logger.Warnn("Failed to unmarshal line from staging file to BatchRouterEvent",
logger.NewIntField("stagingFileID", job.StagingFileID),
obskit.Error(err),
)
continue
}

Expand Down

0 comments on commit 170c268

Please sign in to comment.