diff --git a/pkg/cmd/ctlstore/main.go b/pkg/cmd/ctlstore/main.go index 88add84..cb98a33 100644 --- a/pkg/cmd/ctlstore/main.go +++ b/pkg/cmd/ctlstore/main.go @@ -54,29 +54,29 @@ type sidecarConfig struct { } type reflectorCliConfig struct { - LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"` - ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"` - ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"` - UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"` - UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"` - UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"` - UpstreamShardingFamily string `conf:"upstream-sharding-family" help:"Sharding family(s) reflector is targeting"` - UpstreamShardingTable string `conf:"upstream-sharding-table" help:"Sharding tables(s) reflector is targeting"` - BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"` - BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"` - PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"` - PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"` - PollTimeout time.Duration `conf:"poll-timeout" help:"How long to poll from the source before canceling"` - QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"` - Debug bool `conf:"debug" help:"Turns on debug logging"` - LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"` - Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"` - MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"` - WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"` - WALCheckpointThresholdSize int `conf:"wal-checkpoint-threshold-size" help:"Performs a checkpoint after the WAL file exceeds this size in bytes"` - WALCheckpointType ldbwriter.CheckpointType `conf:"wal-checkpoint-type" help:"what type of checkpoint to manually perform once the wal size is exceeded"` - BusyTimeoutMS int `conf:"busy-timeout-ms" help:"Set a busy timeout on the connection string for sqlite in milliseconds"` - MultiReflector multiReflectorConfig `conf:"multi-reflector" help:"Configuration for running multiple reflectors at once"` + LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"` + ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"` + ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"` + UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"` + UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"` + UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"` + UpstreamShardingWholeFamily string `conf:"upstream-sharding-full-family" help:"Whole sharding family(s) reflector is targeting"` + UpstreamShardingFamilyTable string `conf:"upstream-sharding-family-table" help:"Fully-qualified sharding tables(s) reflector is targeting"` + BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"` + BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"` + PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"` + PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"` + PollTimeout time.Duration `conf:"poll-timeout" help:"How long to poll from the source before canceling"` + QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"` + Debug bool `conf:"debug" help:"Turns on debug logging"` + LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"` + Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"` + MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"` + WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"` + WALCheckpointThresholdSize int `conf:"wal-checkpoint-threshold-size" help:"Performs a checkpoint after the WAL file exceeds this size in bytes"` + WALCheckpointType ldbwriter.CheckpointType `conf:"wal-checkpoint-type" help:"what type of checkpoint to manually perform once the wal size is exceeded"` + BusyTimeoutMS int `conf:"busy-timeout-ms" help:"Set a busy timeout on the connection string for sqlite in milliseconds"` + MultiReflector multiReflectorConfig `conf:"multi-reflector" help:"Configuration for running multiple reflectors at once"` } type multiReflectorConfig struct { @@ -575,20 +575,20 @@ func multiReflector(ctx context.Context, args []string) { func defaultReflectorCLIConfig(isSupervisor bool) reflectorCliConfig { config := reflectorCliConfig{ - LDBPath: "", - ChangelogPath: "", - ChangelogSize: 1 * 1024 * 1024, - UpstreamDriver: "", - UpstreamDSN: "", - UpstreamLedgerTable: "ctlstore_dml_ledger", - UpstreamShardingFamily: "flagon2,cob", - UpstreamShardingTable: "flagon2___flags,cob___kvs", - BootstrapURL: "", - PollInterval: 1 * time.Second, - PollJitterCoefficient: 0.25, - QueryBlockSize: 100, - Dogstatsd: defaultDogstatsdConfig(), - PollTimeout: 5 * time.Second, + LDBPath: "", + ChangelogPath: "", + ChangelogSize: 1 * 1024 * 1024, + UpstreamDriver: "", + UpstreamDSN: "", + UpstreamLedgerTable: "ctlstore_dml_ledger", + UpstreamShardingWholeFamily: "flagon2,cob", + UpstreamShardingFamilyTable: "flagon2___flags,cob___kvs", + BootstrapURL: "", + PollInterval: 1 * time.Second, + PollJitterCoefficient: 0.25, + QueryBlockSize: 100, + Dogstatsd: defaultDogstatsdConfig(), + PollTimeout: 5 * time.Second, LedgerHealth: ledgerHealthConfig{ Disable: false, MaxHealthyLatency: time.Minute, @@ -658,8 +658,8 @@ func newReflector(cliCfg reflectorCliConfig, isSupervisor bool, i int) (*reflect Driver: cliCfg.UpstreamDriver, DSN: cliCfg.UpstreamDSN, LedgerTable: cliCfg.UpstreamLedgerTable, - ShardingFamily: cliCfg.UpstreamShardingFamily, - ShardingTable: cliCfg.UpstreamShardingTable, + ShardingFamily: cliCfg.UpstreamShardingWholeFamily, + ShardingTable: cliCfg.UpstreamShardingFamilyTable, PollInterval: cliCfg.PollInterval, PollJitterCoefficient: cliCfg.PollJitterCoefficient, QueryBlockSize: cliCfg.QueryBlockSize, diff --git a/pkg/reflector/dml_source.go b/pkg/reflector/dml_source.go index 309bfde..1b4307e 100644 --- a/pkg/reflector/dml_source.go +++ b/pkg/reflector/dml_source.go @@ -130,22 +130,34 @@ func (source *sqlDmlSource) Next(ctx context.Context) (statement schema.DMLState // Helper function to generate the SQL query func generateSQLQuery(ledgerTableName, shardingFamily, shardingTable string, blocksize int) string { + baseQuery := "SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE " + whereClause := "seq > ?" + limitClause := " ORDER BY seq LIMIT $4" + if shardingFamily != "" { - familiesStr := prepareString(shardingFamily) - tablesStr := prepareString(shardingTable) - return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? AND family_name IN $2 AND CONCAT(family_name,'___',table_name) IN $3 ORDER BY seq LIMIT $4", - ledgerTableName, - familiesStr, - tablesStr, - fmt.Sprintf("%d", blocksize)) - } else { - return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? ORDER BY seq LIMIT $2", - ledgerTableName, - fmt.Sprintf("%d", blocksize)) + whereClause += fmt.Sprintf(" AND family_name IN $2") + } + + if shardingTable != "" { + whereClause += fmt.Sprintf(" AND CONCAT(family_name,'___',table_name) IN $3") + } + + if shardingFamily != "" && shardingTable != "" { + whereClause = fmt.Sprintf("seq > ? AND (family_name IN $2 OR CONCAT(family_name,'___',table_name) IN $3)") } + + return sqlgen.SqlSprintf(baseQuery+whereClause+limitClause, + ledgerTableName, + prepareString(shardingFamily), + prepareString(shardingTable), + fmt.Sprintf("%d", blocksize)) } // Helper function to prepare the family string for SQL query func prepareString(str string) string { + if str == "" { + return "" + } + return "(\"" + strings.ReplaceAll(str, ",", "\", \"") + "\")" } diff --git a/pkg/reflector/dml_source_test.go b/pkg/reflector/dml_source_test.go index 20ffd8f..422b400 100644 --- a/pkg/reflector/dml_source_test.go +++ b/pkg/reflector/dml_source_test.go @@ -138,79 +138,73 @@ func TestSqlDmlSourceWithSharding(t *testing.T) { }{ {statement: foobar, family: "foo", table: "bar"}, {statement: foobar1, family: "foo", table: "bar1"}, - {statement: foo1bar1, family: "foo1", table: "bar1"}, {statement: foo1bar, family: "foo1", table: "bar"}, + {statement: foo1bar1, family: "foo1", table: "bar1"}, } testCases := []struct { - name string - shardingFamily string - shardingTable string - stContains []string - stNotContains []string - seqModContains []int64 - seqModNotContains []int64 - expectedErr error + name string + shardingFamily string + shardingTable string + stContains []string + seqModContains []int64 + expectedErr error }{ { - name: "Single family single table", - shardingFamily: "foo", - shardingTable: "foo___bar", - stContains: []string{foobar}, - stNotContains: []string{foobar1, foo1bar1, foo1bar}, - seqModContains: []int64{0}, - seqModNotContains: []int64{1, 2, 3}, - expectedErr: nil, + name: "Single whole family", + shardingFamily: "foo", + shardingTable: "", + stContains: []string{foobar, foobar1}, + seqModContains: []int64{0, 1}, + expectedErr: nil, }, { - name: "Single family multiple tables", - shardingFamily: "foo", - shardingTable: "foo___bar,foo___bar1", - stContains: []string{foobar, foobar1}, - stNotContains: []string{foo1bar1, foo1bar}, - seqModContains: []int64{0, 1}, - seqModNotContains: []int64{2, 3}, - expectedErr: nil, + name: "Single qualified table", + shardingFamily: "", + shardingTable: "foo___bar", + stContains: []string{foobar}, + seqModContains: []int64{0}, + expectedErr: nil, }, { - name: "Multiple families multiple tables", - shardingFamily: "foo,foo1", - shardingTable: "foo___bar,foo1___bar1", - stContains: []string{foobar, foo1bar1}, - stNotContains: []string{foo1bar, foobar1}, - seqModContains: []int64{0, 2}, - seqModNotContains: []int64{1, 3}, - expectedErr: nil, + name: "Multiple whole families", + shardingFamily: "foo,foo1", + shardingTable: "", + stContains: []string{foobar, foobar1, foo1bar, foo1bar1}, + seqModContains: []int64{0, 1, 2, 3}, + expectedErr: nil, }, { - name: "All families all tables", - shardingFamily: "foo,foo1", - shardingTable: "foo___bar,foo___bar1,foo1___bar1,foo1___bar", - stContains: []string{foobar, foobar1, foo1bar1, foo1bar}, - stNotContains: []string{}, - seqModContains: []int64{0, 1, 2, 3}, - seqModNotContains: []int64{}, - expectedErr: nil, + name: "Multiple qualified tables", + shardingFamily: "", + shardingTable: "foo___bar,foo___bar1,foo1___bar,foo1___bar1", + stContains: []string{foobar, foobar1, foo1bar, foo1bar1}, + seqModContains: []int64{0, 1, 2, 3}, + expectedErr: nil, }, { - name: "No family no table", - shardingFamily: "", - shardingTable: "", - stContains: []string{foobar, foobar1, foo1bar1, foo1bar}, - stNotContains: []string{}, - seqModContains: []int64{0, 1, 2, 3}, - seqModNotContains: []int64{}, - expectedErr: nil, + name: "Whole family and qualified table", + shardingFamily: "foo", + shardingTable: "foo1___bar1", + stContains: []string{foobar, foobar1, foo1bar1}, + seqModContains: []int64{0, 1, 3}, + expectedErr: nil, }, { - name: "Single family no table", - shardingFamily: "foo", - shardingTable: "", - stContains: []string{}, - stNotContains: []string{foobar, foobar1, foo1bar1, foo1bar}, - seqModContains: []int64{}, - seqModNotContains: []int64{0, 1, 2, 3}, - expectedErr: nil, + name: "Whole family override qualified table", + shardingFamily: "foo", + shardingTable: "foo___bar", + stContains: []string{foobar, foobar1}, + seqModContains: []int64{0, 1}, + expectedErr: nil, + }, + { + name: "No sharding", + shardingFamily: "", + shardingTable: "", + stContains: []string{foobar, foobar1, foo1bar1, foo1bar}, + seqModContains: []int64{0, 1, 2, 3}, + expectedErr: nil, }, } @@ -245,11 +239,10 @@ func TestSqlDmlSourceWithSharding(t *testing.T) { st, err := src.Next(ctx) require.NoError(t, err) require.Contains(t, tt.stContains, st.Statement) - require.NotContains(t, tt.stNotContains, st.Statement) require.True(t, st.Sequence.Int() > lastSeq) lastSeq = st.Sequence.Int() - require.Contains(t, tt.seqModContains, (lastSeq-2)%int64(len(statements))) - require.NotContains(t, tt.seqModNotContains, (lastSeq-2)%int64(len(statements))) + mod := (lastSeq - 2) % int64(len(statements)) + require.Contains(t, tt.seqModContains, mod) } _, err = src.Next(ctx) @@ -297,6 +290,11 @@ func TestPrepareString(t *testing.T) { input string expected string }{ + { + name: "Empty string", + input: "", + expected: "", + }, { name: "Single family", input: "family1", @@ -307,11 +305,6 @@ func TestPrepareString(t *testing.T) { input: "family1,family2,family3", expected: "(\"family1\", \"family2\", \"family3\")", }, - { - name: "No families", - input: "", - expected: "(\"\")", - }, { name: "Sharding table", input: "foo___bar",