Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clickhouse-replicated dialect #809

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Examples:
GOOSE_DRIVER=mysql GOOSE_DBSTRING="user:password@/dbname" goose status
GOOSE_DRIVER=redshift GOOSE_DBSTRING="postgres://user:[email protected]:5439/db" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:[email protected]:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse-replicated GOOSE_CLICKHOUSE_CLUSTER_NAME=example GOOSE_DBSTRING="clickhouse://user:[email protected]:9440/dbname?secure=true&skip_verify=false" goose status
Options:
Expand Down
22 changes: 12 additions & 10 deletions cmd/goose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,19 @@ Examples:
goose tidb "user:password@/dbname?parseTime=true" status
goose mssql "sqlserver://user:password@dbname:1433?database=master" status
goose clickhouse "tcp://127.0.0.1:9000" status
goose clickhouse-replicated "tcp://127.0.0.1:9000" status
goose vertica "vertica://user:password@localhost:5433/dbname?connection_load_balance=1" status
goose ydb "grpcs://localhost:2135/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric" status
goose turso "libsql://dbname.turso.io?authToken=token" status
goose turso "libsql://dbname.turso.io?authToken=token" status
GOOSE_DRIVER=sqlite3 GOOSE_DBSTRING=./foo.db goose status
GOOSE_DRIVER=sqlite3 GOOSE_DBSTRING=./foo.db goose create init sql
GOOSE_DRIVER=postgres GOOSE_DBSTRING="user=postgres dbname=postgres sslmode=disable" goose status
GOOSE_DRIVER=mysql GOOSE_DBSTRING="user:password@/dbname" goose status
GOOSE_DRIVER=redshift GOOSE_DBSTRING="postgres://user:[email protected]:5439/db" goose status
GOOSE_DRIVER=turso GOOSE_DBSTRING="libsql://dbname.turso.io?authToken=token" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:[email protected]:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:[email protected]:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse-replicated GOOSE_CLICKHOUSE_CLUSTER_NAME=example GOOSE_DBSTRING="clickhouse://user:[email protected]:9440/dbname?secure=true&skip_verify=false" goose status
Options:
`
Expand All @@ -302,23 +304,23 @@ Commands:
)

var sqlMigrationTemplate = template.Must(template.New("goose.sql-migration").Parse(`-- Thank you for giving goose a try!
--
--
-- This file was automatically created running goose init. If you're familiar with goose
-- feel free to remove/rename this file, write some SQL and goose up. Briefly,
--
--
-- Documentation can be found here: https://pressly.github.io/goose
--
-- A single goose .sql file holds both Up and Down migrations.
--
--
-- All goose .sql files are expected to have a -- +goose Up annotation.
-- The -- +goose Down annotation is optional, but recommended, and must come after the Up annotation.
--
-- The -- +goose NO TRANSACTION annotation may be added to the top of the file to run statements
--
-- The -- +goose NO TRANSACTION annotation may be added to the top of the file to run statements
-- outside a transaction. Both Up and Down migrations within this file will be run without a transaction.
--
-- More complex statements that have semicolons within them must be annotated with
--
-- More complex statements that have semicolons within them must be annotated with
-- the -- +goose StatementBegin and -- +goose StatementEnd annotations to be properly recognized.
--
--
-- Use GitHub issues for reporting bugs and requesting features, enjoy!
-- +goose Up
Expand Down
47 changes: 26 additions & 21 deletions database/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ import (
"errors"
"fmt"

"github.com/pressly/goose/v3/internal/cfg"
"github.com/pressly/goose/v3/internal/dialect/dialectquery"
)

// Dialect is the type of database dialect.
type Dialect string

