diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
1 files changed, 15 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 904eb6d070..589e82892c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -569,18 +569,14 @@ maybe_drop_head(State = #q{max_length = MaxLen, backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:len(BQS) >= MaxLen of - true -> - with_dlx(State#q.dlx, - fun (X) -> - {ok, State1} = dead_letter_maxlen_msgs(X, State), - State1 - end, - fun () -> - {_, BQS1} = BQ:drop(false, BQS), - State#q{backing_queue_state = BQS1} - end); - false -> - State + true -> with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, State) end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end); + false -> State end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, @@ -771,10 +767,13 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> State1. dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) -> - dead_letter_msgs(fun (DLFun, Acc, BQS1) -> - {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), - {ok, DLFun(Msg, AckTag, Acc), BQS2} - end, maxlen, X, State). + {ok State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS1) -> + {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), + {ok, DLFun(Msg, AckTag, Acc), BQS2} + end, maxlen, X, State), + State1. dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, |
