From 86efac98889a30dcc83546d6ebec0a49bdeeae6a Mon Sep 17 00:00:00 2001 From: Tristan Hyams Date: Mon, 31 Aug 2020 19:15:17 -0400 Subject: [PATCH] New ConnectionPool with ErrorHandler method added. --- v2/go.mod | 11 +++++- v2/go.sum | 26 ++++++++++-- v2/pkg/tcr/connectionpool.go | 77 ++++++++++++++++++++++++++++++------ v2/pkg/tcr/consumer.go | 5 +++ v2/tests/badtest.json | 17 ++++++++ v2/tests/main_pool_test.go | 21 ++++++++++ 6 files changed, 140 insertions(+), 17 deletions(-) create mode 100644 v2/tests/badtest.json diff --git a/v2/go.mod b/v2/go.mod index 631d302..1adeddb 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -6,9 +6,16 @@ require ( github.com/Workiva/go-datastructures v1.0.52 github.com/fortytw2/leaktest v1.3.0 github.com/json-iterator/go v1.1.10 - github.com/klauspost/compress v1.10.10 + github.com/klauspost/compress v1.10.11 + github.com/kr/text v0.2.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.6.1 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a + golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect ) diff --git a/v2/go.sum b/v2/go.sum index 33c84f3..59227dc 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -1,5 +1,6 @@ github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI= github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -8,12 +9,23 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I= -github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.10.11 h1:K9z59aO18Aywg2b/WSgBaUX99mHy2BES18Cr5lBKZHk= +github.com/klauspost/compress v1.10.11/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -25,14 +37,20 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a h1:i47hUS795cOydZI4AwJQCKXOr4BvxzvikwDoDtHhP2Y= +golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/v2/pkg/tcr/connectionpool.go b/v2/pkg/tcr/connectionpool.go index ee757ce..1e726b2 100644 --- a/v2/pkg/tcr/connectionpool.go +++ b/v2/pkg/tcr/connectionpool.go @@ -22,6 +22,7 @@ type ConnectionPool struct { poolRWLock *sync.RWMutex flaggedConnections map[uint64]bool sleepOnErrorInterval time.Duration + errorHandler func(error) } // NewConnectionPool creates hosting structure for the ConnectionPool. @@ -54,6 +55,37 @@ func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) { return cp, nil } +// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool. +func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error) { + + if config.Heartbeat == 0 || config.ConnectionTimeout == 0 { + return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0") + } + + if config.MaxConnectionCount == 0 { + return nil, errors.New("connectionpool maxconnectioncount can't be 0") + } + + cp := &ConnectionPool{ + Config: *config, + uri: config.URI, + heartbeatInterval: time.Duration(config.Heartbeat) * time.Second, + connectionTimeout: time.Duration(config.ConnectionTimeout) * time.Second, + connections: queue.New(int64(config.MaxConnectionCount)), // possible overflow error + channels: make(chan *ChannelHost, config.MaxCacheChannelCount), + poolRWLock: &sync.RWMutex{}, + flaggedConnections: make(map[uint64]bool), + sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond, + errorHandler: errorHandler, + } + + if ok := cp.initializeConnections(); !ok { + return nil, errors.New("initialization failed during connection creation") + } + + return cp, nil +} + func (cp *ConnectionPool) initializeConnections() bool { cp.connectionID = 0 @@ -70,10 +102,16 @@ func (cp *ConnectionPool) initializeConnections() bool { cp.Config.TLSConfig) if err != nil { + if cp.errorHandler != nil { + cp.errorHandler(err) + } return false } if err = cp.connections.Put(connectionHost); err != nil { + if cp.errorHandler != nil { + cp.errorHandler(err) + } return false } @@ -94,6 +132,9 @@ func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error) { connHost, err := cp.getConnectionFromPool() if err != nil { // errors on bad data in the queue + if cp.errorHandler != nil { + cp.errorHandler(err) + } return nil, err } @@ -216,6 +257,9 @@ func (cp *ConnectionPool) reconnectChannel(chanHost *ChannelHost) { err := chanHost.MakeChannel() // Creates a new channel and flushes internal buffers automatically. if err != nil { + if cp.errorHandler != nil { + cp.errorHandler(err) + } continue } break @@ -229,6 +273,9 @@ func (cp *ConnectionPool) createCacheChannel(id uint64) *ChannelHost { for { connHost, err := cp.GetConnection() if err != nil { + if cp.errorHandler != nil { + cp.errorHandler(err) + } if cp.sleepOnErrorInterval > 0 { time.Sleep(cp.sleepOnErrorInterval) } @@ -237,6 +284,9 @@ func (cp *ConnectionPool) createCacheChannel(id uint64) *ChannelHost { chanHost, err := NewChannelHost(connHost, id, connHost.ConnectionID, true, true) if err != nil { + if cp.errorHandler != nil { + cp.errorHandler(err) + } if cp.sleepOnErrorInterval > 0 { time.Sleep(cp.sleepOnErrorInterval) } @@ -256,17 +306,13 @@ func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel { for { connHost, err := cp.GetConnection() if err != nil { - if cp.sleepOnErrorInterval > 0 { - time.Sleep(cp.sleepOnErrorInterval) - } + cp.handleError(err) continue } channel, err := connHost.Connection.Channel() if err != nil { - if cp.sleepOnErrorInterval > 0 { - time.Sleep(cp.sleepOnErrorInterval) - } + cp.handleError(err) cp.ReturnConnection(connHost, true) continue } @@ -276,9 +322,7 @@ func (cp *ConnectionPool) GetTransientChannel(ackable bool) *amqp.Channel { if ackable { err := channel.Confirm(false) if err != nil { - if cp.sleepOnErrorInterval > 0 { - time.Sleep(cp.sleepOnErrorInterval) - } + cp.handleError(err) continue } } @@ -314,8 +358,11 @@ func (cp *ConnectionPool) isConnectionFlagged(connectionID uint64) bool { // Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state. func (cp *ConnectionPool) Shutdown() { - wg := &sync.WaitGroup{} + if cp == nil { + return + } + wg := &sync.WaitGroup{} ChannelFlushLoop: for { select { @@ -333,7 +380,6 @@ ChannelFlushLoop: break ChannelFlushLoop } } - wg.Wait() for !cp.connections.Empty() { @@ -363,3 +409,12 @@ ChannelFlushLoop: cp.flaggedConnections = make(map[uint64]bool) cp.connectionID = 0 } + +func (cp *ConnectionPool) handleError(err error) { + if cp.errorHandler != nil { + cp.errorHandler(err) + } + if cp.sleepOnErrorInterval > 0 { + time.Sleep(cp.sleepOnErrorInterval) + } +} diff --git a/v2/pkg/tcr/consumer.go b/v2/pkg/tcr/consumer.go index 1e669ed..0b08966 100644 --- a/v2/pkg/tcr/consumer.go +++ b/v2/pkg/tcr/consumer.go @@ -364,3 +364,8 @@ FlushLoop: } } } + +// Started allows you to determine if a consumer has started. +func (con *Consumer) Started() bool { + return con.started +} diff --git a/v2/tests/badtest.json b/v2/tests/badtest.json new file mode 100644 index 0000000..544d224 --- /dev/null +++ b/v2/tests/badtest.json @@ -0,0 +1,17 @@ +{ + "PoolConfig": { + "URI": "amqp://guest:guest@null:5672/", + "ConnectionName": "TurboCookedRabbit", + "SleepOnErrorInterval": 100, + "MaxCacheChannelCount": 50, + "MaxConnectionCount": 10, + "Heartbeat": 6, + "ConnectionTimeout": 10, + "TLSConfig": { + "EnableTLS": false, + "PEMCertLocation": "test/catest.pem", + "LocalCertLocation": "client/cert.ca", + "CertServerName": "hostname-in-cert" + } + } +} \ No newline at end of file diff --git a/v2/tests/main_pool_test.go b/v2/tests/main_pool_test.go index d90e25f..832d6d9 100644 --- a/v2/tests/main_pool_test.go +++ b/v2/tests/main_pool_test.go @@ -1,6 +1,7 @@ package main_test import ( + "fmt" "sync" "testing" "time" @@ -22,6 +23,26 @@ func TestCreateConnectionPoolWithZeroConnections(t *testing.T) { TestCleanup(t) } +func TestCreateConnectionPoolWithErrorHandler(t *testing.T) { + defer leaktest.Check(t)() // Fail on leaked goroutines. + + seasoning, err := tcr.ConvertJSONFileToConfig("badtest.json") + if err != nil { + return + } + + cp, err := tcr.NewConnectionPoolWithErrorHandler(seasoning.PoolConfig, errorHandler) + assert.Nil(t, cp) + assert.Error(t, err) + + cp.Shutdown() + TestCleanup(t) +} + +func errorHandler(err error) { + fmt.Println(err) +} + func TestCreateConnectionPoolAndGetConnection(t *testing.T) { defer leaktest.Check(t)() // Fail on leaked goroutines.