summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl107
1 files changed, 65 insertions, 42 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 450ba1ac85..58e91f3d19 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -184,7 +184,7 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- suspected_down = false :: boolean()
+ suspected_down = false :: 'cancel' | boolean()
}).
-type consumer() :: #consumer{}.
@@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, []),
- % TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers. leaving a monitor in place isn't harmful
- % however
+ {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
checkout(Meta, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
@@ -469,7 +466,8 @@ apply(_, {down, ConsumerPid, noconnection},
{Cons, State, Effects1} = maps:fold(
fun({_, P} = K,
#consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when node(P) =:= Node ->
+ {Co, St0, Eff}) when (node(P) =:= Node) and
+ (C#consumer.suspected_down =/= cancel)->
St = return_all(St0, Checked0),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
@@ -514,7 +512,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E)
+ cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
@@ -538,7 +536,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when node(P) =:= Node ->
+ when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) ->
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
@@ -606,7 +604,8 @@ maybe_mark_suspect_waiting_consumers(Node,
_ ->
{ConsumerId, Consumer}
end
- end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers].
+ end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.suspected_down =/= cancel].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -758,18 +757,24 @@ query_consumers(#state{consumers = Consumers,
end
end
end,
- FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
- {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- Active,
- ActivityStatus,
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)}
- end, Consumers),
+ FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
FromWaitingConsumers =
- lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
@@ -854,42 +859,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{consumer_strategy = default} = State, Effects) ->
+ #state{consumer_strategy = default} = State, Effects, Reason) ->
%% general case, single active consumer off
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumer_strategy = single_active,
waiting_consumers = []} = State,
- Effects) ->
+ Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumers = Cons0,
consumer_strategy = single_active,
waiting_consumers = WaitingConsumers0} = State0,
- Effects0) ->
+ Effects0, Reason) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
- {#consumer{checked_out = Checked0}, _} ->
+ {Consumer, Cons1} ->
% The active consumer is to be removed
% Cancel it
- State1 = return_all(State0, Checked0),
- Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0),
+ {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason),
+ Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
| RemainingWaitingConsumers] = WaitingConsumers0,
- #state{service_queue = ServiceQueue} = State0,
+ #state{service_queue = ServiceQueue} = State1,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
NewActiveConsumer,
ServiceQueue),
- State = State1#state{consumers = #{NewActiveConsumerId =>
- NewActiveConsumer},
+ State = State1#state{consumers = maps:put(NewActiveConsumerId,
+ NewActiveConsumer, State1#state.consumers),
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
+ Effects = consumer_update_active_effects(State, NewActiveConsumerId,
NewActiveConsumer, true,
- single_active, Effects1),
- {State, Effects2};
+ single_active, Effects2),
+ {State, Effects};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -914,23 +919,39 @@ consumer_update_active_effects(#state{queue_resource = QName },
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId,
- #state{consumers = C0} = S0, Effects0) ->
+cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
- {#consumer{checked_out = Checked0}, Cons} ->
- S = return_all(S0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- case maps:size(Cons) of
+ {Consumer, Cons1} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#state.consumers) of
0 ->
- {S#state{consumers = Cons}, [{aux, inactive} | Effects]};
+ {S, [{aux, inactive} | Effects]};
_ ->
- {S#state{consumers = Cons}, Effects}
+ {S, Effects}
end;
error ->
%% already removed: do nothing
{S0, Effects0}
end.
+maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
+ #state{consumers = C0,
+ service_queue = SQ0} = S0, Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {Cons, SQ, Effects1} =
+ update_or_remove_sub(ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ suspected_down = cancel},
+ C0, SQ0, Effects0),
+ {S0#state{consumers = Cons, service_queue = SQ}, Effects1};
+ down ->
+ S1 = return_all(S0, Checked0),
+ {S1#state{consumers = Cons1}, Effects0}
+ end.
+
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
@@ -1303,6 +1324,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = cancel}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{suspected_down = true}} ->
checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,