From 2acb43696dc01e676f4abd859d9653b3cc9f2246 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 6 Apr 2024 23:03:35 +0800 Subject: [PATCH] feat(shardid): added DHT for sharding (#30) --- CHANGELOG.md | 10 ++ context.go | 2 +- context_stmt.go | 11 +- db.go | 82 ++++++++++++++ db_test.go | 199 +++++++++++++++++++++++++++++++- query.go | 6 +- queryer_mapr_test.go | 220 +++++++++++++++++++----------------- shardid/dht.go | 110 ++++++++++++++++++ shardid/dht_test.go | 127 +++++++++++++++++++++ shardid/hash_ring.go | 90 +++++++++++++++ shardid/hash_ring_option.go | 9 ++ shardid/hash_ring_test.go | 184 ++++++++++++++++++++++++++++++ shardid/id.go | 3 + 13 files changed, 937 insertions(+), 116 deletions(-) create mode 100644 shardid/dht.go create mode 100644 shardid/dht_test.go create mode 100644 shardid/hash_ring.go create mode 100644 shardid/hash_ring_option.go create mode 100644 shardid/hash_ring_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 39adada..fdb50d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [1.4.0] - 2014-04-06 +### Added +- added DHT/HashRing in shardid (#30) +- added NewDHT/DHTAdd/DHTAdded/OnDHT on db (#30) + +## [1.3.2] - 2014-03-28 +### Added +- added module name in migration (#29) + ## [1.3.1] - 2014-03-19 ### Added - added `Duration` to support `Sacnner` and `Valuer` in sql driver (#27) diff --git a/context.go b/context.go index ad39ec9..c38133a 100644 --- a/context.go +++ b/context.go @@ -14,7 +14,7 @@ type Context struct { index int stmts map[string]*cachedStmt - stmtsMutex sync.RWMutex + stmtsMutex sync.Mutex } func (db *Context) Query(query string, args ...any) (*Rows, error) { diff --git a/context_stmt.go b/context_stmt.go index 503fb4e..0812feb 100644 --- a/context_stmt.go +++ b/context_stmt.go @@ -14,13 +14,12 @@ type cachedStmt struct { } func (db *Context) prepareStmt(ctx context.Context, query string) (*sql.Stmt, error) { - db.stmtsMutex.RLock() + db.stmtsMutex.Lock() + defer db.stmtsMutex.Unlock() s, ok := db.stmts[query] - db.stmtsMutex.RUnlock() + if ok { - s.Lock() s.lastUsed = time.Now() - s.Unlock() return s.stmt, nil } @@ -29,19 +28,17 @@ func (db *Context) prepareStmt(ctx context.Context, query string) (*sql.Stmt, er return nil, err } - db.stmtsMutex.Lock() db.stmts[query] = &cachedStmt{ stmt: stmt, lastUsed: time.Now(), } - db.stmtsMutex.Unlock() return stmt, nil } func (db *Context) closeIdleStmt() { for { - <-time.After(1 * time.Minute) + <-time.After(StmtMaxIdleTime) db.stmtsMutex.Lock() lastActive := time.Now().Add(-1 * time.Minute) diff --git a/db.go b/db.go index 2817738..232cfc6 100644 --- a/db.go +++ b/db.go @@ -2,14 +2,24 @@ package sqle import ( "database/sql" + "errors" + "sync" + "time" "github.com/yaitoo/sqle/shardid" ) +var ( + StmtMaxIdleTime = 1 * time.Minute + ErrMissingDHT = errors.New("sqle: DHT is missing") +) + type DB struct { *Context _ noCopy //nolint: unused + mu sync.RWMutex + dht *shardid.DHT dbs []*Context } @@ -34,6 +44,78 @@ func Open(dbs ...*sql.DB) *DB { return d } +// Add dynamically scale out DB with new databases +func (db *DB) Add(dbs ...*sql.DB) { + db.Lock() + defer db.Unlock() + + n := len(db.dbs) + + for i, d := range dbs { + ctx := &Context{ + DB: d, + index: n + i, + stmts: make(map[string]*cachedStmt), + } + db.dbs = append(db.dbs, ctx) + go ctx.closeIdleStmt() + } +} + +// On select database from shardid.ID func (db *DB) On(id shardid.ID) *Context { + db.mu.RLock() + defer db.mu.RUnlock() + return db.dbs[int(id.DatabaseID)] } + +// NewDHT create new DTH with databases +func (db *DB) NewDHT(dbs ...int) { + db.mu.Lock() + defer db.mu.Unlock() + + db.dht = shardid.NewDHT(dbs...) +} + +// DHTAdd add new databases into current DHT +func (db *DB) DHTAdd(dbs ...int) { + db.Lock() + defer db.Unlock() + if db.dht == nil { + return + } + db.dht.Add(dbs...) +} + +// DHTAdded get databases added on current DHT, and reload it. +func (db *DB) DHTAdded() { + db.Lock() + defer db.Unlock() + if db.dht == nil { + return + } + db.dht.Done() +} + +// OnDHT select database from DHT +func (db *DB) OnDHT(key string) (*Context, error) { + db.mu.RLock() + defer db.mu.RUnlock() + + if len(db.dbs) == 1 { + return db.dbs[0], nil + } + + if db.dht == nil { + return nil, ErrMissingDHT + } + + cur, _, err := db.dht.On(key) + + if err != nil { + return nil, err + + } + return db.dbs[cur], nil +} diff --git a/db_test.go b/db_test.go index f9a5607..b049b6a 100644 --- a/db_test.go +++ b/db_test.go @@ -34,7 +34,7 @@ func createSQLite3() *sql.DB { return db } -func TestSharding(t *testing.T) { +func TestOn(t *testing.T) { dbs := make([]*sql.DB, 0, 10) for i := 0; i < 10; i++ { @@ -82,3 +82,200 @@ func TestSharding(t *testing.T) { } } + +func TestDHT(t *testing.T) { + db := Open(createSQLite3()) + + // always work if only single server + ctx, err := db.OnDHT("") + require.Equal(t, 0, ctx.index) + require.Nil(t, err) + + // MUST NOT panic even DHT is missing + db.DHTAdd(1) + db.DHTAdded() + + db.Add(createSQLite3()) + + ctx, err = db.OnDHT("") + require.ErrorIs(t, err, ErrMissingDHT) + require.Nil(t, ctx) +} + +func TestOnDHT(t *testing.T) { + dbs := make([]*sql.DB, 0, 10) + + for i := 0; i < 10; i++ { + db3 := createSQLite3() + + db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck + + dbs = append(dbs, db3) + } + + db := Open(dbs...) + + // 2 dbs -> 3 dbs -> data + // -> 2439456 1149 + // 46916880 E0 0 ! + // 63694499 E1 1 + // <- 80472118 E2 2 + // <- 84017712 S2 2 + // -> 111074370 638 + // 117572950 S0 0 ! + // 134350569 S1 1 + // <- 214987260 G2 2 + // 248542498 G0 0 ! + // 265320117 G1 1 + // 316638712 M0 0 + // 333416331 M1 1 + // <- 350193950 M2 2 + // <- 351179688 K2 2 + // 384734926 K0 0 ! + // 401512545 K1 1 + // <- 484709092 O2 2 + // 518264330 O0 0 ! + // 535041949 O1 1 + // <- 2228889920 C2 2 + // 2262445158 C0 0 ! + // 2279222777 C1 1 + // 2330541372 I0 0 + // 2347318991 I1 1 + // <- 2364096610 I2 2 + // 2597703348 A0 0 ! + // 2600263204 Q0 0 + // 2614480967 A1 1 + // 2617040823 Q1 1 + // <- 2631258586 A2 2 + // <- 2633818442 Q2 2 + // -> 4113327457 150 + + db.NewDHT(1, 2) + + values := map[string]int{ + "1149": 1, + "S0": 2, + "I2": 1, + } + + for v, i := range values { + + b := New().Insert("dht"). + Set("v", v). + End() + + c, err := db.OnDHT(v) + require.NoError(t, err) + require.Equal(t, i, c.index) + + result, err := c.ExecBuilder(context.TODO(), b) + + require.NoError(t, err) + rows, err := result.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rows) + } + + for v, i := range values { + b := New().Select("dht", "v").Where("v = {v}").Param("v", v) + + ctx := db.dbs[i] + + var val string + err := ctx.QueryRowBuilder(context.TODO(), b).Scan(&val) + require.NoError(t, err) + require.Equal(t, v, val) + } +} + +func TestDHTScaling(t *testing.T) { + dbs := make([]*sql.DB, 0, 10) + + for i := 0; i < 10; i++ { + db3 := createSQLite3() + + db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck + + dbs = append(dbs, db3) + } + + db := Open(dbs...) + + // 2 dbs -> 3 dbs -> data + // -> 2439456 1149 + // 46916880 E0 0 ! + // 63694499 E1 1 + // <- 80472118 E2 2 + // -> 83143427 3850 + // <- 84017712 S2 2 + // -> 111074370 638 + // 117572950 S0 0 ! + // 134350569 S1 1 + // <- 214987260 G2 2 + // 248542498 G0 0 ! + // 265320117 G1 1 + // 316638712 M0 0 + // 333416331 M1 1 + // <- 350193950 M2 2 + // <- 351179688 K2 2 + // 384734926 K0 0 ! + // 401512545 K1 1 + // <- 484709092 O2 2 + // 518264330 O0 0 ! + // 535041949 O1 1 + // <- 2228889920 C2 2 + // 2262445158 C0 0 ! + // 2279222777 C1 1 + // 2330541372 I0 0 + // 2347318991 I1 1 + // <- 2364096610 I2 2 + // 2597703348 A0 0 ! + // 2600263204 Q0 0 + // 2614480967 A1 1 + // 2617040823 Q1 1 + // <- 2631258586 A2 2 + // <- 2633818442 Q2 2 + // -> 4113327457 150 + + db.NewDHT(0, 1) + + type item struct { + current int + busy bool + next int + } + + values := make(map[string]item) + + values["1149"] = item{current: 0, busy: false, next: 0} // Busy + values["E1"] = item{current: 0, busy: true, next: 2} // move from S0 to E2 + values["3850"] = item{current: 0, busy: true, next: 2} // move from S0 to S2 + values["638"] = item{current: 0, busy: false, next: 0} // keep on S0 + values["150"] = item{current: 0, busy: false, next: 0} // keep on E0 + + for v, it := range values { + ctx, err := db.OnDHT(v) + require.NoError(t, err) + require.Equal(t, it.current, ctx.index) + } + + db.DHTAdd(2) + + for v, it := range values { + ctx, err := db.OnDHT(v) + if it.busy { + require.ErrorIs(t, err, shardid.ErrItemIsBusy) + } else { + require.NoError(t, err) + require.Equal(t, it.current, ctx.index) + } + + } + + db.DHTAdded() + for v, it := range values { + ctx, err := db.OnDHT(v) + require.NoError(t, err) + require.Equal(t, it.next, ctx.index) + } +} diff --git a/query.go b/query.go index 173a6c0..4903398 100644 --- a/query.go +++ b/query.go @@ -20,14 +20,14 @@ type Query[T any] struct { } // NewQuery create a Query -func NewQuery[T any](db *DB, options ...QueryOption[T]) Query[T] { - q := Query[T]{ +func NewQuery[T any](db *DB, options ...QueryOption[T]) *Query[T] { + q := &Query[T]{ db: db, } for _, opt := range options { if opt != nil { - opt(&q) + opt(q) } } diff --git a/queryer_mapr_test.go b/queryer_mapr_test.go index 60d5d69..51e0373 100644 --- a/queryer_mapr_test.go +++ b/queryer_mapr_test.go @@ -120,18 +120,18 @@ func TestFirst(t *testing.T) { name string wanted MRUser wantErr error - query func() Query[MRUser] - first func(q Query[MRUser]) (MRUser, error) + query func() *Query[MRUser] + first func(q *Query[MRUser]) (MRUser, error) }{ { name: "1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithQueryer[MRUser](&MapR[MRUser]{ dbs: db.dbs, })) }, wanted: MRUser{ID: 2}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 2").End()) @@ -139,11 +139,11 @@ func TestFirst(t *testing.T) { }, { name: "3rd_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: MRUser{ID: 31}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 31").End()) @@ -151,11 +151,11 @@ func TestFirst(t *testing.T) { }, { name: "last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: MRUser{ID: 94}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 94").End()) @@ -163,11 +163,11 @@ func TestFirst(t *testing.T) { }, { name: "month_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithMonths[MRUser](m202402, m202403)) }, wanted: MRUser{ID: 20240204}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 20240204").End()) @@ -175,11 +175,11 @@ func TestFirst(t *testing.T) { }, { name: "month_on_6th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithMonths[MRUser](m202402, m202403)) }, wanted: MRUser{ID: 20240354}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 20240354").End()) @@ -187,11 +187,11 @@ func TestFirst(t *testing.T) { }, { name: "month_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithMonths[MRUser](m202402, m202403)) }, wanted: MRUser{ID: 20240394}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 20240394").End()) @@ -199,11 +199,11 @@ func TestFirst(t *testing.T) { }, { name: "week_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: MRUser{ID: 202400504}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 202400504").End()) @@ -211,11 +211,11 @@ func TestFirst(t *testing.T) { }, { name: "week_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: MRUser{ID: 202400654}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 202400654").End()) @@ -223,11 +223,11 @@ func TestFirst(t *testing.T) { }, { name: "week_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: MRUser{ID: 202400694}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 202400694").End()) @@ -235,11 +235,11 @@ func TestFirst(t *testing.T) { }, { name: "day_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) }, wanted: MRUser{ID: 2024020104}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 2024020104").End()) @@ -247,11 +247,11 @@ func TestFirst(t *testing.T) { }, { name: "day_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) }, wanted: MRUser{ID: 2024020154}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 2024020154").End()) @@ -259,11 +259,23 @@ func TestFirst(t *testing.T) { }, { name: "day_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) }, wanted: MRUser{ID: 2024020294}, - first: func(q Query[MRUser]) (MRUser, error) { + first: func(q *Query[MRUser]) (MRUser, error) { + return q.First(context.Background(), New(). + Select("users", "id"). + Where("id = 2024020294").End()) + }, + }, + { + name: "day_on_last_db_should_work", + query: func() *Query[MRUser] { + return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) + }, + wanted: MRUser{ID: 2024020294}, + first: func(q *Query[MRUser]) (MRUser, error) { return q.First(context.Background(), New(). Select("users", "id"). Where("id = 2024020294").End()) @@ -293,18 +305,18 @@ func TestCount(t *testing.T) { name string wanted int wantErr error - query func() Query[int] - count func(q Query[int]) (int, error) + query func() *Query[int] + count func(q *Query[int]) (int, error) }{ { name: "1st_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithQueryer[int](&MapR[int]{ dbs: db.dbs, })) }, wanted: 3, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("id < 4"). @@ -313,11 +325,11 @@ func TestCount(t *testing.T) { }, { name: "3_dbs_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db) }, wanted: 11, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("id < 24").End()) @@ -325,22 +337,22 @@ func TestCount(t *testing.T) { }, { name: "all_dbs_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db) }, wanted: 40, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)")) }, }, { name: "month_on_1st_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithMonths[int](m202402, m202403)) }, wanted: 7, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("( id > 20240200 AND id < 20240204) OR ( id >= 20240300 AND id < 20240305)").End()) @@ -348,11 +360,11 @@ func TestCount(t *testing.T) { }, { name: "month_on_6th_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithMonths[int](m202402, m202403)) }, wanted: 7, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 20240250 AND id < 20240254) OR ( id >= 20240350 AND id < 20240355)").End()) @@ -360,11 +372,11 @@ func TestCount(t *testing.T) { }, { name: "month_on_last_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithMonths[int](m202402, m202403)) }, wanted: 7, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 20240290 AND id < 20240294) OR ( id >= 20240390 AND id < 20240395)").End()) @@ -372,11 +384,11 @@ func TestCount(t *testing.T) { }, { name: "week_on_1st_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithWeeks[int](w20240201, w20240208)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 202400500 AND id < 202400504) OR ( id >= 202400600 AND id < 202400604)").End()) @@ -384,11 +396,11 @@ func TestCount(t *testing.T) { }, { name: "week_on_5th_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithWeeks[int](w20240201, w20240208)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 202400550 AND id < 202400554) OR ( id >= 202400650 AND id < 202400654)").End()) @@ -396,11 +408,11 @@ func TestCount(t *testing.T) { }, { name: "week_on_last_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithWeeks[int](w20240201, w20240208)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 202400590 AND id < 202400594) OR ( id >= 202400690 AND id < 202400694)").End()) @@ -408,11 +420,11 @@ func TestCount(t *testing.T) { }, { name: "day_on_1st_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithDays[int](d20240201, d20240202)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 2024020100 AND id < 2024020104) OR ( id > 2024020200 AND id < 2024020204)").End()) @@ -420,11 +432,11 @@ func TestCount(t *testing.T) { }, { name: "day_on_5th_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithDays[int](d20240201, d20240202)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 2024020150 AND id < 2024020154) OR ( id > 2024020250 AND id < 2024020254)").End()) @@ -432,11 +444,11 @@ func TestCount(t *testing.T) { }, { name: "day_on_last_db_should_work", - query: func() Query[int] { + query: func() *Query[int] { return NewQuery[int](db, WithDays[int](d20240201, d20240202)) }, wanted: 6, - count: func(q Query[int]) (int, error) { + count: func(q *Query[int]) (int, error) { return q.Count(context.Background(), New(). Select("users", "count(id)"). Where("(id > 2024020190 AND id < 2024020194) OR ( id > 2024020290 AND id < 2024020294)").End()) @@ -465,12 +477,12 @@ func TestQuery(t *testing.T) { name string wanted []MRUser wantErr error - query func() Query[MRUser] - queryRows func(q Query[MRUser]) ([]MRUser, error) + query func() *Query[MRUser] + queryRows func(q *Query[MRUser]) ([]MRUser, error) }{ { name: "1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithQueryer[MRUser](&MapR[MRUser]{ dbs: db.dbs, })) @@ -479,7 +491,7 @@ func TestQuery(t *testing.T) { {ID: 1}, {ID: 2}, {ID: 3}}, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("id < 4"). @@ -490,7 +502,7 @@ func TestQuery(t *testing.T) { }, { name: "3_dbs_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: []MRUser{{ID: 1}, @@ -500,7 +512,7 @@ func TestQuery(t *testing.T) { {ID: 14}, {ID: 21}, {ID: 22}, {ID: 23}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("id < 24"). @@ -511,7 +523,7 @@ func TestQuery(t *testing.T) { }, { name: "all_dbs_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: []MRUser{{ID: 1}, {ID: 2}, {ID: 3}, {ID: 4}, @@ -524,7 +536,7 @@ func TestQuery(t *testing.T) { {ID: 71}, {ID: 72}, {ID: 73}, {ID: 74}, {ID: 81}, {ID: 82}, {ID: 83}, {ID: 84}, {ID: 91}, {ID: 92}, {ID: 93}, {ID: 94}}, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). SQL("ORDER BY id"), func(i, j MRUser) bool { @@ -534,14 +546,14 @@ func TestQuery(t *testing.T) { }, { name: "month_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ {ID: 20240201}, {ID: 20240202}, {ID: 20240203}, {ID: 20240301}, {ID: 20240302}, {ID: 20240303}, {ID: 20240304}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("( id > 20240200 AND id < 20240204) OR ( id >= 20240300 AND id < 20240305)"). @@ -552,14 +564,14 @@ func TestQuery(t *testing.T) { }, { name: "month_on_6th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ {ID: 20240251}, {ID: 20240252}, {ID: 20240253}, {ID: 20240351}, {ID: 20240352}, {ID: 20240353}, {ID: 20240354}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 20240250 AND id < 20240254) OR ( id >= 20240350 AND id < 20240355)"). @@ -570,14 +582,14 @@ func TestQuery(t *testing.T) { }, { name: "month_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ {ID: 20240291}, {ID: 20240292}, {ID: 20240293}, {ID: 20240391}, {ID: 20240392}, {ID: 20240393}, {ID: 20240394}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 20240290 AND id < 20240294) OR ( id >= 20240390 AND id < 20240395)"). @@ -588,14 +600,14 @@ func TestQuery(t *testing.T) { }, { name: "week_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ {ID: 202400501}, {ID: 202400502}, {ID: 202400503}, {ID: 202400601}, {ID: 202400602}, {ID: 202400603}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 202400500 AND id < 202400504) OR ( id >= 202400600 AND id < 202400604)"). @@ -606,14 +618,14 @@ func TestQuery(t *testing.T) { }, { name: "week_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ {ID: 202400551}, {ID: 202400552}, {ID: 202400553}, {ID: 202400651}, {ID: 202400652}, {ID: 202400653}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 202400550 AND id < 202400554) OR ( id >= 202400650 AND id < 202400654)"). @@ -624,14 +636,14 @@ func TestQuery(t *testing.T) { }, { name: "week_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ {ID: 202400591}, {ID: 202400592}, {ID: 202400593}, {ID: 202400691}, {ID: 202400692}, {ID: 202400693}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 202400590 AND id < 202400594) OR ( id >= 202400690 AND id < 202400694)"). @@ -642,14 +654,14 @@ func TestQuery(t *testing.T) { }, { name: "day_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ {ID: 2024020101}, {ID: 2024020102}, {ID: 2024020103}, {ID: 2024020201}, {ID: 2024020202}, {ID: 2024020203}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 2024020100 AND id < 2024020104) OR ( id > 2024020200 AND id < 2024020204)"). @@ -660,14 +672,14 @@ func TestQuery(t *testing.T) { }, { name: "day_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ {ID: 2024020151}, {ID: 2024020152}, {ID: 2024020153}, {ID: 2024020251}, {ID: 2024020252}, {ID: 2024020253}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 2024020150 AND id < 2024020154) OR ( id > 2024020250 AND id < 2024020254)"). @@ -678,14 +690,14 @@ func TestQuery(t *testing.T) { }, { name: "day_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ {ID: 2024020191}, {ID: 2024020192}, {ID: 2024020193}, {ID: 2024020291}, {ID: 2024020292}, {ID: 2024020293}, }, - queryRows: func(q Query[MRUser]) ([]MRUser, error) { + queryRows: func(q *Query[MRUser]) ([]MRUser, error) { return q.Query(context.Background(), New(). Select("users", "id"). Where("(id > 2024020190 AND id < 2024020194) OR ( id > 2024020290 AND id < 2024020294)"). @@ -719,12 +731,12 @@ func TestQueryLimit(t *testing.T) { wanted []MRUser wantErr error limit int - query func() Query[MRUser] - queryLimit func(q Query[MRUser], limit int) ([]MRUser, error) + query func() *Query[MRUser] + queryLimit func(q *Query[MRUser], limit int) ([]MRUser, error) }{ { name: "1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithQueryer[MRUser](&MapR[MRUser]{ dbs: db.dbs, })) @@ -735,7 +747,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 3}, }, limit: 2, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("id < 4"). @@ -746,7 +758,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "3_dbs_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: []MRUser{ @@ -766,7 +778,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 23}, }, limit: 5, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("id < 24"). @@ -777,7 +789,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "all_dbs_desc_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db) }, wanted: []MRUser{ @@ -797,7 +809,7 @@ func TestQueryLimit(t *testing.T) { {64}, {63}, {62}, {61}, }, limit: 16, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). SQL("ORDER BY id DESC"), func(i, j MRUser) bool { @@ -808,7 +820,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "month_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ @@ -821,7 +833,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 20240304}, }, limit: 5, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("( id > 20240200 AND id < 20240204) OR ( id >= 20240300 AND id < 20240305)"). @@ -832,7 +844,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "month_on_6th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ @@ -845,7 +857,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 20240354}, }, limit: 6, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 20240250 AND id < 20240254) OR ( id >= 20240350 AND id < 20240355)"). @@ -856,7 +868,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "month_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithMonths[MRUser](m202402, m202403)) }, wanted: []MRUser{ @@ -869,7 +881,7 @@ func TestQueryLimit(t *testing.T) { {ID: 20240394}, }, limit: 8, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 20240290 AND id < 20240294) OR ( id >= 20240390 AND id < 20240395)"). @@ -880,7 +892,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "week_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ @@ -892,7 +904,7 @@ func TestQueryLimit(t *testing.T) { {ID: 202400603}, }, limit: 6, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 202400500 AND id < 202400504) OR ( id >= 202400600 AND id < 202400604)"). @@ -903,7 +915,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "week_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ @@ -915,7 +927,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 202400653}, }, limit: 5, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 202400550 AND id < 202400554) OR ( id >= 202400650 AND id < 202400654)"). @@ -926,7 +938,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "week_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithWeeks[MRUser](w20240201, w20240208)) }, wanted: []MRUser{ @@ -938,7 +950,7 @@ func TestQueryLimit(t *testing.T) { {ID: 202400693}, }, limit: 8, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 202400590 AND id < 202400594) OR ( id >= 202400690 AND id < 202400694)"). @@ -949,7 +961,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "day_desc_on_1st_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ @@ -961,7 +973,7 @@ func TestQueryLimit(t *testing.T) { {ID: 2024020203}, }, limit: 1, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 2024020100 AND id < 2024020104) OR ( id > 2024020200 AND id < 2024020204)"). @@ -972,7 +984,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "day_on_5th_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery(db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ @@ -984,7 +996,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 2024020253}, }, limit: 4, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 2024020150 AND id < 2024020154) OR ( id > 2024020250 AND id < 2024020254)"). @@ -995,7 +1007,7 @@ func TestQueryLimit(t *testing.T) { }, { name: "day_on_last_db_should_work", - query: func() Query[MRUser] { + query: func() *Query[MRUser] { return NewQuery[MRUser](db, WithDays[MRUser](d20240201, d20240202)) }, wanted: []MRUser{ @@ -1007,7 +1019,7 @@ func TestQueryLimit(t *testing.T) { // {ID: 2024020293}, }, limit: 3, - queryLimit: func(q Query[MRUser], limit int) ([]MRUser, error) { + queryLimit: func(q *Query[MRUser], limit int) ([]MRUser, error) { return q.QueryLimit(context.Background(), New(). Select("users", "id"). Where("(id > 2024020190 AND id < 2024020194) OR ( id > 2024020290 AND id < 2024020294)"). diff --git a/shardid/dht.go b/shardid/dht.go new file mode 100644 index 0000000..5584502 --- /dev/null +++ b/shardid/dht.go @@ -0,0 +1,110 @@ +package shardid + +import ( + "errors" + "slices" + "sync" +) + +var ErrItemIsBusy = errors.New("sqle: item is busy, waiting for scaling done") + +// DHT distributed hash table +type DHT struct { + sync.RWMutex + current *HashRing + next *HashRing + + dbsCount int + dbs map[int]int + + affectedDbs []int + affectedVNodes map[uint32]bool +} + +// NewDHT create a distributed hash table between databases +func NewDHT(dbs ...int) *DHT { + m := &DHT{ + dbs: map[int]int{}, + dbsCount: len(dbs), + affectedVNodes: make(map[uint32]bool), + } + + for i, db := range dbs { + m.dbs[i] = db + } + + m.current = NewHR(m.dbsCount, WithReplicas(defaultReplicas...)) + + return m +} + +// On locate database with v from current/next HashRing, return ErrItemIsBusy if it is on affected database +func (m *DHT) On(v string) (int, int, error) { + m.RLock() + defer m.RUnlock() + + i, n := m.current.On(v) + + current := m.dbs[i] + + ok := m.affectedVNodes[n] + if ok { + n, _ := m.next.On(v) + if n == i { + return current, current, nil + } + + return current, m.dbs[n], ErrItemIsBusy + } + + return current, current, nil +} + +// Done dbs are added, then reset current/next HashRing +func (m *DHT) Done() { + m.Lock() + defer m.Unlock() + + m.affectedDbs = nil + m.affectedVNodes = make(map[uint32]bool) + m.current = m.next + m.next = nil +} + +// Add dynamically add databases, and return affected database +func (m *DHT) Add(dbs ...int) []int { + m.Lock() + defer m.Unlock() + + for i, db := range dbs { + m.dbs[m.dbsCount+i] = db + } + + m.dbsCount += len(dbs) + m.next = NewHR(m.dbsCount, WithReplicas(defaultReplicas...)) + var ( + db1 int + db2 int + ) + + affectedDbs := make(map[int]bool) + + for _, v := range m.current.vNodes { + db1 = m.current.getPreviousDB(v) + db2 = m.next.getPreviousDB(v) + + if db1 != db2 { // the node's previous db is changed, data should be checked if it should be migrated to previous db + affectedDbs[m.current.dbs[v]] = true + m.affectedVNodes[v] = true + } + } + + if len(affectedDbs) > 0 { + for k := range affectedDbs { + m.affectedDbs = append(m.affectedDbs, k) + } + slices.Sort(m.affectedDbs) + } + + return m.affectedDbs +} diff --git a/shardid/dht_test.go b/shardid/dht_test.go new file mode 100644 index 0000000..2a25472 --- /dev/null +++ b/shardid/dht_test.go @@ -0,0 +1,127 @@ +package shardid + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDHT(t *testing.T) { + + // 2 dbs -> 3 dbs -> data + // -> 2439456 1149 + // 46916880 E0 0 ! + // 63694499 E1 1 + // <- 80472118 E2 2 + // -> 83143427 3850 + // <- 84017712 S2 2 + // -> 111074370 638 + // 117572950 S0 0 ! + // 134350569 S1 1 + // <- 214987260 G2 2 + // 248542498 G0 0 ! + // 265320117 G1 1 + // 316638712 M0 0 + // 333416331 M1 1 + // <- 350193950 M2 2 + // <- 351179688 K2 2 + // 384734926 K0 0 ! + // 401512545 K1 1 + // <- 484709092 O2 2 + // 518264330 O0 0 ! + // 535041949 O1 1 + // <- 2228889920 C2 2 + // 2262445158 C0 0 ! + // 2279222777 C1 1 + // 2330541372 I0 0 + // 2347318991 I1 1 + // <- 2364096610 I2 2 + // 2597703348 A0 0 ! + // 2600263204 Q0 0 + // 2614480967 A1 1 + // 2617040823 Q1 1 + // <- 2631258586 A2 2 + // <- 2633818442 Q2 2 + // -> 4113327457 150 + + m := NewDHT(1, 2) + m.Add(3) + + vn := map[uint32]bool{ + 46916880: true, // E0 + 117572950: true, // S0 + 248542498: true, // G0 + 384734926: true, // K0 + 518264330: true, // O0 + 2262445158: true, // C0 + 2597703348: true, // A0 + } + require.Equal(t, vn, m.affectedVNodes) + require.Equal(t, []int{0}, m.affectedDbs) + + // vNode E0 is affected, but 1149 is unnecessary to move + cur, next, err := m.On("1149") + require.Equal(t, 1, cur) + require.Equal(t, 1, next) + require.NoError(t, err) // < E0! => E0! first node + + // vNode S0 is affected, and 3850 should be moved from S0 to S2 + cur, next, err = m.On("3850") + require.Equal(t, 1, cur) + require.Equal(t, 3, next) + require.ErrorIs(t, err, ErrItemIsBusy) // < S0! => S2 + + // vNode S0 is affected, but 638 is unnecessary to move + cur, next, err = m.On("638") + require.Equal(t, 1, cur) + require.Equal(t, 1, next) + require.NoError(t, err) // S0! => S0! + + // vNode E1 is not affected + cur, next, err = m.On("E0") // v equals E0, and on vNode E1 + require.Equal(t, 2, cur) + require.Equal(t, 2, next) + require.Nil(t, err) // == E1 => E1 + + // vNode S0 is affected, and E1 should be moved from S0 to E2 + cur, next, err = m.On("E1") + require.Equal(t, 1, cur) + require.Equal(t, 3, next) + require.ErrorIs(t, err, ErrItemIsBusy) // == E1 => S0! + + // vNode S1 is not affected + cur, next, err = m.On("S0") + require.Equal(t, 2, cur) + require.Equal(t, 2, next) + require.Nil(t, err) // == S0! => S1 + + // vNode C1 is not affected + cur, next, err = m.On("C0") + require.Equal(t, 2, cur) + require.Equal(t, 2, next) + require.Nil(t, err) // == C0! => C1 + + // vNode I0 is not affected + cur, next, err = m.On("C1") + require.Equal(t, 1, cur) + require.Equal(t, 1, next) + require.Nil(t, err) // == C1 => I0 + + // vNode E0 is affected, but 150 is unnecessary to move from E0 to Q2 + cur, next, err = m.On("150") + require.Equal(t, 1, cur) + require.Equal(t, 1, next) + require.Nil(t, err) // > Q1 last node => E0! + + m.Done() + + cur, next, err = m.On("E1") + require.Equal(t, 3, cur) + require.Equal(t, 3, next) + require.Nil(t, err) + + cur, next, err = m.On("150") + require.Equal(t, 1, cur) + require.Equal(t, 1, next) + require.Nil(t, err) +} diff --git a/shardid/hash_ring.go b/shardid/hash_ring.go new file mode 100644 index 0000000..21ac1f6 --- /dev/null +++ b/shardid/hash_ring.go @@ -0,0 +1,90 @@ +package shardid + +import ( + "hash/fnv" + "slices" + "strconv" +) + +var ( + defaultReplicas = []string{"A", "C", "E", "G", "I", "K", "M", "O", "Q", "S"} +) + +// HashRing implement consistent hashing for database sharding with hash key +type HashRing struct { + dbCount int + dbs map[uint32]int + + vnCount int + vNodes []uint32 + + replicas []string +} + +// NewHR create HashRing with n dbs and virtual nodes +func NewHR(n int, options ...HashRingOption) *HashRing { + r := &HashRing{ + dbCount: n, + dbs: make(map[uint32]int), + } + + for _, o := range options { + o(r) + } + + if len(r.replicas) == 0 { + r.replicas = defaultReplicas + } + + r.vnCount = n * len(r.replicas) + + for i := 0; i < n; i++ { + for _, v := range r.replicas { + k := getHash(v + strconv.Itoa(i)) + r.dbs[k] = i + r.vNodes = append(r.vNodes, k) + } + } + + slices.Sort(r.vNodes) + + return r +} + +// On locate db and vNode for data v +func (r *HashRing) On(v string) (int, uint32) { + k := getHash(v) + + var found uint32 + for i, n := range r.vNodes { + if n > k { + found = r.vNodes[i] + break + } + } + + if found == 0 { + found = r.vNodes[0] + } + + return r.dbs[found], found +} + +// getPreviousDB get previous db for node v +func (r *HashRing) getPreviousDB(v uint32) int { + i, _ := slices.BinarySearch(r.vNodes, v) + + // first node, its previous node is last one + if i == 0 { + return r.dbs[r.vNodes[r.vnCount-1]] + } + + return r.dbs[r.vNodes[i-1]] +} + +// getHash get hash for data v +func getHash(v string) uint32 { + h := fnv.New32a() + h.Write([]byte(v)) // nolint: errcheck + return h.Sum32() +} diff --git a/shardid/hash_ring_option.go b/shardid/hash_ring_option.go new file mode 100644 index 0000000..a0de085 --- /dev/null +++ b/shardid/hash_ring_option.go @@ -0,0 +1,9 @@ +package shardid + +type HashRingOption func(r *HashRing) + +func WithReplicas(nodes ...string) HashRingOption { + return func(r *HashRing) { + r.replicas = nodes + } +} diff --git a/shardid/hash_ring_test.go b/shardid/hash_ring_test.go new file mode 100644 index 0000000..f6d5850 --- /dev/null +++ b/shardid/hash_ring_test.go @@ -0,0 +1,184 @@ +package shardid + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHR(t *testing.T) { + + // Hash i + // 46916880 E0 0 + // 63694499 E1 1 + // 80472118 E2 2 + // 84017712 S2 2 + // 97249737 E3 3 + // 100795331 S3 3 + // 114027356 E4 4 + // 117572950 S0 0 + // 134350569 S1 1 + // 181432022 G4 4 + // 184683426 S4 4 + // 214987260 G2 2 + // 231764879 G3 3 + // 248542498 G0 0 + // 265320117 G1 1 + // 316638712 M0 0 + // 333416331 M1 1 + // 350193950 M2 2 + // 351179688 K2 2 + // 366971569 M3 3 + // 367957307 K3 3 + // 383749188 M4 4 + // 384734926 K0 0 + // 401512545 K1 1 + // 451153854 O4 4 + // 451845402 K4 4 + // 484709092 O2 2 + // 501486711 O3 3 + // 518264330 O0 0 + // 535041949 O1 1 + // 2228889920 C2 2 + // 2245667539 C3 3 + // 2262445158 C0 0 + // 2263430896 I4 4 + // 2279222777 C1 1 + // 2329555634 C4 4 + // 2330541372 I0 0 + // 2347318991 I1 1 + // 2364096610 I2 2 + // 2380874229 I3 3 + // 2530592872 A4 4 + // 2533152728 Q4 4 + // 2597703348 A0 0 + // 2600263204 Q0 0 + // 2614480967 A1 1 + // 2617040823 Q1 1 + // 2631258586 A2 2 + // 2633818442 Q2 2 + // 2648036205 A3 3 + // 2650596061 Q3 3 + + // 4294967295 MaxUint32 + + tests := []struct { + name string + dbs int + data []string + wantedDbs []int + wantedNodes []uint32 + }{ + // 2439456 1149 <- + // 46916880 E0 0 <- + // 117572950 S0 0 + // 248542498 G0 0 + // 316638712 M0 0 + // 384734926 K0 0 + // 518264330 O0 0 + // 2262445158 C0 0 + // 2329555634 C4 <- + // 2330541372 I0 0 + // 2597703348 A0 0 + // 2600263204 Q0 0 <- + // 4113327457 150 <- + { + name: "single_db_should_work", + dbs: 1, + data: []string{ + "1149", // 2439456 < E0 + "E0", // == E0 + "C4", // E0 < 2329555634 < Q0 + "Q0", // == Q0 + "150", // 2631258586 > Q0 + }, + wantedDbs: []int{ + 0, + 0, + 0, + 0, + 0, + }, + wantedNodes: []uint32{ + 46916880, // E0 + 117572950, // S0 + 2330541372, // I0 + 46916880, // E0 + 46916880, // E0 + }, + }, + // 2439456 1149 <- + // 46916880 E0 0 <- + // 63694499 E1 1 + // 80472118 E2 2 + // 84017712 S2 2 + // 117572950 S0 0 + // 134350569 S1 1 + // 214987260 G2 2 + // 248542498 G0 0 + // 265320117 G1 1 + // 316638712 M0 0 + // 333416331 M1 1 + // 350193950 M2 2 + // 351179688 K2 2 + // 384734926 K0 0 + // 401512545 K1 1 + // 484709092 O2 2 + // 518264330 O0 0 + // 535041949 O1 1 + // 2228889920 C2 2 + // 2262445158 C0 0 + // 2279222777 C1 1 + // 2329555634 C4 <- + // 2330541372 I0 0 + // 2347318991 I1 1 + // 2364096610 I2 2 + // 2597703348 A0 0 + // 2600263204 Q0 0 + // 2614480967 A1 1 + // 2617040823 Q1 1 + // 2631258586 A2 2 + // 2633818442 Q2 2 <- + // 4113327457 150 <- + + { + name: "multi_dbs_should_work", + dbs: 3, + data: []string{ + "1149", // 2439456 < E0 + "E0", // == E0 + "C4", // E0 < 2329555634 < Q2 + "Q2", // == Q2 + "150", // 2631258586 > Q0 + }, + wantedDbs: []int{ + 0, + 1, + 0, + 0, + 0, + }, + wantedNodes: []uint32{ + 46916880, // E0 + 63694499, // E1 + 2330541372, // I0 + 46916880, // E0 + 46916880, // E0 + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + as := require.New(t) + hr := NewHR(test.dbs) + + for i, v := range test.data { + d, vn := hr.On(v) + as.Equal(test.wantedDbs[i], d, v) + as.Equal(test.wantedNodes[i], vn, v) + } + }) + } + +} diff --git a/shardid/id.go b/shardid/id.go index 2076de3..f960b50 100644 --- a/shardid/id.go +++ b/shardid/id.go @@ -36,6 +36,7 @@ const ( MaxTimeMillis int64 = -1 ^ (-1 << TimeMillisBits) ) +// TableRotate table rotation option type TableRotate int8 var ( @@ -45,6 +46,7 @@ var ( DailyRotate TableRotate = 3 ) +// ID shardid info type ID struct { Time time.Time Int64 int64 @@ -57,6 +59,7 @@ type ID struct { TableRotate TableRotate } +// RotateName format time parts as rotated table name suffix func (i *ID) RotateName() string { // skipcq: GO-W1029 switch i.TableRotate { case DailyRotate: