diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 50 |
1 files changed, 39 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5537634144..a5480f707e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -339,11 +339,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, {ok, Node0} -> Node0; {error, _} -> Node end, - Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), - gen_server2:call( - rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), - {init, new}, infinity). + case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of + {ok, _} -> + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), + {init, new}, infinity); + {error, Error} -> + rabbit_misc:protocol_error(internal_error, + "Cannot declare a queue '~s' on node '~s': ~255p", + [rabbit_misc:rs(QueueName), Node1, Error]) + end. internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -456,16 +462,28 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised slave can take over + %% so we should retry. {ok, Q = #amqqueue{state = stopped}} -> %% The queue process was stopped by the supervisor - E({absent, Q, stopped}); - {ok, Q = #amqqueue{pid = QPid}} -> + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a slave to take over. + {ok, Q = #amqqueue{state = live}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -473,14 +491,24 @@ with(Name, F, E, RetriesLeft) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_mnesia:is_process_alive(QPid), - timer:sleep(30), - with(Name, F, E, RetriesLeft - 1) - end, fun () -> F(Q) end); + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) end. +retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> + case {QState, is_mirrored(Q)} of + %% We don't want to repeat an operation if + %% there are no slaves to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> |
