diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2016-09-16 16:28:35 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2016-09-16 16:28:35 +0100 |
| commit | 4005a5a12cffea117e4bc59a64ce4ec06b93ef7e (patch) | |
| tree | 2e5c6ec19b367417330289323d88cc7d91a39f69 /src | |
| parent | 44d831eefb04f994fb247f562a8ce7250fe82e46 (diff) | |
| download | rabbitmq-server-git-4005a5a12cffea117e4bc59a64ce4ec06b93ef7e.tar.gz | |
Stop queue gracefully on coordinator/GM initialisation shutdown
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 66 |
2 files changed, 59 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 29f8e53f08..3111afca2e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1013,7 +1013,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> end. handle_call({init, Recover}, From, State) -> - init_it(Recover, From, State); + try + init_it(Recover, From, State) + catch + {coordinator_not_started, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + %% is trapping exists. The master captures this return value and + %% throws the current exception. + {stop, Reason, State} + end; handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1159,7 +1169,17 @@ handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). handle_cast(init, State) -> - init_it({no_barrier, non_clean_shutdown}, none, State); + try + init_it({no_barrier, non_clean_shutdown}, none, State) + catch + {coordinator_not_started, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + %% is trapping exists. The master captures this return value and + %% throws the current exception. + {stop, Reason, State} + end; handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d78f6180e7..d82cdf336a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) -> State. init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun(), depth_fun()), - GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - Self = self(), - ok = rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue{gm_pids = GMPids}] - = mnesia:read({rabbit_queue, QName}), - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], - state = live}) - end), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - %% We need synchronous add here (i.e. do not return until the - %% slave is running) so that when queue declaration is finished - %% all slaves are up; we don't want to end up with unsynced slaves - %% just by declaring a new queue. But add can't be synchronous all - %% the time as it can be called by slaves and that's - %% deadlock-prone. - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), - #state { name = QName, - gm = GM, - coordinator = CPid, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = dict:new(), - confirmed = [], - known_senders = sets:new(), - wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }. + case rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun(), depth_fun()) of + {ok, CPid} -> + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue{gm_pids = GMPids}] + = mnesia:read({rabbit_queue, QName}), + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], + state = live}) + end), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + %% We need synchronous add here (i.e. do not return until the + %% slave is running) so that when queue declaration is finished + %% all slaves are up; we don't want to end up with unsynced slaves + %% just by declaring a new queue. But add can't be synchronous all + %% the time as it can be called by slaves and that's + %% deadlock-prone. + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), + #state { name = QName, + gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = dict:new(), + confirmed = [], + known_senders = sets:new(), + wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; + {error, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + % is trapping exists + throw({coordinator_not_started, Reason}) + end. stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, |
