Skip to content

Commit

Permalink
Merge pull request #115 from netvyne/batch_consumer_func
Browse files Browse the repository at this point in the history
Add batch consumer func
  • Loading branch information
wellle authored Mar 17, 2023
2 parents 028d635 + 9e5e618 commit 6e53e76
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
6 changes: 6 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ package rmq
type BatchConsumer interface {
Consume(batch Deliveries)
}

type BatchConsumerFunc func(Deliveries)

func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries) {
batchConsumerFunc(batch)
}
13 changes: 13 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Queue interface {
AddConsumer(tag string, consumer Consumer) (string, error)
AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error)
AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error)
AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error)
PurgeReady() (int64, error)
PurgeRejected() (int64, error)
ReturnUnacked(max int64) (int64, error)
Expand Down Expand Up @@ -320,6 +321,18 @@ func (queue *redisQueue) AddBatchConsumer(tag string, batchSize int64, timeout t
return name, nil
}

// AddBatchConsumerFunc is similar to AddConsumerFunc, but for batches of deliveries
// timeout limits the amount of time waiting to fill an entire batch
// The timer is only started when the first message in a batch is received
func (queue *redisQueue) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) {
name, err := queue.addConsumer(tag)
if err != nil {
return "", err
}
go queue.consumerBatchConsume(batchSize, timeout, batchConsumerFunc)
return name, nil
}

func (queue *redisQueue) consumerBatchConsume(batchSize int64, timeout time.Duration, consumer BatchConsumer) {
defer func() {
queue.stopWg.Done()
Expand Down
10 changes: 10 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,16 @@ func TestBatch(t *testing.T) {
assert.NoError(t, consumer.Last()[0].Reject())
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 3)

for i := 0; i < 5; i++ {
err := queue.Publish(fmt.Sprintf("batch-d%d", i))
assert.NoError(t, err)
}
_, err = queue.AddBatchConsumerFunc("batch-cons-func", 2, 50*time.Millisecond, func(batch Deliveries) {
errMap := batch.Ack()
assert.Empty(t, errMap)
})
assert.NoError(t, err)
}

func TestReturnRejected(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic(
func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error) {
panic(errorNotSupported)
}
func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error) {
panic(errorNotSupported)
}
func (*TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
Expand Down

0 comments on commit 6e53e76

Please sign in to comment.