diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 248 |
1 files changed, 127 insertions, 121 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index e43e94a385..fb6f9ce770 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -184,11 +184,9 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0} = State) -> case Cons0 of - #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> - Checked = maps:without(MsgIds, Checked0), + #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = maps:values(Returned), - return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State); + return(Meta, ConsumerId, Returned, [], State); _ -> {State, ok} end; @@ -310,7 +308,7 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, Pid, noconnection}, +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, @@ -318,17 +316,30 @@ apply(_, {down, Pid, noconnection}, Node = node(Pid), %% if the pid refers to the active consumer, mark it as suspected and return %% it to the waiting queue - {State1, Effects0} = case maps:to_list(Cons0) of - [{{_, P} = Cid, C}] when node(P) =:= Node -> - %% the consumer should be returned to waiting - %% - Effs = consumer_update_active_effects( - State0, Cid, C, false, suspected_down, []), - {State0#?MODULE{consumers = #{}, - waiting_consumers = Waiting0 ++ [{Cid, C}]}, - Effs}; - _ -> {State0, []} - end, + {State1, Effects0} = + case maps:to_list(Cons0) of + [{{_, P} = Cid, C0}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + State0, Cid, C0, false, suspected_down, []), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(State0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = #{}, + waiting_consumers = Waiting}, + Effs1}; + _ -> {State0, []} + end, WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -342,8 +353,8 @@ apply(_, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - {State#?MODULE{enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, noconnection}, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that @@ -355,23 +366,22 @@ apply(_, {down, Pid, noconnection}, %% all pids for the disconnected node will be marked as suspected not just %% the one we got the `down' command for Node = node(Pid), - ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, - status = up} = C, - {Co, St0, Eff}) when node(P) =:= Node -> - {St, Eff0} = return_all(St0, Checked0, Eff, K, C), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff0), - {maps:put(K, C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {State, Effects1} = + maps:fold( + fun({_, P} = Cid, #consumer{checked_out = Checked0, + status = up} = C0, + {St0, Eff}) when node(P) =:= Node -> + Credit = increase_credit(C0, map_size(Checked0)), + C = C0#consumer{status = suspected_down, + credit = Credit}, + {St, Eff0} = return_all(St0, Eff, Cid, C), + Eff1 = consumer_update_active_effects(St, Cid, C, false, + suspected_down, Eff0), + {St, Eff1}; + (_, _, {St, Eff}) -> + {St, Eff} + end, {State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = suspected_down}; (_, E) -> E @@ -380,14 +390,14 @@ apply(_, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects2 = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end ++ Effects1, + Effects = case maps:size(State#?MODULE.consumers) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -795,9 +805,9 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, | Effects]. cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> - case maps:take(ConsumerId, C0) of - {Consumer, Cons1} -> - {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, + case C0 of + #{ConsumerId := Consumer} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, Effects0, Reason), %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but @@ -810,7 +820,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> _ -> {S, Effects} end; - error -> + _ -> %% already removed: do nothing {S0, Effects0} end. @@ -847,9 +857,9 @@ activate_next_consumer(#?MODULE{consumers = Cons, -maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, - Cons1, #?MODULE{consumers = C0, - service_queue = SQ0} = S0, +maybe_return_all(ConsumerId, Consumer, + #?MODULE{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> case Reason of consumer_cancel -> @@ -859,11 +869,12 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, credit = 0, status = cancelled}, C0, SQ0, Effects0), - {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1}; + {S0#?MODULE{consumers = Cons, + service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, - Consumer), - {S1#?MODULE{consumers = Cons1}, Effects1} + {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), + {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> @@ -980,38 +991,43 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, - Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, - SQ0, Effects0), - {State1, Effects2} = lists:foldl( - fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, - ConsumerId, Con); - ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0, - ConsumerId, Con) - end, {State0, Effects1}, MsgNumMsgs), - checkout(Meta, State1#?MODULE{consumers = Cons, - service_queue = SQ}, - Effects2). +return(Meta, ConsumerId, Returned, + Effects0, #?MODULE{service_queue = SQ0} = State0) -> + {State1, Effects1} = maps:fold( + fun(MsgId, {'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(MsgId, 0, Msg, S0, E0, ConsumerId); + (MsgId, {MsgNum, Msg}, {S0, E0}) -> + return_one(MsgId, MsgNum, Msg, S0, E0, + ConsumerId) + end, {State0, Effects0}, Returned), + #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers, + Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, + {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, + SQ0, Effects1), + State = State1#?MODULE{consumers = Cons, + service_queue = SQ}, + checkout(Meta, State, Effects2). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, NumDiscarded, - Con0, Checked, Effects0, +complete(ConsumerId, Discarded, + #consumer{checked_out = Checked} = Con0, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> + MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, NumDiscarded)}, + Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), + credit = increase_credit(Con0, maps:size(Discarded))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), - {State0#?MODULE{consumers = Cons, + State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> + add_bytes_settle(RawMsg, Acc); + ({'$prefix_msg', _} = M, Acc) -> + add_bytes_settle(M, Acc) + end, State0, maps:values(Discarded)), + {State1#?MODULE{consumers = Cons, ra_indexes = Indexes, service_queue = SQ}, Effects}. @@ -1030,19 +1046,9 @@ increase_credit(#consumer{credit = Current}, Credit) -> complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> - Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), - MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], - State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> - add_bytes_settle(RawMsg, Acc); - ({'$prefix_msg', _} = M, Acc) -> - add_bytes_settle(M, Acc) - end, State0, maps:values(Discarded)), - %% need to pass the length of discarded as $prefix_msgs would be filtered - %% by the above list comprehension - {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, - maps:size(Discarded), - Con0, Checked, Effects0, State1), + {State2, Effects1} = complete(ConsumerId, Discarded, Con0, + Effects0, State0), {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, State, Effects). @@ -1102,69 +1108,70 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', Header0}, +return_one(MsgId, 0, {'$prefix_msg', Header0}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {'$prefix_msg', Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - Checked = Con#consumer.checked_out, - {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, - Effects0, State0), - {add_bytes_settle(Msg, State1), Effects}; + complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way - {add_bytes_return(Msg, - State0#?MODULE{returns = lqueue:in(Msg, Returns)}), + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + {add_bytes_return( + Msg, + State0#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in(Msg, Returns)}), Effects0} end; -return_one(MsgNum, {RaftId, {Header0, RawMsg}}, +return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> DlMsg = {MsgNum, Msg}, - Effects = dead_letter_effects(delivery_limit, - #{none => DlMsg}, + Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), - Checked = Con#consumer.checked_out, - {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, - Effects, State0), - {add_bytes_settle(RawMsg, State1), Effects1}; + complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way - {add_bytes_return(RawMsg, - State0#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + {add_bytes_return( + RawMsg, + State0#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> +return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, + #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E, ConsumerId, Consumer); - ({_, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) - end, {State0, Effects0}, Checked). + State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) + end, {State, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, #{}), - case evaluate_limit(State0#?MODULE.ra_indexes, false, - State1, Effects1) of + case evaluate_limit(false, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> @@ -1187,17 +1194,16 @@ checkout0({Activity, State0}, Effects0, Acc) -> end, {State0, ok, lists:reverse(Effects1)}. -evaluate_limit(_OldIndexes, Result, +evaluate_limit(Result, #?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(OldIndexes, Result, - State0, Effects0) -> +evaluate_limit(Result, State0, Effects0) -> case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(OldIndexes, true, State, Effects); + evaluate_limit(true, State, Effects); false -> {State0, Result, Effects0} end. |
