diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 |
2 files changed, 4 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f6935f2584..4d79e0969a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -291,6 +291,7 @@ store_queue(Q = #amqqueue{durable = false}) -> policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + rabbit_federation_queue:policy_changed(Q1, Q2), %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. wake_up(Q1). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4c3751ba1c..5278c2901a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -190,10 +190,7 @@ declare(Recover, From, State = #q{q = Q, recovery_barrier(Recover), State1 = process_args(State#q{backing_queue = BQ, backing_queue_state = BQS}), - case Q#amqqueue.name#resource.name of - <<"test">> -> rabbit_federation_queue:start_link(Q); - _ -> ok - end, + rabbit_federation_queue:maybe_start(Q), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -260,7 +257,7 @@ init_dlx_routing_key(RoutingKey, State) -> init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. -terminate_shutdown(Fun, State) -> +terminate_shutdown(Fun, State = #q{q = Q}) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, @@ -271,6 +268,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), + rabbit_federation_queue:terminate(Q), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} |
