summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl32
-rw-r--r--src/rabbit_mirror_queue_slave.erl81
5 files changed, 77 insertions, 65 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index e2c050f5d8..604b1bfac0 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -232,6 +232,12 @@ action(list_queues, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
+action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
+ Inform("Adding mirror of queue ~p on node ~p~n", [Queue, MirrorNode]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ rpc_call(Node, rabbit_mirror_queue_misc, add_slave,
+ [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+
action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index bd77c976e7..5fd07e6015 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/2, add_slave/2, get_gm/1]).
+-export([start_link/2, get_gm/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,6 @@
start_link(Queue, GM) ->
gen_server2:start_link(?MODULE, [Queue, GM], []).
-add_slave(CPid, SlaveNode) ->
- gen_server2:call(CPid, {add_slave, SlaveNode}, infinity).
-
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -67,21 +64,7 @@ init([#amqqueue { name = QueueName } = Q, GM]) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(get_gm, _From, State = #state { gm = GM }) ->
- reply(GM, State);
-
-handle_call({add_slave, Node}, _From, State = #state { q = Q }) ->
- Nodes = nodes(),
- case lists:member(Node, Nodes) of
- true ->
- Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]),
- rabbit_log:info("Adding slave node for ~s: ~p~n",
- [rabbit_misc:rs(Q #amqqueue.name), Result]);
- false ->
- rabbit_log:info(
- "Ignoring request to add slave on node ~p for ~s~n",
- [Node, rabbit_misc:rs(Q #amqqueue.name)])
- end,
- reply(ok, State).
+ reply(GM, State).
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName } }) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 54c718b1c5..c5a2e88aa8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -62,7 +62,7 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
_ -> [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes]
end,
- [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1],
+ [rabbit_mirror_queue_misc:add_slave(Q, Node) || Node <- Nodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover),
#state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 090cb81203..23d7c39835 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2]).
+-export([remove_from_queue/2, add_slave/2, add_slave/3]).
-include("rabbit.hrl").
@@ -44,3 +44,33 @@ remove_from_queue(QueueName, DeadPids) ->
end
end
end).
+
+add_slave(VHostPath, QueueName, MirrorNode) ->
+ add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+add_slave(Queue, MirrorNode) ->
+ rabbit_amqqueue:with(
+ Queue,
+ fun (#amqqueue { arguments = Args, name = Name,
+ pid = QPid, mirror_pids = MPids } = Q) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined ->
+ ok;
+ _ ->
+ case [MirrorNode || Pid <- [QPid | MPids],
+ node(Pid) =:= MirrorNode] of
+ [] ->
+ Result =
+ rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info("Adding slave node for ~s: ~p~n",
+ [rabbit_misc:rs(Name), Result]),
+ case Result of
+ {ok, _Pid} -> ok;
+ _ -> Result
+ end;
+ [_] ->
+ {error, queue_already_mirrored_on_node}
+ end
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d7f864564e..064dc3293b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -76,46 +76,38 @@ init([#amqqueue { name = QueueName } = Q]) ->
end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- mnesia:write(rabbit_queue,
- Q1 #amqqueue { mirror_pids = MPids1 },
- write),
- {ok, QPid};
- _ ->
- {error, node_already_present}
- end
- end) of
- {ok, MPid} ->
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target,
- [self()]}),
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, false),
- {ok, #state { q = Q,
- gm = GM,
- master_node = node(MPid),
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- msg_id_status = dict:new()
- }, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}};
- {error, Error} ->
- {stop, Error}
- end.
+ {ok, MPid} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ %% ASSERTION
+ [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { mirror_pids = MPids1 },
+ write),
+ {ok, QPid}
+ end),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_node = node(MPid),
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ msg_id_status = dict:new()
+ }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "immediate" delivery mode
@@ -578,7 +570,7 @@ process_instruction(
State1 = State #state { sender_queues = SQ1,
msg_id_status = MS2 },
- %% we probably want to work in BQ:validate_message here
+
{ok,
case Deliver of
false ->
@@ -649,10 +641,11 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 };
false ->
- %% the only thing we can safely do is nuke out our BQ
- %% and MA
+ %% The only thing we can safely do is nuke out our BQ
+ %% and MA. The interaction between this and confirms
+ %% doesn't really bear thinking about...
{_Count, BQS1} = BQ:purge(BQS),
- {MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
+ {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
State #state { msg_id_ack = dict:new(),
backing_queue_state = BQS2 }
end};