summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-01 16:50:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-01 16:50:31 +0100
commit70962d101a1421af06ce28ac262d3c82b943fdcb (patch)
tree49dfdd37db4afdbfa0bf5e15b286c319a873391c /src
parente787dcfdae7bc055e2ee13c70ed14b3eacba5a9e (diff)
downloadrabbitmq-server-git-70962d101a1421af06ce28ac262d3c82b943fdcb.tar.gz
Stop the whole queue if the alternative is to fail over to an unsynced slave.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl23
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
3 files changed, 25 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 23718da18b..6a67a7594c 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -368,7 +368,7 @@ handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
-handle_cast({delete_and_terminate, Reason}, State) ->
+handle_cast({terminate, _Delete, Reason}, State) ->
{stop, Reason, State}.
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
@@ -410,7 +410,7 @@ handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+handle_msg([CPid], _From, {terminate, _Delete, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, Msg),
{stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b9118d..c6e5a7d80e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -130,7 +130,8 @@ stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
unlink(CPid),
- stop_all_slaves(shutdown, State),
+ %% delete = *slaves* should delete
+ stop_all_slaves(shutdown, delete, State),
{BQ, BQS}.
sync_mirrors(HandleInfo, EmitStats,
@@ -170,21 +171,31 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, #amqqueue{sync_slave_pids = SSPids}} = rabbit_amqqueue:lookup(QName),
+ %% TODO there should be a policy for this
+ case SSPids of
+ [] -> %% Remove the whole queue to avoid data loss
+ stop_all_slaves(Reason, nodelete, State);
+ _ -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- stop_all_slaves(Reason, State),
+ stop_all_slaves(Reason, delete, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, Delete, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
- ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ ok = gm:broadcast(GM, {terminate, Delete, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ee889f8442..e30b941d5e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -420,7 +420,7 @@ handle_msg([_SPid], _From, process_death) ->
%% messages from the master we have yet to receive. When we get
%% members_changed, then there will be no more messages.
ok;
-handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+handle_msg([CPid], _From, {terminate, _Delete, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
@@ -876,10 +876,13 @@ process_instruction({depth, Depth},
backing_queue_state = BQS }) ->
{ok, set_delta(Depth - BQ:depth(BQS), State)};
-process_instruction({delete_and_terminate, Reason},
+process_instruction({terminate, Delete, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQ:delete_and_terminate(Reason, BQS),
+ case Delete of
+ delete -> BQ:delete_and_terminate(Reason, BQS);
+ nodelete -> BQ:terminate(Reason, BQS)
+ end,
{stop, State #state { backing_queue_state = undefined }}.
msg_ids_to_acktags(MsgIds, MA) ->