Skip to content

Commit

Permalink
Merge branch '2.0-dev' into test2
Browse files Browse the repository at this point in the history
  • Loading branch information
badboynt1 authored Jan 10, 2025
2 parents 7359659 + d364247 commit b31ec61
Show file tree
Hide file tree
Showing 157 changed files with 6,295 additions and 1,958 deletions.
2 changes: 1 addition & 1 deletion pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
return err
}
if latestVersion.Version != currentCN.Version {
panic("BUG: current cn's version(" +
s.logger.Fatal("BUG: current cn's version(" +
currentCN.Version +
") must equal cluster latest version(" +
latestVersion.Version +
Expand Down
10 changes: 6 additions & 4 deletions pkg/bootstrap/service_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {
// cluster is upgrading to v1, only v1's cn can start up.
if !v.IsReady() {
if v.Version != final.Version {
panic(fmt.Sprintf("cannot upgrade to version %s, because version %s is in upgrading",
s.logger.Fatal(fmt.Sprintf("cannot upgrade to version %s, because version %s is in upgrading",
final.Version,
v.Version))
} else if v.VersionOffset != final.VersionOffset {
panic(fmt.Sprintf("cannot upgrade to version %s with versionOffset[%d], because version %s with versionOffset[%d] is in upgrading",
s.logger.Fatal(fmt.Sprintf("cannot upgrade to version %s with versionOffset[%d], because version %s with versionOffset[%d] is in upgrading",
final.Version,
final.VersionOffset,
v.Version,
Expand All @@ -183,7 +183,7 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {

// cluster is running at v1, cannot startup a old version to join cluster.
if v.IsReady() && versions.Compare(final.Version, v.Version) < 0 {
panic(fmt.Sprintf("cannot startup a old version %s to join cluster, current version is %s",
s.logger.Fatal(fmt.Sprintf("cannot startup a old version %s to join cluster, current version is %s",
final.Version,
v.Version))
}
Expand All @@ -193,6 +193,8 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {
// 2: add upgrades from latest version to final version
checker := func() (bool, error) {
if v.Version == final.Version && v.VersionOffset >= final.VersionOffset {
// if the schema version has reached finalVersion, mark finalVersion Completed and return
s.upgrade.finalVersionCompleted.Store(true)
return true, nil
}

Expand Down Expand Up @@ -396,7 +398,7 @@ func (s *service) performUpgrade(
zap.String("final", final.Version))
return false, nil
default:
panic(fmt.Sprintf("BUG: invalid state %d", state))
s.logger.Fatal(fmt.Sprintf("BUG: invalid state %d", state))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/service_upgrade_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *service) MaybeUpgradeTenant(
return err
}
if latestVersion.Version != currentCN.Version {
panic("BUG: current cn's version(" +
s.logger.Fatal("BUG: current cn's version(" +
currentCN.Version +
") must equal cluster latest version(" +
latestVersion.Version +
Expand Down
5 changes: 3 additions & 2 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *service) getFinalVersionHandle() VersionHandle {
// Get the original version of the upgrade framework
func (s *service) getFounderVersionHandle() VersionHandle {
if len(s.handles) == 0 {
panic("Waring: no upgrade version handles available, please check the code")
s.logger.Fatal("Waring: no upgrade version handles available, please check the code")
}
return s.handles[0]
}
Expand All @@ -66,5 +66,6 @@ func (s *service) getVersionHandle(version string) VersionHandle {
return h
}
}
panic("missing upgrade handle for version: " + version)
s.logger.Fatal("missing upgrade handle for version: " + version)
return nil
}
4 changes: 2 additions & 2 deletions pkg/bootstrap/versions/upgrade_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func CheckTableColumn(txn executor.TxnExecutor,
}

// CheckViewDefinition Check if the view exists, if so, return true and return the view definition
func CheckViewDefinition(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
var CheckViewDefinition = func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
sql := fmt.Sprintf("SELECT tbl.rel_createsql AS `VIEW_DEFINITION` FROM mo_catalog.mo_tables tbl LEFT JOIN mo_catalog.mo_user usr ON tbl.creator = usr.user_id WHERE tbl.relkind = 'v' AND tbl.reldatabase = '%s' AND tbl.relname = '%s'", schema, viewName)
if accountId == catalog.System_Account {
sql = fmt.Sprintf("SELECT tbl.rel_createsql AS `VIEW_DEFINITION` FROM mo_catalog.mo_tables tbl LEFT JOIN mo_catalog.mo_user usr ON tbl.creator = usr.user_id WHERE tbl.relkind = 'v' AND account_id = 0 AND tbl.reldatabase = '%s' AND tbl.relname = '%s'", schema, viewName)
Expand All @@ -309,7 +309,7 @@ func CheckViewDefinition(txn executor.TxnExecutor, accountId uint32, schema stri
})

if loaded && n > 1 {
panic("BUG: Duplicate column names in table")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: Duplicate column names in table")
}
return loaded, view_def, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func GetTenantCreateVersionForUpdate(
return true
})
if version == "" {
panic(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
}
return version, nil
}
Expand All @@ -153,7 +153,7 @@ func UpgradeTenantVersion(
}
defer res.Close()
if res.AffectedRows != 1 {
panic(fmt.Sprintf("BUG: update tenant: %d failed with AffectedRows %d",
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: update tenant: %d failed with AffectedRows %d",
tenantID, res.AffectedRows))
}
return nil
Expand All @@ -178,7 +178,7 @@ func GetTenantVersion(
return true
})
if version == "" {
panic(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
}
return version, nil
}
60 changes: 60 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ package v2_0_2
import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_cdc_watermark,
upg_mo_pubs_add_account_name,
upg_mo_subs_add_sub_account_name,
upg_mo_subs_add_pub_account_id,
upg_mo_account_lock,
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Expand All @@ -37,3 +42,58 @@ var upg_mo_cdc_watermark = versions.UpgradeEntry{
return !colInfo.IsExits, nil
},
}

var upg_mo_pubs_add_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_PUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_pubs add column account_name varchar(300) after account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_PUBS, "account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_pubs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.account_id = t2.account_id SET t1.account_name = t2.account_name",
}

var upg_mo_subs_add_sub_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column sub_account_name VARCHAR(300) NOT NULL after sub_account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "sub_account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.sub_account_id = t2.account_id SET t1.sub_account_name = t2.account_name",
}

var upg_mo_subs_add_pub_account_id = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column pub_account_id INT NOT NULL after sub_time",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "pub_account_id")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.pub_account_name = t2.account_name SET t1.pub_account_id = t2.account_id",
}

var upg_mo_account_lock = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_ACCOUNT_LOCK,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoAccountLockDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_ACCOUNT_LOCK)
},
}
47 changes: 46 additions & 1 deletion pkg/bootstrap/versions/v2_0_2/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,52 @@
package v2_0_2

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/util/sysview"
)

var tenantUpgEntries = []versions.UpgradeEntry{}
var tenantUpgEntries = []versions.UpgradeEntry{
upg_information_schema_tables,
upg_information_schema_columns,
}

var upg_information_schema_tables = versions.UpgradeEntry{
Schema: sysview.InformationDBConst,
TableName: "TABLES",
UpgType: versions.MODIFY_VIEW,
UpgSql: sysview.InformationSchemaTablesDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
exists, viewDef, err := versions.CheckViewDefinition(txn, accountId, sysview.InformationDBConst, "TABLES")
if err != nil {
return false, err
}

if exists && viewDef == sysview.InformationSchemaTablesDDL {
return true, nil
}
return false, nil
},
PreSql: fmt.Sprintf("DROP VIEW IF EXISTS %s.%s;", sysview.InformationDBConst, "TABLES"),
}

var upg_information_schema_columns = versions.UpgradeEntry{
Schema: sysview.InformationDBConst,
TableName: "COLUMNS",
UpgType: versions.MODIFY_VIEW,
UpgSql: sysview.InformationSchemaColumnsDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
exists, viewDef, err := versions.CheckViewDefinition(txn, accountId, sysview.InformationDBConst, "COLUMNS")
if err != nil {
return false, err
}

if exists && viewDef == sysview.InformationSchemaColumnsDDL {
return true, nil
}
return false, nil
},
PreSql: fmt.Sprintf("DROP VIEW IF EXISTS %s.%s;", sysview.InformationDBConst, "COLUMNS"),
}
2 changes: 1 addition & 1 deletion pkg/bootstrap/versions/v2_0_2/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Version: "2.0.2",
MinUpgradeVersion: "2.0.1",
UpgradeCluster: versions.Yes,
UpgradeTenant: versions.No,
UpgradeTenant: versions.Yes,
VersionOffset: uint32(len(clusterUpgEntries) + len(tenantUpgEntries)),
},
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
Expand Down Expand Up @@ -197,3 +198,32 @@ func Test_UpgEntry(t *testing.T) {
},
)
}

func Test_upgrade_view(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, "", moerr.NewInternalErrorNoCtx("return error")
})
defer stubs.Reset()
_, err := upg_information_schema_tables.CheckFunc(nil, 0)
assert.Error(t, err)
_, err = upg_information_schema_columns.CheckFunc(nil, 0)
assert.Error(t, err)
}

func Test_upgrade_view2(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, upg_information_schema_tables.UpgSql, nil
})
defer stubs.Reset()
_, err := upg_information_schema_tables.CheckFunc(nil, 0)
assert.NoError(t, err)
}

