diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 107 |
1 files changed, 65 insertions, 42 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 450ba1ac85..58e91f3d19 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -184,7 +184,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - suspected_down = false :: boolean() + suspected_down = false :: 'cancel' | boolean() }). -type consumer() :: #consumer{}. @@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(ConsumerId, State0, []), - % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. leaving a monitor in place isn't harmful - % however + {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), checkout(Meta, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, @@ -469,7 +466,8 @@ apply(_, {down, ConsumerPid, noconnection}, {Cons, State, Effects1} = maps:fold( fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when node(P) =:= Node -> + {Co, St0, Eff}) when (node(P) =:= Node) and + (C#consumer.suspected_down =/= cancel)-> St = return_all(St0, Checked0), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), @@ -514,7 +512,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E) + cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #state{consumers = Cons0, @@ -538,7 +536,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when node(P) =:= Node -> + when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) -> EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, @@ -606,7 +604,8 @@ maybe_mark_suspect_waiting_consumers(Node, _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.suspected_down =/= cancel]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -758,18 +757,24 @@ query_consumers(#state{consumers = Consumers, end end end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - Active, - ActivityStatus, - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers), + FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) -> + Acc; + ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, @@ -854,42 +859,42 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects) -> + #state{consumer_strategy = default} = State, Effects, Reason) -> %% general case, single active consumer off - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumer_strategy = single_active, waiting_consumers = []} = State, - Effects) -> + Effects, Reason) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumers = Cons0, consumer_strategy = single_active, waiting_consumers = WaitingConsumers0} = State0, - Effects0) -> + Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of - {#consumer{checked_out = Checked0}, _} -> + {Consumer, Cons1} -> % The active consumer is to be removed % Cancel it - State1 = return_all(State0, Checked0), - Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0), + {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason), + Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, - #state{service_queue = ServiceQueue} = State0, + #state{service_queue = ServiceQueue} = State1, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State = State1#state{consumers = #{NewActiveConsumerId => - NewActiveConsumer}, + State = State1#state{consumers = maps:put(NewActiveConsumerId, + NewActiveConsumer, State1#state.consumers), service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, + Effects = consumer_update_active_effects(State, NewActiveConsumerId, NewActiveConsumer, true, - single_active, Effects1), - {State, Effects2}; + single_active, Effects2), + {State, Effects}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -914,23 +919,39 @@ consumer_update_active_effects(#state{queue_resource = QName }, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, - #state{consumers = C0} = S0, Effects0) -> +cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of - {#consumer{checked_out = Checked0}, Cons} -> - S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - case maps:size(Cons) of + {Consumer, Cons1} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#state.consumers) of 0 -> - {S#state{consumers = Cons}, [{aux, inactive} | Effects]}; + {S, [{aux, inactive} | Effects]}; _ -> - {S#state{consumers = Cons}, Effects} + {S, Effects} end; error -> %% already removed: do nothing {S0, Effects0} end. +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, + #state{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> + case Reason of + consumer_cancel -> + {Cons, SQ, Effects1} = + update_or_remove_sub(ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + suspected_down = cancel}, + C0, SQ0, Effects0), + {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; + down -> + S1 = return_all(S0, Checked0), + {S1#state{consumers = Cons1}, Effects0} + end. + apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of @@ -1303,6 +1324,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = cancel}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{suspected_down = true}} -> checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, |
