diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 15:58:02 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 15:58:02 +0000 |
| commit | b53ae6dfdcc843b61b6214428157c6699027f975 (patch) | |
| tree | fc6b7663259ad855a0c2d10a6af780cc01ce9c76 /src | |
| parent | c3d2001e5d60f43a6a909b2d9d9827608bc18fb4 (diff) | |
| download | rabbitmq-server-git-b53ae6dfdcc843b61b6214428157c6699027f975.tar.gz | |
Don't hang on shutdown
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 11 |
3 files changed, 20 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 60857e7e3d..f7bb4453f6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1162,9 +1162,14 @@ handle_call(sync_mirrors, From, 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = rabbit_amqqueue:lookup(Name), gen_server2:reply(From, ok), - noreply(State#q{backing_queue_state = - rabbit_mirror_queue_master:sync_mirrors( - SPids -- SSPids, Name, BQS)}); + try + noreply(State#q{backing_queue_state = + rabbit_mirror_queue_master:sync_mirrors( + SPids -- SSPids, Name, BQS)}) + catch + {time_to_shutdown, Reason} -> + {stop, Reason, State} + end; _ -> reply({error, queue_has_pending_acks}, State) end; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 64b78fbbdb..a695d6f20d 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -152,6 +152,12 @@ sync_mirrors(SPids, Name, State = #state { gm = GM, SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), {{_, SPidsMRefs2, _}, BQS1} = BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> + receive + {'EXIT', _Pid, Reason} -> + throw({time_to_shutdown, Reason}) + after 0 -> + ok + end, SPMR1 = wait_for_credit(SPMR, Ref), [begin credit_flow:send(SPid, ?CREDIT_DISC_BOUND), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 965ea09017..bb8def3de3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -228,8 +228,7 @@ handle_cast({sync_start, Ref, MPid}, MRef = erlang:monitor(process, MPid), MPid ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), - noreply( - sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})); + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -858,7 +857,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, %% only thing to do here is purge.) {_MsgCount, BQS1} = BQ:purge(BQS), credit_flow:peer_down(MPid), - State#state{backing_queue_state = BQS1}; + noreply(State#state{backing_queue_state = BQS1}); {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), sync_loop(Ref, MRef, MPid, State); @@ -867,7 +866,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, erlang:demonitor(MRef), credit_flow:peer_down(MPid), %% We can only sync when there are no pending acks - set_delta(0, State); + noreply(set_delta(0, State)); {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), sync_loop(Ref, MRef, MPid, State); @@ -882,5 +881,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, Props = Props0#message_properties{needs_confirming = false, delivered = true}, BQS1 = BQ:publish(Msg, Props, none, BQS), - sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}) + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}); + {'EXIT', _Pid, Reason} -> + {stop, Reason, State} end. |
