diff --git a/delivery.go b/delivery.go index 51395e6..4b94f49 100644 --- a/delivery.go +++ b/delivery.go @@ -36,34 +36,6 @@ func (delivery *redisDelivery) Header() http.Header { return delivery.header } -func newDelivery( - ctx context.Context, - payload string, - unackedKey string, - rejectedKey string, - pushKey string, - redisClient RedisClient, - errChan chan<- error, -) (*redisDelivery, error) { - rd := redisDelivery{ - ctx: ctx, - payload: payload, - unackedKey: unackedKey, - rejectedKey: rejectedKey, - pushKey: pushKey, - redisClient: redisClient, - errChan: errChan, - } - - var err error - - if rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload); err != nil { - return nil, err - } - - return &rd, nil -} - func (delivery *redisDelivery) String() string { return fmt.Sprintf("[%s %s]", delivery.clearPayload, delivery.unackedKey) } diff --git a/queue.go b/queue.go index 198237c..57f8518 100644 --- a/queue.go +++ b/queue.go @@ -212,14 +212,13 @@ func (queue *redisQueue) consumeBatch() error { if err == ErrorNotFound { return nil } - if err != nil { return err } d, err := queue.newDelivery(payload) if err != nil { - return err + return fmt.Errorf("create new delivery: %w", err) } queue.deliveryChan <- d @@ -229,15 +228,29 @@ func (queue *redisQueue) consumeBatch() error { } func (queue *redisQueue) newDelivery(payload string) (Delivery, error) { - return newDelivery( - queue.ackCtx, - payload, - queue.unackedKey, - queue.rejectedKey, - queue.pushKey, - queue.redisClient, - queue.errChan, - ) + rd := &redisDelivery{ + ctx: queue.ackCtx, + payload: payload, + unackedKey: queue.unackedKey, + rejectedKey: queue.rejectedKey, + pushKey: queue.pushKey, + redisClient: queue.redisClient, + errChan: queue.errChan, + } + + var err error + rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload) + if err == nil { + return rd, nil + } + + // we need to reject a delivery here to move the delivery from the unacked to the rejected list. + rejectErr := rd.Reject() + if rejectErr != nil { + return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr) + } + + return nil, err } // StopConsuming can be used to stop all consumers on this queue. It returns a diff --git a/queue_test.go b/queue_test.go index a4c3431..8abf993 100644 --- a/queue_test.go +++ b/queue_test.go @@ -513,6 +513,46 @@ func TestReturnRejected(t *testing.T) { eventuallyRejected(t, queue, 0) } +func TestRejectFaultyMessages(t *testing.T) { + redisAddr, closer := testRedis(t) + defer closer() + + connection, err := OpenConnection("faulty-conn", "tcp", redisAddr, 1, nil) + require.NoError(t, err) + queue, err := connection.OpenQueue("faulty-q") + require.NoError(t, err) + _, err = queue.PurgeReady() + require.NoError(t, err) + + for i := 0; i < 6; i++ { + // if there is no line separator after the header in the message, + // it will lead to an error and the message will be rejected + err := queue.Publish(fmt.Sprintf("%sreturn-d%d", jsonHeaderSignature, i)) + require.NoError(t, err) + } + + eventuallyReady(t, queue, 6) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 0) + + require.NoError(t, queue.StartConsuming(10, time.Millisecond)) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 6) + + consumer := NewTestConsumer("faulty-cons") + consumer.AutoAck = false + _, err = queue.AddConsumer("cons", consumer) + require.NoError(t, err) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 6) + + require.Len(t, consumer.Deliveries(), 0) + + <-queue.StopConsuming() +} + func TestPushQueue(t *testing.T) { redisAddr, closer := testRedis(t) defer closer()