diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2017-06-14 10:18:52 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2017-06-14 10:18:52 +0200 |
| commit | 32b9c88d1eba112d721c720dd1c80c981b1e5972 (patch) | |
| tree | ef3b9902e6b4dd0a0e617c3a6675add178c85439 /src | |
| parent | b077d47e7596973b9b8ec01d48233f42a0eb097e (diff) | |
| parent | 41a396e26e3d5fea88ee3e9e375a2ef791bf1b21 (diff) | |
| download | rabbitmq-server-git-32b9c88d1eba112d721c720dd1c80c981b1e5972.tar.gz | |
Merge branch 'master' into rabbitmq-server-1229
Diffstat (limited to 'src')
| -rw-r--r-- | src/background_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_config.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_classic_config.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_dns.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/term_to_binary_compat.erl | 32 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 21 |
11 files changed, 231 insertions, 70 deletions
diff --git a/src/background_gc.erl b/src/background_gc.erl index 2ae9d93e4e..bbac3138cf 100644 --- a/src/background_gc.erl +++ b/src/background_gc.erl @@ -74,7 +74,7 @@ interval_gc(State = #state{last_interval = LastInterval}) -> State#state{last_interval = Interval}. gc() -> - Enabled = rabbit_misc:get_env(rabbit, background_gc_enabled, true), + Enabled = rabbit_misc:get_env(rabbit, background_gc_enabled, false), case Enabled of true -> [garbage_collect(P) || P <- processes(), diff --git a/src/rabbit.erl b/src/rabbit.erl index 5ae58f57d1..138d03f051 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -19,11 +19,12 @@ -behaviour(application). -export([start/0, boot/0, stop/0, - stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0, + stop_and_halt/0, await_startup/0, await_startup/1, + status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1, prep_stop/1]). --export([start_apps/1, stop_apps/1]). +-export([start_apps/1, start_apps/2, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent -ifdef(TEST). @@ -266,6 +267,8 @@ -spec boot_delegate() -> 'ok'. -spec recover() -> 'ok'. -spec start_apps([app_name()]) -> 'ok'. +-spec start_apps([app_name()], + #{app_name() => permanent|transient|temporary}) -> 'ok'. -spec stop_apps([app_name()]) -> 'ok'. %%---------------------------------------------------------------------------- @@ -327,11 +330,7 @@ broker_start() -> ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), maybe_sd_notify(), - ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())), - %% See rabbitmq/rabbitmq-server#1202 for details. - rabbit_peer_discovery:maybe_inject_randomized_delay(), - rabbit_peer_discovery:maybe_register(), - ok. + ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())). %% Try to send systemd ready notification if it makes sense in the %% current environment. standard_error is used intentionally in all @@ -471,7 +470,7 @@ stop() -> undefined -> ok; _ -> rabbit_log:info("RabbitMQ hasn't finished starting yet. Waiting for startup to finish before stopping..."), - wait_for_boot_to_finish() + ok = wait_for_boot_to_finish(node()) end, rabbit_log:info("RabbitMQ is asked to stop...~n", []), Apps = ?APPS ++ rabbit_plugins:active(), @@ -506,6 +505,9 @@ stop_and_halt() -> ok. start_apps(Apps) -> + start_apps(Apps, #{}). + +start_apps(Apps, AppModes) -> app_utils:load_applications(Apps), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of @@ -545,7 +547,8 @@ start_apps(Apps) -> true -> ok %% will run during start of rabbit app end, ok = app_utils:start_applications(OrderedApps, - handle_app_error(could_not_start)). + handle_app_error(could_not_start), + AppModes). %% This function retrieves the correct IoDevice for requesting %% input. The problem with using the default IoDevice is that @@ -621,7 +624,7 @@ decrypt_list([Value|Tail], Algo, Acc) -> stop_apps(Apps) -> rabbit_log:info( - lists:flatten(["Stopping RabbitMQ applications and their dependencies in the following order: ~n", + lists:flatten(["Stopping RabbitMQ applications and their dependencies in the following order:~n", [" ~p~n" || _ <- Apps]]), lists:reverse(Apps)), ok = app_utils:stop_applications( @@ -641,32 +644,52 @@ handle_app_error(Term) -> end. await_startup() -> - case is_booting() of - true -> wait_for_boot_to_finish(); + await_startup(node()). + +await_startup(Node) -> + case is_booting(Node) of + true -> wait_for_boot_to_finish(Node); false -> - case is_running() of + case is_running(Node) of true -> ok; - false -> wait_for_boot_to_start(), - wait_for_boot_to_finish() + false -> wait_for_boot_to_start(Node), + wait_for_boot_to_finish(Node) end end. -is_booting() -> - whereis(rabbit_boot) /= undefined. +is_booting(Node) -> + case rpc:call(Node, erlang, whereis, [rabbit_boot]) of + {badrpc, _} = Err -> Err; + undefined -> false; + P when is_pid(P) -> true + end. -wait_for_boot_to_start() -> - case whereis(rabbit_boot) of - undefined -> timer:sleep(100), - wait_for_boot_to_start(); - _ -> ok +wait_for_boot_to_start(Node) -> + case is_booting(Node) of + false -> + timer:sleep(100), + wait_for_boot_to_start(Node); + {badrpc, _} = Err -> + Err; + true -> + ok end. -wait_for_boot_to_finish() -> - case whereis(rabbit_boot) of - undefined -> true = is_running(), - ok; - _ -> timer:sleep(100), - wait_for_boot_to_finish() +wait_for_boot_to_finish(Node) -> + case is_booting(Node) of + false -> + %% We don't want badrpc error to be interpreted as false, + %% so we don't call rabbit:is_running(Node) + case rpc:call(Node, rabbit, is_running, []) of + true -> ok; + false -> {error, rabbit_is_not_running}; + {badrpc, _} = Err -> Err + end; + {badrpc, _} = Err -> + Err; + true -> + timer:sleep(100), + wait_for_boot_to_finish(Node) end. status() -> diff --git a/src/rabbit_config.erl b/src/rabbit_config.erl index 9e70898c82..a3f7d19136 100644 --- a/src/rabbit_config.erl +++ b/src/rabbit_config.erl @@ -99,7 +99,6 @@ generate_config_file(ConfFiles, ConfDir, ScriptDir) -> generate_config_file(ConfFiles, ConfDir, ScriptDir, SchemaDir, Advanced) -> prepare_plugin_schemas(SchemaDir), - % SchemaFile = filename:join([ScriptDir, "rabbitmq.schema"]), Cuttlefish = filename:join([ScriptDir, "cuttlefish"]), GeneratedDir = filename:join([ConfDir, "generated"]), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e49ea6dfb6..a0363b69d0 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -46,6 +46,11 @@ %% Used internally in rpc calls -export([node_info/0, remove_node_if_mnesia_running/1]). +-ifdef(TEST). +-compile(export_all). +-export([init_with_lock/3]). +-endif. + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -101,12 +106,13 @@ init() -> rabbit_log:info("Node database directory at ~s is empty. " "Assuming we need to join an existing cluster or initialise from scratch...~n", [dir()]), - rabbit_peer_discovery:log_configured_backend(), - init_from_config(); + rabbit_peer_discovery:log_configured_backend(), + init_with_lock(); false -> NodeType = node_type(), init_db_and_upgrade(cluster_nodes(all), NodeType, - NodeType =:= ram, _Retry = true) + NodeType =:= ram, _Retry = true), + rabbit_peer_discovery:maybe_register() end, %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - @@ -114,13 +120,43 @@ init() -> ok = rabbit_node_monitor:global_sync(), ok. +init_with_lock() -> + {Retries, Timeout} = rabbit_peer_discovery:retry_timeout(), + init_with_lock(Retries, Timeout, fun init_from_config/0). + +init_with_lock(0, _, InitFromConfig) -> + case rabbit_peer_discovery:lock_acquisition_failure_mode() of + ignore -> + rabbit_log:warning("Cannot acquire a lock during clustering", []), + InitFromConfig(), + rabbit_peer_discovery:maybe_register(); + fail -> + exit(cannot_acquire_startup_lock) + end; +init_with_lock(Retries, Timeout, InitFromConfig) -> + case rabbit_peer_discovery:lock() of + not_supported -> + %% See rabbitmq/rabbitmq-server#1202 for details. + rabbit_peer_discovery:maybe_inject_randomized_delay(), + InitFromConfig(), + rabbit_peer_discovery:maybe_register(); + {error, _Reason} -> + timer:sleep(Timeout), + init_with_lock(Retries - 1, Timeout, InitFromConfig); + {ok, Data} -> + try + InitFromConfig(), + rabbit_peer_discovery:maybe_register() + after + rabbit_peer_discovery:unlock(Data) + end + end. + init_from_config() -> FindBadNodeNames = fun (Name, BadNames) when is_atom(Name) -> BadNames; (Name, BadNames) -> [Name | BadNames] end, - %% See rabbitmq/rabbitmq-server#1202 for details. - rabbit_peer_discovery:maybe_inject_randomized_delay(), {DiscoveredNodes, NodeType} = case rabbit_peer_discovery:discover_cluster_nodes() of {ok, {Nodes, Type} = Config} diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0eadf0ff59..810df2d1fc 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -344,8 +344,8 @@ init([]) -> Nodes = possibly_partitioned_nodes(), startup_log(Nodes), Monitors = lists:foldl(fun(Node, Monitors0) -> - pmon:monitor({rabbit, Node}, Monitors0) - end, pmon:new(), Nodes), + pmon:monitor({rabbit, Node}, Monitors0) + end, pmon:new(), Nodes), {ok, ensure_keepalive_timer(#state{monitors = Monitors, subscribers = pmon:new(), partitions = [], @@ -420,12 +420,12 @@ handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, fun () -> case rpc:call(Node, rabbit, is_running, []) of {badrpc, _} -> ok; - _ -> - rabbit_log:warning("Received a 'DOWN' message" - " from ~p but still can" - " communicate with it ~n", - [Node]), - cast(Rep, {partial_partition, + _ -> + rabbit_log:warning("Received a 'DOWN' message" + " from ~p but still can" + " communicate with it ~n", + [Node]), + cast(Rep, {partial_partition, Node, node(), RepGUID}) end end); @@ -499,18 +499,18 @@ handle_cast({node_up, Node, NodeType}, rabbit_log:info("rabbit on node ~p up~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({add_node(Node, AllNodes), - case NodeType of - disc -> add_node(Node, DiscNodes); - ram -> DiscNodes - end, - add_node(Node, RunningNodes)}), + case NodeType of + disc -> add_node(Node, DiscNodes); + ram -> DiscNodes + end, + add_node(Node, RunningNodes)}), ok = handle_live_rabbit(Node), Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of - true -> - Monitors; - false -> - pmon:monitor({rabbit, Node}, Monitors) - end, + true -> + Monitors; + false -> + pmon:monitor({rabbit, Node}, Monitors) + end, {noreply, maybe_autoheal(State#state{monitors = Monitors1})}; handle_cast({joined_cluster, Node, NodeType}, State) -> @@ -584,7 +584,7 @@ handle_info({mnesia_system_event, State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of true -> State; false -> State#state{ - monitors = pmon:monitor({rabbit, Node}, Monitors)} + monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), Partitions1 = lists:usort([Node | Partitions]), @@ -893,4 +893,4 @@ startup_log([]) -> rabbit_log:info("Starting rabbit_node_monitor~n", []); startup_log(Nodes) -> rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n", - [Nodes]). + [Nodes]). diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl index 628b9f101f..1abf2b7cc7 100644 --- a/src/rabbit_peer_discovery.erl +++ b/src/rabbit_peer_discovery.erl @@ -23,8 +23,9 @@ -export([discover_cluster_nodes/0, backend/0, node_type/0, normalize/1, format_discovered_nodes/1, log_configured_backend/0, register/0, unregister/0, maybe_register/0, maybe_unregister/0, - maybe_inject_randomized_delay/0]). --export([append_node_prefix/1, node_prefix/0]). + maybe_inject_randomized_delay/0, lock/0, unlock/1]). +-export([append_node_prefix/1, node_prefix/0, retry_timeout/0, + lock_acquisition_failure_mode/0]). -define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config). %% what node type is used by default for this node when joining @@ -60,7 +61,27 @@ node_type() -> ?DEFAULT_NODE_TYPE end. +-spec retry_timeout() -> {Retries :: integer(), Timeout :: integer()}. +retry_timeout() -> + case application:get_env(rabbit, cluster_formation) of + {ok, Proplist} -> + Retries = proplists:get_value(lock_retry_limit, Proplist, 10), + Timeout = proplists:get_value(lock_retry_timeout, Proplist, 30000), + {Retries, Timeout}; + undefined -> + {10, 30000} + end. + +-spec lock_acquisition_failure_mode() -> ignore | fail. + +lock_acquisition_failure_mode() -> + case application:get_env(rabbit, cluster_formation) of + {ok, Proplist} -> + proplists:get_value(lock_acquisition_failure_mode, Proplist, fail); + undefined -> + fail + end. -spec log_configured_backend() -> ok. @@ -139,7 +160,7 @@ inject_randomized_delay() -> true -> Min; false -> RandomVal end, - rabbit_log:info("Will wait for ~p milliseconds before proceeding with regitration...", [Effective]), + rabbit_log:info("Will wait for ~p milliseconds before proceeding with registration...", [Effective]), timer:sleep(Effective), ok end. @@ -183,6 +204,34 @@ unregister() -> ok end. +-spec lock() -> ok | {ok, Data :: term()} | not_supported | {error, Reason :: string()}. + +lock() -> + Backend = backend(), + rabbit_log:info("Will try to lock with peer discovery backend ~s", [Backend]), + case Backend:lock(node()) of + {error, Reason} = Error -> + rabbit_log:error("Failed to lock with peer discovery backend ~s: ~p", + [Backend, Reason]), + Error; + Any -> + Any + end. + +-spec unlock(Data :: term()) -> ok | {error, Reason :: string()}. + +unlock(Data) -> + Backend = backend(), + rabbit_log:info("Will try to unlock with peer discovery backend ~s", [Backend]), + case Backend:unlock(Data) of + {error, Reason} = Error -> + rabbit_log:error("Failed to unlock with peer discovery backend ~s: ~p, " + "lock data: ~p", + [Backend, Reason, Data]), + Error; + Any -> + Any + end. %% %% Implementation diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl index da5fb6c971..96d47d8182 100644 --- a/src/rabbit_peer_discovery_classic_config.erl +++ b/src/rabbit_peer_discovery_classic_config.erl @@ -20,7 +20,7 @@ -include("rabbit.hrl"). -export([list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0]). + post_registration/0, lock/1, unlock/1]). %% %% API @@ -54,3 +54,13 @@ unregister() -> post_registration() -> ok. + +-spec lock(Node :: atom()) -> not_supported. + +lock(_Node) -> + not_supported. + +-spec unlock(Data :: term()) -> ok. + +unlock(_Data) -> + ok. diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl index 5b1ab4405e..7422876c4d 100644 --- a/src/rabbit_peer_discovery_dns.erl +++ b/src/rabbit_peer_discovery_dns.erl @@ -20,7 +20,7 @@ -include("rabbit.hrl"). -export([list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0]). + post_registration/0, lock/1, unlock/1]). %% for tests -export([discover_nodes/2, discover_hostnames/2]). @@ -71,6 +71,15 @@ unregister() -> post_registration() -> ok. +-spec lock(Node :: atom()) -> not_supported. + +lock(_Node) -> + not_supported. + +-spec unlock(Data :: term()) -> ok. + +unlock(_Data) -> + ok. %% %% Implementation diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a71eaf1ff4..123bfaba25 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -691,7 +691,7 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> DirtyCount + 2}. queue_name_to_dir_name(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary(Name)), + <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)), rabbit_misc:format("~.36B", [Num]). queues_base_dir() -> diff --git a/src/term_to_binary_compat.erl b/src/term_to_binary_compat.erl new file mode 100644 index 0000000000..8a74bde3e0 --- /dev/null +++ b/src/term_to_binary_compat.erl @@ -0,0 +1,32 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(term_to_binary_compat). + +-include("rabbit.hrl"). + +-export([queue_name_to_binary/1]). + +queue_name_to_binary(#resource{kind = queue, virtual_host = VHost, name = Name}) -> + VHostBSize = byte_size(VHost), + NameBSize = byte_size(Name), + <<131, %% Binary format "version" + 104, 4, %% 4-element tuple + 100, 0, 8, "resource", %% `resource` atom + 109, VHostBSize:32, VHost/binary, %% Vhost binary + 100, 0, 5, "queue", %% `queue` atom + 109, NameBSize:32, Name/binary>>. %% Name binary + diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index ba324d649e..71e9f36c46 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -43,7 +43,7 @@ -define(SERVER, ?MODULE). -define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). --define(ONE_MB, 1048576). +-define(ONE_MiB, 1048576). %% For an unknown OS, we assume that we have 1GB of memory. It'll be %% wrong. Scale by vm_memory_high_watermark in configuration to get a @@ -214,9 +214,10 @@ set_mem_limits(State, MemLimit) -> memory_limit = undefined } -> error_logger:warning_msg( "Unknown total memory size for your OS ~p. " - "Assuming memory size is ~pMB.~n", + "Assuming memory size is ~p MiB (~p bytes).~n", [os:type(), - trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/?ONE_MB)]); + trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/?ONE_MiB), + ?MEMORY_SIZE_FOR_UNKNOWN_OS]); _ -> ok end, @@ -227,18 +228,20 @@ set_mem_limits(State, MemLimit) -> case get_vm_limit() of Limit when Limit < TotalMemory -> error_logger:warning_msg( - "Only ~pMB of ~pMB memory usable due to " + "Only ~p MiB (~p bytes) of ~p MiB (~p bytes) memory usable due to " "limited address space.~n" "Crashes due to memory exhaustion are possible - see~n" "http://www.rabbitmq.com/memory.html#address-space~n", - [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]), + [trunc(Limit/?ONE_MiB), Limit, trunc(TotalMemory/?ONE_MiB), + TotalMemory]), Limit; _ -> TotalMemory end, MemLim = interpret_limit(parse_mem_limit(MemLimit), UsableMemory), - error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", - [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), + error_logger:info_msg("Memory limit set to ~p MiB (~p bytes) of ~p MiB (~p bytes) total.~n", + [trunc(MemLim/?ONE_MiB), MemLim, trunc(TotalMemory/?ONE_MiB), + TotalMemory]), internal_update(State #state { total_memory = TotalMemory, memory_limit = MemLim, memory_config_limit = MemLimit}). @@ -402,9 +405,9 @@ parse_line_sunos(Line) -> [Value1 | UnitsRest] = string:tokens(RHS, " "), Value2 = case UnitsRest of ["Gigabytes"] -> - list_to_integer(Value1) * ?ONE_MB * 1024; + list_to_integer(Value1) * ?ONE_MiB * 1024; ["Megabytes"] -> - list_to_integer(Value1) * ?ONE_MB; + list_to_integer(Value1) * ?ONE_MiB; ["Kilobytes"] -> list_to_integer(Value1) * 1024; _ -> |
