diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
| commit | 3b0adfda40edf59496ff9f6d994a11c27971a3f5 (patch) | |
| tree | 40c39f89a760c55d7cf8aee51a8ea0b279b4e0fe /src | |
| parent | b9873465666d143bd1fc70a828d417ce48b5b1c3 (diff) | |
| download | rabbitmq-server-git-3b0adfda40edf59496ff9f6d994a11c27971a3f5.tar.gz | |
rabbit_fifo: change single active consumer on noconnection
To ensure availability and progress when a node gets disconnected.
[#164135123]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 219 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 7 |
2 files changed, 131 insertions, 95 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b06d34e83a..64f5b09bb4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -94,6 +94,7 @@ #credit{} | #purge{} | #update_config{}. + -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo @@ -134,7 +135,7 @@ update_config(Conf, State) -> true -> single_active; false -> - default + competing end, Cfg = State#?MODULE.cfg, State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, @@ -248,7 +249,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {State0, {dequeue, empty}}; Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), + {once, 1, simple_prefetch}, + State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), case Settlement of unsettled -> @@ -257,10 +259,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, [{monitor, process, Pid}]}; settled -> %% immediately settle the checkout - {State, _, Effects} = apply(Meta, - make_settle(ConsumerId, - [MsgId]), - State2), + {State, _, Effects} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -294,27 +295,64 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, ConsumerPid, noconnection}, +apply(_, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0, + enqueuers = Enqs0} = State0) -> + Node = node(Pid), + %% if the pid refers to the active consumer, mark it as suspected and return + %% it to the waiting queue + {State1, Effects0} = case maps:to_list(Cons0) of + [{{_, P} = Cid, C}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% + Effs = consumer_update_active_effects( + State0, Cid, C, false, suspected_down, []), + {State0#?MODULE{consumers = #{}, + waiting_consumers = Waiting0 ++ [{Cid, C}]}, + Effs}; + _ -> {State0, []} + end, + WaitingConsumers = update_waiting_consumer_status(Node, State1, + suspected_down), + + %% select a new consumer from the waiting queue and run a checkout + State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + {State, Effects1} = activate_next_consumer(State2, Effects0), + + %% mark any enquers as suspected + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + Effects = [{monitor, node, Node} | Effects1], + {State#?MODULE{enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - Node = node(ConsumerPid), + %% A node has been disconnected. This doesn't necessarily mean that + %% any processes on this node are down, they _may_ come back so here + %% we just mark them as suspected (effectively deactivated) + %% and return all checked out messages to the main queue for delivery to any + %% live consumers + %% + %% all pids for the disconnected node will be marked as suspected not just + %% the one we got the `down' command for + Node = node(Pid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - % mark all consumers and enqueuers as suspected down - % and monitor the node so that we can find out the final state of the - % process at some later point + {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when (node(P) =:= Node) and - (C#consumer.status =/= cancelled)-> + maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, + status = up} = C, + {Co, St0, Eff}) when node(P) =:= Node -> {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), - {maps:put(K, - C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), + {maps:put(K, C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), St, Eff1}; (K, C, {Co, St, Eff}) -> {maps:put(K, C, Co), St, Eff} @@ -323,10 +361,10 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), - % mark waiting consumers as suspected if necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, - suspected_down), + % Monitor the node so that we can "unsuspect" these processes when the node + % comes back, then re-issue all monitors and discover the final fate of + % these processes Effects2 = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -334,8 +372,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; + {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -367,36 +404,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Monitors = [{monitor, process, P} || P <- suspected_pids_for(Node, State0)], - % un-suspect waiting consumers when necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, up), - Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons1, SQ, Effects} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, - true, up, EAcc), - update_or_remove_sub( - ConsumerId, C#consumer{status = up}, - CAcc, SQAcc, EAcc1); - (_, _, Acc) -> - Acc - end, {Cons0, SQ0, Monitors}, Cons0), - - checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + %% mark all consumers as up + {Cons1, SQ, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + C, true, up, EAcc), + update_or_remove_sub(ConsumerId, + C#consumer{status = up}, CAcc, + SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State0, up), + State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = Waiting}, + {State, Effects} = activate_next_consumer(State1, Effects1), + checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) @@ -407,7 +444,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -429,27 +466,18 @@ handle_waiting_consumer_down(Pid, State = State0#?MODULE{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}}, - _Status) -> - []; -update_waiting_consumer_status(_Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []}, - _Status) -> - []; update_waiting_consumer_status(Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers}, + #?MODULE{waiting_consumers = WaitingConsumers}, Status) -> [begin - case node(P) of + case node(Pid) of Node -> {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, - Consumer#consumer.status =/= cancelled]. + end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #?MODULE{consumers = Cons, @@ -583,7 +611,7 @@ query_consumers(#?MODULE{consumers = Consumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of - default -> + competing -> fun(_ConsumerId, #consumer{status = Status}) -> case Status of @@ -709,8 +737,8 @@ num_checked_out(#?MODULE{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> - %% general case, single active consumer off + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects, Reason) -> cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -721,41 +749,23 @@ cancel_consumer(ConsumerId, cancel_consumer(ConsumerId, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0, + waiting_consumers = Waiting0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting - case maps:take(ConsumerId, Cons0) of - {Consumer, Cons1} -> + case maps:is_key(ConsumerId, Cons0) of + true -> % The active consumer is to be removed - % Cancel it - {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, - State0, Effects0, Reason), - Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), - % Take another one from the waiting consumers and put it in consumers - [{NewActiveConsumerId, NewActiveConsumer} - | RemainingWaitingConsumers] = WaitingConsumers0, - #?MODULE{service_queue = ServiceQueue} = State1, - ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, - NewActiveConsumer, - ServiceQueue), - State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#?MODULE.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, - Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), - {State, Effects}; - error -> + {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + Effects0, Reason), + activate_next_consumer(State1, Effects1); + false -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - WaitingConsumers = lists:keydelete(ConsumerId, 1, - WaitingConsumers0), + Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects} + {State0#?MODULE{waiting_consumers = Waiting}, Effects} end. consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, @@ -765,9 +775,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), Args = maps:get(args, Meta, []), - [{mod_call, - rabbit_quorum_queue, - update_consumer_handler, + [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. @@ -788,6 +796,32 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. +activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects) + when map_size(Cons) == 1 -> + {State0, Effects}; +activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0, + Effects0) -> + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, Effects0} + end. + + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, @@ -1296,7 +1330,7 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, @@ -1331,7 +1365,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebe5f3328a..968ae07739 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -88,6 +88,8 @@ -type consumer() :: #consumer{}. +-type consumer_strategy() :: competing | single_active. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -104,7 +106,8 @@ max_length :: maybe(non_neg_integer()), max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, + consumer_strategy = competing :: consumer_strategy(), + %% the maximum number of unsuccessful delivery attempts permitted delivery_limit :: maybe(non_neg_integer()) }). @@ -114,7 +117,7 @@ messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map % that isn't a return - low_msg_num :: msg_in_id() | undefined, + low_msg_num :: maybe(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from |
