summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-17 17:10:29 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-17 17:10:29 +0000
commit365cfd054e543efe399eae8a319fd92ec30ae35b (patch)
tree7180351c40067425bd70a20721556e7dfea973e1
parentffc233c1a2fe8b18b7ae6e9738646a4133b07067 (diff)
downloadrabbitmq-server-git-365cfd054e543efe399eae8a319fd92ec30ae35b.tar.gz
rabbit_fifo: refactoring
Light refactoring Shortening some lines for the benefit of split buffer users.
-rw-r--r--src/rabbit_fifo.erl237
1 files changed, 138 insertions, 99 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 8f34c01210..b62ff62a51 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -420,10 +420,10 @@ apply(_, #checkout{spec = {dequeue, unsettled},
{S, {dequeue, empty}}
end;
apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}),
+ {State, Effects} = cancel_consumer(ConsumerId, State0, []),
% TODO: here we should really demonitor the pid but _only_ if it has no
% other consumers or enqueuers.
- checkout(State1, CancelEffects);
+ checkout(State, Effects);
apply(_, #checkout{spec = Spec, meta = Meta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
@@ -475,7 +475,7 @@ apply(_, {down, ConsumerPid, noconnection},
(_, E) -> E
end, Enqs0),
% mark waiting consumers as suspected if necessary
- WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true),
+ WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true),
Effects = case maps:size(Cons) of
0 ->
@@ -484,9 +484,10 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ {State#state{consumers = Cons, enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers}, ok, Effects};
+apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -497,42 +498,28 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
error ->
State0
end,
- {Effects1, State2} = maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, State1),
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
% return checked out messages to main queue
% Find the consumers for the down pid
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2},
- DownConsumers),
- checkout(State3, Effects2);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E)
+ end, {State2, Effects1}, DownConsumers),
+ checkout(State, Effects);
+apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
%% actually down or not
- Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
- when node(P) =:= Node ->
- [P | Acc];
- (_, _, Acc) -> Acc
- end, [], Cons0),
- Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
- when node(P) =:= Node ->
- [P | Acc];
- (_, _, Acc) -> Acc
- end, [], Enqs0),
- WaitingConsumers = lists:foldl(fun({{_, P}, #consumer{suspected_down = true}}, Acc)
- when node(P) =:= Node ->
- [P | Acc];
- (_, Acc) -> Acc
- end, [], WaitingConsumers0),
-
- Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers],
+ Monitors = [{monitor, process, P}
+ || P <- suspected_pids_for(Node, State0)],
% un-suspect waiting consumers when necessary
- WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, false),
+ WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0,
+ false),
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = false};
@@ -549,36 +536,48 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ, waiting_consumers = WaitingConsumers1}, Effects);
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(_, #update_config{config = Conf}, State) ->
{update_config(Conf, State), ok}.
-maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) ->
+handle_waiting_consumer_down(_Pid,
+ #state{consumer_strategy = default} = State) ->
{[], State};
-maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active,
- waiting_consumers = []} = State) ->
+handle_waiting_consumer_down(_Pid,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = []} = State) ->
{[], State};
-maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+handle_waiting_consumer_down(Pid,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
- DownWaitingConsumers = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0),
- Effects1 = lists:foldl(fun ({ConsumerId, _}, Effects) ->
- cancel_consumer_effects(ConsumerId, State0, Effects)
- end, [], DownWaitingConsumers),
+ Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
+ WaitingConsumers0),
+ Effects = lists:foldl(fun ({ConsumerId, _}, Effects) ->
+ cancel_consumer_effects(ConsumerId, State0,
+ Effects)
+ end, [], Down),
% update state to have only up waiting consumers
- WaitingConsumersStillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0),
- State2 = State0#state{waiting_consumers = WaitingConsumersStillUp},
- {Effects1, State2}.
+ StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
+ WaitingConsumers0),
+ State = State0#state{waiting_consumers = StillUp},
+ {Effects, State}.
-maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) ->
+maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default},
+ _Suspected) ->
[];
-maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active,
- waiting_consumers = []}, _Suspected) ->
+maybe_mark_suspect_waiting_consumers(_Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = []},
+ _Suspected) ->
[];
-maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers}, Suspected) ->
+maybe_mark_suspect_waiting_consumers(Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers},
+ Suspected) ->
[begin
case node(P) of
Node ->
@@ -617,9 +616,11 @@ state_enter(eol, #state{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
- WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0),
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
+ #{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
- [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
+ [{send_msg, P, eol, ra_event}
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
@@ -708,10 +709,12 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
+query_consumer_count(#state{consumers = Consumers,
+ waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) ->
+query_consumers(#state{consumers = Consumers,
+ waiting_consumers = WaitingConsumers} = State) ->
SingleActiveConsumer = query_single_active_consumer(State),
IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
case SingleActiveConsumer of
@@ -729,19 +732,21 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers),
- FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
- maps:put({Tag, Pid},
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- IsSingleActiveConsumerFun({Tag, Pid}),
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)},
- Acc)
- end, #{}, WaitingConsumers),
+ FromWaitingConsumers =
+ lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ IsSingleActiveConsumerFun({Tag, Pid}),
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) ->
+query_single_active_consumer(#state{consumer_strategy = single_active,
+ consumers = Consumers}) ->
case maps:size(Consumers) of
1 ->
{value, lists:nth(1, maps:keys(Consumers))};
@@ -799,65 +804,79 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- {Effects0, #state{consumer_strategy = default} = S0}) ->
+ #state{consumer_strategy = default} = State, Effects) ->
%% general case, single active consumer off
- cancel_consumer0(ConsumerId, {Effects0, S0});
+ cancel_consumer0(ConsumerId, State, Effects);
cancel_consumer(ConsumerId,
- {Effects0, #state{consumer_strategy = single_active,
- waiting_consumers = [] } = S0}) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = []} = State,
+ Effects) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, {Effects0, S0});
+ cancel_consumer0(ConsumerId, State, Effects);
cancel_consumer(ConsumerId,
- {Effects0, #state{consumers = Cons0,
- consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0 } = State0}) ->
+ #state{consumers = Cons0,
+ consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0,
+ Effects0) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
- {_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} ->
+ {#consumer{checked_out = Checked0}, _} ->
% The active consumer is to be removed
% Cancel it
- S = return_all(State0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ State1 = return_all(State0, Checked0),
+ Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0),
% Take another one from the waiting consumers and put it in consumers
- [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0,
+ [{NewActiveConsumerId, NewActiveConsumer}
+ | RemainingWaitingConsumers] = WaitingConsumers0,
#state{service_queue = ServiceQueue} = State0,
- ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
- State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
- service_queue = ServiceQueue1,
- waiting_consumers = RemainingWaitingConsumers},
- Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects),
- {Effects1, State1};
+ ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
+ NewActiveConsumer,
+ ServiceQueue),
+ State = State1#state{consumers = #{NewActiveConsumerId =>
+ NewActiveConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = RemainingWaitingConsumers},
+ add_consumer_promotion_effect(State, NewActiveConsumerId,
+ NewActiveConsumer, Effects1);
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
- {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0),
+ WaitingConsumers = lists:keydelete(ConsumerId, 1,
+ WaitingConsumers0),
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
- % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
- {Effects, State0#state{waiting_consumers = WaitingConsumers1}}
+ % A waiting consumer isn't supposed to have any checked out messages,
+ % so nothing special to do here
+ {State0#state{waiting_consumers = WaitingConsumers}, Effects}
end.
-consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active,
- queue_resource = QName },
- ConsumerId, #consumer{meta = Meta}, Effects) ->
- [{mod_call, rabbit_quorum_queue, update_consumer_handler,
- [QName, ConsumerId, false, maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects].
+add_consumer_promotion_effect(#state{consumer_strategy = single_active,
+ queue_resource = QName} = State,
+ ConsumerId, #consumer{meta = Meta},
+ Effects) ->
+ Ack = maps:get(ack, Meta, undefined),
+ Prefetch = maps:get(prefetch, Meta, undefined),
+ Args = maps:get(args, Meta, []),
+ {State, [{mod_call,
+ rabbit_quorum_queue,
+ update_consumer_handler,
+ [QName, ConsumerId, false, Ack, Prefetch, true, Args]}
+ | Effects]}.
cancel_consumer0(ConsumerId,
- {Effects0, #state{consumers = C0} = S0}) ->
+ #state{consumers = C0} = S0, Effects0) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
case maps:size(Cons) of
0 ->
- {[{aux, inactive} | Effects], S#state{consumers = Cons}};
+ {S#state{consumers = Cons}, [{aux, inactive} | Effects]};
_ ->
- {Effects, S#state{consumers = Cons}}
+ {S#state{consumers = Cons}, Effects}
end;
error ->
%% already removed: do nothing
- {Effects0, S0}
+ {S0, Effects0}
end.
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
@@ -895,8 +914,7 @@ enqueue_pending(From,
enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
State#state{enqueuers = Enqueuers0#{From => Enq}}.
-maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects,
- State0) ->
+maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
{ok, enqueue(RaftIdx, RawMsg, State0), Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
@@ -1245,7 +1263,8 @@ update_consumer(ConsumerId, Meta, Spec,
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
#state{consumers = Cons0,
- consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 ->
+ consumer_strategy = single_active} = State0)
+ when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
@@ -1381,6 +1400,26 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+suspected_pids_for(Node, #state{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P},
+ #consumer{suspected_down = true}}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").