summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-25 12:50:46 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-25 12:54:51 +0000
commit07bfd2bb198356967a1a01c9783ad1b818fb2d60 (patch)
tree573048cfd473ec43a06aa5a43bb6adff6ace0111
parent16f49718518ef14f72b32981a210723e7ea057aa (diff)
downloadrabbitmq-server-git-07bfd2bb198356967a1a01c9783ad1b818fb2d60.tar.gz
Ensure purge clears out returned messages
Fixes bug introduced with queue limit changes. [#161247380]
-rw-r--r--src/rabbit_fifo.erl16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4f67628320..c3b5e66355 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -195,7 +195,6 @@
suspected_down = false :: boolean()
}).
-
-record(state,
{name :: atom(),
queue_resource :: rabbit_types:r('queue'),
@@ -435,17 +434,22 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#state{ra_indexes = Indexes0,
+ returns = Returns,
messages = Messages} = State0) ->
- Total = maps:size(Messages),
- Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2,
Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx, Indexes0,
State0#state{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
+ prefix_msgs = {[], []},
low_msg_num = undefined},
[]),
%% as we're not checking out after a purge (no point) we have to
@@ -646,16 +650,14 @@ tick(_Ts, #state{name = Name,
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
enqueuers = Enqs,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
- num_ready_messages => maps:size(Messages),
- num_messages => rabbit_fifo_index:size(Indexes),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.