diff --git a/batch_consumer.go b/batch_consumer.go index 6185d93..e257b05 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -3,3 +3,9 @@ package rmq type BatchConsumer interface { Consume(batch Deliveries) } + +type BatchConsumerFunc func(Deliveries) + +func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries) { + batchConsumerFunc(batch) +} diff --git a/queue.go b/queue.go index b14b997..198237c 100644 --- a/queue.go +++ b/queue.go @@ -23,6 +23,7 @@ type Queue interface { AddConsumer(tag string, consumer Consumer) (string, error) AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error) AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error) + AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) PurgeReady() (int64, error) PurgeRejected() (int64, error) ReturnUnacked(max int64) (int64, error) @@ -320,6 +321,18 @@ func (queue *redisQueue) AddBatchConsumer(tag string, batchSize int64, timeout t return name, nil } +// AddBatchConsumerFunc is similar to AddConsumerFunc, but for batches of deliveries +// timeout limits the amount of time waiting to fill an entire batch +// The timer is only started when the first message in a batch is received +func (queue *redisQueue) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) { + name, err := queue.addConsumer(tag) + if err != nil { + return "", err + } + go queue.consumerBatchConsume(batchSize, timeout, batchConsumerFunc) + return name, nil +} + func (queue *redisQueue) consumerBatchConsume(batchSize int64, timeout time.Duration, consumer BatchConsumer) { defer func() { queue.stopWg.Done() diff --git a/queue_test.go b/queue_test.go index 21021d9..a4c3431 100644 --- a/queue_test.go +++ b/queue_test.go @@ -439,6 +439,16 @@ func TestBatch(t *testing.T) { assert.NoError(t, consumer.Last()[0].Reject()) eventuallyUnacked(t, queue, 0) eventuallyRejected(t, queue, 3) + + for i := 0; i < 5; i++ { + err := queue.Publish(fmt.Sprintf("batch-d%d", i)) + assert.NoError(t, err) + } + _, err = queue.AddBatchConsumerFunc("batch-cons-func", 2, 50*time.Millisecond, func(batch Deliveries) { + errMap := batch.Ack() + assert.Empty(t, errMap) + }) + assert.NoError(t, err) } func TestReturnRejected(t *testing.T) { diff --git a/test_queue.go b/test_queue.go index 0c07d96..b4664c4 100644 --- a/test_queue.go +++ b/test_queue.go @@ -38,6 +38,9 @@ func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic( func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error) { panic(errorNotSupported) } +func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error) { + panic(errorNotSupported) +} func (*TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) } func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) } func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }