diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-05-01 16:50:31 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-05-01 16:50:31 +0100 |
| commit | 70962d101a1421af06ce28ac262d3c82b943fdcb (patch) | |
| tree | 49dfdd37db4afdbfa0bf5e15b286c319a873391c /src | |
| parent | e787dcfdae7bc055e2ee13c70ed14b3eacba5a9e (diff) | |
| download | rabbitmq-server-git-70962d101a1421af06ce28ac262d3c82b943fdcb.tar.gz | |
Stop the whole queue if the alternative is to fail over to an unsynced slave.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 9 |
3 files changed, 25 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 23718da18b..6a67a7594c 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -368,7 +368,7 @@ handle_cast(request_depth, State = #state { depth_fun = DepthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); -handle_cast({delete_and_terminate, Reason}, State) -> +handle_cast({terminate, _Delete, Reason}, State) -> {stop, Reason, State}. handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, @@ -410,7 +410,7 @@ handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); -handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> +handle_msg([CPid], _From, {terminate, _Delete, _Reason} = Msg) -> ok = gen_server2:cast(CPid, Msg), {stop, {shutdown, ring_shutdown}}; handle_msg([_CPid], _From, _Msg) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b9118d..c6e5a7d80e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -130,7 +130,8 @@ stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS }) -> unlink(CPid), - stop_all_slaves(shutdown, State), + %% delete = *slaves* should delete + stop_all_slaves(shutdown, delete, State), {BQ, BQS}. sync_mirrors(HandleInfo, EmitStats, @@ -170,21 +171,31 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, #amqqueue{sync_slave_pids = SSPids}} = rabbit_amqqueue:lookup(QName), + %% TODO there should be a policy for this + case SSPids of + [] -> %% Remove the whole queue to avoid data loss + stop_all_slaves(Reason, nodelete, State); + _ -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - stop_all_slaves(Reason, State), + stop_all_slaves(Reason, delete, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, Delete, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], - ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + ok = gm:broadcast(GM, {terminate, Delete, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ee889f8442..e30b941d5e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -420,7 +420,7 @@ handle_msg([_SPid], _From, process_death) -> %% messages from the master we have yet to receive. When we get %% members_changed, then there will be no more messages. ok; -handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> +handle_msg([CPid], _From, {terminate, _Delete, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> @@ -876,10 +876,13 @@ process_instruction({depth, Depth}, backing_queue_state = BQS }) -> {ok, set_delta(Depth - BQ:depth(BQS), State)}; -process_instruction({delete_and_terminate, Reason}, +process_instruction({terminate, Delete, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:delete_and_terminate(Reason, BQS), + case Delete of + delete -> BQ:delete_and_terminate(Reason, BQS); + nodelete -> BQ:terminate(Reason, BQS) + end, {stop, State #state { backing_queue_state = undefined }}. msg_ids_to_acktags(MsgIds, MA) -> |
