Skip to content

Commit

Permalink
fix enum type op in one transaction (#17135)
Browse files Browse the repository at this point in the history
fix enum type op in one transaction

Approved by: @LeftHandCold, @ouyuanning, @m-schen, @XuPeng-SH, @daviszhen, @qingxinhome, @fengttt
  • Loading branch information
YANGGMM authored Jun 26, 2024
1 parent dce34b4 commit f6ad0b1
Show file tree
Hide file tree
Showing 22 changed files with 171 additions and 29 deletions.
24 changes: 14 additions & 10 deletions pkg/container/types/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func ParseIntToEnum(input int64) (Enum, error) {
}

// ParseEnum return item index with item name or value.
func ParseEnum(enumStr string, name string) (uint16, error) {
func ParseEnum(enumStr string, name string) (Enum, error) {
if len(enumStr) == 0 {
return 0, moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: enum define is empty %v", enumStr)
}
Expand All @@ -48,46 +48,50 @@ func ParseEnum(enumStr string, name string) (uint16, error) {
}

// ParseEnumName return item index with item name.
func ParseEnumName(enumStr string, name string) (uint16, error) {
func ParseEnumName(enumStr string, name string) (Enum, error) {
if len(enumStr) == 0 {
return 0, moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: enum define is empty %v", enumStr)
}
elems := strings.Split(enumStr, ",")
return parseEnumName(elems, name)
}
func parseEnumName(elems []string, name string) (uint16, error) {
func parseEnumName(elems []string, name string) (Enum, error) {
for i, n := range elems {
if strings.EqualFold(n, name) {
return uint16(i) + 1, nil
return Enum(uint16(i) + 1), nil
}
}
return 0, moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: item %s is not in enum %v", name, elems)
return Enum(1), moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: item %s is not in enum %v", name, elems)
}

// ParseEnumValue return item index with special number.
func ParseEnumValue(enumStr string, number uint16) (uint16, error) {
func ParseEnumValue(enumStr string, number uint16) (Enum, error) {
if len(enumStr) == 0 {
return 0, moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: enum define is empty %v", enumStr)
}
elems := strings.Split(enumStr, ",")
return parseEnumValue(elems, number)
}
func parseEnumValue(elems []string, number uint16) (uint16, error) {
func parseEnumValue(elems []string, number uint16) (Enum, error) {
if number == 0 || number > uint16(len(elems)) {
return 0, moerr.NewInternalErrorNoCtx("convert to MySQL enum failed: number %d overflow enum boundary [1, %d]", number, len(elems))
}

return number, nil
return Enum(number), nil
}

// ParseEnumIndex return item value with index.
func ParseEnumIndex(enumStr string, index uint16) (string, error) {
func ParseEnumIndex(enumStr string, index Enum) (string, error) {
if len(enumStr) == 0 {
return "", moerr.NewInternalErrorNoCtx("parse MySQL enum failed: enum type length err %d", len(enumStr))
}
elems := strings.Split(enumStr, ",")
if index == 0 || index > uint16(len(elems)) {
if index == 0 || index > Enum(len(elems)) {
return "", moerr.NewInternalErrorNoCtx("parse MySQL enum failed: index %d overflow enum boundary [1, %d]", index, len(elems))
}
return elems[index-1], nil
}

func (e Enum) String() string {
return strconv.Itoa(int(e))
}
4 changes: 2 additions & 2 deletions pkg/container/types/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestParseEnum(t *testing.T) {
name string
enums []string
value string
want uint16
want Enum
}{
{
name: "test01",
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestParseEnumIndex(t *testing.T) {
cases := []struct {
name string
enums []string
index uint16
index Enum
want string
}{
{
Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ func constructByte(ctx context.Context, obj FeSession, bat *batch.Batch, index i
val := vector.GetFixedAt[types.Blockid](vec, i)
writeByte = appendBytes(writeByte, []byte(val.String()), symbol[j], closeby, flag[j])
case types.T_enum:
val := vector.GetFixedAt[types.Blockid](vec, i)
writeByte = appendBytes(writeByte, []byte(val.String()), symbol[j], closeby, flag[j])
val := vector.GetFixedAt[types.Enum](vec, i).String()
writeByte = appendBytes(writeByte, []byte(val), symbol[j], closeby, flag[j])
default:
ses.Error(ctx,
"Failed to construct byte due to unsupported type",
Expand Down Expand Up @@ -725,6 +725,14 @@ func exportDataToCSVFile(oq *ExportConfig) error {
if err = formatOutputString(oq, []byte(value), symbol[i], closeby, flag[i]); err != nil {
return err
}
case defines.MYSQL_TYPE_ENUM:
value, err := oq.mrs.GetString(oq.ctx, 0, i)
if err != nil {
return err
}
if err = formatOutputString(oq, []byte(value), symbol[i], closeby, flag[i]); err != nil {
return err
}
default:
return moerr.NewInternalError(oq.ctx, "unsupported column type %d ", mysqlColumn.ColumnType())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func extractRowFromVector(ctx context.Context, ses FeSession, vec *vector.Vector
case types.T_TS:
row[i] = vector.GetFixedAt[types.TS](vec, rowIndex)
case types.T_enum:
row[i] = copyBytes(vec.GetBytesAt(rowIndex), true)
row[i] = vector.GetFixedAt[types.Enum](vec, rowIndex)
default:
ses.Error(ctx,
"Failed to extract row from vector, unsupported type",
Expand Down
13 changes: 13 additions & 0 deletions pkg/frontend/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,19 @@ func convertRowsIntoBatch(pool *mpool.MPool, cols []Column, rows [][]any) (*batc
if err != nil {
return nil, nil, err
}
case types.T_enum:
vData := make([]types.Enum, cnt)
for rowIdx, row := range rows {
if row[colIdx] == nil {
nsp.Add(uint64(rowIdx))
continue
}
vData[rowIdx] = row[colIdx].(types.Enum)
}
err := vector.AppendFixedList[types.Enum](bat.Vecs[colIdx], vData, nil, pool)
if err != nil {
return nil, nil, err
}
default:
return nil, nil, moerr.NewInternalErrorNoCtx("unsupported type %d", typ.Oid)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,8 @@ func Test_convertRowsIntoBatch(t *testing.T) {
row[j] = types.Datetime(0)
case defines.MYSQL_TYPE_TIMESTAMP:
row[j] = types.Timestamp(0)
case defines.MYSQL_TYPE_ENUM:
row[j] = types.Enum(1)
default:
assert.True(t, false)
}
Expand Down Expand Up @@ -1282,6 +1284,9 @@ func Test_convertRowsIntoBatch(t *testing.T) {
case types.T_timestamp:
assert.Equal(t, mrs.Data[i][j].(types.Timestamp).String2(time.UTC, 0), row[j])
continue
case types.T_enum:
assert.Equal(t, mrs.Data[i][j].(types.Enum), row[j])
continue
}
assert.Equal(t, mrs.Data[i][j], row[j])
}
Expand Down
31 changes: 28 additions & 3 deletions pkg/objectio/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ func NewVector(n int, typ types.Type, m *mpool.MPool, random bool, Values interf
}
return NewBlockidVector(n, typ, m, random, nil)
case types.T_enum:
if vs, ok := Values.([]uint16); ok {
return NewUInt16Vector(n, typ, m, random, vs)
if vs, ok := Values.([]types.Enum); ok {
return NewEnumVector(n, typ, m, random, vs)
}
return NewUInt16Vector(n, typ, m, random, nil)
return NewEnumVector(n, typ, m, random, nil)
default:
panic(moerr.NewInternalErrorNoCtx("unsupport vector's type '%v", typ))
}
Expand Down Expand Up @@ -884,6 +884,31 @@ func NewTimeVector(n int, typ types.Type, m *mpool.MPool, random bool, vs []stri
return vec
}

func NewEnumVector(n int, typ types.Type, m *mpool.MPool, random bool, vs []types.Enum) *vector.Vector {
vec := vector.NewVec(typ)
if vs != nil {
for i := range vs {
if err := vector.AppendFixed(vec, vs[i], false, m); err != nil {
vec.Free(m)
return nil
}
}
return vec
}

for i := 0; i < n; i++ {
v := uint16(i)
if random {
v = uint16(rand.Int())
}
if err := vector.AppendFixed(vec, v, false, m); err != nil {
vec.Free(m)
return nil
}
}
return vec
}

func NewDatetimeVector(n int, typ types.Type, m *mpool.MPool, random bool, vs []string) *vector.Vector {
vec := vector.NewVec(typ)
if vs != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/argument.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func newArgumentOfMultiAgg1[ret types.FixedSizeTExceptStrType](paramType types.T
return &mArg1Fixed[ret, types.Datetime]{}
case types.T_time:
return &mArg1Fixed[ret, types.Time]{}
case types.T_enum:
return &mArg1Fixed[ret, types.Enum]{}
case types.T_timestamp:
return &mArg1Fixed[ret, types.Timestamp]{}
}
Expand Down Expand Up @@ -138,6 +140,8 @@ func newArgumentOfMultiAgg2(paramType types.Type) mArg2 {
return &mArg2Fixed[types.Datetime]{}
case types.T_time:
return &mArg2Fixed[types.Time]{}
case types.T_enum:
return &mArg2Fixed[types.Enum]{}
case types.T_timestamp:
return &mArg2Fixed[types.Timestamp]{}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/aggexec/fromBytesRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package aggexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -169,6 +170,10 @@ func newSingleAggFuncExec3NewVersion(
e := &singleAggFuncExecNew3[types.Uuid]{}
e.init(mg, info, impl)
return e
case types.T_enum:
e := &singleAggFuncExecNew3[types.Enum]{}
e.init(mg, info, impl)
return e
}
panic(fmt.Sprintf("unsupported result type %s for singleAggFuncExec3", info.retType))
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/aggexec/fromFixedRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package aggexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -187,6 +188,11 @@ func newSingleAggFuncExec2NewVersion(
e := &singleAggFuncExecNew2[types.Uuid]{}
e.init(mg, info, impl)
return e

case types.T_enum:
e := &singleAggFuncExecNew2[types.Enum]{}
e.init(mg, info, impl)
return e
}
panic(fmt.Sprintf("unsupported parameter type %s for singleAggFuncExec2", info.argType))
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/colexec/aggexec/fromFixedRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package aggexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -122,6 +123,8 @@ func newSingleAggFuncExec1NewVersion(
return newSingleAggFuncExec1NewVersionWithKnownResultType[types.Blockid](mg, info, impl)
case types.T_uuid:
return newSingleAggFuncExec1NewVersionWithKnownResultType[types.Uuid](mg, info, impl)
case types.T_enum:
return newSingleAggFuncExec1NewVersionWithKnownResultType[types.Enum](mg, info, impl)
}
panic(fmt.Sprintf("unsupported result type %s for single column agg executor1", info.retType))
}
Expand Down Expand Up @@ -238,6 +241,11 @@ func newSingleAggFuncExec1NewVersionWithKnownResultType[to types.FixedSizeTExcep
e := &singleAggFuncExecNew1[types.Uuid, to]{}
e.init(mg, info, impl)
return e

case types.T_enum:
e := &singleAggFuncExecNew1[types.Enum, to]{}
e.init(mg, info, impl)
return e
}
panic(fmt.Sprintf("unexpected parameter to Init a singleAggFuncExec1NewVersion, aggInfo: %s", info))
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/aggexec/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package aggexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -94,6 +95,10 @@ func newMultiAggFuncExecRetFixed(
e := &multiAggFuncExec1[types.Timestamp]{}
e.init(mg, info, impl)
return e
case types.T_enum:
e := &multiAggFuncExec1[types.Enum]{}
e.init(mg, info, impl)
return e
}

panic(fmt.Sprintf("unexpected parameter to Init a multiAggFuncExec, aggInfo: %s", info))
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/evalExpression.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,9 @@ func GenerateConstListExpressionExecutor(proc *process.Process, exprs []*plan.Ex
defaultVal := val.Defaultval
veccol := vector.MustFixedCol[bool](vec)
veccol[i] = defaultVal
case *plan.Literal_EnumVal:
veccol := vector.MustFixedCol[types.Enum](vec)
veccol[i] = types.Enum(val.EnumVal)
default:
return nil, moerr.NewNYI(proc.Ctx, fmt.Sprintf("const expression %v", t.GetValue()))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/top/top.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func (arg *Argument) getTopValue() ([]byte, bool) {
case types.T_decimal128:
v := vector.GetFixedAt[types.Decimal128](vec, x)
return types.EncodeDecimal128(&v), true
case types.T_enum:
v := vector.GetFixedAt[types.Enum](vec, x)
return types.EncodeEnum(&v), true
}
return nil, false
}
2 changes: 1 addition & 1 deletion pkg/sql/plan/build_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func genAsSelectCols(ctx CompilerContext, stmt *tree.Select) ([]*ColDef, error)
// enum
if e.F.Func.ObjName == moEnumCastIndexToValueFun {
// cast_index_to_value('apple,banana,orange', cast(col_name as T_uint16))
colRef := e.F.Args[1].Expr.(*plan.Expr_F).F.Args[0].Expr.(*plan.Expr_Col).Col
colRef := e.F.Args[1].Expr.(*plan.Expr_Col).Col
tblName, colName := getTblAndColName(colRef.RelPos, colRef.ColPos)
if binding, ok := bindCtx.bindingByTable[tblName]; ok {
typ = binding.types[binding.colIdByName[colName]]
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/plan/function/agg/any_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func RegisterAnyValue2(id int64) {
nil, nil, nil,
aggAnyValueFill[types.Rowid], aggAnyValueFills[types.Rowid], aggAnyValueMerge[types.Rowid], nil)

aggexec.RegisterAggFromFixedRetFixed(
aggexec.MakeSingleColumnAggInformation(id, types.T_enum.ToType(), AnyValueReturnType, true),
nil, nil, nil,
aggAnyValueFill[types.Enum], aggAnyValueFills[types.Enum], aggAnyValueMerge[types.Enum], nil)

varLenList := []types.T{types.T_varchar, types.T_char, types.T_blob, types.T_text, types.T_binary, types.T_varbinary}
for _, t := range varLenList {
aggexec.RegisterAggFromBytesRetBytes(
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/plan/function/agg/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package agg

import (
"bytes"
"math"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"math"
)

func RegisterMax2(id int64) {
Expand Down Expand Up @@ -134,6 +135,13 @@ func RegisterMax2(id int64) {
aggMaxInitResult[bool],
aggMaxOfBoolFill, aggMaxOfBoolFills, aggMaxOfBoolMerge, nil)

aggexec.RegisterAggFromFixedRetFixed(
aggexec.MakeSingleColumnAggInformation(id, types.T_enum.ToType(), MaxReturnType, true),
nil,
nil,
aggMaxInitResult[types.Enum],
aggMaxFill[types.Enum], aggMaxFills[types.Enum], aggMaxMerge[types.Enum], nil)

aggexec.RegisterAggFromFixedRetFixed(
aggexec.MakeSingleColumnAggInformation(id, types.T_uuid.ToType(), MaxReturnType, true),
nil,
Expand Down
Loading

0 comments on commit f6ad0b1

Please sign in to comment.