diff options
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 81 |
5 files changed, 77 insertions, 65 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e2c050f5d8..604b1bfac0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -232,6 +232,12 @@ action(list_queues, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); +action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) -> + Inform("Adding mirror of queue ~p on node ~p~n", [Queue, MirrorNode]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + rpc_call(Node, rabbit_mirror_queue_misc, add_slave, + [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]); + action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index bd77c976e7..5fd07e6015 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/2, add_slave/2, get_gm/1]). +-export([start_link/2, get_gm/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,9 +38,6 @@ start_link(Queue, GM) -> gen_server2:start_link(?MODULE, [Queue, GM], []). -add_slave(CPid, SlaveNode) -> - gen_server2:call(CPid, {add_slave, SlaveNode}, infinity). - get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -67,21 +64,7 @@ init([#amqqueue { name = QueueName } = Q, GM]) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(get_gm, _From, State = #state { gm = GM }) -> - reply(GM, State); - -handle_call({add_slave, Node}, _From, State = #state { q = Q }) -> - Nodes = nodes(), - case lists:member(Node, Nodes) of - true -> - Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]), - rabbit_log:info("Adding slave node for ~s: ~p~n", - [rabbit_misc:rs(Q #amqqueue.name), Result]); - false -> - rabbit_log:info( - "Ignoring request to add slave on node ~p for ~s~n", - [Node, rabbit_misc:rs(Q #amqqueue.name)]) - end, - reply(ok, State). + reply(GM, State). handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName } }) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 54c718b1c5..c5a2e88aa8 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -62,7 +62,7 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> _ -> [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] end, - [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1], + [rabbit_mirror_queue_misc:add_slave(Q, Node) || Node <- Nodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover), #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 090cb81203..23d7c39835 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2]). +-export([remove_from_queue/2, add_slave/2, add_slave/3]). -include("rabbit.hrl"). @@ -44,3 +44,33 @@ remove_from_queue(QueueName, DeadPids) -> end end end). + +add_slave(VHostPath, QueueName, MirrorNode) -> + add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +add_slave(Queue, MirrorNode) -> + rabbit_amqqueue:with( + Queue, + fun (#amqqueue { arguments = Args, name = Name, + pid = QPid, mirror_pids = MPids } = Q) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> + ok; + _ -> + case [MirrorNode || Pid <- [QPid | MPids], + node(Pid) =:= MirrorNode] of + [] -> + Result = + rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding slave node for ~s: ~p~n", + [rabbit_misc:rs(Name), Result]), + case Result of + {ok, _Pid} -> ok; + _ -> Result + end; + [_] -> + {error, queue_already_mirrored_on_node} + end + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d7f864564e..064dc3293b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -76,46 +76,38 @@ init([#amqqueue { name = QueueName } = Q]) -> end, Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> - MPids1 = MPids ++ [Self], - mnesia:write(rabbit_queue, - Q1 #amqqueue { mirror_pids = MPids1 }, - write), - {ok, QPid}; - _ -> - {error, node_already_present} - end - end) of - {ok, MPid} -> - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, - [self()]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, false), - {ok, #state { q = Q, - gm = GM, - master_node = node(MPid), - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - msg_id_status = dict:new() - }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}; - {error, Error} -> - {stop, Error} - end. + {ok, MPid} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + %% ASSERTION + [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { mirror_pids = MPids1 }, + write), + {ok, QPid} + end), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, false), + {ok, #state { q = Q, + gm = GM, + master_node = node(MPid), + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + msg_id_status = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> %% Synchronous, "immediate" delivery mode @@ -578,7 +570,7 @@ process_instruction( State1 = State #state { sender_queues = SQ1, msg_id_status = MS2 }, - %% we probably want to work in BQ:validate_message here + {ok, case Deliver of false -> @@ -649,10 +641,11 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }; false -> - %% the only thing we can safely do is nuke out our BQ - %% and MA + %% The only thing we can safely do is nuke out our BQ + %% and MA. The interaction between this and confirms + %% doesn't really bear thinking about... {_Count, BQS1} = BQ:purge(BQS), - {MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), State #state { msg_id_ack = dict:new(), backing_queue_state = BQS2 } end}; |
