summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-11 16:21:05 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-11 16:21:05 +0000
commit1721a5ebc1160fae927d72be21278eba2d3c28db (patch)
treea7c7f98d3394f16a22068259b631bc70767f0940
parent6785ac3db7bcb5e926310a54e3273be1530a756d (diff)
downloadrabbitmq-server-git-1721a5ebc1160fae927d72be21278eba2d3c28db.tar.gz
Make new mirrors magically occur when set to [] and a new node in the cluster appears. Fix various other bits and pieces
-rw-r--r--src/rabbit_mirror_queue_master.erl36
-rw-r--r--src/rabbit_mirror_queue_misc.erl18
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl6
3 files changed, 42 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c5a2e88aa8..25a1e4b86b 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -53,7 +53,7 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { arguments = Args } = Q, Recover) ->
+init(#amqqueue { arguments = Args, name = QName } = Q, Recover) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
@@ -62,7 +62,7 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
_ -> [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes]
end,
- [rabbit_mirror_queue_misc:add_slave(Q, Node) || Node <- Nodes1],
+ [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover),
#state { gm = GM,
@@ -120,11 +120,11 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
%% Must use confirmed_broadcast here in order to guarantee that
%% all slaves are forced to interpret this publish_delivered at
%% the same point, especially if we die and a slave is promoted.
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ok = gm:confirmed_broadcast(
GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
- State #state { backing_queue_state = BQS1 }.
+ {AckTag, BQS1} =
+ BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ {AckTag, State #state { backing_queue_state = BQS1 }}.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -247,20 +247,22 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
case dict:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
- {ok, published} ->
- %% It was published when we were a slave,
- %% and we were promoted before we saw the
- %% publish from the channel. We still
- %% haven't seen the channel publish, and
- %% consequently we need to filter out the
- %% confirm here. We will issue the confirm
- %% when we see the publish from the
- %% channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)}
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {ok, confirmed} ->
+ %% Well, confirms are racy by definition.
+ {[MsgId | MsgIdsN], SSN}
end
- end, {[], SS}, MsgIds),
+ end, {[], SS}, MsgIds),
{MsgIds1, State #state { backing_queue_state = BQS1,
- seen_status = SS1 }}.
+ seen_status = SS1 }}.
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 23d7c39835..51c2a28a4b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, add_slave/2, add_slave/3]).
+-export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]).
-include("rabbit.hrl").
@@ -74,3 +74,19 @@ add_slave(Queue, MirrorNode) ->
end
end
end).
+
+on_node_up() ->
+ Qs =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:foldl(
+ fun (#amqqueue{ arguments = Args, name = QName }, QsN) ->
+ case rabbit_misc:table_lookup(
+ Args, <<"x-mirror">>) of
+ {_Type, []} -> [QName | QsN];
+ _ -> QsN
+ end
+ end, [], rabbit_queue)
+ end),
+ [add_slave(Q, node()) || Q <- Qs],
+ ok.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 80c0520c08..2fb3be5196 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -22,6 +22,12 @@
{requires, queue_sup_queue_recovery},
{enables, routing_ready}]}).
+-rabbit_boot_step({mirrored_queues,
+ [{description, "adding mirrors to queues"},
+ {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
+ {requires, mirror_queue_slave_sup},
+ {enables, routing_ready}]}).
+
-behaviour(supervisor2).
-export([start/0, start_link/0, start_child/2]).