summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-07-06 16:55:54 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-07-06 16:55:54 +0100
commitb170895eaf2ec0142f2528a97c7846563aaff219 (patch)
treee35e4191e03d207178cf4103508836b1b21cb0dc /src
parent729d7f6cae44de2627a86575bf82d705bf2bc267 (diff)
downloadrabbitmq-server-git-b170895eaf2ec0142f2528a97c7846563aaff219.tar.gz
Dynamic change of HA policy. Promotion from non-HA to master, and demotion from master to non-HA as appropriate. There will still be problems if the policy changes such that the master needs to change - I'm not sure we should even allow this.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_mirror_queue_master.erl42
-rw-r--r--src/rabbit_mirror_queue_misc.erl62
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
5 files changed, 112 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b32aefdb8c..be6613a0b7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,6 +30,8 @@
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
+%% temp
+-export([start_mirroring/1, stop_mirroring/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -214,7 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q0),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -267,8 +269,8 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(_Q1, _Q2) ->
- ok.
+policy_changed(Q1, Q2) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -550,6 +552,9 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 71f8aacd18..e4a61cb4df 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1199,6 +1199,31 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end));
+handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, qname(State)}) of
+ [] -> ok;
+ [Q] -> rabbit_amqqueue:store_queue(
+ Q#amqqueue{slave_pids = undefined})
+ end
+ end),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 5034920444..e5ca085dbc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -27,6 +27,9 @@
-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
+%% temp
+-export([init_with_existing_bq/3, stop_mirroring/1]).
+
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
@@ -82,16 +85,17 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { name = QName } = Q, Recover,
- AsyncCallback) ->
+init(Q, Recover, AsyncCallback) ->
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover, AsyncCallback),
+ init_with_existing_bq(Q, BQ, BQS).
+
+init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- {_MNode, MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q),
- MNodes1 = MNodes -- [node()],
- [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, Recover, AsyncCallback),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes],
ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
@@ -103,14 +107,24 @@ init(#amqqueue { name = QName } = Q, Recover,
ack_msg_id = dict:new(),
known_senders = sets:new() }.
+stop_mirroring(State = #state { coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ unlink(CPid),
+ stop_all_slaves(unmirroring, State),
+ {BQ, BQS}.
+
terminate({shutdown, dropped} = Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination - this node has been explicitly
%% dropped. Normally, non-durable queues would be tidied up on
%% startup, but there's a possibility that we will be added back
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
+ ok = gm:leave(GM), %% TODO presumably we need this?
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 };
terminate(Reason,
@@ -120,15 +134,17 @@ terminate(Reason,
%% node. Thus just let some other slave take over.
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
-delete_and_terminate(Reason, State = #state { gm = GM,
- backing_queue = BQ,
+delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
+ stop_all_slaves(Reason, State),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 }.
+
+stop_all_slaves(Reason, #state{gm = GM}) ->
Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
- monitor_wait(MRefs),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ monitor_wait(MRefs).
monitor_wait([]) ->
ok;
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index a84623f62d..de507afec6 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -21,7 +21,8 @@
report_deaths/4]).
%% temp
--export([queue_nodes/1, is_mirrored/1, slave_pids/1]).
+-export([suggested_queue_nodes/1, is_mirrored/1, slave_pids/1,
+ update_mirrors/2]).
-include("rabbit.hrl").
@@ -94,8 +95,8 @@ on_node_up() ->
fun () ->
mnesia:foldl(
fun (Q = #amqqueue{name = QName}, QNames0) ->
- {_MNode, MNodes} = queue_nodes(Q),
- case lists:member(node(), MNodes) of
+ {_MNode, SNodes} = suggested_queue_nodes(Q),
+ case lists:member(node(), SNodes) of
true -> [QName | QNames0];
false -> QNames0
end
@@ -107,9 +108,9 @@ on_node_up() ->
drop_mirror(VHostPath, QueueName, MirrorNode) ->
drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-drop_mirror(Queue, MirrorNode) ->
+drop_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
@@ -128,9 +129,9 @@ drop_mirror(Queue, MirrorNode) ->
add_mirror(VHostPath, QueueName, MirrorNode) ->
add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-add_mirror(Queue, MirrorNode) ->
+add_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] -> case rabbit_mirror_queue_slave_sup:start_child(
@@ -173,21 +174,35 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
%%----------------------------------------------------------------------------
-queue_nodes(Q) ->
+%% TODO this should take account of current nodes so we don't throw
+%% away mirrors or change the master needlessly
+suggested_queue_nodes(Q) ->
case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of
[{ok, <<"all">>}, _] ->
- {node(), rabbit_mnesia:all_clustered_nodes()};
+ {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]};
[{ok, <<"nodes">>}, {ok, Nodes}] ->
case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of
[Node] -> {Node, []};
- [First | Rest] -> {First, [First | Rest]}
+ [First | Rest] -> {First, Rest}
end;
[{ok, <<"at-least">>}, {ok, Count}] ->
- {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)};
+ {node(), lists:sublist(
+ rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]};
_ ->
{node(), []}
end.
+actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+ MNode = case MPid of
+ undefined -> undefined;
+ _ -> node(MPid)
+ end,
+ SNodes = case SPids of
+ undefined -> undefined;
+ _ -> [node(Pid) || Pid <- SPids]
+ end,
+ {MNode, SNodes}.
+
is_mirrored(Q) ->
case rabbit_policy:get(<<"ha-mode">>, Q) of
{ok, <<"all">>} -> true;
@@ -196,10 +211,27 @@ is_mirrored(Q) ->
_ -> false
end.
-slave_pids(Q = #amqqueue{name = Name}) ->
+slave_pids(#amqqueue{name = Name}) ->
+ {ok, Q = #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
case is_mirrored(Q) of
false -> not_mirrored;
- true -> {ok, #amqqueue{slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
- SPids
+ true -> SPids
+ end.
+
+update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid},
+ NewQ = #amqqueue{name = QName, pid = QPid}) ->
+ case {is_mirrored(OldQ), is_mirrored(NewQ)} of
+ {false, false} -> ok;
+ {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
+ {true, true} -> {OldMNode, OldSNodes} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ case OldMNode of
+ NewMNode -> ok;
+ _ -> io:format("TODO: master needs to change for ~p~n", [NewQ])
+ end,
+ Add = NewSNodes -- OldSNodes,
+ Remove = OldSNodes -- NewSNodes,
+ [ok = drop_mirror(QName, SNode) || SNode <- Remove],
+ [ok = add_mirror(QName, SNode) || SNode <- Add]
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 03fafc3e5a..8f57e69540 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -275,9 +275,11 @@ terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
ok;
-terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
+terminate({shutdown, dropped} = R, #state { gm = GM,
+ backing_queue = BQ,
backing_queue_state = BQS }) ->
%% See rabbit_mirror_queue_master:terminate/2
+ ok = gm:leave(GM), %% TODO presumably we need this?
BQ:delete_and_terminate(R, BQS);
terminate(Reason, #state { q = Q,
gm = GM,