summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-08-01 03:44:22 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-08-01 03:44:22 +0300
commit1342b04a177207c66d562e4d33610efcd3eac216 (patch)
treecd584cbdce16720b82fd3cfb6ebc225d3d323cf4 /src
parent7bca93c7f931fbff8862077ea07b710004a4f12a (diff)
parentc58a15e7893bae019418c486d971046e879e0385 (diff)
downloadrabbitmq-server-git-1342b04a177207c66d562e4d33610efcd3eac216.tar.gz
Merge branch 'master' into rabbitmq-server-1314
Conflicts: test/dynamic_ha_SUITE.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl6
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl19
-rw-r--r--src/rabbit_mirror_queue_misc.erl20
-rw-r--r--src/rabbit_recovery_terms.erl4
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vhost_msg_store.erl6
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--src/rabbit_vhost_sup_sup.erl60
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl7
12 files changed, 113 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff57593374..5537634144 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,7 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -255,6 +256,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) ->
E({absent, Q, timeout});
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ E({absent, Q, stopped});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index b5ef86255d..f0bcbd7c60 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -57,7 +57,7 @@ find_for_vhost(VHost) ->
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
@@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
supervisor2:start_child(
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 3ae17677e0..ca13700da0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
- " because the vhost database has stopped working",
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
ok.
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- rabbit_networking:close_connection(Pid, Message);
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index dab98c740e..725cd7d089 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -231,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup_sup:start_queue_process(
- MirrorNode, Q, slave),
- log_info(QName, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
+ {ok, _} ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ {error, Error} ->
+ log_warning(QName,
+ "Unable to start queue mirror on node '~p'. "
+ "Target virtual host is not running: ~p~n",
+ [MirrorNode, Error]),
+ ok
+ end
end);
{error, not_found} = E ->
E
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 73fc9c7449..b73f3add7c 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,7 +48,7 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
{ok, _} = supervisor2:start_child(
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
ok.
stop(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
case supervisor:terminate_child(VHostSup, ?MODULE) of
ok -> supervisor:delete_child(VHostSup, ?MODULE);
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 7b797e46b2..9d8a6795b4 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
- update_vhost(VHost, undefined).
+ %% If the function is called as a part of vhost deletion, the vhost can
+ %% be already deleted.
+ case rabbit_vhost:exists(VHost) of
+ true -> update_vhost(VHost, undefined);
+ false -> ok
+ end.
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 3c633875bc..b9af37c258 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,7 +23,7 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
end.
stop(VHost, Type) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, Type),
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
end.
vhost_store_pid(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
case supervisor2:find_child(VHostSup, Type) of
[Pid] -> Pid;
[] -> no_pid
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 1d5db93fda..93c26d4e0f 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,7 +23,7 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
-export([start_on_all_nodes/1]).
@@ -72,7 +72,7 @@ delete_on_all_nodes(VHost) ->
ok.
stop_and_delete_vhost(VHost) ->
- case get_vhost_sup(VHost) of
+ StopResult = case lookup_vhost_sup_record(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +84,15 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete(?MODULE, VHost),
- ok = rabbit_vhost:delete_storage(VHost);
+ true = ets:delete(?MODULE, VHost),
+ ok;
Other ->
Other
end
end
- end.
+ end,
+ ok = rabbit_vhost:delete_storage(VHost),
+ StopResult.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,7 +108,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec init_vhost(rabbit_types:vhost()) -> ok.
+-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
@@ -130,30 +132,32 @@ init_vhost(VHost) ->
end
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
-vhost_sup(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
+ {vhost_supervisor_not_running, rabbit_types:vhost()}.
+
+-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
+get_vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
+ {error, Err} ->
+ {error, Err};
{badrpc, RpcErr} ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
-vhost_sup(VHost) ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case start_vhost(VHost) of
- {ok, Pid} ->
- true = is_vhost_alive(VHost),
- {ok, Pid};
- {error, {no_such_vhost, VHost}} ->
- {error, {no_such_vhost, VHost}};
- Error ->
- throw(Error)
- end;
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
+-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
+get_vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false ->
+ {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ {error, {vhost_supervisor_not_running, VHost}};
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
end.
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
@@ -181,7 +185,7 @@ start_vhost(VHost) ->
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
@@ -210,8 +214,8 @@ save_vhost_process(VHost, VHostProcessPid) ->
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.
--spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
-get_vhost_sup(VHost) ->
+-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
+lookup_vhost_sup_record(VHost) ->
case ets:lookup(?MODULE, VHost) of
[] -> not_found;
[#vhost_sup{} = VHostSup] -> VHostSup
@@ -219,7 +223,7 @@ get_vhost_sup(VHost) ->
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
not_found ->
no_pid;
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8e23389bb9..4ae68cdd75 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -29,7 +29,12 @@
start_link(VHost) ->
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
- supervisor:start_link(?MODULE, [VHost]).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
+ {ok, Pid} ->
+ {error, {already_started, Pid}};
+ {error, _} ->
+ supervisor:start_link(?MODULE, [VHost])
+ end.
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.