diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-29 17:22:26 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-29 17:22:26 +0000 |
| commit | 3948c08b6ed09c4bd8a2bbfb8e3b40481f86b574 (patch) | |
| tree | 6c68b12bd433584416229ebe10bf2e00ccb13d5a /src | |
| parent | 2b5f12eefa2f036676c9f16c00b2075701f9ca65 (diff) | |
| download | rabbitmq-server-git-3948c08b6ed09c4bd8a2bbfb8e3b40481f86b574.tar.gz | |
Replace down_slave_nodes with a new field recoverable_slaves with different semantics: this field is intended to show all nodes that contain a slave, even if they are running. That means we don't have to worry about race conditions when getting nodes into the field at shutdown.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 88 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
4 files changed, 102 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ca5eb34874..8d5971542f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -263,17 +263,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_queue_decorator:set( - rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - down_slave_nodes = [], - gm_pids = [], - state = live})), + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + recoverable_slaves = [], + gm_pids = [], + state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), @@ -558,12 +558,12 @@ info_down(Q, DownReason) -> info_down(Q, Items, DownReason) -> [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. -i_down(name, #amqqueue{name = Name}, _) -> Name; -i_down(durable, #amqqueue{durable = Durable},_) -> Durable; -i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; -i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; -i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; -i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN; +i_down(name, #amqqueue{name = Name}, _) -> Name; +i_down(durable, #amqqueue{durable = Dur}, _) -> Dur; +i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; +i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; +i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; +i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS; i_down(state, _Q, DownReason) -> DownReason; i_down(K, _Q, _DownReason) -> case lists:member(K, rabbit_amqqueue_process:info_keys()) of @@ -718,24 +718,38 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + [forget_node_for_queue(Node, Q) || + #amqqueue{pid = Pid} = Q <- Qs, node(Pid) =:= Node], ok end), ok. -forget_node_for_queue(#amqqueue{name = Name, - down_slave_nodes = []}) -> +forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> + forget_node_for_queue(DeadNode, RS, Q). + +forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> %% No slaves to recover from, queue is gone. %% Don't process_deletions since that just calls callbacks and we %% are not really up. internal_delete1(Name, true); -forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> - %% Promote a slave while down - it'll happily recover as a master - Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), - down_slave_nodes = T}, - ok = mnesia:write(rabbit_durable_queue, Q1, write). +%% Should not happen, but let's be conservative. +forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> + forget_node_for_queue(DeadNode, T, Q); + +forget_node_for_queue(DeadNode, [H|T], Q) -> + case H =/= node() andalso %% TODO not really good enough test + lists:member(H, rabbit_mnesia:cluster_nodes(running)) of + true -> + forget_node_for_queue(DeadNode, T, Q); + false -> + %% Promote a slave while down - it should recover as a + %% master. We try to take the oldest slave here for best + %% chance of recovery. + Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + ok = mnesia:write(rabbit_durable_queue, Q1, write) + end. run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -757,12 +771,12 @@ on_node_up(Node) -> fun () -> Qs = mnesia:match_object(rabbit_queue, #amqqueue{_ = '_'}, write), - [case lists:member(Node, DSNs) of - true -> DSNs1 = DSNs -- [Node], + [case lists:member(Node, RSs) of + true -> RSs1 = RSs -- [Node], store_queue( - Q#amqqueue{down_slave_nodes = DSNs1}); + Q#amqqueue{recoverable_slaves = RSs1}); false -> ok - end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs], ok end). @@ -801,14 +815,14 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -immutable(Q) -> Q#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - down_slave_nodes = none, - gm_pids = none, - policy = none, - decorators = none, - state = none}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}. deliver([], _Delivery) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1b9427da1f..3f69c9af4f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -83,7 +83,7 @@ memory, slave_pids, synchronised_slave_pids, - down_slave_nodes, + recoverable_slaves, state ]). @@ -861,9 +861,9 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(down_slave_nodes, #q{q = #amqqueue{name = Name, - durable = Durable}}) -> - {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = +i(recoverable_slaves, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{recoverable_slaves = Nodes}} = rabbit_amqqueue:lookup(Name), case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index c2095cebbe..ce63f7af1d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -78,10 +78,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids, - down_slave_nodes = DSNs}] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids }] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -90,9 +89,6 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], - DSNs1 = [node(Pid) || - Pid <- SPids, - not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), Extra = case {{QPid, SPids}, {QPid1, SPids1}} of @@ -102,10 +98,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM}, store_updated_slaves(Q1), %% If we add and remove nodes at the %% same time we might tell the old @@ -117,9 +112,8 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, store_updated_slaves(Q1), [] end, @@ -249,22 +243,39 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{pid = MPid, - slave_pids = SPids, - sync_slave_pids = SSPids, - down_slave_nodes = DSNs}) -> +store_updated_slaves(Q = #amqqueue{slave_pids = SPids, + sync_slave_pids = SSPids, + recoverable_slaves = RS}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - down_slave_nodes = DSNs1, - state = live}, + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + recoverable_slaves = update_recoverable(SPids, RS), + state = live}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), Q1. +%% Recoverable nodes are those which we could promote if the whole +%% cluster were to suddenly stop and we then lose the master; i.e. all +%% nodes with running slaves, and all stopped nodes which had running +%% slaves when they were up. +%% +%% Therefore we aim here to add new nodes with slaves, and remove +%% running nodes without slaves, We also try to keep the order +%% constant, and similar to the live SPids field (i.e. oldest +%% first). That's not necessarily optimal if nodes spend a long time +%% down, but we don't have a good way to predict what the optimal is +%% in that case anyway, and we assume nodes will not just be down for +%% a long time without being removed. +update_recoverable(SPids, RS) -> + SNodes = [node(SPid) || SPid <- SPids], + RunningNodes = rabbit_mnesia:cluster_nodes(running), + AddNodes = SNodes -- RS, + DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave + (RS -- DelNodes) ++ AddNodes. + %%---------------------------------------------------------------------------- promote_slave([SPid | SPids]) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 9f6dc21aa3..16f0b21b1b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -50,6 +50,7 @@ -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). +-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). %% ------------------------------------------------------------------- @@ -82,6 +83,7 @@ -spec(cluster_name/0 :: () -> 'ok'). -spec(down_slave_nodes/0 :: () -> 'ok'). -spec(queue_state/0 :: () -> 'ok'). +-spec(recoverable_slaves/0 :: () -> 'ok'). -endif. @@ -418,6 +420,18 @@ queue_state(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). +recoverable_slaves() -> + ok = recoverable_slaves(rabbit_queue), + ok = recoverable_slaves(rabbit_durable_queue). + +recoverable_slaves(Table) -> + transform( + Table, fun (Q) -> Q end, %% Don't change shape of record + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, + state]). + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
