diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-11 16:21:05 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-11 16:21:05 +0000 |
| commit | 1721a5ebc1160fae927d72be21278eba2d3c28db (patch) | |
| tree | a7c7f98d3394f16a22068259b631bc70767f0940 | |
| parent | 6785ac3db7bcb5e926310a54e3273be1530a756d (diff) | |
| download | rabbitmq-server-git-1721a5ebc1160fae927d72be21278eba2d3c28db.tar.gz | |
Make new mirrors magically occur when set to [] and a new node in the cluster appears. Fix various other bits and pieces
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 6 |
3 files changed, 42 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c5a2e88aa8..25a1e4b86b 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -53,7 +53,7 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { arguments = Args } = Q, Recover) -> +init(#amqqueue { arguments = Args, name = QName } = Q, Recover) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), @@ -62,7 +62,7 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> _ -> [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] end, - [rabbit_mirror_queue_misc:add_slave(Q, Node) || Node <- Nodes1], + [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover), #state { gm = GM, @@ -120,11 +120,11 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, %% Must use confirmed_broadcast here in order to guarantee that %% all slaves are forced to interpret this publish_delivered at %% the same point, especially if we die and a slave is promoted. - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), ok = gm:confirmed_broadcast( GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), - BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - State #state { backing_queue_state = BQS1 }. + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + {AckTag, State #state { backing_queue_state = BQS1 }}. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -247,20 +247,22 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, case dict:find(MsgId, SSN) of error -> {[MsgId | MsgIdsN], SSN}; - {ok, published} -> - %% It was published when we were a slave, - %% and we were promoted before we saw the - %% publish from the channel. We still - %% haven't seen the channel publish, and - %% consequently we need to filter out the - %% confirm here. We will issue the confirm - %% when we see the publish from the - %% channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)} + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} end - end, {[], SS}, MsgIds), + end, {[], SS}, MsgIds), {MsgIds1, State #state { backing_queue_state = BQS1, - seen_status = SS1 }}. + seen_status = SS1 }}. validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 23d7c39835..51c2a28a4b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, add_slave/2, add_slave/3]). +-export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]). -include("rabbit.hrl"). @@ -74,3 +74,19 @@ add_slave(Queue, MirrorNode) -> end end end). + +on_node_up() -> + Qs = + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl( + fun (#amqqueue{ arguments = Args, name = QName }, QsN) -> + case rabbit_misc:table_lookup( + Args, <<"x-mirror">>) of + {_Type, []} -> [QName | QsN]; + _ -> QsN + end + end, [], rabbit_queue) + end), + [add_slave(Q, node()) || Q <- Qs], + ok. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 80c0520c08..2fb3be5196 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -22,6 +22,12 @@ {requires, queue_sup_queue_recovery}, {enables, routing_ready}]}). +-rabbit_boot_step({mirrored_queues, + [{description, "adding mirrors to queues"}, + {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, + {requires, mirror_queue_slave_sup}, + {enables, routing_ready}]}). + -behaviour(supervisor2). -export([start/0, start_link/0, start_child/2]). |
