summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-27 11:31:09 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-27 11:31:09 +0000
commit3b0adfda40edf59496ff9f6d994a11c27971a3f5 (patch)
tree40c39f89a760c55d7cf8aee51a8ea0b279b4e0fe /src
parentb9873465666d143bd1fc70a828d417ce48b5b1c3 (diff)
downloadrabbitmq-server-git-3b0adfda40edf59496ff9f6d994a11c27971a3f5.tar.gz
rabbit_fifo: change single active consumer on noconnection
To ensure availability and progress when a node gets disconnected. [#164135123]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl219
-rw-r--r--src/rabbit_fifo.hrl7
2 files changed, 131 insertions, 95 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b06d34e83a..64f5b09bb4 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -94,6 +94,7 @@
#credit{} |
#purge{} |
#update_config{}.
+
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types supported by ra fifo
@@ -134,7 +135,7 @@ update_config(Conf, State) ->
true ->
single_active;
false ->
- default
+ competing
end,
Cfg = State#?MODULE.cfg,
State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
@@ -248,7 +249,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
{State0, {dequeue, empty}};
Ready ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
+ {once, 1, simple_prefetch},
+ State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
case Settlement of
unsettled ->
@@ -257,10 +259,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
[{monitor, process, Pid}]};
settled ->
%% immediately settle the checkout
- {State, _, Effects} = apply(Meta,
- make_settle(ConsumerId,
- [MsgId]),
- State2),
+ {State, _, Effects} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
end
end;
@@ -294,27 +295,64 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
-apply(_, {down, ConsumerPid, noconnection},
+apply(_, {down, Pid, noconnection},
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0,
+ enqueuers = Enqs0} = State0) ->
+ Node = node(Pid),
+ %% if the pid refers to the active consumer, mark it as suspected and return
+ %% it to the waiting queue
+ {State1, Effects0} = case maps:to_list(Cons0) of
+ [{{_, P} = Cid, C}] when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %%
+ Effs = consumer_update_active_effects(
+ State0, Cid, C, false, suspected_down, []),
+ {State0#?MODULE{consumers = #{},
+ waiting_consumers = Waiting0 ++ [{Cid, C}]},
+ Effs};
+ _ -> {State0, []}
+ end,
+ WaitingConsumers = update_waiting_consumer_status(Node, State1,
+ suspected_down),
+
+ %% select a new consumer from the waiting queue and run a checkout
+ State2 = State1#?MODULE{waiting_consumers = WaitingConsumers},
+ {State, Effects1} = activate_next_consumer(State2, Effects0),
+
+ %% mark any enquers as suspected
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ Effects = [{monitor, node, Node} | Effects1],
+ {State#?MODULE{enqueuers = Enqs}, ok, Effects};
+apply(_, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
- Node = node(ConsumerPid),
+ %% A node has been disconnected. This doesn't necessarily mean that
+ %% any processes on this node are down, they _may_ come back so here
+ %% we just mark them as suspected (effectively deactivated)
+ %% and return all checked out messages to the main queue for delivery to any
+ %% live consumers
+ %%
+ %% all pids for the disconnected node will be marked as suspected not just
+ %% the one we got the `down' command for
+ Node = node(Pid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- % mark all consumers and enqueuers as suspected down
- % and monitor the node so that we can find out the final state of the
- % process at some later point
+
{Cons, State, Effects1} =
- maps:fold(fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when (node(P) =:= Node) and
- (C#consumer.status =/= cancelled)->
+ maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0,
+ status = up} = C,
+ {Co, St0, Eff}) when node(P) =:= Node ->
{St, Eff0} = return_all(St0, Checked0, Eff, K, C),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
suspected_down, Eff0),
- {maps:put(K,
- C#consumer{status = suspected_down,
- credit = Credit,
- checked_out = #{}}, Co),
+ {maps:put(K, C#consumer{status = suspected_down,
+ credit = Credit,
+ checked_out = #{}}, Co),
St, Eff1};
(K, C, {Co, St, Eff}) ->
{maps:put(K, C, Co), St, Eff}
@@ -323,10 +361,10 @@ apply(_, {down, ConsumerPid, noconnection},
E#enqueuer{status = suspected_down};
(_, E) -> E
end, Enqs0),
- % mark waiting consumers as suspected if necessary
- WaitingConsumers = update_waiting_consumer_status(Node, State0,
- suspected_down),
+ % Monitor the node so that we can "unsuspect" these processes when the node
+ % comes back, then re-issue all monitors and discover the final fate of
+ % these processes
Effects2 = case maps:size(Cons) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
@@ -334,8 +372,7 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#?MODULE{consumers = Cons, enqueuers = Enqs,
- waiting_consumers = WaitingConsumers}, ok, Effects2};
+ {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2};
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -367,36 +404,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Monitors = [{monitor, process, P}
|| P <- suspected_pids_for(Node, State0)],
- % un-suspect waiting consumers when necessary
- WaitingConsumers = update_waiting_consumer_status(Node, State0, up),
-
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- {Cons1, SQ, Effects} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when (node(P) =:= Node) and
- (C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C,
- true, up, EAcc),
- update_or_remove_sub(
- ConsumerId, C#consumer{status = up},
- CAcc, SQAcc, EAcc1);
- (_, _, Acc) ->
- Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
-
- checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ %% mark all consumers as up
+ {Cons1, SQ, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ C, true, up, EAcc),
+ update_or_remove_sub(ConsumerId,
+ C#consumer{status = up}, CAcc,
+ SQAcc, EAcc1);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State0, up),
+ State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = Waiting},
+ {State, Effects} = activate_next_consumer(State1, Effects1),
+ checkout(Meta, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
@@ -407,7 +444,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin
end.
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
@@ -429,27 +466,18 @@ handle_waiting_consumer_down(Pid,
State = State0#?MODULE{waiting_consumers = StillUp},
{Effects, State}.
-update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}},
- _Status) ->
- [];
-update_waiting_consumer_status(_Node,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = []},
- _Status) ->
- [];
update_waiting_consumer_status(Node,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers},
+ #?MODULE{waiting_consumers = WaitingConsumers},
Status) ->
[begin
- case node(P) of
+ case node(Pid) of
Node ->
{ConsumerId, Consumer#consumer{status = Status}};
_ ->
{ConsumerId, Consumer}
end
- end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
- Consumer#consumer.status =/= cancelled].
+ end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #?MODULE{consumers = Cons,
@@ -583,7 +611,7 @@ query_consumers(#?MODULE{consumers = Consumers,
cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
case ConsumerStrategy of
- default ->
+ competing ->
fun(_ConsumerId,
#consumer{status = Status}) ->
case Status of
@@ -709,8 +737,8 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
- %% general case, single active consumer off
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
+ Effects, Reason) ->
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
@@ -721,41 +749,23 @@ cancel_consumer(ConsumerId,
cancel_consumer(ConsumerId,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers0} = State0,
+ waiting_consumers = Waiting0} = State0,
Effects0, Reason) ->
%% single active consumer on, consumers are waiting
- case maps:take(ConsumerId, Cons0) of
- {Consumer, Cons1} ->
+ case maps:is_key(ConsumerId, Cons0) of
+ true ->
% The active consumer is to be removed
- % Cancel it
- {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1,
- State0, Effects0, Reason),
- Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
- % Take another one from the waiting consumers and put it in consumers
- [{NewActiveConsumerId, NewActiveConsumer}
- | RemainingWaitingConsumers] = WaitingConsumers0,
- #?MODULE{service_queue = ServiceQueue} = State1,
- ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
- NewActiveConsumer,
- ServiceQueue),
- State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId,
- NewActiveConsumer,
- State1#?MODULE.consumers),
- service_queue = ServiceQueue1,
- waiting_consumers = RemainingWaitingConsumers},
- Effects = consumer_update_active_effects(State, NewActiveConsumerId,
- NewActiveConsumer, true,
- single_active, Effects2),
- {State, Effects};
- error ->
+ {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ Effects0, Reason),
+ activate_next_consumer(State1, Effects1);
+ false ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
- WaitingConsumers = lists:keydelete(ConsumerId, 1,
- WaitingConsumers0),
+ Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects}
+ {State0#?MODULE{waiting_consumers = Waiting}, Effects}
end.
consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
@@ -765,9 +775,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
Ack = maps:get(ack, Meta, undefined),
Prefetch = maps:get(prefetch, Meta, undefined),
Args = maps:get(args, Meta, []),
- [{mod_call,
- rabbit_quorum_queue,
- update_consumer_handler,
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
@@ -788,6 +796,32 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
+activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects)
+ when map_size(Cons) == 1 ->
+ {State0, Effects};
+activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0,
+ Effects0) ->
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?MODULE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, Effects0}
+ end.
+
+
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
Cons1, #?MODULE{consumers = C0,
service_queue = SQ0} = S0,
@@ -1296,7 +1330,7 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
@@ -1331,7 +1365,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
-
State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ebe5f3328a..968ae07739 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -88,6 +88,8 @@
-type consumer() :: #consumer{}.
+-type consumer_strategy() :: competing | single_active.
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -104,7 +106,8 @@
max_length :: maybe(non_neg_integer()),
max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
- consumer_strategy = default :: default | single_active,
+ consumer_strategy = competing :: consumer_strategy(),
+ %% the maximum number of unsuccessful delivery attempts permitted
delivery_limit :: maybe(non_neg_integer())
}).
@@ -114,7 +117,7 @@
messages = #{} :: #{msg_in_id() => indexed_msg()},
% defines the lowest message in id available in the messages map
% that isn't a return
- low_msg_num :: msg_in_id() | undefined,
+ low_msg_num :: maybe(msg_in_id()),
% defines the next message in id to be added to the messages map
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from