Skip to content

Commit

Permalink
feat(shardid): added DHT for sharding (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi authored Apr 6, 2024
1 parent 1acbdae commit 2acb436
Show file tree
Hide file tree
Showing 13 changed files with 937 additions and 116 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [1.4.0] - 2014-04-06
### Added
- added DHT/HashRing in shardid (#30)
- added NewDHT/DHTAdd/DHTAdded/OnDHT on db (#30)

## [1.3.2] - 2014-03-28
### Added
- added module name in migration (#29)

## [1.3.1] - 2014-03-19
### Added
- added `Duration` to support `Sacnner` and `Valuer` in sql driver (#27)
Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Context struct {

index int
stmts map[string]*cachedStmt
stmtsMutex sync.RWMutex
stmtsMutex sync.Mutex
}

func (db *Context) Query(query string, args ...any) (*Rows, error) {
Expand Down
11 changes: 4 additions & 7 deletions context_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ type cachedStmt struct {
}

func (db *Context) prepareStmt(ctx context.Context, query string) (*sql.Stmt, error) {
db.stmtsMutex.RLock()
db.stmtsMutex.Lock()
defer db.stmtsMutex.Unlock()
s, ok := db.stmts[query]
db.stmtsMutex.RUnlock()

if ok {
s.Lock()
s.lastUsed = time.Now()
s.Unlock()
return s.stmt, nil
}

Expand All @@ -29,19 +28,17 @@ func (db *Context) prepareStmt(ctx context.Context, query string) (*sql.Stmt, er
return nil, err
}

db.stmtsMutex.Lock()
db.stmts[query] = &cachedStmt{
stmt: stmt,
lastUsed: time.Now(),
}
db.stmtsMutex.Unlock()

return stmt, nil
}

func (db *Context) closeIdleStmt() {
for {
<-time.After(1 * time.Minute)
<-time.After(StmtMaxIdleTime)

db.stmtsMutex.Lock()
lastActive := time.Now().Add(-1 * time.Minute)
Expand Down
82 changes: 82 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@ package sqle

import (
"database/sql"
"errors"
"sync"
"time"

"github.com/yaitoo/sqle/shardid"
)

var (
StmtMaxIdleTime = 1 * time.Minute
ErrMissingDHT = errors.New("sqle: DHT is missing")
)

type DB struct {
*Context
_ noCopy //nolint: unused

mu sync.RWMutex
dht *shardid.DHT
dbs []*Context
}

Expand All @@ -34,6 +44,78 @@ func Open(dbs ...*sql.DB) *DB {
return d
}

// Add dynamically scale out DB with new databases
func (db *DB) Add(dbs ...*sql.DB) {
db.Lock()
defer db.Unlock()

n := len(db.dbs)

for i, d := range dbs {
ctx := &Context{
DB: d,
index: n + i,
stmts: make(map[string]*cachedStmt),
}
db.dbs = append(db.dbs, ctx)
go ctx.closeIdleStmt()
}
}

// On select database from shardid.ID
func (db *DB) On(id shardid.ID) *Context {
db.mu.RLock()
defer db.mu.RUnlock()

return db.dbs[int(id.DatabaseID)]
}

// NewDHT create new DTH with databases
func (db *DB) NewDHT(dbs ...int) {
db.mu.Lock()
defer db.mu.Unlock()

db.dht = shardid.NewDHT(dbs...)
}

// DHTAdd add new databases into current DHT
func (db *DB) DHTAdd(dbs ...int) {
db.Lock()
defer db.Unlock()
if db.dht == nil {
return
}
db.dht.Add(dbs...)
}

// DHTAdded get databases added on current DHT, and reload it.
func (db *DB) DHTAdded() {
db.Lock()
defer db.Unlock()
if db.dht == nil {
return
}
db.dht.Done()
}

// OnDHT select database from DHT
func (db *DB) OnDHT(key string) (*Context, error) {
db.mu.RLock()
defer db.mu.RUnlock()

if len(db.dbs) == 1 {
return db.dbs[0], nil
}

if db.dht == nil {
return nil, ErrMissingDHT
}

cur, _, err := db.dht.On(key)

if err != nil {
return nil, err

}
return db.dbs[cur], nil
}
199 changes: 198 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func createSQLite3() *sql.DB {
return db
}

func TestSharding(t *testing.T) {
func TestOn(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -82,3 +82,200 @@ func TestSharding(t *testing.T) {
}

}

func TestDHT(t *testing.T) {
db := Open(createSQLite3())

// always work if only single server
ctx, err := db.OnDHT("")
require.Equal(t, 0, ctx.index)
require.Nil(t, err)

// MUST NOT panic even DHT is missing
db.DHTAdd(1)
db.DHTAdded()

db.Add(createSQLite3())

ctx, err = db.OnDHT("")
require.ErrorIs(t, err, ErrMissingDHT)
require.Nil(t, ctx)
}

func TestOnDHT(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)

for i := 0; i < 10; i++ {
db3 := createSQLite3()

db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck

dbs = append(dbs, db3)
}

db := Open(dbs...)

// 2 dbs -> 3 dbs -> data
// -> 2439456 1149
// 46916880 E0 0 !
// 63694499 E1 1
// <- 80472118 E2 2
// <- 84017712 S2 2
// -> 111074370 638
// 117572950 S0 0 !
// 134350569 S1 1
// <- 214987260 G2 2
// 248542498 G0 0 !
// 265320117 G1 1
// 316638712 M0 0
// 333416331 M1 1
// <- 350193950 M2 2
// <- 351179688 K2 2
// 384734926 K0 0 !
// 401512545 K1 1
// <- 484709092 O2 2
// 518264330 O0 0 !
// 535041949 O1 1
// <- 2228889920 C2 2
// 2262445158 C0 0 !
// 2279222777 C1 1
// 2330541372 I0 0
// 2347318991 I1 1
// <- 2364096610 I2 2
// 2597703348 A0 0 !
// 2600263204 Q0 0
// 2614480967 A1 1
// 2617040823 Q1 1
// <- 2631258586 A2 2
// <- 2633818442 Q2 2
// -> 4113327457 150

db.NewDHT(1, 2)

values := map[string]int{
"1149": 1,
"S0": 2,
"I2": 1,
}

for v, i := range values {

b := New().Insert("dht").
Set("v", v).
End()

c, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, i, c.index)

result, err := c.ExecBuilder(context.TODO(), b)

require.NoError(t, err)
rows, err := result.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
}

for v, i := range values {
b := New().Select("dht", "v").Where("v = {v}").Param("v", v)

ctx := db.dbs[i]

var val string
err := ctx.QueryRowBuilder(context.TODO(), b).Scan(&val)
require.NoError(t, err)
require.Equal(t, v, val)
}
}

func TestDHTScaling(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)

for i := 0; i < 10; i++ {
db3 := createSQLite3()

db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck

dbs = append(dbs, db3)
}

db := Open(dbs...)

// 2 dbs -> 3 dbs -> data
// -> 2439456 1149
// 46916880 E0 0 !
// 63694499 E1 1
// <- 80472118 E2 2
// -> 83143427 3850
// <- 84017712 S2 2
// -> 111074370 638
// 117572950 S0 0 !
// 134350569 S1 1
// <- 214987260 G2 2
// 248542498 G0 0 !
// 265320117 G1 1
// 316638712 M0 0
// 333416331 M1 1
// <- 350193950 M2 2
// <- 351179688 K2 2
// 384734926 K0 0 !
// 401512545 K1 1
// <- 484709092 O2 2
// 518264330 O0 0 !
// 535041949 O1 1
// <- 2228889920 C2 2
// 2262445158 C0 0 !
// 2279222777 C1 1
// 2330541372 I0 0
// 2347318991 I1 1
// <- 2364096610 I2 2
// 2597703348 A0 0 !
// 2600263204 Q0 0
// 2614480967 A1 1
// 2617040823 Q1 1
// <- 2631258586 A2 2
// <- 2633818442 Q2 2
// -> 4113327457 150

db.NewDHT(0, 1)

type item struct {
current int
busy bool
next int
}

values := make(map[string]item)

values["1149"] = item{current: 0, busy: false, next: 0} // Busy
values["E1"] = item{current: 0, busy: true, next: 2} // move from S0 to E2
values["3850"] = item{current: 0, busy: true, next: 2} // move from S0 to S2
values["638"] = item{current: 0, busy: false, next: 0} // keep on S0
values["150"] = item{current: 0, busy: false, next: 0} // keep on E0

for v, it := range values {
ctx, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, it.current, ctx.index)
}

db.DHTAdd(2)

for v, it := range values {
ctx, err := db.OnDHT(v)
if it.busy {
require.ErrorIs(t, err, shardid.ErrItemIsBusy)
} else {
require.NoError(t, err)
require.Equal(t, it.current, ctx.index)
}

}

db.DHTAdded()
for v, it := range values {
ctx, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, it.next, ctx.index)
}
}
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type Query[T any] struct {
}

// NewQuery create a Query
func NewQuery[T any](db *DB, options ...QueryOption[T]) Query[T] {
q := Query[T]{
func NewQuery[T any](db *DB, options ...QueryOption[T]) *Query[T] {
q := &Query[T]{
db: db,
}

for _, opt := range options {
if opt != nil {
opt(&q)
opt(q)
}
}

Expand Down
Loading

0 comments on commit 2acb436

Please sign in to comment.