diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-01-09 11:10:56 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-01-09 11:10:56 +0000 |
| commit | 12b0dd78c7efe863ad1b0f6ccf1e0e8c12375746 (patch) | |
| tree | 9789399654931e478ff0b464cbcfc1ea6157183a | |
| parent | bc0a8e7f3727db383f0cc6748dcadcc6f6dea23c (diff) | |
| download | rabbitmq-server-git-12b0dd78c7efe863ad1b0f6ccf1e0e8c12375746.tar.gz | |
Update dead-lettering due to queue length limit
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2506ff915d..81db54918e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -563,10 +563,18 @@ maybe_drop_head(State = #q{max_length = MaxLen, backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:len(BQS) >= MaxLen of - true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), - (dead_letter_fun(maxlen))([{Msg, AckTag}]), - State#q{backing_queue_state = BQS1}; - false -> State + true -> + with_dlx(State#q.dlx, + fun (X) -> + {ok, State1} = dead_letter_maxlen_msgs(X, State), + State1 + end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end); + false -> + State end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, @@ -746,6 +754,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) -> + dead_letter_msgs(fun (DLFun, Acc, BQS1) -> + {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), + {ok, DLFun(Msg, AckTag, Acc), BQS2} + end, maxlen, X, State). + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, |
