From 9e5e6188392a044e906f4549af082bdb669ee89d Mon Sep 17 00:00:00 2001 From: khalilsarwari Date: Mon, 14 Mar 2022 03:55:38 -0700 Subject: [PATCH] add batch consumer func --- batch_consumer.go | 6 ++++++ queue.go | 13 +++++++++++++ queue_test.go | 10 ++++++++++ test_queue.go | 3 +++ 4 files changed, 32 insertions(+) diff --git a/batch_consumer.go b/batch_consumer.go index 6185d93..e257b05 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -3,3 +3,9 @@ package rmq type BatchConsumer interface { Consume(batch Deliveries) } + +type BatchConsumerFunc func(Deliveries) + +func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries) { + batchConsumerFunc(batch) +} diff --git a/queue.go b/queue.go index a2fc416..8265a6e 100644 --- a/queue.go +++ b/queue.go @@ -22,6 +22,7 @@ type Queue interface { AddConsumer(tag string, consumer Consumer) (string, error) AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error) AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error) + AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) PurgeReady() (int64, error) PurgeRejected() (int64, error) ReturnUnacked(max int64) (int64, error) @@ -317,6 +318,18 @@ func (queue *redisQueue) AddBatchConsumer(tag string, batchSize int64, timeout t return name, nil } +// AddBatchConsumerFunc is similar to AddConsumerFunc, but for batches of deliveries +// timeout limits the amount of time waiting to fill an entire batch +// The timer is only started when the first message in a batch is received +func (queue *redisQueue) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) { + name, err := queue.addConsumer(tag) + if err != nil { + return "", err + } + go queue.consumerBatchConsume(batchSize, timeout, batchConsumerFunc) + return name, nil +} + func (queue *redisQueue) consumerBatchConsume(batchSize int64, timeout time.Duration, consumer BatchConsumer) { defer func() { queue.stopWg.Done() diff --git a/queue_test.go b/queue_test.go index 8f13143..8c64555 100644 --- a/queue_test.go +++ b/queue_test.go @@ -412,6 +412,16 @@ func TestBatch(t *testing.T) { assert.NoError(t, consumer.LastBatch[0].Reject()) eventuallyUnacked(t, queue, 0) eventuallyRejected(t, queue, 3) + + for i := 0; i < 5; i++ { + err := queue.Publish(fmt.Sprintf("batch-d%d", i)) + assert.NoError(t, err) + } + _, err = queue.AddBatchConsumerFunc("batch-cons-func", 2, 50*time.Millisecond, func(batch Deliveries) { + errMap := batch.Ack() + assert.Empty(t, errMap) + }) + assert.NoError(t, err) } func TestReturnRejected(t *testing.T) { diff --git a/test_queue.go b/test_queue.go index 60d6d93..9022184 100644 --- a/test_queue.go +++ b/test_queue.go @@ -38,6 +38,9 @@ func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic( func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error) { panic(errorNotSupported) } +func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error) { + panic(errorNotSupported) +} func (*TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) } func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) } func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }