summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-03 14:55:00 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-03 14:55:00 +0100
commit9e72ef1471822ec8cb6c7053bccb4b0cd9993a42 (patch)
tree196685a7779491c2c5d16de79122d5a8ba78efd4 /src
parent5ea455fedfea9027f4bc0e2097e3bd3fc65e2883 (diff)
downloadrabbitmq-server-git-9e72ef1471822ec8cb6c7053bccb4b0cd9993a42.tar.gz
Inform fed queues of policy change and termination, and abstract startup a bit more.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_amqqueue_process.erl8
2 files changed, 4 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f6935f2584..4d79e0969a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -291,6 +291,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
policy_changed(Q1, Q2) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ rabbit_federation_queue:policy_changed(Q1, Q2),
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
wake_up(Q1).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4c3751ba1c..5278c2901a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -190,10 +190,7 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- case Q#amqqueue.name#resource.name of
- <<"test">> -> rabbit_federation_queue:start_link(Q);
- _ -> ok
- end,
+ rabbit_federation_queue:maybe_start(Q),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -260,7 +257,7 @@ init_dlx_routing_key(RoutingKey, State) ->
init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
-terminate_shutdown(Fun, State) ->
+terminate_shutdown(Fun, State = #q{q = Q}) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
@@ -271,6 +268,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
+ rabbit_federation_queue:terminate(Q),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}