diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-17 17:10:29 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-17 17:10:29 +0000 |
| commit | 365cfd054e543efe399eae8a319fd92ec30ae35b (patch) | |
| tree | 7180351c40067425bd70a20721556e7dfea973e1 /src | |
| parent | ffc233c1a2fe8b18b7ae6e9738646a4133b07067 (diff) | |
| download | rabbitmq-server-git-365cfd054e543efe399eae8a319fd92ec30ae35b.tar.gz | |
rabbit_fifo: refactoring
Light refactoring
Shortening some lines for the benefit of split buffer users.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 237 |
1 files changed, 138 insertions, 99 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 8f34c01210..b62ff62a51 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -420,10 +420,10 @@ apply(_, #checkout{spec = {dequeue, unsettled}, {S, {dequeue, empty}} end; apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}), + {State, Effects} = cancel_consumer(ConsumerId, State0, []), % TODO: here we should really demonitor the pid but _only_ if it has no % other consumers or enqueuers. - checkout(State1, CancelEffects); + checkout(State, Effects); apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId}, State0) -> @@ -475,7 +475,7 @@ apply(_, {down, ConsumerPid, noconnection}, (_, E) -> E end, Enqs0), % mark waiting consumers as suspected if necessary - WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true), + WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true), Effects = case maps:size(Cons) of 0 -> @@ -484,9 +484,10 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects}; -apply(_, {down, Pid, _Info}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + {State#state{consumers = Cons, enqueuers = Enqs, + waiting_consumers = WaitingConsumers}, ok, Effects}; +apply(_, {down, Pid, _Info}, #state{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -497,42 +498,28 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, error -> State0 end, - {Effects1, State2} = maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, State1), + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2}, - DownConsumers), - checkout(State3, Effects2); -apply(_, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0, - waiting_consumers = WaitingConsumers0} = State0) -> + {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E) + end, {State2, Effects1}, DownConsumers), + checkout(State, Effects); +apply(_, {nodeup, Node}, #state{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're %% actually down or not - Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, [], Cons0), - Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, [], Enqs0), - WaitingConsumers = lists:foldl(fun({{_, P}, #consumer{suspected_down = true}}, Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, Acc) -> Acc - end, [], WaitingConsumers0), - - Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers], + Monitors = [{monitor, process, P} + || P <- suspected_pids_for(Node, State0)], % un-suspect waiting consumers when necessary - WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, false), + WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, + false), Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = false}; @@ -549,36 +536,48 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, waiting_consumers = WaitingConsumers1}, Effects); + service_queue = SQ, + waiting_consumers = WaitingConsumers}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. -maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) -> +handle_waiting_consumer_down(_Pid, + #state{consumer_strategy = default} = State) -> {[], State}; -maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active, - waiting_consumers = []} = State) -> +handle_waiting_consumer_down(_Pid, + #state{consumer_strategy = single_active, + waiting_consumers = []} = State) -> {[], State}; -maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0) -> +handle_waiting_consumer_down(Pid, + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers - DownWaitingConsumers = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), - Effects1 = lists:foldl(fun ({ConsumerId, _}, Effects) -> - cancel_consumer_effects(ConsumerId, State0, Effects) - end, [], DownWaitingConsumers), + Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, + WaitingConsumers0), + Effects = lists:foldl(fun ({ConsumerId, _}, Effects) -> + cancel_consumer_effects(ConsumerId, State0, + Effects) + end, [], Down), % update state to have only up waiting consumers - WaitingConsumersStillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), - State2 = State0#state{waiting_consumers = WaitingConsumersStillUp}, - {Effects1, State2}. + StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, + WaitingConsumers0), + State = State0#state{waiting_consumers = StillUp}, + {Effects, State}. -maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) -> +maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, + _Suspected) -> []; -maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active, - waiting_consumers = []}, _Suspected) -> +maybe_mark_suspect_waiting_consumers(_Node, + #state{consumer_strategy = single_active, + waiting_consumers = []}, + _Suspected) -> []; -maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, Suspected) -> +maybe_mark_suspect_waiting_consumers(Node, + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, + Suspected) -> [begin case node(P) of Node -> @@ -617,9 +616,11 @@ state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), - WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), + WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, + #{}, WaitingConsumers0), AllConsumers = maps:merge(Custs, WaitingConsumers1), - [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; + [{send_msg, P, eol, ra_event} + || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; state_enter(_, _) -> %% catch all as not handling all states []. @@ -708,10 +709,12 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> query_ra_indexes(#state{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> +query_consumer_count(#state{consumers = Consumers, + waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). -query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) -> +query_consumers(#state{consumers = Consumers, + waiting_consumers = WaitingConsumers} = State) -> SingleActiveConsumer = query_single_active_consumer(State), IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) -> case SingleActiveConsumer of @@ -729,19 +732,21 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume maps:get(args, Meta, []), maps:get(username, Meta, undefined)} end, Consumers), - FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> - maps:put({Tag, Pid}, - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - IsSingleActiveConsumerFun({Tag, Pid}), - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)}, - Acc) - end, #{}, WaitingConsumers), + FromWaitingConsumers = + lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + IsSingleActiveConsumerFun({Tag, Pid}), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) -> +query_single_active_consumer(#state{consumer_strategy = single_active, + consumers = Consumers}) -> case maps:size(Consumers) of 1 -> {value, lists:nth(1, maps:keys(Consumers))}; @@ -799,65 +804,79 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - {Effects0, #state{consumer_strategy = default} = S0}) -> + #state{consumer_strategy = default} = State, Effects) -> %% general case, single active consumer off - cancel_consumer0(ConsumerId, {Effects0, S0}); + cancel_consumer0(ConsumerId, State, Effects); cancel_consumer(ConsumerId, - {Effects0, #state{consumer_strategy = single_active, - waiting_consumers = [] } = S0}) -> + #state{consumer_strategy = single_active, + waiting_consumers = []} = State, + Effects) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, {Effects0, S0}); + cancel_consumer0(ConsumerId, State, Effects); cancel_consumer(ConsumerId, - {Effects0, #state{consumers = Cons0, - consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0 } = State0}) -> + #state{consumers = Cons0, + consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0, + Effects0) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of - {_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} -> + {#consumer{checked_out = Checked0}, _} -> % The active consumer is to be removed % Cancel it - S = return_all(State0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + State1 = return_all(State0, Checked0), + Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0), % Take another one from the waiting consumers and put it in consumers - [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, + [{NewActiveConsumerId, NewActiveConsumer} + | RemainingWaitingConsumers] = WaitingConsumers0, #state{service_queue = ServiceQueue} = State0, - ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, - Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects), - {Effects1, State1}; + ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, + NewActiveConsumer, + ServiceQueue), + State = State1#state{consumers = #{NewActiveConsumerId => + NewActiveConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = RemainingWaitingConsumers}, + add_consumer_promotion_effect(State, NewActiveConsumerId, + NewActiveConsumer, Effects1); error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0), + WaitingConsumers = lists:keydelete(ConsumerId, 1, + WaitingConsumers0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), - % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here - {Effects, State0#state{waiting_consumers = WaitingConsumers1}} + % A waiting consumer isn't supposed to have any checked out messages, + % so nothing special to do here + {State0#state{waiting_consumers = WaitingConsumers}, Effects} end. -consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active, - queue_resource = QName }, - ConsumerId, #consumer{meta = Meta}, Effects) -> - [{mod_call, rabbit_quorum_queue, update_consumer_handler, - [QName, ConsumerId, false, maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects]. +add_consumer_promotion_effect(#state{consumer_strategy = single_active, + queue_resource = QName} = State, + ConsumerId, #consumer{meta = Meta}, + Effects) -> + Ack = maps:get(ack, Meta, undefined), + Prefetch = maps:get(prefetch, Meta, undefined), + Args = maps:get(args, Meta, []), + {State, [{mod_call, + rabbit_quorum_queue, + update_consumer_handler, + [QName, ConsumerId, false, Ack, Prefetch, true, Args]} + | Effects]}. cancel_consumer0(ConsumerId, - {Effects0, #state{consumers = C0} = S0}) -> + #state{consumers = C0} = S0, Effects0) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, S, Effects0), case maps:size(Cons) of 0 -> - {[{aux, inactive} | Effects], S#state{consumers = Cons}}; + {S#state{consumers = Cons}, [{aux, inactive} | Effects]}; _ -> - {Effects, S#state{consumers = Cons}} + {S#state{consumers = Cons}, Effects} end; error -> %% already removed: do nothing - {Effects0, S0} + {S0, Effects0} end. enqueue(RaftIdx, RawMsg, #state{messages = Messages, @@ -895,8 +914,7 @@ enqueue_pending(From, enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) -> State#state{enqueuers = Enqueuers0#{From => Enq}}. -maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, - State0) -> +maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking {ok, enqueue(RaftIdx, RawMsg, State0), Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, @@ -1245,7 +1263,8 @@ update_consumer(ConsumerId, Meta, Spec, update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, #state{consumers = Cons0, - consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 -> + consumer_strategy = single_active} = State0) + when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, @@ -1381,6 +1400,26 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +suspected_pids_for(Node, #state{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, + #consumer{suspected_down = true}}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). |
