diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-11-28 10:32:41 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-04 17:15:33 +0000 |
| commit | 735810edf529605143fcc9b3544e1703f55b0d90 (patch) | |
| tree | 820afa03de0da63a413f09807e1de53f254bb84f | |
| parent | 42e63f0e6f276ce88d1f674ce8895975d2ffb054 (diff) | |
| download | rabbitmq-server-git-735810edf529605143fcc9b3544e1703f55b0d90.tar.gz | |
rabbit_fifo: ensure credit is regained
When returning a $prefix_msg
| -rw-r--r-- | src/rabbit_fifo.erl | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index ca9003dd8b..f96f59639c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -357,7 +357,7 @@ apply(#{index := RaftIdx}, purge, Effects0, {StateAcc0, EffectsAcc0, ok}) -> MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, C, + complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, #{}, EffectsAcc0, StateAcc0) end, {State0, Effects0, ok}, Cons0), {State, Effects, _} = @@ -699,7 +699,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) -> + State1 = lists:foldl(fun('$prefix_msg', + #state{prefix_msg_count = MsgCount} = S0) -> S0#state{prefix_msg_count = MsgCount + 1}; ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) @@ -709,14 +710,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, Effects). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, +complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> - %% credit_mode = simple_prefetch should automatically top-up credit as messages - %% are simple_prefetch or otherwise returned + %% credit_mode = simple_prefetch should automatically top-up credit + %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgRaftIdxs))}, + credit = increase_credit(Con0, NumDiscarded)}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), @@ -742,7 +743,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], + %% need to pass the length of discarded as $prefix_msgs would be filtered + %% by the above list comprehension {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, + maps:size(Discarded), Con0, Checked, Effects0, State0), {State, Effects, _} = checkout(State1, Effects1), % settle metrics are incremented separately @@ -810,7 +814,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, returns = queue:in(MsgNum, Returns)}. return_all(State, Checked) -> - maps:fold(fun (_, {MsgNum, Msg}, S) -> + maps:fold(fun (_, '$prefix_msg', + #state{prefix_msg_count = MsgCount} = S) -> + S#state{prefix_msg_count = MsgCount + 1}; + (_, {MsgNum, Msg}, S) -> return_one(MsgNum, Msg, S) end, State, Checked). @@ -1452,17 +1459,19 @@ snapshot_recover_test() -> enq_deq_return_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - OthPid = spawn(fun () -> ok end), - Oth = {<<"oth">>, OthPid}, + % OthPid = spawn(fun () -> ok end), + % Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Oth}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Oth}, - {return, [0], Cid}, - {enqueue, self(), 3, three}, - purge + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + % {checkout, {auto, 1, simple_prefetch}, Oth}, + {return, [0], Cid}, %% should be re-delivered to Oth + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 1 + % {enqueue, self(), 3, three}, %% Queued: prefetch_msg_count: 2? + % {settle, [0], Oth}, + {settle, [1], Cid}, + {settle, [2], Cid} + % purge ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1477,17 +1486,30 @@ enq_check_settle_snapshot_recover_test() -> {settle, [0], Cid}, {enqueue, self(), 3, three}, {settle, [2], Cid} - ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). +enq_check_settle_snapshot_purge_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, + {enqueue, self(), 2, two}, + {settle, [1], Cid}, + {settle, [0], Cid}, + {enqueue, self(), 3, three}, + purge + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Name, C) end || C <- prefixes(Commands, 1, [])]. |
