Skip to content

Commit

Permalink
add batch consumer func
Browse files Browse the repository at this point in the history
  • Loading branch information
khalilsarwari committed Mar 14, 2022
1 parent 2c37533 commit 9e5e618
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
6 changes: 6 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ package rmq
type BatchConsumer interface {
Consume(batch Deliveries)
}

type BatchConsumerFunc func(Deliveries)

func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries) {
batchConsumerFunc(batch)
}
13 changes: 13 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Queue interface {
AddConsumer(tag string, consumer Consumer) (string, error)
AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error)
AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error)
AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error)
PurgeReady() (int64, error)
PurgeRejected() (int64, error)
ReturnUnacked(max int64) (int64, error)
Expand Down Expand Up @@ -317,6 +318,18 @@ func (queue *redisQueue) AddBatchConsumer(tag string, batchSize int64, timeout t
return name, nil
}

// AddBatchConsumerFunc is similar to AddConsumerFunc, but for batches of deliveries
// timeout limits the amount of time waiting to fill an entire batch
// The timer is only started when the first message in a batch is received
func (queue *redisQueue) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) {
name, err := queue.addConsumer(tag)
if err != nil {
return "", err
}
go queue.consumerBatchConsume(batchSize, timeout, batchConsumerFunc)
return name, nil
}

func (queue *redisQueue) consumerBatchConsume(batchSize int64, timeout time.Duration, consumer BatchConsumer) {
defer func() {
queue.stopWg.Done()
Expand Down
10 changes: 10 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,16 @@ func TestBatch(t *testing.T) {
assert.NoError(t, consumer.LastBatch[0].Reject())
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 3)

for i := 0; i < 5; i++ {
err := queue.Publish(fmt.Sprintf("batch-d%d", i))
assert.NoError(t, err)
}
_, err = queue.AddBatchConsumerFunc("batch-cons-func", 2, 50*time.Millisecond, func(batch Deliveries) {
errMap := batch.Ack()
assert.Empty(t, errMap)
})
assert.NoError(t, err)
}

func TestReturnRejected(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic(
func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error) {
panic(errorNotSupported)
}
func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error) {
panic(errorNotSupported)
}
func (*TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
Expand Down

0 comments on commit 9e5e618

Please sign in to comment.