summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-12-21 15:02:21 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-21 15:02:21 +0000
commit16728ecbc56c4a200a4d86ff99f4527b2e8dbec7 (patch)
treefb4880ae7e161ca8840b24f76825a9952b2f039e /src
parent7e347885de96e95ce85fab577861b98aa0fa7dbf (diff)
downloadrabbitmq-server-git-16728ecbc56c4a200a4d86ff99f4527b2e8dbec7.tar.gz
Quorum queue: purege should not remove checkout out messages.
As these may be returned by a consumer after the purge. [#162811681]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl41
-rw-r--r--src/rabbit_fifo_index.erl6
2 files changed, 21 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 579af38bb3..e89f4a9e22 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -413,26 +413,20 @@ apply(_, #checkout{spec = Spec, meta = Meta,
State1 = update_consumer(ConsumerId, Meta, Spec, State0),
checkout(State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
- #state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
- Total = rabbit_fifo_index:size(Indexes),
- {State1, Effects1} =
- maps:fold(
- fun(ConsumerId, C = #consumer{checked_out = Checked0},
- {StateAcc0, EffectsAcc0}) ->
- MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
- <- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
- #{}, EffectsAcc0, StateAcc0)
- end, {State0, []}, Cons0),
- {State, _, Effects} =
- update_smallest_raft_index(
- RaftIdx, Indexes,
- State1#state{ra_indexes = rabbit_fifo_index:empty(),
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- msg_bytes_checkout = 0,
- low_msg_num = undefined}, Effects1),
+ #state{ra_indexes = Indexes0,
+ messages = Messages} = State0) ->
+ Total = maps:size(Messages),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Indexes0,
+ [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ {State, _, Effects} =
+ update_smallest_raft_index(RaftIdx, Indexes0,
+ State0#state{ra_indexes = Indexes,
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ low_msg_num = undefined},
+ []),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
{State, {purge, Total},
@@ -1876,11 +1870,12 @@ purge_with_checkout_test() ->
%% assert message bytes are non zero
?assert(State2#state.msg_bytes_checkout > 0),
?assert(State2#state.msg_bytes_enqueue > 0),
- {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2),
- ?assertEqual(0, State3#state.msg_bytes_checkout),
+ {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2),
+ ?assert(State2#state.msg_bytes_checkout > 0),
?assertEqual(0, State3#state.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
- ?assertEqual(0, maps:size(Checked)),
+ ?assertEqual(1, maps:size(Checked)),
ok.
down_returns_checked_out_in_order_test() ->
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index 345a99a03c..f8f414f453 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State)
when is_integer(Key) ->
State#?MODULE{data = maps:put(Key, Value, Data)}.
--spec delete(integer(), state()) -> state().
+-spec delete(Index :: integer(), state()) -> state().
delete(Smallest, #?MODULE{data = Data0,
- largest = Largest,
- smallest = Smallest} = State) ->
+ largest = Largest,
+ smallest = Smallest} = State) ->
Data = maps:remove(Smallest, Data0),
case find_next(Smallest + 1, Largest, Data) of
undefined ->