diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-03-01 21:09:22 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-03-01 21:09:22 +0300 |
| commit | 27607074e9ac6bca20db423b0f62a465648a2bdc (patch) | |
| tree | e27254e341d493c62b2710ff2bd1e1245b8546f4 /src | |
| parent | 564bca6b67e7d4a9f15d343e19fe41e7ec1787c5 (diff) | |
| parent | 933c176c95be6cec082389b501787797690f51ba (diff) | |
| download | rabbitmq-server-git-27607074e9ac6bca20db423b0f62a465648a2bdc.tar.gz | |
Merge pull request #1903 from rabbitmq/single-active-noconnection
rabbit_fifo: change single active consumer on noconnection
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 248 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 7 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 6 |
3 files changed, 162 insertions, 99 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b06d34e83a..97e5f6e901 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -94,6 +94,7 @@ #credit{} | #purge{} | #update_config{}. + -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo @@ -134,7 +135,7 @@ update_config(Conf, State) -> true -> single_active; false -> - default + competing end, Cfg = State#?MODULE.cfg, State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, @@ -194,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> case Cons0 of #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> %% this can go below 0 when credit is reduced @@ -207,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, consumers = Cons}, []), - Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, + Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of @@ -231,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {multi, [Response, {send_drained, [{CTag, Drained}]}]}, Effects} end; + _ when Waiting0 /= [] -> + %% there are waiting consuemrs + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} -> + %% the consumer is a waiting one + %% grant the credit + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + Con = Con0#consumer{credit = C}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, {send_credit_reply, messages_ready(State)}}; + false -> + {State0, ok} + end; _ -> %% credit for unknown consumer - just ignore {State0, ok} @@ -248,7 +264,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {State0, {dequeue, empty}}; Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), + {once, 1, simple_prefetch}, + State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), case Settlement of unsettled -> @@ -257,10 +274,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, [{monitor, process, Pid}]}; settled -> %% immediately settle the checkout - {State, _, Effects} = apply(Meta, - make_settle(ConsumerId, - [MsgId]), - State2), + {State, _, Effects} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -294,27 +310,64 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, ConsumerPid, noconnection}, +apply(_, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0, + enqueuers = Enqs0} = State0) -> + Node = node(Pid), + %% if the pid refers to the active consumer, mark it as suspected and return + %% it to the waiting queue + {State1, Effects0} = case maps:to_list(Cons0) of + [{{_, P} = Cid, C}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% + Effs = consumer_update_active_effects( + State0, Cid, C, false, suspected_down, []), + {State0#?MODULE{consumers = #{}, + waiting_consumers = Waiting0 ++ [{Cid, C}]}, + Effs}; + _ -> {State0, []} + end, + WaitingConsumers = update_waiting_consumer_status(Node, State1, + suspected_down), + + %% select a new consumer from the waiting queue and run a checkout + State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + {State, Effects1} = activate_next_consumer(State2, Effects0), + + %% mark any enquers as suspected + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + Effects = [{monitor, node, Node} | Effects1], + {State#?MODULE{enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - Node = node(ConsumerPid), + %% A node has been disconnected. This doesn't necessarily mean that + %% any processes on this node are down, they _may_ come back so here + %% we just mark them as suspected (effectively deactivated) + %% and return all checked out messages to the main queue for delivery to any + %% live consumers + %% + %% all pids for the disconnected node will be marked as suspected not just + %% the one we got the `down' command for + Node = node(Pid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - % mark all consumers and enqueuers as suspected down - % and monitor the node so that we can find out the final state of the - % process at some later point + {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when (node(P) =:= Node) and - (C#consumer.status =/= cancelled)-> + maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, + status = up} = C, + {Co, St0, Eff}) when node(P) =:= Node -> {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), - {maps:put(K, - C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), + {maps:put(K, C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), St, Eff1}; (K, C, {Co, St, Eff}) -> {maps:put(K, C, Co), St, Eff} @@ -323,10 +376,10 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), - % mark waiting consumers as suspected if necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, - suspected_down), + % Monitor the node so that we can "unsuspect" these processes when the node + % comes back, then re-issue all monitors and discover the final fate of + % these processes Effects2 = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -334,8 +387,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; + {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -367,36 +419,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Monitors = [{monitor, process, P} || P <- suspected_pids_for(Node, State0)], - % un-suspect waiting consumers when necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, up), - Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons1, SQ, Effects} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, - true, up, EAcc), - update_or_remove_sub( - ConsumerId, C#consumer{status = up}, - CAcc, SQAcc, EAcc1); - (_, _, Acc) -> - Acc - end, {Cons0, SQ0, Monitors}, Cons0), - - checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + %% mark all consumers as up + {Cons1, SQ, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + C, true, up, EAcc), + update_or_remove_sub(ConsumerId, + C#consumer{status = up}, CAcc, + SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State0, up), + State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = Waiting}, + {State, Effects} = activate_next_consumer(State1, Effects1), + checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) @@ -407,7 +459,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -429,27 +481,18 @@ handle_waiting_consumer_down(Pid, State = State0#?MODULE{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}}, - _Status) -> - []; -update_waiting_consumer_status(_Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []}, - _Status) -> - []; update_waiting_consumer_status(Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers}, + #?MODULE{waiting_consumers = WaitingConsumers}, Status) -> [begin - case node(P) of + case node(Pid) of Node -> {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, - Consumer#consumer.status =/= cancelled]. + end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #?MODULE{consumers = Cons, @@ -583,7 +626,7 @@ query_consumers(#?MODULE{consumers = Consumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of - default -> + competing -> fun(_ConsumerId, #consumer{status = Status}) -> case Status of @@ -709,8 +752,8 @@ num_checked_out(#?MODULE{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> - %% general case, single active consumer off + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects, Reason) -> cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -721,41 +764,23 @@ cancel_consumer(ConsumerId, cancel_consumer(ConsumerId, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0, + waiting_consumers = Waiting0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting - case maps:take(ConsumerId, Cons0) of - {Consumer, Cons1} -> + case maps:is_key(ConsumerId, Cons0) of + true -> % The active consumer is to be removed - % Cancel it - {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, - State0, Effects0, Reason), - Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), - % Take another one from the waiting consumers and put it in consumers - [{NewActiveConsumerId, NewActiveConsumer} - | RemainingWaitingConsumers] = WaitingConsumers0, - #?MODULE{service_queue = ServiceQueue} = State1, - ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, - NewActiveConsumer, - ServiceQueue), - State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#?MODULE.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, - Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), - {State, Effects}; - error -> + {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + Effects0, Reason), + activate_next_consumer(State1, Effects1); + false -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - WaitingConsumers = lists:keydelete(ConsumerId, 1, - WaitingConsumers0), + Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects} + {State0#?MODULE{waiting_consumers = Waiting}, Effects} end. consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, @@ -765,9 +790,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), Args = maps:get(args, Meta, []), - [{mod_call, - rabbit_quorum_queue, - update_consumer_handler, + [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. @@ -776,6 +799,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {Consumer, Cons1} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed + %% if the consumer has unacked messages. This is a bit weird but + %% in line with what classic queues do (from an external point of + %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), case maps:size(S#?MODULE.consumers) of 0 -> @@ -788,6 +815,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. +activate_next_consumer(#?MODULE{consumers = Cons, + waiting_consumers = Waiting0} = State0, + Effects0) -> + case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of + Up when map_size(Up) == 0 -> + %% there are no active consumer in the consumer map + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + %% there is a potential next active consumer + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, [{aux, inactive} | Effects0]} + end; + _ -> + {State0, Effects0} + end. + + + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, @@ -1296,7 +1355,7 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, @@ -1331,7 +1390,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebe5f3328a..968ae07739 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -88,6 +88,8 @@ -type consumer() :: #consumer{}. +-type consumer_strategy() :: competing | single_active. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -104,7 +106,8 @@ max_length :: maybe(non_neg_integer()), max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, + consumer_strategy = competing :: consumer_strategy(), + %% the maximum number of unsuccessful delivery attempts permitted delivery_limit :: maybe(non_neg_integer()) }). @@ -114,7 +117,7 @@ messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map % that isn't a return - low_msg_num :: msg_in_id() | undefined, + low_msg_num :: maybe(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e811bfffb3..c685785d0d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -120,6 +120,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> RaMachine = ra_machine(NewQ), ServerIds = [{RaName, Node} || Node <- Nodes], ClusterName = RaName, + TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), RaConfs = [begin UId = ra:new_uid(ra_lib:to_binary(ClusterName)), FName = rabbit_misc:rs(QName), @@ -129,7 +130,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> friendly_name => FName, initial_members => ServerIds, log_init_args => #{uid => UId}, - tick_timeout => ?TICK_TIMEOUT, + tick_timeout => TickTimeout, machine => RaMachine} end || ServerId <- ServerIds], @@ -190,7 +191,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, + [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), |