func Test_upgrade_view3(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, upg_information_schema_columns.UpgSql, nil
})
defer stubs.Reset()
_, err := upg_information_schema_columns.CheckFunc(nil, 0)
assert.NoError(t, err)
}
4 changes: 2 additions & 2 deletions pkg/bootstrap/versions/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func MustGetLatestReadyVersion(
return true
})
if version == "" {
panic("missing latest ready version")
getLogger(txn.Txn().TxnOptions().CN).Fatal("missing latest ready version")
}
return version, nil
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func GetVersionState(
return true
})
if loaded && n > 1 {
panic("BUG: missing version " + version)
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: missing version " + version)
}
return state, loaded, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func GetUpgradeVersions(
return nil, err
}
if len(values) == 0 && mustHave {
panic("BUG: missing version upgrade")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: missing version upgrade")
}
return values, nil
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func GetUpgradingTenantVersion(txn executor.TxnExecutor) (VersionUpgrade, bool,
return VersionUpgrade{}, false, nil
}
if len(values) != 1 {
panic("BUG: invalid version upgrade")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: invalid version upgrade")
}
return values[0], true, nil
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func GetUpgradeVersionForUpdateByID(
return VersionUpgrade{}, err
}
if len(values) != 1 {
panic(fmt.Sprintf("BUG: can not get version upgrade by primary key: %d", id))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: can not get version upgrade by primary key: %d", id))
}
return values[0], nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
MO_DATA_KEY = "mo_data_key"

MO_TABLE_STATS = "mo_table_stats_alpha"

MO_ACCOUNT_LOCK = "__mo_account_lock"
)

func IsSystemTable(id uint64) bool {
Expand Down
Loading

0 comments on commit b31ec61

Please sign in to comment.