summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-26 15:58:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-26 15:58:02 +0000
commitb53ae6dfdcc843b61b6214428157c6699027f975 (patch)
treefc6b7663259ad855a0c2d10a6af780cc01ce9c76 /src
parentc3d2001e5d60f43a6a909b2d9d9827608bc18fb4 (diff)
downloadrabbitmq-server-git-b53ae6dfdcc843b61b6214428157c6699027f975.tar.gz
Don't hang on shutdown
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
3 files changed, 20 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 60857e7e3d..f7bb4453f6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1162,9 +1162,14 @@ handle_call(sync_mirrors, From,
0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
rabbit_amqqueue:lookup(Name),
gen_server2:reply(From, ok),
- noreply(State#q{backing_queue_state =
- rabbit_mirror_queue_master:sync_mirrors(
- SPids -- SSPids, Name, BQS)});
+ try
+ noreply(State#q{backing_queue_state =
+ rabbit_mirror_queue_master:sync_mirrors(
+ SPids -- SSPids, Name, BQS)})
+ catch
+ {time_to_shutdown, Reason} ->
+ {stop, Reason, State}
+ end;
_ -> reply({error, queue_has_pending_acks}, State)
end;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 64b78fbbdb..a695d6f20d 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -152,6 +152,12 @@ sync_mirrors(SPids, Name, State = #state { gm = GM,
SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
{{_, SPidsMRefs2, _}, BQS1} =
BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
+ receive
+ {'EXIT', _Pid, Reason} ->
+ throw({time_to_shutdown, Reason})
+ after 0 ->
+ ok
+ end,
SPMR1 = wait_for_credit(SPMR, Ref),
[begin
credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 965ea09017..bb8def3de3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -228,8 +228,7 @@ handle_cast({sync_start, Ref, MPid},
MRef = erlang:monitor(process, MPid),
MPid ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),
- noreply(
- sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}));
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1});
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -858,7 +857,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
%% only thing to do here is purge.)
{_MsgCount, BQS1} = BQ:purge(BQS),
credit_flow:peer_down(MPid),
- State#state{backing_queue_state = BQS1};
+ noreply(State#state{backing_queue_state = BQS1});
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
sync_loop(Ref, MRef, MPid, State);
@@ -867,7 +866,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
erlang:demonitor(MRef),
credit_flow:peer_down(MPid),
%% We can only sync when there are no pending acks
- set_delta(0, State);
+ noreply(set_delta(0, State));
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
sync_loop(Ref, MRef, MPid, State);
@@ -882,5 +881,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
Props = Props0#message_properties{needs_confirming = false,
delivered = true},
BQS1 = BQ:publish(Msg, Props, none, BQS),
- sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1});
+ {'EXIT', _Pid, Reason} ->
+ {stop, Reason, State}
end.