From 5effc3e60f73261e0da649f803924fdb09d7c16d Mon Sep 17 00:00:00 2001 From: Tristan Hyams Date: Thu, 18 Nov 2021 22:54:06 +0000 Subject: [PATCH] PublishWithError Support * A few Publish methods were added to bypass Async Receipts and return errors directly. * Supports CorrelationID and Type now in the Letter.Envelope. * Golang 1.17 --- v2/go.mod | 20 ++-- v2/go.sum | 26 ++--- v2/pkg/tcr/letter.go | 18 +-- v2/pkg/tcr/publisher.go | 251 +++++++++++++++++++++++++++++++++------- 4 files changed, 247 insertions(+), 68 deletions(-) diff --git a/v2/go.mod b/v2/go.mod index 09766ae..615ce83 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -1,20 +1,26 @@ module github.com/houseofcat/turbocookedrabbit/v2 -go 1.16 +go 1.17 require ( github.com/Workiva/go-datastructures v1.0.53 github.com/fortytw2/leaktest v1.3.0 github.com/google/uuid v1.3.0 - github.com/json-iterator/go v1.1.11 - github.com/klauspost/compress v1.13.4 - github.com/kr/text v0.2.0 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.13.6 github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.7.0 - golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 + golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/v2/go.sum b/v2/go.sum index 9f65d65..049b757 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -6,15 +6,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= -github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= -github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -24,9 +22,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= @@ -44,24 +41,27 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ= -golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI= +golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 h1:kwrAHlwJ0DUBZwQ238v+Uod/3eZ8B2K5rYsUHBQvzmI= +golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/v2/pkg/tcr/letter.go b/v2/pkg/tcr/letter.go index 1983bbd..30ea572 100644 --- a/v2/pkg/tcr/letter.go +++ b/v2/pkg/tcr/letter.go @@ -15,14 +15,16 @@ type Letter struct { // Envelope contains all the address details of where a letter is going. type Envelope struct { - Exchange string - RoutingKey string - ContentType string - Mandatory bool - Immediate bool - Headers amqp.Table - DeliveryMode uint8 - Priority uint8 + Exchange string + RoutingKey string + ContentType string + CorrelationID string + Type string + Mandatory bool + Immediate bool + Headers amqp.Table + DeliveryMode uint8 + Priority uint8 } // WrappedBody is to go inside a Letter struct with indications of the body of data being modified (ex., compressed). diff --git a/v2/pkg/tcr/publisher.go b/v2/pkg/tcr/publisher.go index ac54abc..6b8b046 100644 --- a/v2/pkg/tcr/publisher.go +++ b/v2/pkg/tcr/publisher.go @@ -74,6 +74,7 @@ func NewPublisher( // Publish sends a single message to the address on the letter using a cached ChannelHost. // Subscribe to PublishReceipts to see success and errors. +// // For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) { @@ -85,14 +86,16 @@ func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) { letter.Envelope.Mandatory, letter.Envelope.Immediate, amqp.Publishing{ - ContentType: letter.Envelope.ContentType, - Body: letter.Body, - Headers: letter.Envelope.Headers, - DeliveryMode: letter.Envelope.DeliveryMode, - Priority: letter.Envelope.Priority, - MessageId: letter.LetterID.String(), - Timestamp: time.Now().UTC(), - AppId: pub.Config.PoolConfig.ApplicationName, + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, }, ) @@ -103,6 +106,40 @@ func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) { pub.ConnectionPool.ReturnChannel(chanHost, err != nil) } +// PublishWithError sends a single message to the address on the letter using a cached ChannelHost. +// +// For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation +func (pub *Publisher) PublishWithError(letter *Letter, skipReceipt bool) error { + + chanHost := pub.ConnectionPool.GetChannelFromPool() + + err := chanHost.Channel.Publish( + letter.Envelope.Exchange, + letter.Envelope.RoutingKey, + letter.Envelope.Mandatory, + letter.Envelope.Immediate, + amqp.Publishing{ + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, + }, + ) + + if !skipReceipt { + pub.publishReceipt(letter, err) + } + + pub.ConnectionPool.ReturnChannel(chanHost, err != nil) + return err +} + // PublishWithTransient sends a single message to the address on the letter using a transient (new) RabbitMQ channel. // Subscribe to PublishReceipts to see success and errors. // For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation @@ -122,19 +159,22 @@ func (pub *Publisher) PublishWithTransient(letter *Letter) error { letter.Envelope.Mandatory, letter.Envelope.Immediate, amqp.Publishing{ - ContentType: letter.Envelope.ContentType, - Body: letter.Body, - Headers: letter.Envelope.Headers, - DeliveryMode: letter.Envelope.DeliveryMode, - Priority: letter.Envelope.Priority, - MessageId: letter.LetterID.String(), - Timestamp: time.Now().UTC(), - AppId: pub.Config.PoolConfig.ApplicationName, + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, }, ) } // PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities. +// // This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. // A timeout failure drops the letter back in the PublishReceipts. // A confirmation failure keeps trying to publish (at least until timeout failure occurs.) @@ -157,14 +197,16 @@ func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Durat letter.Envelope.Mandatory, letter.Envelope.Immediate, amqp.Publishing{ - ContentType: letter.Envelope.ContentType, - Body: letter.Body, - Headers: letter.Envelope.Headers, - DeliveryMode: letter.Envelope.DeliveryMode, - Priority: letter.Envelope.Priority, - MessageId: letter.LetterID.String(), - Timestamp: time.Now().UTC(), - AppId: pub.Config.PoolConfig.ApplicationName, + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, }, ) if err != nil { @@ -199,6 +241,72 @@ func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Durat } } +// PublishWithConfirmationError sends a single message to the address on the letter with confirmation capabilities. +// +// This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. +// A timeout failure drops the letter back in the PublishReceipts. +// A confirmation failure keeps trying to publish (at least until timeout failure occurs.) +func (pub *Publisher) PublishWithConfirmationError(letter *Letter, timeout time.Duration) error { + + if timeout == 0 { + timeout = pub.publishTimeOutDuration + } + + for { + // Has to use an Ackable channel for Publish Confirmations. + chanHost := pub.ConnectionPool.GetChannelFromPool() + chanHost.FlushConfirms() // Flush all previous publish confirmations + + Publish: + timeoutAfter := time.After(timeout) // timeoutAfter resets everytime we try to publish. + err := chanHost.Channel.Publish( + letter.Envelope.Exchange, + letter.Envelope.RoutingKey, + letter.Envelope.Mandatory, + letter.Envelope.Immediate, + amqp.Publishing{ + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, + }, + ) + if err != nil { + pub.ConnectionPool.ReturnChannel(chanHost, true) + continue // Take it again! From the top! + } + + // Wait for very next confirmation on this channel, which should be our confirmation. + for { + select { + case <-timeoutAfter: + pub.ConnectionPool.ReturnChannel(chanHost, false) // not a channel error + return fmt.Errorf("publish confirmation for LetterID: %s wasn't received in a timely manner - recommend retry/requeue", letter.LetterID.String()) + + case confirmation := <-chanHost.Confirmations: + + if !confirmation.Ack { + goto Publish //nack has occurred, republish + } + + // Happy Path, publish was received by server and we didn't timeout client side. + pub.ConnectionPool.ReturnChannel(chanHost, false) + return nil + + default: + + time.Sleep(time.Duration(time.Millisecond * 1)) // limits CPU spin up + } + } + } +} + // PublishWithConfirmationContext sends a single message to the address on the letter with confirmation capabilities. // This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. // A timeout failure drops the letter back in the PublishReceipts. @@ -217,14 +325,16 @@ func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter letter.Envelope.Mandatory, letter.Envelope.Immediate, amqp.Publishing{ - ContentType: letter.Envelope.ContentType, - Body: letter.Body, - Headers: letter.Envelope.Headers, - DeliveryMode: letter.Envelope.DeliveryMode, - Priority: letter.Envelope.Priority, - MessageId: letter.LetterID.String(), - Timestamp: time.Now().UTC(), - AppId: pub.Config.PoolConfig.ApplicationName, + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, }, ) if err != nil { @@ -259,6 +369,65 @@ func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter } } +// PublishWithConfirmationContextError sends a single message to the address on the letter with confirmation capabilities. +// This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. +// A timeout failure drops the letter back in the PublishReceipts. +// A confirmation failure keeps trying to publish (at least until timeout failure occurs.) +func (pub *Publisher) PublishWithConfirmationContextError(ctx context.Context, letter *Letter) error { + + for { + // Has to use an Ackable channel for Publish Confirmations. + chanHost := pub.ConnectionPool.GetChannelFromPool() + chanHost.FlushConfirms() // Flush all previous publish confirmations + + Publish: + err := chanHost.Channel.Publish( + letter.Envelope.Exchange, + letter.Envelope.RoutingKey, + letter.Envelope.Mandatory, + letter.Envelope.Immediate, + amqp.Publishing{ + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, + }, + ) + if err != nil { + pub.ConnectionPool.ReturnChannel(chanHost, true) + continue // Take it again! From the top! + } + + // Wait for very next confirmation on this channel, which should be our confirmation. + for { + select { + case <-ctx.Done(): + pub.ConnectionPool.ReturnChannel(chanHost, false) // not a channel error + return fmt.Errorf("publish confirmation for LetterID: %s wasn't received before context expired - recommend retry/requeue", letter.LetterID.String()) + + case confirmation := <-chanHost.Confirmations: + + if !confirmation.Ack { + goto Publish //nack has occurred, republish + } + + pub.ConnectionPool.ReturnChannel(chanHost, false) + return nil + + default: + + time.Sleep(time.Duration(time.Millisecond * 1)) // limits CPU spin up + } + } + } +} + // PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels. // This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority. // A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically @@ -284,14 +453,16 @@ func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout t letter.Envelope.Mandatory, letter.Envelope.Immediate, amqp.Publishing{ - ContentType: letter.Envelope.ContentType, - Body: letter.Body, - Headers: letter.Envelope.Headers, - DeliveryMode: letter.Envelope.DeliveryMode, - Priority: letter.Envelope.Priority, - MessageId: letter.LetterID.String(), - Timestamp: time.Now().UTC(), - AppId: pub.Config.PoolConfig.ApplicationName, + ContentType: letter.Envelope.ContentType, + Body: letter.Body, + Headers: letter.Envelope.Headers, + DeliveryMode: letter.Envelope.DeliveryMode, + Priority: letter.Envelope.Priority, + MessageId: letter.LetterID.String(), + CorrelationId: letter.Envelope.CorrelationID, + Type: letter.Envelope.Type, + Timestamp: time.Now().UTC(), + AppId: pub.Config.PoolConfig.ApplicationName, }, ) if err != nil {