summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl36
1 files changed, 29 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5537634144..4a7a271a6e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -456,16 +456,28 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ %% The queue was stopped and not migrated
+ E({absent, Q, stopped});
+ %% The queue process has crashed with unknown error
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ %% The queue process has been stopped by a supervisor.
+ %% In that case a synchronised slave can take over
+ %% so we should retry.
{ok, Q = #amqqueue{state = stopped}} ->
%% The queue process was stopped by the supervisor
- E({absent, Q, stopped});
- {ok, Q = #amqqueue{pid = QPid}} ->
+ rabbit_misc:with_exit_handler(
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
+ %% The queue is supposed to be active.
+ %% The master node can go away or queue can be killed
+ %% so we retry, waiting for a slave to take over.
+ {ok, Q = #amqqueue{state = live}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -473,14 +485,24 @@ with(Name, F, E, RetriesLeft) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_mnesia:is_process_alive(QPid),
- timer:sleep(30),
- with(Name, F, E, RetriesLeft - 1)
- end, fun () -> F(Q) end);
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
end.
+retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+ case {QState, is_mirrored(Q)} of
+ %% We don't want to repeat an operation if
+ %% there are no slaves to migrate to
+ {stopped, false} ->
+ E({absent, Q, stopped});
+ _ ->
+ false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end.
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->