summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl88
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl55
-rw-r--r--src/rabbit_upgrade_functions.erl14
4 files changed, 102 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ca5eb34874..8d5971542f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -263,17 +263,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_queue_decorator:set(
- rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- down_slave_nodes = [],
- gm_pids = [],
- state = live})),
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ recoverable_slaves = [],
+ gm_pids = [],
+ state = live})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
@@ -558,12 +558,12 @@ info_down(Q, DownReason) ->
info_down(Q, Items, DownReason) ->
[{Item, i_down(Item, Q, DownReason)} || Item <- Items].
-i_down(name, #amqqueue{name = Name}, _) -> Name;
-i_down(durable, #amqqueue{durable = Durable},_) -> Durable;
-i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
-i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
-i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
-i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN;
+i_down(name, #amqqueue{name = Name}, _) -> Name;
+i_down(durable, #amqqueue{durable = Dur}, _) -> Dur;
+i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
+i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
+i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
+i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS;
i_down(state, _Q, DownReason) -> DownReason;
i_down(K, _Q, _DownReason) ->
case lists:member(K, rabbit_amqqueue_process:info_keys()) of
@@ -718,24 +718,38 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ [forget_node_for_queue(Node, Q) ||
+ #amqqueue{pid = Pid} = Q <- Qs,
node(Pid) =:= Node],
ok
end),
ok.
-forget_node_for_queue(#amqqueue{name = Name,
- down_slave_nodes = []}) ->
+forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
+ forget_node_for_queue(DeadNode, RS, Q).
+
+forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
%% No slaves to recover from, queue is gone.
%% Don't process_deletions since that just calls callbacks and we
%% are not really up.
internal_delete1(Name, true);
-forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
- %% Promote a slave while down - it'll happily recover as a master
- Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
- down_slave_nodes = T},
- ok = mnesia:write(rabbit_durable_queue, Q1, write).
+%% Should not happen, but let's be conservative.
+forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
+ forget_node_for_queue(DeadNode, T, Q);
+
+forget_node_for_queue(DeadNode, [H|T], Q) ->
+ case H =/= node() andalso %% TODO not really good enough test
+ lists:member(H, rabbit_mnesia:cluster_nodes(running)) of
+ true ->
+ forget_node_for_queue(DeadNode, T, Q);
+ false ->
+ %% Promote a slave while down - it should recover as a
+ %% master. We try to take the oldest slave here for best
+ %% chance of recovery.
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write)
+ end.
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -757,12 +771,12 @@ on_node_up(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_queue,
#amqqueue{_ = '_'}, write),
- [case lists:member(Node, DSNs) of
- true -> DSNs1 = DSNs -- [Node],
+ [case lists:member(Node, RSs) of
+ true -> RSs1 = RSs -- [Node],
store_queue(
- Q#amqqueue{down_slave_nodes = DSNs1});
+ Q#amqqueue{recoverable_slaves = RSs1});
false -> ok
- end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs],
ok
end).
@@ -801,14 +815,14 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-immutable(Q) -> Q#amqqueue{pid = none,
- slave_pids = none,
- sync_slave_pids = none,
- down_slave_nodes = none,
- gm_pids = none,
- policy = none,
- decorators = none,
- state = none}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ recoverable_slaves = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none,
+ state = none}.
deliver([], _Delivery) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1b9427da1f..3f69c9af4f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -83,7 +83,7 @@
memory,
slave_pids,
synchronised_slave_pids,
- down_slave_nodes,
+ recoverable_slaves,
state
]).
@@ -861,9 +861,9 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(down_slave_nodes, #q{q = #amqqueue{name = Name,
- durable = Durable}}) ->
- {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+i(recoverable_slaves, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{recoverable_slaves = Nodes}} =
rabbit_amqqueue:lookup(Name),
case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
false -> '';
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index c2095cebbe..ce63f7af1d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -78,10 +78,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids,
- down_slave_nodes = DSNs}] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -90,9 +89,6 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
- DSNs1 = [node(Pid) ||
- Pid <- SPids,
- not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
Extra =
case {{QPid, SPids}, {QPid1, SPids1}} of
@@ -102,10 +98,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM,
- down_slave_nodes = DSNs1},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM},
store_updated_slaves(Q1),
%% If we add and remove nodes at the
%% same time we might tell the old
@@ -117,9 +112,8 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM,
- down_slave_nodes = DSNs1},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
store_updated_slaves(Q1),
[]
end,
@@ -249,22 +243,39 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{pid = MPid,
- slave_pids = SPids,
- sync_slave_pids = SSPids,
- down_slave_nodes = DSNs}) ->
+store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ recoverable_slaves = RS}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- down_slave_nodes = DSNs1,
- state = live},
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ recoverable_slaves = update_recoverable(SPids, RS),
+ state = live},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
Q1.
+%% Recoverable nodes are those which we could promote if the whole
+%% cluster were to suddenly stop and we then lose the master; i.e. all
+%% nodes with running slaves, and all stopped nodes which had running
+%% slaves when they were up.
+%%
+%% Therefore we aim here to add new nodes with slaves, and remove
+%% running nodes without slaves, We also try to keep the order
+%% constant, and similar to the live SPids field (i.e. oldest
+%% first). That's not necessarily optimal if nodes spend a long time
+%% down, but we don't have a good way to predict what the optimal is
+%% in that case anyway, and we assume nodes will not just be down for
+%% a long time without being removed.
+update_recoverable(SPids, RS) ->
+ SNodes = [node(SPid) || SPid <- SPids],
+ RunningNodes = rabbit_mnesia:cluster_nodes(running),
+ AddNodes = SNodes -- RS,
+ DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
+ (RS -- DelNodes) ++ AddNodes.
+
%%----------------------------------------------------------------------------
promote_slave([SPid | SPids]) ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 9f6dc21aa3..16f0b21b1b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -50,6 +50,7 @@
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
+-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
%% -------------------------------------------------------------------
@@ -82,6 +83,7 @@
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
-spec(queue_state/0 :: () -> 'ok').
+-spec(recoverable_slaves/0 :: () -> 'ok').
-endif.
@@ -418,6 +420,18 @@ queue_state(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+recoverable_slaves() ->
+ ok = recoverable_slaves(rabbit_queue),
+ ok = recoverable_slaves(rabbit_durable_queue).
+
+recoverable_slaves(Table) ->
+ transform(
+ Table, fun (Q) -> Q end, %% Don't change shape of record
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
+ state]).
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->