summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2016-09-16 16:28:35 +0100
committerDiana Corbacho <diana@rabbitmq.com>2016-09-16 16:28:35 +0100
commit4005a5a12cffea117e4bc59a64ce4ec06b93ef7e (patch)
tree2e5c6ec19b367417330289323d88cc7d91a39f69 /src
parent44d831eefb04f994fb247f562a8ce7250fe82e46 (diff)
downloadrabbitmq-server-git-4005a5a12cffea117e4bc59a64ce4ec06b93ef7e.tar.gz
Stop queue gracefully on coordinator/GM initialisation shutdown
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl66
2 files changed, 59 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 29f8e53f08..3111afca2e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1013,7 +1013,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
end.
handle_call({init, Recover}, From, State) ->
- init_it(Recover, From, State);
+ try
+ init_it(Recover, From, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1159,7 +1169,17 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
handle_cast(init, State) ->
- init_it({no_barrier, non_clean_shutdown}, none, State);
+ try
+ init_it({no_barrier, non_clean_shutdown}, none, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d78f6180e7..d82cdf336a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) ->
State.
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), depth_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- Self = self(),
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue{gm_pids = GMPids}]
- = mnesia:read({rabbit_queue, QName}),
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
- state = live})
- end),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- %% We need synchronous add here (i.e. do not return until the
- %% slave is running) so that when queue declaration is finished
- %% all slaves are up; we don't want to end up with unsynced slaves
- %% just by declaring a new queue. But add can't be synchronous all
- %% the time as it can be called by slaves and that's
- %% deadlock-prone.
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
- #state { name = QName,
- gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = dict:new(),
- confirmed = [],
- known_senders = sets:new(),
- wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }.
+ case rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()) of
+ {ok, CPid} ->
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
+ end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
+ #state { name = QName,
+ gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = dict:new(),
+ confirmed = [],
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
+ {error, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ % is trapping exists
+ throw({coordinator_not_started, Reason})
+ end.
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,