Skip to content

Commit

Permalink
Character set recognition and manipulation
Browse files Browse the repository at this point in the history
- Identifying textual characters sets; converting into specific type when applying dml events
- Refactored `ColumnsList`: introducing `Column` type
- Refactored `unsigned` handling, as part of `Column`
- `Column` type supports `convertArg()`: converting value of argument according to column data type
- DB URI attempts `utf8mb4,utf8,latin1` charsets in that order (first one to be recognized wins)
- Local tests filter by pattern
- Local tests append table schema on failure
- Local tests do not have postpone flag file
- Added character set local tests: `utf8`, `utf8mb4`, `latin1`
  • Loading branch information
Shlomi Noach committed Sep 7, 2016
1 parent f646021 commit 791d963
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 97 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
#

RELEASE_VERSION="1.0.17"
RELEASE_VERSION="1.0.18"

function build {
osname=$1
Expand Down
12 changes: 6 additions & 6 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (this *Applier) ExecuteThrottleQuery() (int64, error) {
// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names())
if err != nil {
return err
}
Expand All @@ -322,7 +322,7 @@ func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names())
if err != nil {
return err
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.UniqueKey.Columns.Names,
this.migrationContext.UniqueKey.Columns.Names(),
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
Expand Down Expand Up @@ -402,10 +402,10 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names,
this.migrationContext.MappedSharedColumns.Names,
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
this.migrationContext.UniqueKey.Columns.Names,
this.migrationContext.UniqueKey.Columns.Names(),
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
Expand Down
42 changes: 25 additions & 17 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
// This additional step looks at which columns are unsigned. We could have merged this within
// the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel
// comfortable in doing this as a separate step.
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns)
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns)
this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns)
this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns)

return nil
}
Expand Down Expand Up @@ -477,23 +477,31 @@ func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.Col
return sql.NewColumnList(columnNames), nil
}

// applyUnsignedColumns
func (this *Inspector) applyUnsignedColumns(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
query := fmt.Sprintf(`
show columns from %s.%s
`,
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
columnName := rowMap.GetString("Field")
if strings.Contains(rowMap.GetString("Type"), "unsigned") {
// applyColumnTypes
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
query := `
select
*
from
information_schema.columns
where
table_schema=?
and table_name=?
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
columnName := m.GetString("COLUMN_NAME")
if strings.Contains(m.GetString("COLUMN_TYPE"), "unsigned") {
for _, columnsList := range columnsLists {
columnsList.SetUnsigned(columnName)
}
}
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {
for _, columnsList := range columnsLists {
columnsList.SetCharset(columnName, charset)
}
}
return nil
})
}, databaseName, tableName)
return err
}

Expand Down Expand Up @@ -583,7 +591,7 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
// the ALTER is on the name itself...
for _, originalUniqueKey := range originalUniqueKeys {
for _, ghostUniqueKey := range ghostUniqueKeys {
if originalUniqueKey.Columns.Equals(&ghostUniqueKey.Columns) {
if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) {
uniqueKeys = append(uniqueKeys, originalUniqueKey)
}
}
Expand All @@ -594,11 +602,11 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) {
columnsInGhost := make(map[string]bool)
for _, ghostColumn := range ghostColumns.Names {
for _, ghostColumn := range ghostColumns.Names() {
columnsInGhost[ghostColumn] = true
}
sharedColumnNames := []string{}
for _, originalColumn := range originalColumns.Names {
for _, originalColumn := range originalColumns.Names() {
if columnsInGhost[originalColumn] || columnsInGhost[columnRenameMap[originalColumn]] {
sharedColumnNames = append(sharedColumnNames, originalColumn)
}
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool {
}

func (this *ConnectionConfig) GetDBUri(databaseName string) string {
var ip = net.ParseIP(this.Key.Hostname)
hostname := this.Key.Hostname
var ip = net.ParseIP(hostname)
if (ip != nil) && (ip.To4() == nil) {
// Wrap IPv6 literals in square brackets
return fmt.Sprintf("%s:%s@tcp([%s]:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName)
} else {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName)
hostname = fmt.Sprintf("[%s]", hostname)
}
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4,utf8,latin1", this.User, this.Password, hostname, this.Key.Port, databaseName)
}
57 changes: 17 additions & 40 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,6 @@ func EscapeName(name string) string {
return fmt.Sprintf("`%s`", name)
}

func fixArgType(arg interface{}, isUnsigned bool) interface{} {
if !isUnsigned {
return arg
}
// unsigned
if i, ok := arg.(int8); ok {
return uint8(i)
}
if i, ok := arg.(int16); ok {
return uint16(i)
}
if i, ok := arg.(int32); ok {
return uint32(i)
}
if i, ok := arg.(int64); ok {
return strconv.FormatUint(uint64(i), 10)
}
if i, ok := arg.(int); ok {
return uint(i)
}
return arg
}

func buildPreparedValues(length int) []string {
values := make([]string, length, length)
for i := 0; i < length; i++ {
Expand Down Expand Up @@ -330,14 +307,14 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
if uniqueKeyColumns.Len() == 0 {
return result, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLDeleteQuery")
}
for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
arg := fixArgType(args[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal])
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return result, uniqueKeyArgs, err
}
Expand Down Expand Up @@ -367,13 +344,13 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for _, column := range mappedSharedColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
arg := fixArgType(args[tableOrdinal], mappedSharedColumns.IsUnsigned(column))
for _, column := range mappedSharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal])
sharedArgs = append(sharedArgs, arg)
}

mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names)
mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
for i := range mappedSharedColumnNames {
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
}
Expand Down Expand Up @@ -415,26 +392,26 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for i, column := range sharedColumns.Names {
mappedColumn := mappedSharedColumns.Names[i]
tableOrdinal := tableColumns.Ordinals[column]
arg := fixArgType(valueArgs[tableOrdinal], mappedSharedColumns.IsUnsigned(mappedColumn))
for i, column := range sharedColumns.Columns() {
mappedColumn := mappedSharedColumns.Columns()[i]
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := mappedColumn.convertArg(valueArgs[tableOrdinal])
sharedArgs = append(sharedArgs, arg)
}

for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
arg := fixArgType(whereArgs[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal])
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names)
mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
for i := range mappedSharedColumnNames {
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
}
setClause, err := BuildSetPreparedClause(mappedSharedColumnNames)

equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
result = fmt.Sprintf(`
update /* gh-ost %s.%s */
%s.%s
Expand Down
Loading

0 comments on commit 791d963

Please sign in to comment.