Skip to content

Commit

Permalink
New ConnectionPool with ErrorHandler method added.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan Hyams committed Aug 31, 2020
1 parent 6de146f commit 86efac9
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 17 deletions.
11 changes: 9 additions & 2 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ require (
github.com/Workiva/go-datastructures v1.0.52
github.com/fortytw2/leaktest v1.3.0
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.10.10
github.com/klauspost/compress v1.10.11
github.com/kr/text v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)
26 changes: 22 additions & 4 deletions v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -8,12 +9,23 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I=
github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.11 h1:K9z59aO18Aywg2b/WSgBaUX99mHy2BES18Cr5lBKZHk=
github.com/klauspost/compress v1.10.11/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -25,14 +37,20 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a h1:i47hUS795cOydZI4AwJQCKXOr4BvxzvikwDoDtHhP2Y=
golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
77 changes: 66 additions & 11 deletions v2/pkg/tcr/connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ConnectionPool struct {
poolRWLock *sync.RWMutex
flaggedConnections map[uint64]bool
sleepOnErrorInterval time.Duration
errorHandler func(error)
}

// NewConnectionPool creates hosting structure for the ConnectionPool.
Expand Down Expand Up @@ -54,6 +55,37 @@ func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) {
return cp, nil
}

// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool.
func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error) {

if config.Heartbeat == 0 || config.ConnectionTimeout == 0 {
return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0")
}

if config.MaxConnectionCount == 0 {
return nil, errors.New("connectionpool maxconnectioncount can't be 0")
}

cp := &ConnectionPool{
Config: *config,
uri: config.URI,
heartbeatInterval: time.Duration(config.Heartbeat) * time.Second,
connectionTimeout: time.Duration(config.ConnectionTimeout) * time.Second,
connections: queue.New(int64(config.MaxConnectionCount)), // possible overflow error
channels: make(chan *ChannelHost, config.MaxCacheChannelCount),
poolRWLock: &sync.RWMutex{},
flaggedConnections: make(map[uint64]bool),
sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond,
errorHandler: errorHandler,
}

if ok := cp.initializeConnections(); !ok {
return nil, errors.New("initialization failed during connection creation")
}

return cp, nil
}

func (cp *ConnectionPool) initializeConnections() bool {

cp.connectionID = 0
Expand All @@ -70,10 +102,16 @@ func (cp *ConnectionPool) initializeConnections() bool {
cp.Config.TLSConfig)

if err != nil {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
return false
}

if err = cp.connections.Put(connectionHost); err != nil {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
return false
}

Expand All @@ -94,6 +132,9 @@ func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error) {

connHost, err := cp.getConnectionFromPool()
if err != nil { // errors on bad data in the queue
if cp.errorHandler != nil {
cp.errorHandler(err)
}
return nil, err
}

Expand Down Expand Up @@ -216,6 +257,9 @@ func (cp *ConnectionPool) reconnectChannel(chanHost *ChannelHost) {

err := chanHost.MakeChannel() // Creates a new channel and flushes internal buffers automatically.
if err != nil {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
continue
}
break
Expand All @@ -229,6 +273,9 @@ func (cp *ConnectionPool) createCacheChannel(id uint64) *ChannelHost {
for {
connHost, err := cp.GetConnection()
if err != nil {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
Expand All @@ -237,6 +284,9 @@ func (cp *ConnectionPool) createCacheChannel(id uint64) *ChannelHost {

chanHost, err := NewChannelHost(connHost, id, connHost.ConnectionID, true, true)
if err != nil {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
Expand All @@ -256,17 +306,13 @@ func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel {
for {
connHost, err := cp.GetConnection()
if err != nil {
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
cp.handleError(err)
continue
}

channel, err := connHost.Connection.Channel()
if err != nil {
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
cp.handleError(err)
cp.ReturnConnection(connHost, true)
continue
}
Expand All @@ -276,9 +322,7 @@ func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel {
if ackable {
err := channel.Confirm(false)
if err != nil {
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
cp.handleError(err)
continue
}
}
Expand Down Expand Up @@ -314,8 +358,11 @@ func (cp *ConnectionPool) isConnectionFlagged(connectionID uint64) bool {
// Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.
func (cp *ConnectionPool) Shutdown() {

wg := &sync.WaitGroup{}
if cp == nil {
return
}

wg := &sync.WaitGroup{}
ChannelFlushLoop:
for {
select {
Expand All @@ -333,7 +380,6 @@ ChannelFlushLoop:
break ChannelFlushLoop
}
}

wg.Wait()

for !cp.connections.Empty() {
Expand Down Expand Up @@ -363,3 +409,12 @@ ChannelFlushLoop:
cp.flaggedConnections = make(map[uint64]bool)
cp.connectionID = 0
}

func (cp *ConnectionPool) handleError(err error) {
if cp.errorHandler != nil {
cp.errorHandler(err)
}
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
}
}
5 changes: 5 additions & 0 deletions v2/pkg/tcr/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,8 @@ FlushLoop:
}
}
}

// Started allows you to determine if a consumer has started.
func (con *Consumer) Started() bool {
return con.started
}
17 changes: 17 additions & 0 deletions v2/tests/badtest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"PoolConfig": {
"URI": "amqp://guest:guest@null:5672/",
"ConnectionName": "TurboCookedRabbit",
"SleepOnErrorInterval": 100,
"MaxCacheChannelCount": 50,
"MaxConnectionCount": 10,
"Heartbeat": 6,
"ConnectionTimeout": 10,
"TLSConfig": {
"EnableTLS": false,
"PEMCertLocation": "test/catest.pem",
"LocalCertLocation": "client/cert.ca",
"CertServerName": "hostname-in-cert"
}
}
}
21 changes: 21 additions & 0 deletions v2/tests/main_pool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main_test

import (
"fmt"
"sync"
"testing"
"time"
Expand All @@ -22,6 +23,26 @@ func TestCreateConnectionPoolWithZeroConnections(t *testing.T) {
TestCleanup(t)
}

func TestCreateConnectionPoolWithErrorHandler(t *testing.T) {
defer leaktest.Check(t)() // Fail on leaked goroutines.

seasoning, err := tcr.ConvertJSONFileToConfig("badtest.json")
if err != nil {
return
}

cp, err := tcr.NewConnectionPoolWithErrorHandler(seasoning.PoolConfig, errorHandler)
assert.Nil(t, cp)
assert.Error(t, err)

cp.Shutdown()
TestCleanup(t)
}

func errorHandler(err error) {
fmt.Println(err)
}

func TestCreateConnectionPoolAndGetConnection(t *testing.T) {
defer leaktest.Check(t)() // Fail on leaked goroutines.

Expand Down

0 comments on commit 86efac9

Please sign in to comment.