diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-08-09 14:41:01 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-08-09 14:41:01 +0100 |
| commit | ad9f941f95718e5cef9ee544d20a66e8c3b0b88c (patch) | |
| tree | 9b7ff7b39d4c6ff93fd65b31f4296eb84028db6a /src | |
| parent | e7a67da88293ebe5d8baf2f73eeb04d7d235a2dc (diff) | |
| parent | 70fabf3eae712ee5e13f5131b5d8443b7147203c (diff) | |
| download | rabbitmq-server-git-ad9f941f95718e5cef9ee544d20a66e8c3b0b88c.tar.gz | |
Merge branch 'master' into rabbitmq-cli-207
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 59 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 7 |
16 files changed, 258 insertions, 126 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index fd2f980455..0a0eb6b71a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -168,12 +168,6 @@ {requires, recovery}, {enables, routing_ready}]}). --rabbit_boot_step({mirrored_queues, - [{description, "adding mirrors to queues"}, - {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, {requires, core_initialized}]}). @@ -803,6 +797,16 @@ start(normal, []) -> warn_if_disc_io_options_dubious(), rabbit_boot_steps:run_boot_steps(), {ok, SupPid}; + {error, {erlang_version_too_old, + {found, OTPRel, ERTSVer}, + {required, ?OTP_MINIMUM, ?ERTS_MINIMUM}}} -> + Msg = "This RabbitMQ version cannot run on Erlang ~s (erts ~s): " + "minimum required version is ~s (erts ~s)", + Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM], + rabbit_log:error(Msg, Args), + %% also print to stderr to make this more visible + io:format(standard_error, "Error: " ++ Msg ++ "~n", Args), + {error, {erlang_version_too_old, rabbit_misc:format("Erlang ~s or later is required, started on ~s", [?OTP_MINIMUM, OTPRel])}}; Error -> Error end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f76566ffb9..2d85a9f04b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,6 +41,7 @@ -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -256,6 +257,15 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +mark_local_durable_queues_stopped(VHost) -> + Qs = find_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(Q#amqqueue{ state = stopped }) + || Q = #amqqueue{ state = State } <- Qs, + State =/= stopped ] + end). + find_durable_queues(VHost) -> Node = node(), mnesia:async_dirty( @@ -330,11 +340,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, {ok, Node0} -> Node0; {error, _} -> Node end, - Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), - gen_server2:call( - rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), - {init, new}, infinity). + case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of + {ok, _} -> + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), + {init, new}, infinity); + {error, Error} -> + rabbit_misc:protocol_error(internal_error, + "Cannot declare a queue '~s' on node '~s': ~255p", + [rabbit_misc:rs(QueueName), Node1, Error]) + end. internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -447,13 +463,28 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); - {ok, Q = #amqqueue{pid = QPid}} -> + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised slave can take over + %% so we should retry. + {ok, Q = #amqqueue{state = stopped}} -> + %% The queue process was stopped by the supervisor + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a slave to take over. + {ok, Q = #amqqueue{state = live}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -461,14 +492,24 @@ with(Name, F, E, RetriesLeft) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_mnesia:is_process_alive(QPid), - timer:sleep(30), - with(Name, F, E, RetriesLeft - 1) - end, fun () -> F(Q) end); + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) end. +retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> + case {QState, is_mirrored(Q)} of + %% We don't want to repeat an operation if + %% there are no slaves to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> @@ -655,10 +696,13 @@ is_unresponsive(#amqqueue{ pid = QPid }, Timeout) -> end. info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); +info(Q = #amqqueue{ state = stopped }, Items) -> + info_down(Q, Items, stopped); info(#amqqueue{ pid = QPid }, Items) -> case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e43104de2..678f1136c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + [Q] = mnesia:read({rabbit_queue, QName}), + Q2 = Q#amqqueue{state = stopped}, + rabbit_amqqueue:store_queue(Q2) + end), + BQ:terminate(R, BQS) + end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index b5ef86255d..f0bcbd7c60 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -57,7 +57,7 @@ find_for_vhost(VHost) -> -spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}. find_for_vhost(VHost, Node) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node), + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node), case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of [QSup] -> {ok, QSup}; Result -> {error, {queue_supervisor_not_found, Result}} @@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) -> -spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> supervisor2:start_child( VHostSup, @@ -82,7 +82,7 @@ start_for_vhost(VHost) -> -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 00a6607dfb..c69a27d57c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin, fun (not_found) -> {ok, 0}; ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), {ok, 0}; + ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index 3ae17677e0..ca13700da0 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +%% Note: under normal circumstances this will be called immediately +%% after the vhost_deleted above. Therefore we should be careful about +%% what we log and be more defensive. handle_event(#event{type = vhost_down, props = Details}, State) -> VHost = pget(name, Details), Node = pget(node, Details), - rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" - " because the vhost database has stopped working", + rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" + " because the vhost is stopping", [VHost, Node]), close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), rabbit_misc:format("vhost '~s' is down", [VHost])), @@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) -> ok. close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> - rabbit_networking:close_connection(Pid, Message); + try + rabbit_networking:close_connection(Pid, Message) + catch error:{not_a_connection, _} -> + %% could has been closed concurrently, or the input + %% is bogus. In any case, we should not terminate + ok; + _:Err -> + %% ignore, don't terminate + rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]), + ok + end; close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 59522da4a9..a6571defcb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/3, +-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, @@ -53,7 +53,6 @@ -spec remove_from_queue (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}. --spec on_node_up() -> 'ok'. -spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'. -spec store_updated_slaves(rabbit_types:amqqueue()) -> @@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) -> {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), NewNodes -- OldNodes. -on_node_up() -> +on_vhost_up(VHost) -> QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName, + fun + (#amqqueue{name = #resource{virtual_host = OtherVhost}}, + QNames0) when OtherVhost =/= VHost -> + QNames0; + (Q = #amqqueue{name = QName, pid = Pid, slave_pids = SPids}, QNames0) -> %% We don't want to pass in the whole @@ -228,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup_sup:start_queue_process( - MirrorNode, Q, slave), - log_info(QName, "Adding mirror on node ~p: ~p~n", - [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) + #amqqueue{name = #resource{virtual_host = VHost}} = Q, + case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of + {ok, _} -> + SPid = rabbit_amqqueue_sup_sup:start_queue_process( + MirrorNode, Q, slave), + log_info(QName, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode); + {error, Error} -> + log_warning(QName, + "Unable to start queue mirror on node '~p'. " + "Target virtual host is not running: ~p~n", + [MirrorNode, Error]), + ok + end end); {error, not_found} = E -> E @@ -249,12 +262,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> rabbit_misc:pid_to_string(MirrorPid), [[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]). -log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args). -log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args). - -log(Level, QName, Fmt, Args) -> - rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, - [rabbit_misc:rs(QName) | Args]). +log_info (QName, Fmt, Args) -> + rabbit_log_mirroring:info("Mirrored ~s: " ++ Fmt, + [rabbit_misc:rs(QName) | Args]). +log_warning(QName, Fmt, Args) -> + rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt, + [rabbit_misc:rs(QName) | Args]). store_updated_slaves(Q = #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 77914a00bf..6e2ed2a889 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -279,18 +279,20 @@ socket_error(Reason) when is_atom(Reason) -> rabbit_log_connection:error("Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - Level = - case Reason of - {ssl_upgrade_error, closed} -> - %% The socket was closed while upgrading to SSL. - %% This is presumably a TCP healthcheck, so don't log - %% it unless specified otherwise. - debug; - _ -> - error - end, - rabbit_log:log(rabbit_log_connection, Level, - "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). + Fmt = "Error on AMQP connection ~p:~n~p~n", + Args = [self(), Reason], + case Reason of + %% The socket was closed while upgrading to SSL. + %% This is presumably a TCP healthcheck, so don't log + %% it unless specified otherwise. + {ssl_upgrade_error, closed} -> + %% Lager sinks (rabbit_log_connection) + %% are handled by the lager parse_transform. + %% Hence have to define the loglevel as a function call. + rabbit_log_connection:debug(Fmt, Args); + _ -> + rabbit_log_connection:error(Fmt, Args) + end. inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -402,31 +404,41 @@ log_connection_exception(Name, Ex) -> log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) -> %% Long line to avoid extra spaces and line breaks in log - rabbit_log:log(rabbit_log_connection, Severity, + log_connection_exception_with_severity(Severity, "closing AMQP connection ~p (~s):~n" "missed heartbeats from client, timeout: ~ps~n", [self(), Name, TimeoutSec]); log_connection_exception(Severity, Name, {connection_closed_abruptly, #v1{connection = #connection{user = #user{username = Username}, vhost = VHost}}}) -> - rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n", + log_connection_exception_with_severity(Severity, + "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n", [self(), Name, VHost, Username]); %% when client abruptly closes connection before connection.open/authentication/authorization %% succeeded, don't log username and vhost as 'none' log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) -> - rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n", + log_connection_exception_with_severity(Severity, + "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n", [self(), Name]); %% old exception structure log_connection_exception(Severity, Name, connection_closed_abruptly) -> - rabbit_log:log(rabbit_log_connection, Severity, + log_connection_exception_with_severity(Severity, "closing AMQP connection ~p (~s):~n" "client unexpectedly closed TCP connection~n", [self(), Name]); log_connection_exception(Severity, Name, Ex) -> - rabbit_log:log(rabbit_log_connection, Severity, + log_connection_exception_with_severity(Severity, "closing AMQP connection ~p (~s):~n~p~n", [self(), Name, Ex]). +log_connection_exception_with_severity(Severity, Fmt, Args) -> + case Severity of + debug -> rabbit_log_connection:debug(Fmt, Args); + info -> rabbit_log_connection:info(Fmt, Args); + warning -> rabbit_log_connection:warning(Fmt, Args); + error -> rabbit_log_connection:warning(Fmt, Args) + end. + run({M, F, A}) -> try apply(M, F, A) catch {become, MFA} -> run(MFA) @@ -475,13 +487,12 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, %% %% The goal is to not log TCP healthchecks (a connection %% with no data received) unless specified otherwise. - Level = case Recv of - closed -> debug; - _ -> info - end, - rabbit_log:log(rabbit_log_connection, Level, - "accepting AMQP connection ~p (~s)~n", - [self(), ConnName]); + Fmt = "accepting AMQP connection ~p (~s)~n", + Args = [self(), ConnName], + case Recv of + closed -> rabbit_log_connection:debug(Fmt, Args); + _ -> rabbit_log_connection:info(Fmt, Args) + end; _ -> ok end, diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 73fc9c7449..b73f3add7c 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,7 +48,7 @@ %%---------------------------------------------------------------------------- start(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> {ok, _} = supervisor2:start_child( VHostSup, @@ -65,7 +65,7 @@ start(VHost) -> ok. stop(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> case supervisor:terminate_child(VHostSup, ?MODULE) of ok -> supervisor:delete_child(VHostSup, ?MODULE); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b4945fe3d3..4d61bb4b03 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2766,9 +2766,15 @@ transform_store(Store, TransformFun) -> move_messages_to_vhost_store() -> case list_persistent_queues() of - % [] -> ok; - Queues -> move_messages_to_vhost_store(Queues) - end. + [] -> + log_upgrade("No durable queues found." + " Skipping message store migration"), + ok; + Queues -> + move_messages_to_vhost_store(Queues) + end, + ok = delete_old_store(), + ok = rabbit_queue_index:cleanup_global_recovery_terms(). move_messages_to_vhost_store(Queues) -> log_upgrade("Moving messages to per-vhost message store"), @@ -2802,8 +2808,7 @@ move_messages_to_vhost_store(Queues) -> "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), log_upgrade("Message store migration finished"), - ok = delete_old_store(OldStore), - ok = rabbit_queue_index:cleanup_global_recovery_terms(), + ok = rabbit_sup:stop_child(OldStore), [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts], ok = stop_new_store(NewMsgStore). @@ -2936,8 +2941,8 @@ stop_new_store(NewStore) -> NewStore), ok. -delete_old_store(OldStore) -> - ok = rabbit_sup:stop_child(OldStore), +delete_old_store() -> + log_upgrade("Removing the old message store data"), rabbit_file:recursive_delete( [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), %% Delete old transient store as well diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 30557fc7be..73c05389be 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -71,11 +71,13 @@ recover(VHost) -> ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), [QName || #amqqueue{name = QName} <- Qs]), ok = rabbit_amqqueue:start(Qs), + %% Start queue mirrors. + ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, tracing, state]). +-define(INFO_KEYS, [name, tracing, cluster_state]). add(VHostPath, ActingUser) -> rabbit_log:info("Adding vhost '~s'~n", [VHostPath]), @@ -261,10 +263,19 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, VHost) -> VHost; i(tracing, VHost) -> rabbit_trace:enabled(VHost); -i(state, VHost) -> case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of - true -> running; - false -> down - end; +i(cluster_state, VHost) -> + Nodes = rabbit_nodes:all_running(), + lists:map(fun(Node) -> + State = case rabbit_misc:rpc_call(Node, + rabbit_vhost_sup_sup, is_vhost_alive, + [VHost]) of + {badrpc, nodedown} -> nodedown; + true -> running; + false -> stopped + end, + {Node, State} + end, + Nodes); i(Item, _) -> throw({bad_argument, Item}). info(VHost) -> infos(?INFO_KEYS, VHost). diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index 7b797e46b2..9d8a6795b4 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) -> notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) -> rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}, {user_who_performed_action, ActingUser}]), - update_vhost(VHost, undefined). + %% If the function is called as a part of vhost deletion, the vhost can + %% be already deleted. + case rabbit_vhost:exists(VHost) of + true -> update_vhost(VHost, undefined); + false -> ok + end. connection_limit(VirtualHost) -> get_limit(VirtualHost, <<"max-connections">>). diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 3c633875bc..b9af37c258 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,7 +23,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), supervisor2:start_child(VHostSup, @@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); end. stop(VHost, Type) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> ok = supervisor2:terminate_child(VHostSup, Type), ok = supervisor2:delete_child(VHostSup, Type); @@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) -> end. vhost_store_pid(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost), case supervisor2:find_child(VHostSup, Type) of [Pid] -> Pid; [] -> no_pid diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index e3c815a727..f6e4a83daa 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -55,6 +55,7 @@ init([VHost]) -> timer:send_interval(Interval, check_vhost), {ok, VHost} catch _:Reason -> + rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" " Stacktrace ~p", [VHost, Reason, erlang:get_stacktrace()]), diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 1d5db93fda..19d7cf61b7 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,15 +23,16 @@ -export([init/1]). -export([start_link/0, start/0]). --export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). --export([delete_on_all_nodes/1]). --export([start_on_all_nodes/1]). - --export([save_vhost_process/2]). +-export([init_vhost/1, + start_vhost/1, start_vhost/2, + get_vhost_sup/1, get_vhost_sup/2, + save_vhost_sup/3, + save_vhost_process/2]). +-export([delete_on_all_nodes/1, start_on_all_nodes/1]). -export([is_vhost_alive/1]). %% Internal --export([stop_and_delete_vhost/1, start_vhost/1]). +-export([stop_and_delete_vhost/1]). -record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). @@ -61,7 +62,12 @@ init([]) -> start_on_all_nodes(VHost) -> NodesStart = [ {Node, start_vhost(VHost, Node)} || Node <- rabbit_nodes:all_running() ], - Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart), + Failures = lists:filter(fun + ({_, {ok, _}}) -> false; + ({_, {error, {already_started, _}}}) -> false; + (_) -> true + end, + NodesStart), case Failures of [] -> ok; Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} @@ -72,7 +78,7 @@ delete_on_all_nodes(VHost) -> ok. stop_and_delete_vhost(VHost) -> - case get_vhost_sup(VHost) of + StopResult = case lookup_vhost_sup_record(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, vhost_sup_pid = VHostSupPid} -> @@ -84,13 +90,15 @@ stop_and_delete_vhost(VHost) -> [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete(?MODULE, VHost), - ok = rabbit_vhost:delete_storage(VHost); + true = ets:delete(?MODULE, VHost), + ok; Other -> Other end end - end. + end, + ok = rabbit_vhost:delete_storage(VHost), + StopResult. %% We take an optimistic approach whan stopping a remote VHost supervisor. stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> @@ -106,10 +114,15 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec init_vhost(rabbit_types:vhost()) -> ok. +-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}. init_vhost(VHost) -> case start_vhost(VHost) of {ok, _} -> ok; + {error, {already_started, _}} -> + rabbit_log:warning( + "Attempting to start an already started vhost '~s'.", + [VHost]), + ok; {error, {no_such_vhost, VHost}} -> {error, {no_such_vhost, VHost}}; {error, Reason} -> @@ -130,58 +143,54 @@ init_vhost(VHost) -> end end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. -vhost_sup(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of +-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} | + {vhost_supervisor_not_running, rabbit_types:vhost()}. + +-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}. +get_vhost_sup(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; + {error, Err} -> + {error, Err}; {badrpc, RpcErr} -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. -vhost_sup(VHost) -> - case vhost_sup_pid(VHost) of - no_pid -> - case start_vhost(VHost) of - {ok, Pid} -> - true = is_vhost_alive(VHost), - {ok, Pid}; - {error, {no_such_vhost, VHost}} -> - {error, {no_such_vhost, VHost}}; - Error -> - throw(Error) - end; - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} +-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}. +get_vhost_sup(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> + {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + {error, {vhost_supervisor_not_running, VHost}}; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end end. -spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. start_vhost(VHost, Node) -> case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} + {ok, Pid} -> {ok, Pid}; + {error, Err} -> {error, Err}; + {badrpc, RpcErr} -> {error, RpcErr} end. -spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_vhost(VHost) -> case rabbit_vhost:exists(VHost) of false -> {error, {no_such_vhost, VHost}}; - true -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, Pid} -> {ok, Pid}; - {error, {already_started, Pid}} -> {ok, Pid}; - {error, Err} -> {error, Err} - end + true -> supervisor2:start_child(?MODULE, [VHost]) end. -spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). is_vhost_alive(VHost) -> %% A vhost is considered alive if it's supervision tree is alive and %% saved in the ETS table - case get_vhost_sup(VHost) of + case lookup_vhost_sup_record(VHost) of #vhost_sup{wrapper_pid = WrapperPid, vhost_sup_pid = VHostSupPid, vhost_process_pid = VHostProcessPid} @@ -210,8 +219,8 @@ save_vhost_process(VHost, VHostProcessPid) -> {#vhost_sup.vhost_process_pid, VHostProcessPid}), ok. --spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. -get_vhost_sup(VHost) -> +-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found. +lookup_vhost_sup_record(VHost) -> case ets:lookup(?MODULE, VHost) of [] -> not_found; [#vhost_sup{} = VHostSup] -> VHostSup @@ -219,7 +228,7 @@ get_vhost_sup(VHost) -> -spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}. vhost_sup_pid(VHost) -> - case get_vhost_sup(VHost) of + case lookup_vhost_sup_record(VHost) of not_found -> no_pid; #vhost_sup{vhost_sup_pid = Pid} = VHostSup -> diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 8e23389bb9..4ae68cdd75 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -29,7 +29,12 @@ start_link(VHost) -> %% Using supervisor, because supervisor2 does not stop a started child when %% another one fails to start. Bug? - supervisor:start_link(?MODULE, [VHost]). + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of + {ok, Pid} -> + {error, {already_started, Pid}}; + {error, _} -> + supervisor:start_link(?MODULE, [VHost]) + end. init([VHost]) -> %% 2 restarts in 5 minutes. One per message store. |
