diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5537634144..4a7a271a6e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -456,16 +456,28 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised slave can take over + %% so we should retry. {ok, Q = #amqqueue{state = stopped}} -> %% The queue process was stopped by the supervisor - E({absent, Q, stopped}); - {ok, Q = #amqqueue{pid = QPid}} -> + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a slave to take over. + {ok, Q = #amqqueue{state = live}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -473,14 +485,24 @@ with(Name, F, E, RetriesLeft) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_mnesia:is_process_alive(QPid), - timer:sleep(30), - with(Name, F, E, RetriesLeft - 1) - end, fun () -> F(Q) end); + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) end. +retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> + case {QState, is_mirrored(Q)} of + %% We don't want to repeat an operation if + %% there are no slaves to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> |
