summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl43
1 files changed, 32 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index da9772516c..4ed2bb743a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -472,7 +472,7 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when (node(P) =:= Node) and
(C#consumer.status =/= cancelled)->
- {St, Eff0} = return_all(St0, Checked0, Eff),
+ {St, Eff0} = return_all(St0, Checked0, Eff, K, C),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
suspected_down, Eff0),
@@ -965,7 +965,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
C0, SQ0, Effects0),
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- {S1, Effects1} = return_all(S0, Checked0, Effects0),
+ {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer),
{S1#state{consumers = Cons1}, Effects1}
end.
@@ -1089,9 +1089,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
{State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0);
+ return_one(0, Msg, S0, E0, ConsumerId, Con);
({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0)
+ return_one(MsgNum, Msg, S0, E0, ConsumerId, Con)
end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
@@ -1201,19 +1201,21 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
end.
return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0, Effects) ->
+ #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) ->
{add_bytes_return(Msg,
State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0, Effects0) ->
+ delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
- {add_bytes_settle(RawMsg, State0), Effects};
+ Checked = maps:without([MsgNum], Con#consumer.checked_out),
+ {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0),
+ {add_bytes_settle(RawMsg, State1), Effects1};
_ ->
Msg = {RaftId, {Header, RawMsg}},
%% this should not affect the release cursor in any way
@@ -1221,14 +1223,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
end.
-return_all(State0, Checked0, Effects0) ->
+return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(0, Msg, S, E);
+ return_one(0, Msg, S, E, ConsumerId, Consumer);
({_, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgNum, Msg, S, E)
+ return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
end, {State0, Effects0}, Checked).
%% checkout new messages to consumers
@@ -1868,6 +1870,26 @@ return_checked_out_test() ->
apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
+return_checked_out_limit_test() ->
+ Cid = {<<"cid">>, self()},
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, [_, _]} = enq(1, 1, first, Init),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {#state{ra_indexes = RaIdxs}, ok, []} =
+ apply(meta(4), make_return(Cid, [MsgId2]), State2),
+ ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
+ ok.
+
return_auto_checked_out_test() ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -1886,7 +1908,6 @@ return_auto_checked_out_test() ->
Effects),
ok.
-
cancelled_checkout_out_test() ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),