summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-09-20 16:51:02 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-09-20 16:51:02 +0300
commitbe9a3f7f84d3f6c680d3019360c5458f946aa462 (patch)
tree06b9eb2ceb1d2dae73de40fa4f0f7ce5dea83d68
parent1f8fba1df31b2b01f7c6404fc251b22727f3d125 (diff)
parentb0cb2c4a23c81e0e405723e6988bf0d1235eca80 (diff)
downloadrabbitmq-server-git-be9a3f7f84d3f6c680d3019360c5458f946aa462.tar.gz
Merge branch 'rabbitmq-server-953' into stable
-rw-r--r--src/gm.erl34
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl66
-rw-r--r--test/gm_SUITE.erl2
4 files changed, 79 insertions, 47 deletions
diff --git a/src/gm.erl b/src/gm.erl
index c6d73b6cdd..74e19ee6fd 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -611,7 +611,7 @@ handle_call({add_on_right, NewMember}, _From,
handle_callback_result({Result, {ok, Group}, State1})
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end.
%% add_on_right causes a catchup to be sent immediately from the left,
@@ -646,7 +646,7 @@ handle_cast({?TAG, ReqVer, Msg},
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end;
handle_cast({broadcast, _Msg, _SizeHint},
@@ -675,16 +675,21 @@ handle_cast(join, State = #state { self = Self,
module = Module,
callback_args = Args,
txn_executor = TxnFun }) ->
- View = join_group(Self, GroupName, TxnFun),
- MembersState =
- case alive_view_members(View) of
- [Self] -> blank_member_state();
- _ -> undefined
- end,
- State1 = check_neighbours(State #state { view = View,
- members_state = MembersState }),
- handle_callback_result(
- {Module:joined(Args, get_pids(all_known_members(View))), State1});
+ try
+ View = join_group(Self, GroupName, TxnFun),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, get_pids(all_known_members(View))), State1})
+ catch
+ lost_membership ->
+ {stop, shutdown, State}
+ end;
handle_cast({validate_members, OldMembers},
State = #state { view = View,
@@ -756,7 +761,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
end
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end;
handle_info(_, State) ->
%% Discard any unexpected messages, such as late replies from neighbour_call/2
@@ -768,7 +773,6 @@ handle_info(_, State) ->
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -871,7 +875,7 @@ handle_msg({activity, Left, Activity},
Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
catch
lost_membership ->
- {{stop, normal}, State}
+ {{stop, shutdown}, State}
end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 29f8e53f08..3111afca2e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1013,7 +1013,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
end.
handle_call({init, Recover}, From, State) ->
- init_it(Recover, From, State);
+ try
+ init_it(Recover, From, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1159,7 +1169,17 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
handle_cast(init, State) ->
- init_it({no_barrier, non_clean_shutdown}, none, State);
+ try
+ init_it({no_barrier, non_clean_shutdown}, none, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d78f6180e7..d82cdf336a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) ->
State.
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), depth_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- Self = self(),
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue{gm_pids = GMPids}]
- = mnesia:read({rabbit_queue, QName}),
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
- state = live})
- end),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- %% We need synchronous add here (i.e. do not return until the
- %% slave is running) so that when queue declaration is finished
- %% all slaves are up; we don't want to end up with unsynced slaves
- %% just by declaring a new queue. But add can't be synchronous all
- %% the time as it can be called by slaves and that's
- %% deadlock-prone.
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
- #state { name = QName,
- gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = dict:new(),
- confirmed = [],
- known_senders = sets:new(),
- wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }.
+ case rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()) of
+ {ok, CPid} ->
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
+ end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
+ #state { name = QName,
+ gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = dict:new(),
+ confirmed = [],
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
+ {error, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ % is trapping exists
+ throw({coordinator_not_started, Reason})
+ end.
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
diff --git a/test/gm_SUITE.erl b/test/gm_SUITE.erl
index 8b07c9efad..df73d8ac27 100644
--- a/test/gm_SUITE.erl
+++ b/test/gm_SUITE.erl
@@ -146,7 +146,7 @@ down_in_members_change(_Config) ->
end),
gm:leave(Pid2),
Passed = receive
- {'EXIT', Pid, normal} ->
+ {'EXIT', Pid, shutdown} ->
passed;
{'EXIT', Pid, _} ->
crashed