diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-09-21 15:26:40 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-09-21 15:26:40 +0300 |
| commit | 0e7e75d3e3a85965fa1848073ab82d511c4c8da5 (patch) | |
| tree | c957845e0aa8192410b3bcd7408e85057cfca15a | |
| parent | d5e11bc481154b1534c00a04d0e32a314dec7ddd (diff) | |
| parent | be9a3f7f84d3f6c680d3019360c5458f946aa462 (diff) | |
| download | rabbitmq-server-git-0e7e75d3e3a85965fa1848073ab82d511c4c8da5.tar.gz | |
Merge branch 'stable' into rabbitmq-server-944
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/gm.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 66 | ||||
| -rw-r--r-- | test/gm_SUITE.erl | 2 |
5 files changed, 80 insertions, 48 deletions
@@ -4,7 +4,7 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src) # Release artifacts are put in $(PACKAGES_DIR). PACKAGES_DIR ?= $(abspath PACKAGES) -DEPS = ranch rabbit_common $(PLUGINS) +DEPS = ranch rabbit_common TEST_DEPS = rabbitmq_ct_helpers amqp_client meck proper define usage_xml_to_erl diff --git a/src/gm.erl b/src/gm.erl index c6d73b6cdd..74e19ee6fd 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -611,7 +611,7 @@ handle_call({add_on_right, NewMember}, _From, handle_callback_result({Result, {ok, Group}, State1}) catch lost_membership -> - {stop, normal, State} + {stop, shutdown, State} end. %% add_on_right causes a catchup to be sent immediately from the left, @@ -646,7 +646,7 @@ handle_cast({?TAG, ReqVer, Msg}, Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)) catch lost_membership -> - {stop, normal, State} + {stop, shutdown, State} end; handle_cast({broadcast, _Msg, _SizeHint}, @@ -675,16 +675,21 @@ handle_cast(join, State = #state { self = Self, module = Module, callback_args = Args, txn_executor = TxnFun }) -> - View = join_group(Self, GroupName, TxnFun), - MembersState = - case alive_view_members(View) of - [Self] -> blank_member_state(); - _ -> undefined - end, - State1 = check_neighbours(State #state { view = View, - members_state = MembersState }), - handle_callback_result( - {Module:joined(Args, get_pids(all_known_members(View))), State1}); + try + View = join_group(Self, GroupName, TxnFun), + MembersState = + case alive_view_members(View) of + [Self] -> blank_member_state(); + _ -> undefined + end, + State1 = check_neighbours(State #state { view = View, + members_state = MembersState }), + handle_callback_result( + {Module:joined(Args, get_pids(all_known_members(View))), State1}) + catch + lost_membership -> + {stop, shutdown, State} + end; handle_cast({validate_members, OldMembers}, State = #state { view = View, @@ -756,7 +761,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, end catch lost_membership -> - {stop, normal, State} + {stop, shutdown, State} end; handle_info(_, State) -> %% Discard any unexpected messages, such as late replies from neighbour_call/2 @@ -768,7 +773,6 @@ handle_info(_, State) -> terminate(Reason, #state { module = Module, callback_args = Args }) -> Module:handle_terminate(Args, Reason). - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -871,7 +875,7 @@ handle_msg({activity, Left, Activity}, Result, fun activity_true/3, fun activity_false/3, Activity3, State2) catch lost_membership -> - {{stop, normal}, State} + {{stop, shutdown}, State} end; handle_msg({activity, _NotLeft, _Activity}, State) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 29f8e53f08..3111afca2e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1013,7 +1013,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> end. handle_call({init, Recover}, From, State) -> - init_it(Recover, From, State); + try + init_it(Recover, From, State) + catch + {coordinator_not_started, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + %% is trapping exists. The master captures this return value and + %% throws the current exception. + {stop, Reason, State} + end; handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); @@ -1159,7 +1169,17 @@ handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). handle_cast(init, State) -> - init_it({no_barrier, non_clean_shutdown}, none, State); + try + init_it({no_barrier, non_clean_shutdown}, none, State) + catch + {coordinator_not_started, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + %% is trapping exists. The master captures this return value and + %% throws the current exception. + {stop, Reason, State} + end; handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d78f6180e7..d82cdf336a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) -> State. init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun(), depth_fun()), - GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - Self = self(), - ok = rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue{gm_pids = GMPids}] - = mnesia:read({rabbit_queue, QName}), - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], - state = live}) - end), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - %% We need synchronous add here (i.e. do not return until the - %% slave is running) so that when queue declaration is finished - %% all slaves are up; we don't want to end up with unsynced slaves - %% just by declaring a new queue. But add can't be synchronous all - %% the time as it can be called by slaves and that's - %% deadlock-prone. - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), - #state { name = QName, - gm = GM, - coordinator = CPid, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = dict:new(), - confirmed = [], - known_senders = sets:new(), - wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }. + case rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun(), depth_fun()) of + {ok, CPid} -> + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue{gm_pids = GMPids}] + = mnesia:read({rabbit_queue, QName}), + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], + state = live}) + end), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + %% We need synchronous add here (i.e. do not return until the + %% slave is running) so that when queue declaration is finished + %% all slaves are up; we don't want to end up with unsynced slaves + %% just by declaring a new queue. But add can't be synchronous all + %% the time as it can be called by slaves and that's + %% deadlock-prone. + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), + #state { name = QName, + gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = dict:new(), + confirmed = [], + known_senders = sets:new(), + wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; + {error, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + % is trapping exists + throw({coordinator_not_started, Reason}) + end. stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, diff --git a/test/gm_SUITE.erl b/test/gm_SUITE.erl index 8b07c9efad..df73d8ac27 100644 --- a/test/gm_SUITE.erl +++ b/test/gm_SUITE.erl @@ -146,7 +146,7 @@ down_in_members_change(_Config) -> end), gm:leave(Pid2), Passed = receive - {'EXIT', Pid, normal} -> + {'EXIT', Pid, shutdown} -> passed; {'EXIT', Pid, _} -> crashed |
