diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-08-02 14:24:58 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-08-02 14:26:59 +0100 |
| commit | 7c76ca3a965d54197d51af339bbda5877c7b4974 (patch) | |
| tree | 6d2bf8f432ec12694f0beaa61fc9b0a78bb36a74 | |
| parent | c58a15e7893bae019418c486d971046e879e0385 (diff) | |
| download | rabbitmq-server-git-7c76ca3a965d54197d51af339bbda5877c7b4974.tar.gz | |
Repeat `rabbit_amqqueue:with` operation for a stopped queue.
A stopped queue can migrate to a synchronized slave if it's mirrored,
so we should retry an operation if it has mirrors to migrate to.
Fixes #1319
| -rw-r--r-- | src/rabbit_amqqueue.erl | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5537634144..4a7a271a6e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -456,16 +456,28 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised slave can take over + %% so we should retry. {ok, Q = #amqqueue{state = stopped}} -> %% The queue process was stopped by the supervisor - E({absent, Q, stopped}); - {ok, Q = #amqqueue{pid = QPid}} -> + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a slave to take over. + {ok, Q = #amqqueue{state = live}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -473,14 +485,24 @@ with(Name, F, E, RetriesLeft) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_mnesia:is_process_alive(QPid), - timer:sleep(30), - with(Name, F, E, RetriesLeft - 1) - end, fun () -> F(Q) end); + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) end. +retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> + case {QState, is_mirrored(Q)} of + %% We don't want to repeat an operation if + %% there are no slaves to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> |
