diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-25 12:50:46 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-25 12:54:51 +0000 |
| commit | 07bfd2bb198356967a1a01c9783ad1b818fb2d60 (patch) | |
| tree | 573048cfd473ec43a06aa5a43bb6adff6ace0111 | |
| parent | 16f49718518ef14f72b32981a210723e7ea057aa (diff) | |
| download | rabbitmq-server-git-07bfd2bb198356967a1a01c9783ad1b818fb2d60.tar.gz | |
Ensure purge clears out returned messages
Fixes bug introduced with queue limit changes.
[#161247380]
| -rw-r--r-- | src/rabbit_fifo.erl | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4f67628320..c3b5e66355 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -195,7 +195,6 @@ suspected_down = false :: boolean() }). - -record(state, {name :: atom(), queue_resource :: rabbit_types:r('queue'), @@ -435,17 +434,22 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, #state{ra_indexes = Indexes0, + returns = Returns, messages = Messages} = State0) -> - Total = maps:size(Messages), - Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, + Total = messages_ready(State0), + Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, [I || {I, _} <- lists:sort(maps:values(Messages))]), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, + Indexes1, + [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = update_smallest_raft_index(RaftIdx, Indexes0, State0#state{ra_indexes = Indexes, messages = #{}, returns = lqueue:new(), msg_bytes_enqueue = 0, + prefix_msgs = {[], []}, low_msg_num = undefined}, []), %% as we're not checking out after a purge (no point) we have to @@ -646,16 +650,14 @@ tick(_Ts, #state{name = Name, -spec overview(state()) -> map(). overview(#state{consumers = Cons, enqueuers = Enqs, - messages = Messages, - ra_indexes = Indexes, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), - num_ready_messages => maps:size(Messages), - num_messages => rabbit_fifo_index:size(Indexes), + num_ready_messages => messages_ready(State), + num_messages => messages_total(State), enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. |
