diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-07-06 16:55:54 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-07-06 16:55:54 +0100 |
| commit | b170895eaf2ec0142f2528a97c7846563aaff219 (patch) | |
| tree | e35e4191e03d207178cf4103508836b1b21cb0dc /src | |
| parent | 729d7f6cae44de2627a86575bf82d705bf2bc267 (diff) | |
| download | rabbitmq-server-git-b170895eaf2ec0142f2528a97c7846563aaff219.tar.gz | |
Dynamic change of HA policy. Promotion from non-HA to master, and demotion from master to non-HA as appropriate. There will still be problems if the policy changes such that the master needs to change - I'm not sure we should even allow this.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 |
5 files changed, 112 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b32aefdb8c..be6613a0b7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,8 @@ -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). +%% temp +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -214,7 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q0), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); @@ -267,8 +269,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -550,6 +552,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 71f8aacd18..e4a61cb4df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1199,6 +1199,31 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, qname(State)}) of + [] -> ok; + [Q] -> rabbit_amqqueue:store_queue( + Q#amqqueue{slave_pids = undefined}) + end + end), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 5034920444..e5ca085dbc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,6 +27,9 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +%% temp +-export([init_with_existing_bq/3, stop_mirroring/1]). + -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). @@ -82,16 +85,17 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName } = Q, Recover, - AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + init_with_existing_bq(Q, BQ, BQS). + +init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - {_MNode, MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q), - MNodes1 = MNodes -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, @@ -103,14 +107,24 @@ init(#amqqueue { name = QName } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(unmirroring, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. + ok = gm:leave(GM), %% TODO presumably we need this? State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }; terminate(Reason, @@ -120,15 +134,17 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - monitor_wait(MRefs), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + monitor_wait(MRefs). monitor_wait([]) -> ok; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a84623f62d..de507afec6 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -21,7 +21,8 @@ report_deaths/4]). %% temp --export([queue_nodes/1, is_mirrored/1, slave_pids/1]). +-export([suggested_queue_nodes/1, is_mirrored/1, slave_pids/1, + update_mirrors/2]). -include("rabbit.hrl"). @@ -94,8 +95,8 @@ on_node_up() -> fun () -> mnesia:foldl( fun (Q = #amqqueue{name = QName}, QNames0) -> - {_MNode, MNodes} = queue_nodes(Q), - case lists:member(node(), MNodes) of + {_MNode, SNodes} = suggested_queue_nodes(Q), + case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 end @@ -107,9 +108,9 @@ on_node_up() -> drop_mirror(VHostPath, QueueName, MirrorNode) -> drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -128,9 +129,9 @@ drop_mirror(Queue, MirrorNode) -> add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> case rabbit_mirror_queue_slave_sup:start_child( @@ -173,21 +174,35 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> %%---------------------------------------------------------------------------- -queue_nodes(Q) -> +%% TODO this should take account of current nodes so we don't throw +%% away mirrors or change the master needlessly +suggested_queue_nodes(Q) -> case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of [{ok, <<"all">>}, _] -> - {node(), rabbit_mnesia:all_clustered_nodes()}; + {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]}; [{ok, <<"nodes">>}, {ok, Nodes}] -> case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of [Node] -> {Node, []}; - [First | Rest] -> {First, [First | Rest]} + [First | Rest] -> {First, Rest} end; [{ok, <<"at-least">>}, {ok, Count}] -> - {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)}; + {node(), lists:sublist( + rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]}; _ -> {node(), []} end. +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + MNode = case MPid of + undefined -> undefined; + _ -> node(MPid) + end, + SNodes = case SPids of + undefined -> undefined; + _ -> [node(Pid) || Pid <- SPids] + end, + {MNode, SNodes}. + is_mirrored(Q) -> case rabbit_policy:get(<<"ha-mode">>, Q) of {ok, <<"all">>} -> true; @@ -196,10 +211,27 @@ is_mirrored(Q) -> _ -> false end. -slave_pids(Q = #amqqueue{name = Name}) -> +slave_pids(#amqqueue{name = Name}) -> + {ok, Q = #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), case is_mirrored(Q) of false -> not_mirrored; - true -> {ok, #amqqueue{slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - SPids + true -> SPids + end. + +update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, + NewQ = #amqqueue{name = QName, pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {true, true} -> {OldMNode, OldSNodes} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + case OldMNode of + NewMNode -> ok; + _ -> io:format("TODO: master needs to change for ~p~n", [NewQ]) + end, + Add = NewSNodes -- OldSNodes, + Remove = OldSNodes -- NewSNodes, + [ok = drop_mirror(QName, SNode) || SNode <- Remove], + [ok = add_mirror(QName, SNode) || SNode <- Add] end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 03fafc3e5a..8f57e69540 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -275,9 +275,11 @@ terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { backing_queue = BQ, +terminate({shutdown, dropped} = R, #state { gm = GM, + backing_queue = BQ, backing_queue_state = BQS }) -> %% See rabbit_mirror_queue_master:terminate/2 + ok = gm:leave(GM), %% TODO presumably we need this? BQ:delete_and_terminate(R, BQS); terminate(Reason, #state { q = Q, gm = GM, |
