diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-21 09:59:16 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-21 09:59:16 +0000 |
| commit | 14a09806440e06d7a7ff77551f68610d916c597a (patch) | |
| tree | c8d6d079ab74bda02db0657b6fcf9efefa0601b0 | |
| parent | 02f84e6a15ed544b777f87768466099721274923 (diff) | |
| download | rabbitmq-server-git-14a09806440e06d7a7ff77551f68610d916c597a.tar.gz | |
rabbit_fifo: formatting
Make it more readable for people that use split buffers.
| -rw-r--r-- | src/rabbit_fifo.erl | 113 |
1 files changed, 61 insertions, 52 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index ee0f65950c..af3441df86 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -486,7 +486,7 @@ apply(_, {down, ConsumerPid, noconnection}, end, Enqs0), % mark waiting consumers as suspected if necessary WaitingConsumers = update_waiting_consumer_status(Node, State0, - suspected_down), + suspected_down), Effects2 = case maps:size(Cons) of 0 -> @@ -537,16 +537,17 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), - update_or_remove_sub( - ConsumerId, C#consumer{status = up}, - CAcc, SQAcc, EAcc1); - (_, _, Acc) -> - Acc - end, {Cons0, SQ0, Monitors}, Cons0), + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, + true, up, EAcc), + update_or_remove_sub( + ConsumerId, C#consumer{status = up}, + CAcc, SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1, service_queue = SQ, @@ -558,7 +559,8 @@ apply(Meta, #update_config{config = Conf}, State) -> consumer_active_flag_update_function(#state{consumer_strategy = default}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> - consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) + consumer_update_active_effects(State, ConsumerId, Consumer, Active, + ActivityStatus, Effects) end; consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> fun(_, _, _, _, _, Effects) -> @@ -739,47 +741,51 @@ query_consumer_count(#state{consumers = Consumers, query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers, consumer_strategy = ConsumerStrategy } = State) -> - ActiveActivityStatusFun = case ConsumerStrategy of - default -> - fun(_ConsumerId, - #consumer{status = Status}) -> - case Status of - suspected_down -> - {false, Status}; - _ -> - {true, Status} - end - end; - single_active -> - SingleActiveConsumer = query_single_active_consumer(State), - fun({Tag, Pid} = _Consumer, _) -> - case SingleActiveConsumer of - {value, {Tag, Pid}} -> - {true, single_active}; - _ -> - {false, waiting} - end - end - end, - FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> - Acc; - ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), - maps:put({Tag, Pid}, - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - Active, - ActivityStatus, - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)}, - Acc) - end, #{}, Consumers), + ActiveActivityStatusFun = + case ConsumerStrategy of + default -> + fun(_ConsumerId, + #consumer{status = Status}) -> + case Status of + suspected_down -> + {false, Status}; + _ -> + {true, Status} + end + end; + single_active -> + SingleActiveConsumer = query_single_active_consumer(State), + fun({Tag, Pid} = _Consumer, _) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + {true, single_active}; + _ -> + {false, waiting} + end + end + end, + FromConsumers = + maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = + ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), FromWaitingConsumers = lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) -> Acc; ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), + {Active, ActivityStatus} = + ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, maps:get(ack, Meta, undefined), @@ -882,7 +888,8 @@ cancel_consumer(ConsumerId, {Consumer, Cons1} -> % The active consumer is to be removed % Cancel it - {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason), + {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, + State0, Effects0, Reason), Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} @@ -892,7 +899,8 @@ cancel_consumer(ConsumerId, NewActiveConsumer, ServiceQueue), State = State1#state{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, State1#state.consumers), + NewActiveConsumer, + State1#state.consumers), service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, Effects = consumer_update_active_effects(State, NewActiveConsumerId, @@ -926,7 +934,8 @@ consumer_update_active_effects(#state{queue_resource = QName }, cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of {Consumer, Cons1} -> - {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, + Effects0, Reason), Effects = cancel_consumer_effects(ConsumerId, S, Effects2), case maps:size(S#state.consumers) of 0 -> |
