summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_node_monitor.erl40
-rw-r--r--src/rabbit_vm.erl125
-rw-r--r--src/term_to_binary_compat.erl13
-rw-r--r--src/vm_memory_monitor.erl13
5 files changed, 147 insertions, 46 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 26a69ddeea..138d03f051 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -624,7 +624,7 @@ decrypt_list([Value|Tail], Algo, Acc) ->
stop_apps(Apps) ->
rabbit_log:info(
- lists:flatten(["Stopping RabbitMQ applications and their dependencies in the following order: ~n",
+ lists:flatten(["Stopping RabbitMQ applications and their dependencies in the following order:~n",
[" ~p~n" || _ <- Apps]]),
lists:reverse(Apps)),
ok = app_utils:stop_applications(
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0eadf0ff59..810df2d1fc 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -344,8 +344,8 @@ init([]) ->
Nodes = possibly_partitioned_nodes(),
startup_log(Nodes),
Monitors = lists:foldl(fun(Node, Monitors0) ->
- pmon:monitor({rabbit, Node}, Monitors0)
- end, pmon:new(), Nodes),
+ pmon:monitor({rabbit, Node}, Monitors0)
+ end, pmon:new(), Nodes),
{ok, ensure_keepalive_timer(#state{monitors = Monitors,
subscribers = pmon:new(),
partitions = [],
@@ -420,12 +420,12 @@ handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
fun () ->
case rpc:call(Node, rabbit, is_running, []) of
{badrpc, _} -> ok;
- _ ->
- rabbit_log:warning("Received a 'DOWN' message"
- " from ~p but still can"
- " communicate with it ~n",
- [Node]),
- cast(Rep, {partial_partition,
+ _ ->
+ rabbit_log:warning("Received a 'DOWN' message"
+ " from ~p but still can"
+ " communicate with it ~n",
+ [Node]),
+ cast(Rep, {partial_partition,
Node, node(), RepGUID})
end
end);
@@ -499,18 +499,18 @@ handle_cast({node_up, Node, NodeType},
rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
- case NodeType of
- disc -> add_node(Node, DiscNodes);
- ram -> DiscNodes
- end,
- add_node(Node, RunningNodes)}),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
+ end,
+ add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
- true ->
- Monitors;
- false ->
- pmon:monitor({rabbit, Node}, Monitors)
- end,
+ true ->
+ Monitors;
+ false ->
+ pmon:monitor({rabbit, Node}, Monitors)
+ end,
{noreply, maybe_autoheal(State#state{monitors = Monitors1})};
handle_cast({joined_cluster, Node, NodeType}, State) ->
@@ -584,7 +584,7 @@ handle_info({mnesia_system_event,
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
true -> State;
false -> State#state{
- monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
Partitions1 = lists:usort([Node | Partitions]),
@@ -893,4 +893,4 @@ startup_log([]) ->
rabbit_log:info("Starting rabbit_node_monitor~n", []);
startup_log(Nodes) ->
rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
- [Nodes]).
+ [Nodes]).
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index b65536c0d4..17dae558b9 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -16,7 +16,7 @@
-module(rabbit_vm).
--export([memory/0, binary/0, ets_tables_memory/1]).
+-export([memory/0, total_memory/0, binary/0, ets_tables_memory/1]).
-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
@@ -30,7 +30,6 @@
%%----------------------------------------------------------------------------
-%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
All = interesting_sups(),
{Sums, _Other} = sum_processes(
@@ -41,7 +40,7 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- Mnesia = mnesia_memory(),
+ MnesiaETS = mnesia_memory(),
MsgIndexETS = ets_memory(msg_stores()),
MetricsETS = ets_memory([rabbit_metrics]),
MetricsProc =
@@ -53,8 +52,9 @@ memory() ->
0
end,
MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
+ OsTotal = total_memory(),
- [{total, Total},
+ [{total, ErlangTotal},
{processes, Processes},
{ets, ETS},
{atom, Atom},
@@ -67,30 +67,137 @@ memory() ->
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
- [{total, Total},
+ [
+ %% Connections
{connection_readers, ConnsReader},
{connection_writers, ConnsWriter},
{connection_channels, ConnsChannel},
{connection_other, ConnsOther},
+
+ %% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+
+ %% Processes
{plugins, Plugins},
{other_proc, lists:max([0, OtherProc])}, %% [1]
- {mnesia, Mnesia},
+
+ %% Metrics
{metrics, MetricsETS + MetricsProc},
{mgmt_db, MgmtDbETS + MgmtDbProc},
- {msg_index, MsgIndexETS + MsgIndexProc},
- {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+
+ %% ETS
+ {mnesia, MnesiaETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS},
+
+ %% Messages (mostly, some binaries are not messages)
{binary, Bin},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+
+ %% System
{code, Code},
{atom, Atom},
- {other_system, System - ETS - Atom - Bin - Code}].
+ {other_system, System - ETS - Bin - Code - Atom + (OsTotal - ErlangTotal)},
+ {total, OsTotal}
+ ].
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
%% claims about negative memory. See
%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+%% Memory reported by erlang:memory(total) is not supposed to
+%% be equal to the total size of all pages mapped to the emulator,
+%% according to http://erlang.org/doc/man/erlang.html#memory-0
+%% erlang:memory(total) under-reports memory usage by around 20%
+-spec total_memory() -> Bytes :: integer().
+total_memory() ->
+ case get_memory_calculation_strategy() of
+ rss ->
+ case get_system_process_resident_memory() of
+ {ok, MemInBytes} ->
+ MemInBytes;
+ {error, Reason} ->
+ rabbit_log:debug("Unable to get system memory used. Reason: ~p."
+ " Falling back to erlang memory reporting",
+ [Reason]),
+ erlang:memory(total)
+ end;
+ erlang ->
+ erlang:memory(total)
+ end.
+
+-spec get_memory_calculation_strategy() -> rss | erlang.
+get_memory_calculation_strategy() ->
+ case application:get_env(rabbit, vm_memory_calculation_strategy, rss) of
+ erlang ->
+ erlang;
+ rss ->
+ rss;
+ UnsupportedValue ->
+ rabbit_log:warning(
+ "Unsupported value '~p' for vm_memory_calculation_strategy. "
+ "Supported values: (rss|erlang). "
+ "Defaulting to 'rss'",
+ [UnsupportedValue]
+ ),
+ rss
+ end.
+
+-spec get_system_process_resident_memory() -> {ok, Bytes :: integer()} | {error, term()}.
+get_system_process_resident_memory() ->
+ try
+ get_system_process_resident_memory(os:type())
+ catch _:Error ->
+ {error, {"Failed to get process resident memory", Error}}
+ end.
+
+get_system_process_resident_memory({unix,darwin}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix, linux}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix,freebsd}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix,openbsd}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({win32,_OSname}) ->
+ OsPid = os:getpid(),
+ Cmd = " tasklist /fi \"pid eq " ++ OsPid ++ "\" /fo LIST 2>&1 ",
+ CmdOutput = os:cmd(Cmd),
+ %% Memory usage is displayed in kilobytes
+ %% with comma-separated thousands
+ case re:run(CmdOutput, "Mem Usage:\\s+([0-9,]+)\\s+K", [{capture, all_but_first, list}]) of
+ {match, [Match]} ->
+ NoCommas = [ N || N <- Match, N =/= $, ],
+ {ok, list_to_integer(NoCommas) * 1024};
+ _ ->
+ {error, {unexpected_output_from_command, Cmd, CmdOutput}}
+ end;
+
+get_system_process_resident_memory({unix, sunos}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix, aix}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory(_OsType) ->
+ {error, not_implemented_for_os}.
+
+get_ps_memory() ->
+ OsPid = os:getpid(),
+ Cmd = "ps -p " ++ OsPid ++ " -o rss=",
+ CmdOutput = os:cmd(Cmd),
+ case re:run(CmdOutput, "[0-9]+", [{capture, first, list}]) of
+ {match, [Match]} ->
+ {ok, list_to_integer(Match) * 1024};
+ _ ->
+ {error, {unexpected_output_from_command, Cmd, CmdOutput}}
+ end.
+
binary() ->
All = interesting_sups(),
{Sums, Rest} =
diff --git a/src/term_to_binary_compat.erl b/src/term_to_binary_compat.erl
index a3e1045623..13396ddacb 100644
--- a/src/term_to_binary_compat.erl
+++ b/src/term_to_binary_compat.erl
@@ -18,15 +18,8 @@
-include("rabbit.hrl").
--export([queue_name_to_binary/1]).
+-export([term_to_binary_1/1]).
-queue_name_to_binary(#resource{kind = queue} = {resource, VHost, queue, Name}) ->
- VHostBSize = byte_size(VHost),
- NameBSize = byte_size(Name),
- <<131, %% Binary format "version"
- 104, 4, %% 4-element tuple
- 100, 0, 8, "resource", %% `resource` atom
- 109, VHostBSize:32, VHost/binary, %% Vhost binary
- 100, 0, 5, "queue", %% `queue` atom
- 109, NameBSize:32, Name/binary>>. %% Name binary
+term_to_binary_1(Term) ->
+ term_to_binary(Term, [{minor_version, 1}]).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 71e9f36c46..bc45d04d62 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -117,14 +117,14 @@ get_memory_limit() ->
get_memory_use(bytes) ->
MemoryLimit = get_memory_limit(),
- {erlang:memory(total), case MemoryLimit > 0.0 of
- true -> MemoryLimit;
- false -> infinity
- end};
+ {rabbit_vm:total_memory(), case MemoryLimit > 0.0 of
+ true -> MemoryLimit;
+ false -> infinity
+ end};
get_memory_use(ratio) ->
MemoryLimit = get_memory_limit(),
case MemoryLimit > 0.0 of
- true -> erlang:memory(total) / MemoryLimit;
+ true -> rabbit_vm:total_memory() / MemoryLimit;
false -> infinity
end.
@@ -268,7 +268,7 @@ parse_mem_limit(_) ->
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed,
alarm_funs = {AlarmSet, AlarmClear} }) ->
- MemUsed = erlang:memory(total),
+ MemUsed = rabbit_vm:total_memory(),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} -> emit_update_info(set, MemUsed, MemLimit),
@@ -365,6 +365,7 @@ get_total_memory({unix, aix}) ->
get_total_memory(_OsType) ->
unknown.
+
%% A line looks like "Foo bar: 123456."
parse_line_mach(Line) ->
[Name, RHS | _Rest] = string:tokens(Line, ":"),