summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-08-09 14:41:01 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-08-09 14:41:01 +0100
commitad9f941f95718e5cef9ee544d20a66e8c3b0b88c (patch)
tree9b7ff7b39d4c6ff93fd65b31f4296eb84028db6a /src
parente7a67da88293ebe5d8baf2f73eeb04d7d235a2dc (diff)
parent70fabf3eae712ee5e13f5131b5d8443b7147203c (diff)
downloadrabbitmq-server-git-ad9f941f95718e5cef9ee544d20a66e8c3b0b88c.tar.gz
Merge branch 'master' into rabbitmq-cli-207
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl64
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl6
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl19
-rw-r--r--src/rabbit_mirror_queue_misc.erl43
-rw-r--r--src/rabbit_reader.erl59
-rw-r--r--src/rabbit_recovery_terms.erl4
-rw-r--r--src/rabbit_variable_queue.erl19
-rw-r--r--src/rabbit_vhost.erl21
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vhost_msg_store.erl6
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--src/rabbit_vhost_sup_sup.erl97
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl7
16 files changed, 258 insertions, 126 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fd2f980455..0a0eb6b71a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -168,12 +168,6 @@
{requires, recovery},
{enables, routing_ready}]}).
--rabbit_boot_step({mirrored_queues,
- [{description, "adding mirrors to queues"},
- {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, core_initialized}]}).
@@ -803,6 +797,16 @@ start(normal, []) ->
warn_if_disc_io_options_dubious(),
rabbit_boot_steps:run_boot_steps(),
{ok, SupPid};
+ {error, {erlang_version_too_old,
+ {found, OTPRel, ERTSVer},
+ {required, ?OTP_MINIMUM, ?ERTS_MINIMUM}}} ->
+ Msg = "This RabbitMQ version cannot run on Erlang ~s (erts ~s): "
+ "minimum required version is ~s (erts ~s)",
+ Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM],
+ rabbit_log:error(Msg, Args),
+ %% also print to stderr to make this more visible
+ io:format(standard_error, "Error: " ++ Msg ++ "~n", Args),
+ {error, {erlang_version_too_old, rabbit_misc:format("Erlang ~s or later is required, started on ~s", [?OTP_MINIMUM, OTPRel])}};
Error ->
Error
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f76566ffb9..2d85a9f04b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,6 +41,7 @@
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -256,6 +257,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -330,11 +340,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
{ok, Node0} -> Node0;
{error, _} -> Node
end,
-
Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
- gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
- {init, new}, infinity).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of
+ {ok, _} ->
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity);
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QueueName), Node1, Error])
+ end.
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -447,13 +463,28 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ %% The queue was stopped and not migrated
+ E({absent, Q, stopped});
+ %% The queue process has crashed with unknown error
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
- {ok, Q = #amqqueue{pid = QPid}} ->
+ %% The queue process has been stopped by a supervisor.
+ %% In that case a synchronised slave can take over
+ %% so we should retry.
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ rabbit_misc:with_exit_handler(
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
+ %% The queue is supposed to be active.
+ %% The master node can go away or queue can be killed
+ %% so we retry, waiting for a slave to take over.
+ {ok, Q = #amqqueue{state = live}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -461,14 +492,24 @@ with(Name, F, E, RetriesLeft) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_mnesia:is_process_alive(QPid),
- timer:sleep(30),
- with(Name, F, E, RetriesLeft - 1)
- end, fun () -> F(Q) end);
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
end.
+retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+ case {QState, is_mirrored(Q)} of
+ %% We don't want to repeat an operation if
+ %% there are no slaves to migrate to
+ {stopped, false} ->
+ E({absent, Q, stopped});
+ _ ->
+ false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end.
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
@@ -655,10 +696,13 @@ is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
end.
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index b5ef86255d..f0bcbd7c60 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -57,7 +57,7 @@ find_for_vhost(VHost) ->
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
@@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
supervisor2:start_child(
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 3ae17677e0..ca13700da0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
- " because the vhost database has stopped working",
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
ok.
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- rabbit_networking:close_connection(Pid, Message);
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 59522da4a9..a6571defcb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
+-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
@@ -53,7 +53,6 @@
-spec remove_from_queue
(rabbit_amqqueue:name(), pid(), [pid()]) ->
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
--spec on_node_up() -> 'ok'.
-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
'ok'.
-spec store_updated_slaves(rabbit_types:amqqueue()) ->
@@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) ->
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
NewNodes -- OldNodes.
-on_node_up() ->
+on_vhost_up(VHost) ->
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName,
+ fun
+ (#amqqueue{name = #resource{virtual_host = OtherVhost}},
+ QNames0) when OtherVhost =/= VHost ->
+ QNames0;
+ (Q = #amqqueue{name = QName,
pid = Pid,
slave_pids = SPids}, QNames0) ->
%% We don't want to pass in the whole
@@ -228,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup_sup:start_queue_process(
- MirrorNode, Q, slave),
- log_info(QName, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
+ {ok, _} ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ {error, Error} ->
+ log_warning(QName,
+ "Unable to start queue mirror on node '~p'. "
+ "Target virtual host is not running: ~p~n",
+ [MirrorNode, Error]),
+ ok
+ end
end);
{error, not_found} = E ->
E
@@ -249,12 +262,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
rabbit_misc:pid_to_string(MirrorPid),
[[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
-log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args).
-log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args).
-
-log(Level, QName, Fmt, Args) ->
- rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
- [rabbit_misc:rs(QName) | Args]).
+log_info (QName, Fmt, Args) ->
+ rabbit_log_mirroring:info("Mirrored ~s: " ++ Fmt,
+ [rabbit_misc:rs(QName) | Args]).
+log_warning(QName, Fmt, Args) ->
+ rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt,
+ [rabbit_misc:rs(QName) | Args]).
store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
sync_slave_pids = SSPids,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 77914a00bf..6e2ed2a889 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -279,18 +279,20 @@ socket_error(Reason) when is_atom(Reason) ->
rabbit_log_connection:error("Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- Level =
- case Reason of
- {ssl_upgrade_error, closed} ->
- %% The socket was closed while upgrading to SSL.
- %% This is presumably a TCP healthcheck, so don't log
- %% it unless specified otherwise.
- debug;
- _ ->
- error
- end,
- rabbit_log:log(rabbit_log_connection, Level,
- "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ Fmt = "Error on AMQP connection ~p:~n~p~n",
+ Args = [self(), Reason],
+ case Reason of
+ %% The socket was closed while upgrading to SSL.
+ %% This is presumably a TCP healthcheck, so don't log
+ %% it unless specified otherwise.
+ {ssl_upgrade_error, closed} ->
+ %% Lager sinks (rabbit_log_connection)
+ %% are handled by the lager parse_transform.
+ %% Hence have to define the loglevel as a function call.
+ rabbit_log_connection:debug(Fmt, Args);
+ _ ->
+ rabbit_log_connection:error(Fmt, Args)
+ end.
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -402,31 +404,41 @@ log_connection_exception(Name, Ex) ->
log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
%% Long line to avoid extra spaces and line breaks in log
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n"
"missed heartbeats from client, timeout: ~ps~n",
[self(), Name, TimeoutSec]);
log_connection_exception(Severity, Name, {connection_closed_abruptly,
#v1{connection = #connection{user = #user{username = Username},
vhost = VHost}}}) ->
- rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
[self(), Name, VHost, Username]);
%% when client abruptly closes connection before connection.open/authentication/authorization
%% succeeded, don't log username and vhost as 'none'
log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
- rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
[self(), Name]);
%% old exception structure
log_connection_exception(Severity, Name, connection_closed_abruptly) ->
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n"
"client unexpectedly closed TCP connection~n",
[self(), Name]);
log_connection_exception(Severity, Name, Ex) ->
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n~p~n",
[self(), Name, Ex]).
+log_connection_exception_with_severity(Severity, Fmt, Args) ->
+ case Severity of
+ debug -> rabbit_log_connection:debug(Fmt, Args);
+ info -> rabbit_log_connection:info(Fmt, Args);
+ warning -> rabbit_log_connection:warning(Fmt, Args);
+ error -> rabbit_log_connection:warning(Fmt, Args)
+ end.
+
run({M, F, A}) ->
try apply(M, F, A)
catch {become, MFA} -> run(MFA)
@@ -475,13 +487,12 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
%%
%% The goal is to not log TCP healthchecks (a connection
%% with no data received) unless specified otherwise.
- Level = case Recv of
- closed -> debug;
- _ -> info
- end,
- rabbit_log:log(rabbit_log_connection, Level,
- "accepting AMQP connection ~p (~s)~n",
- [self(), ConnName]);
+ Fmt = "accepting AMQP connection ~p (~s)~n",
+ Args = [self(), ConnName],
+ case Recv of
+ closed -> rabbit_log_connection:debug(Fmt, Args);
+ _ -> rabbit_log_connection:info(Fmt, Args)
+ end;
_ ->
ok
end,
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 73fc9c7449..b73f3add7c 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,7 +48,7 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
{ok, _} = supervisor2:start_child(
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
ok.
stop(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
case supervisor:terminate_child(VHostSup, ?MODULE) of
ok -> supervisor:delete_child(VHostSup, ?MODULE);
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b4945fe3d3..4d61bb4b03 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2766,9 +2766,15 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
case list_persistent_queues() of
- % [] -> ok;
- Queues -> move_messages_to_vhost_store(Queues)
- end.
+ [] ->
+ log_upgrade("No durable queues found."
+ " Skipping message store migration"),
+ ok;
+ Queues ->
+ move_messages_to_vhost_store(Queues)
+ end,
+ ok = delete_old_store(),
+ ok = rabbit_queue_index:cleanup_global_recovery_terms().
move_messages_to_vhost_store(Queues) ->
log_upgrade("Moving messages to per-vhost message store"),
@@ -2802,8 +2808,7 @@ move_messages_to_vhost_store(Queues) ->
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
log_upgrade("Message store migration finished"),
- ok = delete_old_store(OldStore),
- ok = rabbit_queue_index:cleanup_global_recovery_terms(),
+ ok = rabbit_sup:stop_child(OldStore),
[ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
ok = stop_new_store(NewMsgStore).
@@ -2936,8 +2941,8 @@ stop_new_store(NewStore) ->
NewStore),
ok.
-delete_old_store(OldStore) ->
- ok = rabbit_sup:stop_child(OldStore),
+delete_old_store() ->
+ log_upgrade("Removing the old message store data"),
rabbit_file:recursive_delete(
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
%% Delete old transient store as well
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 30557fc7be..73c05389be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -71,11 +71,13 @@ recover(VHost) ->
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
[QName || #amqqueue{name = QName} <- Qs]),
ok = rabbit_amqqueue:start(Qs),
+ %% Start queue mirrors.
+ ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, tracing, state]).
+-define(INFO_KEYS, [name, tracing, cluster_state]).
add(VHostPath, ActingUser) ->
rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
@@ -261,10 +263,19 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
i(tracing, VHost) -> rabbit_trace:enabled(VHost);
-i(state, VHost) -> case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
- true -> running;
- false -> down
- end;
+i(cluster_state, VHost) ->
+ Nodes = rabbit_nodes:all_running(),
+ lists:map(fun(Node) ->
+ State = case rabbit_misc:rpc_call(Node,
+ rabbit_vhost_sup_sup, is_vhost_alive,
+ [VHost]) of
+ {badrpc, nodedown} -> nodedown;
+ true -> running;
+ false -> stopped
+ end,
+ {Node, State}
+ end,
+ Nodes);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 7b797e46b2..9d8a6795b4 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
- update_vhost(VHost, undefined).
+ %% If the function is called as a part of vhost deletion, the vhost can
+ %% be already deleted.
+ case rabbit_vhost:exists(VHost) of
+ true -> update_vhost(VHost, undefined);
+ false -> ok
+ end.
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 3c633875bc..b9af37c258 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,7 +23,7 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
end.
stop(VHost, Type) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, Type),
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
end.
vhost_store_pid(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
case supervisor2:find_child(VHostSup, Type) of
[Pid] -> Pid;
[] -> no_pid
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 1d5db93fda..19d7cf61b7 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,15 +23,16 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
--export([delete_on_all_nodes/1]).
--export([start_on_all_nodes/1]).
-
--export([save_vhost_process/2]).
+-export([init_vhost/1,
+ start_vhost/1, start_vhost/2,
+ get_vhost_sup/1, get_vhost_sup/2,
+ save_vhost_sup/3,
+ save_vhost_process/2]).
+-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1, start_vhost/1]).
+-export([stop_and_delete_vhost/1]).
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
@@ -61,7 +62,12 @@ init([]) ->
start_on_all_nodes(VHost) ->
NodesStart = [ {Node, start_vhost(VHost, Node)}
|| Node <- rabbit_nodes:all_running() ],
- Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ Failures = lists:filter(fun
+ ({_, {ok, _}}) -> false;
+ ({_, {error, {already_started, _}}}) -> false;
+ (_) -> true
+ end,
+ NodesStart),
case Failures of
[] -> ok;
Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
@@ -72,7 +78,7 @@ delete_on_all_nodes(VHost) ->
ok.
stop_and_delete_vhost(VHost) ->
- case get_vhost_sup(VHost) of
+ StopResult = case lookup_vhost_sup_record(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +90,15 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete(?MODULE, VHost),
- ok = rabbit_vhost:delete_storage(VHost);
+ true = ets:delete(?MODULE, VHost),
+ ok;
Other ->
Other
end
end
- end.
+ end,
+ ok = rabbit_vhost:delete_storage(VHost),
+ StopResult.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,10 +114,15 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec init_vhost(rabbit_types:vhost()) -> ok.
+-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
+ {error, {already_started, _}} ->
+ rabbit_log:warning(
+ "Attempting to start an already started vhost '~s'.",
+ [VHost]),
+ ok;
{error, {no_such_vhost, VHost}} ->
{error, {no_such_vhost, VHost}};
{error, Reason} ->
@@ -130,58 +143,54 @@ init_vhost(VHost) ->
end
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
-vhost_sup(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
+ {vhost_supervisor_not_running, rabbit_types:vhost()}.
+
+-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
+get_vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
+ {error, Err} ->
+ {error, Err};
{badrpc, RpcErr} ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
-vhost_sup(VHost) ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case start_vhost(VHost) of
- {ok, Pid} ->
- true = is_vhost_alive(VHost),
- {ok, Pid};
- {error, {no_such_vhost, VHost}} ->
- {error, {no_such_vhost, VHost}};
- Error ->
- throw(Error)
- end;
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
+-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
+get_vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false ->
+ {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ {error, {vhost_supervisor_not_running, VHost}};
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
end.
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
+ {ok, Pid} -> {ok, Pid};
+ {error, Err} -> {error, Err};
+ {badrpc, RpcErr} -> {error, RpcErr}
end.
-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost) ->
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
- true ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, Pid} -> {ok, Pid};
- {error, {already_started, Pid}} -> {ok, Pid};
- {error, Err} -> {error, Err}
- end
+ true -> supervisor2:start_child(?MODULE, [VHost])
end.
-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
@@ -210,8 +219,8 @@ save_vhost_process(VHost, VHostProcessPid) ->
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.
--spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
-get_vhost_sup(VHost) ->
+-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
+lookup_vhost_sup_record(VHost) ->
case ets:lookup(?MODULE, VHost) of
[] -> not_found;
[#vhost_sup{} = VHostSup] -> VHostSup
@@ -219,7 +228,7 @@ get_vhost_sup(VHost) ->
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
not_found ->
no_pid;
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8e23389bb9..4ae68cdd75 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -29,7 +29,12 @@
start_link(VHost) ->
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
- supervisor:start_link(?MODULE, [VHost]).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
+ {ok, Pid} ->
+ {error, {already_started, Pid}};
+ {error, _} ->
+ supervisor:start_link(?MODULE, [VHost])
+ end.
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.