Skip to content

Commit

Permalink
Faster pool implementation based on Redis client.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed May 10, 2015
1 parent 6fd505f commit 019b5fd
Show file tree
Hide file tree
Showing 11 changed files with 428 additions and 309 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go:

install:
- go get gopkg.in/bufio.v1
- go get gopkg.in/bsm/ratelimit.v1
- go get github.com/golang/glog
- go get github.com/go-sql-driver/mysql
- go get github.com/lib/pq
Expand Down
9 changes: 4 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type conn struct {
elem *list.Element
}

func newConnFunc(opt *Options) func() (*conn, error) {
func newConnDialer(opt *Options) func() (*conn, error) {
return func() (*conn, error) {
netcn, err := dial(opt)
if err != nil {
Expand All @@ -51,6 +51,9 @@ func newConnFunc(opt *Options) func() (*conn, error) {
if err := cn.Startup(); err != nil {
return nil, err
}
if err := setParams(cn, opt.Params); err != nil {
return nil, err
}
return cn, nil
}
}
Expand All @@ -60,10 +63,6 @@ func (cn *conn) GenId() string {
return strconv.FormatInt(cn._id, 10)
}

func (cn *conn) IsIdle(dur time.Duration) bool {
return time.Since(cn.usedAt) > dur
}

func (cn *conn) SetReadTimeout(dur time.Duration) {
if dur == 0 {
cn.cn.SetReadDeadline(zeroTime)
Expand Down
28 changes: 18 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Options struct {
// Params specify connection run-time configuration parameters.
Params map[string]interface{}

PoolSize int
PoolSize int
PoolTimeout time.Duration

DialTimeout time.Duration
ReadTimeout time.Duration
Expand Down Expand Up @@ -89,6 +90,13 @@ func (opt *Options) getPoolSize() int {
return opt.PoolSize
}

func (opt *Options) getPoolTimeout() time.Duration {
if opt == nil || opt.PoolTimeout == 0 {
return 3 * time.Second
}
return opt.PoolTimeout
}

func (opt *Options) getDialTimeout() time.Duration {
if opt.DialTimeout == 0 {
return 5 * time.Second
Expand All @@ -113,8 +121,14 @@ func (opt *Options) getSSL() bool {

func Connect(opt *Options) *DB {
return &DB{
opt: opt,
pool: newConnPool(opt),
opt: opt,
pool: newConnPool(&connPoolOptions{
Dialer: newConnDialer(opt),
PoolSize: opt.getPoolSize(),
PoolTimeout: opt.getPoolTimeout(),
IdleTimeout: opt.getIdleTimeout(),
IdleCheckFrequency: opt.getIdleCheckFrequency(),
}),
}
}

Expand All @@ -129,17 +143,11 @@ func (db *DB) Close() error {
}

func (db *DB) conn() (*conn, error) {
cn, isNew, err := db.pool.Get()
cn, err := db.pool.Get()
if err != nil {
return nil, err
}

if isNew {
if err := setParams(cn, db.opt.Params); err != nil {
return nil, err
}
}

cn.SetReadTimeout(db.opt.ReadTimeout)
cn.SetWriteTimeout(db.opt.WriteTimeout)
return cn, nil
Expand Down
2 changes: 2 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ func ExampleListener() {

wait := make(chan struct{})
go func() {
wait <- struct{}{}
channel, payload, err := ln.Receive()
fmt.Printf("%s %q %v", channel, payload, err)
wait <- struct{}{}
}()

<-wait
db.Exec("NOTIFY mychan, ?", "hello world")
<-wait

Expand Down
70 changes: 52 additions & 18 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,58 @@ func (t *ListenerTest) TearDownTest(c *C) {
}

func (t *ListenerTest) TestListenNotify(c *C) {
wait := make(chan struct{}, 1)
go func() {
wait <- struct{}{}
channel, payload, err := t.ln.Receive()
c.Assert(err, IsNil)
c.Assert(channel, Equals, "test_channel")
c.Assert(payload, Equals, "")
wait <- struct{}{}
}()

select {
case <-wait:
// ok
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}

_, err := t.db.Exec("NOTIFY test_channel")
c.Assert(err, IsNil)

channel, payload, err := t.ln.Receive()
c.Assert(err, IsNil)
c.Assert(channel, Equals, "test_channel")
c.Assert(payload, Equals, "")
select {
case <-wait:
// ok
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}
}

func (t *ListenerTest) TestCloseAbortsListener(c *C) {
done := make(chan struct{})
wait := make(chan struct{}, 1)
go func() {
wait <- struct{}{}
_, _, err := t.ln.Receive()
c.Assert(err.Error(), Equals, "read tcp 127.0.0.1:5432: use of closed network connection")
close(done)
c.Assert(err, ErrorMatches, `^(.*use of closed network connection|EOF)$`)
wait <- struct{}{}
}()

select {
case <-done:
c.Fail()
case <-time.After(1 * time.Second):
case <-wait:
// ok
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}

c.Assert(t.ln.Close(), IsNil)
<-done

select {
case <-wait:
// ok
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}
}

func (t *ListenerTest) TestListenTimeout(c *C) {
Expand All @@ -74,7 +100,7 @@ func (t *ListenerTest) TestReconnectOnListenError(c *C) {
c.Assert(cn.Close(), IsNil)

err := t.ln.Listen("test_channel2")
c.Assert(err.Error(), Equals, "use of closed network connection")
c.Assert(err, ErrorMatches, `^(.*use of closed network connection|EOF)$`)

err = t.ln.Listen("test_channel2")
c.Assert(err, IsNil)
Expand All @@ -86,25 +112,33 @@ func (t *ListenerTest) TestReconnectOnReceiveError(c *C) {
c.Assert(cn.Close(), IsNil)

_, _, err := t.ln.ReceiveTimeout(time.Second)
c.Assert(err.Error(), Equals, "use of closed network connection")
c.Assert(err, ErrorMatches, `^(.*use of closed network connection|EOF)$`)

_, _, err = t.ln.ReceiveTimeout(time.Second)
c.Assert(err.(net.Error).Timeout(), Equals, true)

done := make(chan struct{})
wait := make(chan struct{}, 1)
go func() {
wait <- struct{}{}
_, _, err := t.ln.Receive()
c.Assert(err, IsNil)
close(done)
wait <- struct{}{}
}()

select {
case <-wait:
// ok
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}

_, err = t.db.Exec("NOTIFY test_channel")
c.Assert(err, IsNil)

select {
case <-done:
case <-wait:
// ok
case <-time.After(1 * time.Second):
c.Fail()
case <-time.After(3 * time.Second):
c.Fatal("timeout")
}
}
6 changes: 4 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func TestUnixSocket(t *testing.T) {
User: "postgres",
Database: "test",
})
_, err := db.Exec("SELECT 1")
defer db.Close()

_, err := db.Exec("SELECT 'test_unix_socket'")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -65,7 +67,7 @@ func (t *DBTest) TestQueryOneErrMultiRows(c *C) {
}

func (t *DBTest) TestExecOne(c *C) {
res, err := t.db.ExecOne("SELECT 1")
res, err := t.db.ExecOne("SELECT 'test_exec_one'")
c.Assert(err, IsNil)
c.Assert(res.Affected(), Equals, 1)
}
Expand Down
Loading

0 comments on commit 019b5fd

Please sign in to comment.