diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-21 15:02:21 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-21 15:02:21 +0000 |
| commit | 16728ecbc56c4a200a4d86ff99f4527b2e8dbec7 (patch) | |
| tree | fb4880ae7e161ca8840b24f76825a9952b2f039e /src | |
| parent | 7e347885de96e95ce85fab577861b98aa0fa7dbf (diff) | |
| download | rabbitmq-server-git-16728ecbc56c4a200a4d86ff99f4527b2e8dbec7.tar.gz | |
Quorum queue: purege should not remove checkout out messages.
As these may be returned by a consumer after the purge.
[#162811681]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 6 |
2 files changed, 21 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 579af38bb3..e89f4a9e22 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -413,26 +413,20 @@ apply(_, #checkout{spec = Spec, meta = Meta, State1 = update_consumer(ConsumerId, Meta, Spec, State0), checkout(State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, - #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> - Total = rabbit_fifo_index:size(Indexes), - {State1, Effects1} = - maps:fold( - fun(ConsumerId, C = #consumer{checked_out = Checked0}, - {StateAcc0, EffectsAcc0}) -> - MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} - <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, - #{}, EffectsAcc0, StateAcc0) - end, {State0, []}, Cons0), - {State, _, Effects} = - update_smallest_raft_index( - RaftIdx, Indexes, - State1#state{ra_indexes = rabbit_fifo_index:empty(), - messages = #{}, - returns = lqueue:new(), - msg_bytes_enqueue = 0, - msg_bytes_checkout = 0, - low_msg_num = undefined}, Effects1), + #state{ra_indexes = Indexes0, + messages = Messages} = State0) -> + Total = maps:size(Messages), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, + Indexes0, + [I || {I, _} <- lists:sort(maps:values(Messages))]), + {State, _, Effects} = + update_smallest_raft_index(RaftIdx, Indexes0, + State0#state{ra_indexes = Indexes, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + low_msg_num = undefined}, + []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves {State, {purge, Total}, @@ -1876,11 +1870,12 @@ purge_with_checkout_test() -> %% assert message bytes are non zero ?assert(State2#state.msg_bytes_checkout > 0), ?assert(State2#state.msg_bytes_enqueue > 0), - {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2), - ?assertEqual(0, State3#state.msg_bytes_checkout), + {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2), + ?assert(State2#state.msg_bytes_checkout > 0), ?assertEqual(0, State3#state.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), - ?assertEqual(0, maps:size(Checked)), + ?assertEqual(1, maps:size(Checked)), ok. down_returns_checked_out_in_order_test() -> diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 345a99a03c..f8f414f453 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> State#?MODULE{data = maps:put(Key, Value, Data)}. --spec delete(integer(), state()) -> state(). +-spec delete(Index :: integer(), state()) -> state(). delete(Smallest, #?MODULE{data = Data0, - largest = Largest, - smallest = Smallest} = State) -> + largest = Largest, + smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> |