const (
DialectClickHouse Dialect = "clickhouse"
DialectMSSQL Dialect = "mssql"
DialectMySQL Dialect = "mysql"
DialectPostgres Dialect = "postgres"
DialectRedshift Dialect = "redshift"
DialectSQLite3 Dialect = "sqlite3"
DialectTiDB Dialect = "tidb"
DialectTurso Dialect = "turso"
DialectVertica Dialect = "vertica"
DialectYdB Dialect = "ydb"
DialectStarrocks Dialect = "starrocks"
DialectClickHouse Dialect = "clickhouse"
DialectClickHouseReplicated Dialect = "clickhouse-replicated"
DialectMSSQL Dialect = "mssql"
DialectMySQL Dialect = "mysql"
DialectPostgres Dialect = "postgres"
DialectRedshift Dialect = "redshift"
DialectSQLite3 Dialect = "sqlite3"
DialectTiDB Dialect = "tidb"
DialectTurso Dialect = "turso"
DialectVertica Dialect = "vertica"
DialectYdB Dialect = "ydb"
DialectStarrocks Dialect = "starrocks"
)

// NewStore returns a new [Store] implementation for the given dialect.
Expand All @@ -36,16 +38,19 @@ func NewStore(dialect Dialect, tablename string) (Store, error) {
}
lookup := map[Dialect]dialectquery.Querier{
DialectClickHouse: &dialectquery.Clickhouse{},
DialectMSSQL: &dialectquery.Sqlserver{},
DialectMySQL: &dialectquery.Mysql{},
DialectPostgres: &dialectquery.Postgres{},
DialectRedshift: &dialectquery.Redshift{},
DialectSQLite3: &dialectquery.Sqlite3{},
DialectTiDB: &dialectquery.Tidb{},
DialectVertica: &dialectquery.Vertica{},
DialectYdB: &dialectquery.Ydb{},
DialectTurso: &dialectquery.Turso{},
DialectStarrocks: &dialectquery.Starrocks{},
DialectClickHouseReplicated: &dialectquery.ClickhouseReplicated{
ClusterName: cfg.GOOSECLICKHOUSECLUSTERNAME,
},
DialectMSSQL: &dialectquery.Sqlserver{},
DialectMySQL: &dialectquery.Mysql{},
DialectPostgres: &dialectquery.Postgres{},
DialectRedshift: &dialectquery.Redshift{},
DialectSQLite3: &dialectquery.Sqlite3{},
DialectTiDB: &dialectquery.Tidb{},
DialectVertica: &dialectquery.Vertica{},
DialectYdB: &dialectquery.Ydb{},
DialectTurso: &dialectquery.Turso{},
DialectStarrocks: &dialectquery.Starrocks{},
}
querier, ok := lookup[dialect]
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func OpenDBWithDriver(driver string, dbstring string) (*sql.DB, error) {
switch driver {
case "postgres", "pgx", "sqlite3", "sqlite", "mysql", "sqlserver", "clickhouse", "vertica", "azuresql", "ydb", "libsql", "starrocks":
return sql.Open(driver, dbstring)
case "clickhouse-replicated":
db, err := sql.Open("clickhouse", dbstring)
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
_, err = db.Exec("SET insert_quorum=2")
if err != nil {
return nil, fmt.Errorf("SET insert_quorum %w", err)
}
_, err = db.Exec("SET select_sequential_consistency=1")
if err != nil {
return nil, fmt.Errorf("SET select_sequential_consistency %w", err)
}
return db, nil
default:
return nil, fmt.Errorf("unsupported driver %s", driver)
}
Expand Down
2 changes: 2 additions & 0 deletions dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func SetDialect(s string) error {
d = dialect.Tidb
case "clickhouse":
d = dialect.Clickhouse
case "clickhouse-replicated":
d = dialect.ClickhouseReplicated
case "vertica":
d = dialect.Vertica
case "ydb":
Expand Down
8 changes: 5 additions & 3 deletions internal/cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cfg
import "os"

var (
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
GOOSEDBSTRING = envOr("GOOSE_DBSTRING", "")
GOOSEMIGRATIONDIR = envOr("GOOSE_MIGRATION_DIR", DefaultMigrationDir)
GOOSECLICKHOUSECLUSTERNAME = envOr("GOOSE_CLICKHOUSE_CLUSTER_NAME", "{cluster}")
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
GOOSEDBSTRING = envOr("GOOSE_DBSTRING", "")
GOOSEMIGRATIONDIR = envOr("GOOSE_MIGRATION_DIR", DefaultMigrationDir)
// https://no-color.org/
GOOSENOCOLOR = envOr("NO_COLOR", "false")
)
Expand All @@ -22,6 +23,7 @@ type EnvVar struct {

func List() []EnvVar {
return []EnvVar{
{Name: "GOOSE_CLICKHOUSER_CLUSTER_NAME", Value: GOOSECLICKHOUSECLUSTERNAME},
{Name: "GOOSE_DRIVER", Value: GOOSEDRIVER},
{Name: "GOOSE_DBSTRING", Value: GOOSEDBSTRING},
{Name: "GOOSE_MIGRATION_DIR", Value: GOOSEMIGRATIONDIR},
Expand Down
43 changes: 43 additions & 0 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,46 @@ func (c *Clickhouse) GetLatestVersion(tableName string) string {
q := `SELECT max(version_id) FROM %s`
return fmt.Sprintf(q, tableName)
}

type ClickhouseReplicated struct {
ClusterName string
}

var _ Querier = (*ClickhouseReplicated)(nil)

func (c *ClickhouseReplicated) CreateTable(tableName string) string {
q := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
)
ENGINE = ReplicatedMergeTree()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be awesome to be able to pass keeper path here as well, like

ENGINE = ReplicatedMergeTree('/clickhouse/tables/common/migrations', '{replica}')

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an endless number of ways to configure clickhouse, which means goose will need to know about all these and expose knobs for them.

I think the most practical solution would be for projects to implement the Store interface, e.g.,

#520 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an alternative would be to expose configuration options via a .yaml file. I think what I'm getting at is it's not sustainable to maintain dozens of custom clickhouse dialects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree! The Store interface looks much better

ORDER BY (date)`
return fmt.Sprintf(q, tableName, c.ClusterName)
}

func (c *ClickhouseReplicated) InsertVersion(tableName string) string {
q := `INSERT INTO %s (version_id, is_applied) VALUES ($1, $2)`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) DeleteVersion(tableName string) string {
q := `ALTER TABLE %s DELETE WHERE version_id = $1 SETTINGS mutations_sync = 2`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) GetMigrationByVersion(tableName string) string {
q := `SELECT tstamp, is_applied FROM %s WHERE version_id = $1 ORDER BY tstamp DESC LIMIT 1`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) ListMigrations(tableName string) string {
q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) GetLatestVersion(tableName string) string {
q := `SELECT max(version_id) FROM %s`
return fmt.Sprintf(q, tableName)
}
23 changes: 12 additions & 11 deletions internal/dialect/dialects.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package dialect
type Dialect string

const (
Postgres Dialect = "postgres"
Mysql Dialect = "mysql"
Sqlite3 Dialect = "sqlite3"
Sqlserver Dialect = "sqlserver"
Redshift Dialect = "redshift"
Tidb Dialect = "tidb"
Clickhouse Dialect = "clickhouse"
Vertica Dialect = "vertica"
Ydb Dialect = "ydb"
Turso Dialect = "turso"
Starrocks Dialect = "starrocks"
Postgres Dialect = "postgres"
Mysql Dialect = "mysql"
Sqlite3 Dialect = "sqlite3"
Sqlserver Dialect = "sqlserver"
Redshift Dialect = "redshift"
Tidb Dialect = "tidb"
Clickhouse Dialect = "clickhouse"
ClickhouseReplicated Dialect = "clickhouse-replicated"
Vertica Dialect = "vertica"
Ydb Dialect = "ydb"
Turso Dialect = "turso"
Starrocks Dialect = "starrocks"
)
5 changes: 5 additions & 0 deletions internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/pressly/goose/v3/internal/cfg"
"github.com/pressly/goose/v3/internal/dialect/dialectquery"
)

Expand Down Expand Up @@ -63,6 +64,10 @@ func NewStore(d Dialect) (Store, error) {
querier = &dialectquery.Tidb{}
case Clickhouse:
querier = &dialectquery.Clickhouse{}
case ClickhouseReplicated:
querier = &dialectquery.ClickhouseReplicated{
ClusterName: cfg.GOOSECLICKHOUSECLUSTERNAME,
}
case Vertica:
querier = &dialectquery.Vertica{}
case Ydb:
Expand Down
21 changes: 21 additions & 0 deletions internal/testing/integration/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ func TestClickhouseRemote(t *testing.T) {
require.Equal(t, 265, count)
}

func TestClickhouseReplicated(t *testing.T) {
t.Parallel()

db0, db1, cleanup, err := testdb.NewClickHouseReplicated(testdb.WithDebug(false))
require.NoError(t, err)
t.Cleanup(cleanup)

testDatabase(t, database.DialectClickHouseReplicated, db0, "testdata/migrations/clickhouse-replicated")

rows, err := db1.Query(`SELECT count(*) FROM clickstream`)
require.NoError(t, err)
var result int
for rows.Next() {
err = rows.Scan(&result)
require.NoError(t, err)
}
require.Equal(t, result, 3)
require.NoError(t, rows.Close())
require.NoError(t, rows.Err())
}

func TestMySQL(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS trips ON CLUSTER '{cluster}'
(
`trip_id` UInt32,
`vendor_id` Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_date` Date,
`dropoff_datetime` DateTime,
`store_and_fwd_flag` UInt8,
`rate_code_id` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`fare_amount` Float32,
`extra` Float32,
`mta_tax` Float32,
`tip_amount` Float32,
`tolls_amount` Float32,
`ehail_fee` Float32,
`improvement_surcharge` Float32,
`total_amount` Float32,
`payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
`trip_type` UInt8,
`pickup` FixedString(25),
`dropoff` FixedString(25),
`cab_type` Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
`pickup_nyct2010_gid` Int8,
`pickup_ctlabel` Float32,
`pickup_borocode` Int8,
`pickup_ct2010` String,
`pickup_boroct2010` FixedString(7),
`pickup_cdeligibil` String,
`pickup_ntacode` FixedString(4),
`pickup_ntaname` String,
`pickup_puma` UInt16,
`dropoff_nyct2010_gid` UInt8,
`dropoff_ctlabel` Float32,
`dropoff_borocode` UInt8,
`dropoff_ct2010` String,
`dropoff_boroct2010` FixedString(7),
`dropoff_cdeligibil` String,
`dropoff_ntacode` FixedString(4),
`dropoff_ntaname` String,
`dropoff_puma` UInt16
)
ENGINE = ReplicatedMergeTree()
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;

-- +goose Down
DROP TABLE IF EXISTS trips ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS clickstream ON CLUSTER '{cluster}' (
customer_id String,
time_stamp Date,
click_event_type String,
country_code FixedString(2),
source_id UInt64
)
ENGINE = ReplicatedMergeTree()
ORDER BY (time_stamp);

-- +goose Down
DROP TABLE IF EXISTS clickstream ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +goose Up
INSERT INTO clickstream VALUES ('customer1', '2021-10-02', 'add_to_cart', 'US', 568239 );

INSERT INTO clickstream (customer_id, time_stamp, click_event_type) VALUES ('customer2', '2021-10-30', 'remove_from_cart' );

INSERT INTO clickstream (* EXCEPT(country_code)) VALUES ('customer3', '2021-11-07', 'checkout', 307493 );

-- +goose Down
Loading