diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index da9772516c..4ed2bb743a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -472,7 +472,7 @@ apply(_, {down, ConsumerPid, noconnection}, #consumer{checked_out = Checked0} = C, {Co, St0, Eff}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled)-> - {St, Eff0} = return_all(St0, Checked0, Eff), + {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), @@ -965,7 +965,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 C0, SQ0, Effects0), {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0), + {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer), {S1#state{consumers = Cons1}, Effects1} end. @@ -1089,9 +1089,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0); + return_one(0, Msg, S0, E0, ConsumerId, Con); ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0) + return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, @@ -1201,19 +1201,21 @@ find_next_cursor(Smallest, Cursors0, Potential) -> end. return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0, Effects) -> + #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) -> {add_bytes_return(Msg, State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, Effects0) -> + delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), - {add_bytes_settle(RawMsg, State0), Effects}; + Checked = maps:without([MsgNum], Con#consumer.checked_out), + {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), + {add_bytes_settle(RawMsg, State1), Effects1}; _ -> Msg = {RaftId, {Header, RawMsg}}, %% this should not affect the release cursor in any way @@ -1221,14 +1223,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(State0, Checked0, Effects0) -> +return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E); + return_one(0, Msg, S, E, ConsumerId, Consumer); ({_, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgNum, Msg, S, E) + return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) end, {State0, Effects0}, Checked). %% checkout new messages to consumers @@ -1868,6 +1870,26 @@ return_checked_out_test() -> apply(meta(3), make_return(Cid, [MsgId]), State1), ok. +return_checked_out_limit_test() -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), make_return(Cid, [MsgId]), State1), + {#state{ra_indexes = RaIdxs}, ok, []} = + apply(meta(4), make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + return_auto_checked_out_test() -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), @@ -1886,7 +1908,6 @@ return_auto_checked_out_test() -> Effects), ok. - cancelled_checkout_out_test() -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), |
