summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-08-03 16:16:09 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-08-03 16:16:09 +0300
commit454a26b359b050d62bb745d11e7bc3677aab29bf (patch)
treed4fa6f9a14794da4d78725897c1a8e577e740386 /src
parentba218a735477744f7740fb4325868276dd99ea6e (diff)
parentac687666a1cc4fb0619792d44cad5f10dd869629 (diff)
downloadrabbitmq-server-git-454a26b359b050d62bb745d11e7bc3677aab29bf.tar.gz
Merge branch 'master' into rabbitmq-management-460
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl50
-rw-r--r--src/rabbit_mirror_queue_misc.erl11
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_vhost_sup_sup.erl39
5 files changed, 70 insertions, 38 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b166e079f4..0a0eb6b71a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -168,12 +168,6 @@
{requires, recovery},
{enables, routing_ready}]}).
--rabbit_boot_step({mirrored_queues,
- [{description, "adding mirrors to queues"},
- {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, core_initialized}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5537634144..a5480f707e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -339,11 +339,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
{ok, Node0} -> Node0;
{error, _} -> Node
end,
-
Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
- gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
- {init, new}, infinity).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of
+ {ok, _} ->
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity);
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QueueName), Node1, Error])
+ end.
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -456,16 +462,28 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ %% The queue was stopped and not migrated
+ E({absent, Q, stopped});
+ %% The queue process has crashed with unknown error
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ %% The queue process has been stopped by a supervisor.
+ %% In that case a synchronised slave can take over
+ %% so we should retry.
{ok, Q = #amqqueue{state = stopped}} ->
%% The queue process was stopped by the supervisor
- E({absent, Q, stopped});
- {ok, Q = #amqqueue{pid = QPid}} ->
+ rabbit_misc:with_exit_handler(
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
+ %% The queue is supposed to be active.
+ %% The master node can go away or queue can be killed
+ %% so we retry, waiting for a slave to take over.
+ {ok, Q = #amqqueue{state = live}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -473,14 +491,24 @@ with(Name, F, E, RetriesLeft) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_mnesia:is_process_alive(QPid),
- timer:sleep(30),
- with(Name, F, E, RetriesLeft - 1)
- end, fun () -> F(Q) end);
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
end.
+retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+ case {QState, is_mirrored(Q)} of
+ %% We don't want to repeat an operation if
+ %% there are no slaves to migrate to
+ {stopped, false} ->
+ E({absent, Q, stopped});
+ _ ->
+ false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end.
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index dc23095fb1..725cd7d089 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
+-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
@@ -53,7 +53,6 @@
-spec remove_from_queue
(rabbit_amqqueue:name(), pid(), [pid()]) ->
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
--spec on_node_up() -> 'ok'.
-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
'ok'.
-spec store_updated_slaves(rabbit_types:amqqueue()) ->
@@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) ->
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
NewNodes -- OldNodes.
-on_node_up() ->
+on_vhost_up(VHost) ->
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName,
+ fun
+ (#amqqueue{name = #resource{virtual_host = OtherVhost}},
+ QNames0) when OtherVhost =/= VHost ->
+ QNames0;
+ (Q = #amqqueue{name = QName,
pid = Pid,
slave_pids = SPids}, QNames0) ->
%% We don't want to pass in the whole
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 3a214e2634..73c05389be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -71,6 +71,8 @@ recover(VHost) ->
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
[QName || #amqqueue{name = QName} <- Qs]),
ok = rabbit_amqqueue:start(Qs),
+ %% Start queue mirrors.
+ ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 93c26d4e0f..19d7cf61b7 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,15 +23,16 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
--export([delete_on_all_nodes/1]).
--export([start_on_all_nodes/1]).
-
--export([save_vhost_process/2]).
+-export([init_vhost/1,
+ start_vhost/1, start_vhost/2,
+ get_vhost_sup/1, get_vhost_sup/2,
+ save_vhost_sup/3,
+ save_vhost_process/2]).
+-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1, start_vhost/1]).
+-export([stop_and_delete_vhost/1]).
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
@@ -61,7 +62,12 @@ init([]) ->
start_on_all_nodes(VHost) ->
NodesStart = [ {Node, start_vhost(VHost, Node)}
|| Node <- rabbit_nodes:all_running() ],
- Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ Failures = lists:filter(fun
+ ({_, {ok, _}}) -> false;
+ ({_, {error, {already_started, _}}}) -> false;
+ (_) -> true
+ end,
+ NodesStart),
case Failures of
[] -> ok;
Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
@@ -112,6 +118,11 @@ stop_and_delete_vhost(VHost, Node) ->
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
+ {error, {already_started, _}} ->
+ rabbit_log:warning(
+ "Attempting to start an already started vhost '~s'.",
+ [VHost]),
+ ok;
{error, {no_such_vhost, VHost}} ->
{error, {no_such_vhost, VHost}};
{error, Reason} ->
@@ -163,22 +174,16 @@ get_vhost_sup(VHost) ->
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
+ {ok, Pid} -> {ok, Pid};
+ {error, Err} -> {error, Err};
+ {badrpc, RpcErr} -> {error, RpcErr}
end.
-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost) ->
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
- true ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, Pid} -> {ok, Pid};
- {error, {already_started, Pid}} -> {ok, Pid};
- {error, Err} -> {error, Err}
- end
+ true -> supervisor2:start_child(?MODULE, [VHost])
end.
-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().