summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl113
1 files changed, 61 insertions, 52 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index ee0f65950c..af3441df86 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -486,7 +486,7 @@ apply(_, {down, ConsumerPid, noconnection},
end, Enqs0),
% mark waiting consumers as suspected if necessary
WaitingConsumers = update_waiting_consumer_status(Node, State0,
- suspected_down),
+ suspected_down),
Effects2 = case maps:size(Cons) of
0 ->
@@ -537,16 +537,17 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when (node(P) =:= Node) and
- (C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
- update_or_remove_sub(
- ConsumerId, C#consumer{status = up},
- CAcc, SQAcc, EAcc1);
- (_, _, Acc) ->
- Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C,
+ true, up, EAcc),
+ update_or_remove_sub(
+ ConsumerId, C#consumer{status = up},
+ CAcc, SQAcc, EAcc1);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Monitors}, Cons0),
checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
service_queue = SQ,
@@ -558,7 +559,8 @@ apply(Meta, #update_config{config = Conf}, State) ->
consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
- consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects)
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active,
+ ActivityStatus, Effects)
end;
consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
fun(_, _, _, _, _, Effects) ->
@@ -739,47 +741,51 @@ query_consumer_count(#state{consumers = Consumers,
query_consumers(#state{consumers = Consumers,
waiting_consumers = WaitingConsumers,
consumer_strategy = ConsumerStrategy } = State) ->
- ActiveActivityStatusFun = case ConsumerStrategy of
- default ->
- fun(_ConsumerId,
- #consumer{status = Status}) ->
- case Status of
- suspected_down ->
- {false, Status};
- _ ->
- {true, Status}
- end
- end;
- single_active ->
- SingleActiveConsumer = query_single_active_consumer(State),
- fun({Tag, Pid} = _Consumer, _) ->
- case SingleActiveConsumer of
- {value, {Tag, Pid}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end
- end
- end,
- FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
- Acc;
- ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
- {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
- maps:put({Tag, Pid},
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- Active,
- ActivityStatus,
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)},
- Acc)
- end, #{}, Consumers),
+ ActiveActivityStatusFun =
+ case ConsumerStrategy of
+ default ->
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers =
+ maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
FromWaitingConsumers =
lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
Acc;
({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
- {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
maps:get(ack, Meta, undefined),
@@ -882,7 +888,8 @@ cancel_consumer(ConsumerId,
{Consumer, Cons1} ->
% The active consumer is to be removed
% Cancel it
- {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason),
+ {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1,
+ State0, Effects0, Reason),
Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
@@ -892,7 +899,8 @@ cancel_consumer(ConsumerId,
NewActiveConsumer,
ServiceQueue),
State = State1#state{consumers = maps:put(NewActiveConsumerId,
- NewActiveConsumer, State1#state.consumers),
+ NewActiveConsumer,
+ State1#state.consumers),
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
Effects = consumer_update_active_effects(State, NewActiveConsumerId,
@@ -926,7 +934,8 @@ consumer_update_active_effects(#state{queue_resource = QName },
cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
{Consumer, Cons1} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason),
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
+ Effects0, Reason),
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
case maps:size(S#state.consumers) of
0 ->