summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-28 10:32:41 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-04 17:15:33 +0000
commit735810edf529605143fcc9b3544e1703f55b0d90 (patch)
tree820afa03de0da63a413f09807e1de53f254bb84f
parent42e63f0e6f276ce88d1f674ce8895975d2ffb054 (diff)
downloadrabbitmq-server-git-735810edf529605143fcc9b3544e1703f55b0d90.tar.gz
rabbit_fifo: ensure credit is regained
When returning a $prefix_msg
-rw-r--r--src/rabbit_fifo.erl60
1 files changed, 41 insertions, 19 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index ca9003dd8b..f96f59639c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -357,7 +357,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
{StateAcc0, EffectsAcc0, ok}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, C,
+ complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
end, {State0, Effects0, ok}, Cons0),
{State, Effects, _} =
@@ -699,7 +699,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) ->
+ State1 = lists:foldl(fun('$prefix_msg',
+ #state{prefix_msg_count = MsgCount} = S0) ->
S0#state{prefix_msg_count = MsgCount + 1};
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
@@ -709,14 +710,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
Effects).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs,
+complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
#state{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
- %% credit_mode = simple_prefetch should automatically top-up credit as messages
- %% are simple_prefetch or otherwise returned
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgRaftIdxs))},
+ credit = increase_credit(Con0, NumDiscarded)},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
@@ -742,7 +743,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% need to pass the length of discarded as $prefix_msgs would be filtered
+ %% by the above list comprehension
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
Con0, Checked, Effects0, State0),
{State, Effects, _} = checkout(State1, Effects1),
% settle metrics are incremented separately
@@ -810,7 +814,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
returns = queue:in(MsgNum, Returns)}.
return_all(State, Checked) ->
- maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ maps:fold(fun (_, '$prefix_msg',
+ #state{prefix_msg_count = MsgCount} = S) ->
+ S#state{prefix_msg_count = MsgCount + 1};
+ (_, {MsgNum, Msg}, S) ->
return_one(MsgNum, Msg, S)
end, State, Checked).
@@ -1452,17 +1459,19 @@ snapshot_recover_test() ->
enq_deq_return_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
+ % OthPid = spawn(fun () -> ok end),
+ % Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Oth},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Oth},
- {return, [0], Cid},
- {enqueue, self(), 3, three},
- purge
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ % {checkout, {auto, 1, simple_prefetch}, Oth},
+ {return, [0], Cid}, %% should be re-delivered to Oth
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 1
+ % {enqueue, self(), 3, three}, %% Queued: prefetch_msg_count: 2?
+ % {settle, [0], Oth},
+ {settle, [1], Cid},
+ {settle, [2], Cid}
+ % purge
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1477,17 +1486,30 @@ enq_check_settle_snapshot_recover_test() ->
{settle, [0], Cid},
{enqueue, self(), 3, three},
{settle, [2], Cid}
-
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_snapshot_purge_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one},
+ {enqueue, self(), 2, two},
+ {settle, [1], Cid},
+ {settle, [0], Cid},
+ {enqueue, self(), 3, three},
+ purge
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].