summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2017-06-19 22:21:05 +0300
committerMichael Klishin <mklishin@pivotal.io>2017-06-19 22:21:05 +0300
commit72415b1c9446db5ad38b7d70a45fec6942682ec0 (patch)
tree2d5ffab41df6d0f98529885f4e67cfe73e2dc725
parentbc5048b89ddda79fd12e68247bda58a5691f7f28 (diff)
parentd81736602e67eca0b83bcb72b8fce805b328a2e0 (diff)
downloadrabbitmq-server-git-72415b1c9446db5ad38b7d70a45fec6942682ec0.tar.gz
Merge branch 'stable'
Conflicts: src/rabbit_vm.erl src/term_to_binary_compat.erl
-rw-r--r--Makefile1
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_vm.erl125
-rw-r--r--src/term_to_binary_compat.erl13
-rw-r--r--src/vm_memory_monitor.erl13
-rw-r--r--test/term_to_binary_compat_prop_SUITE.erl59
6 files changed, 174 insertions, 39 deletions
diff --git a/Makefile b/Makefile
index f4174d6c69..61c8fa4b53 100644
--- a/Makefile
+++ b/Makefile
@@ -16,6 +16,7 @@ define PROJECT_ENV
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
{vm_memory_high_watermark_paging_ratio, 0.5},
+ {vm_memory_calculation_strategy, rss},
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 123bfaba25..ee172d48e5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -691,7 +691,7 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
DirtyCount + 2}.
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
+ <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
rabbit_misc:format("~.36B", [Num]).
queues_base_dir() ->
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index b65536c0d4..17dae558b9 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -16,7 +16,7 @@
-module(rabbit_vm).
--export([memory/0, binary/0, ets_tables_memory/1]).
+-export([memory/0, total_memory/0, binary/0, ets_tables_memory/1]).
-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
@@ -30,7 +30,6 @@
%%----------------------------------------------------------------------------
-%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
All = interesting_sups(),
{Sums, _Other} = sum_processes(
@@ -41,7 +40,7 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- Mnesia = mnesia_memory(),
+ MnesiaETS = mnesia_memory(),
MsgIndexETS = ets_memory(msg_stores()),
MetricsETS = ets_memory([rabbit_metrics]),
MetricsProc =
@@ -53,8 +52,9 @@ memory() ->
0
end,
MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
+ OsTotal = total_memory(),
- [{total, Total},
+ [{total, ErlangTotal},
{processes, Processes},
{ets, ETS},
{atom, Atom},
@@ -67,30 +67,137 @@ memory() ->
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
- [{total, Total},
+ [
+ %% Connections
{connection_readers, ConnsReader},
{connection_writers, ConnsWriter},
{connection_channels, ConnsChannel},
{connection_other, ConnsOther},
+
+ %% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+
+ %% Processes
{plugins, Plugins},
{other_proc, lists:max([0, OtherProc])}, %% [1]
- {mnesia, Mnesia},
+
+ %% Metrics
{metrics, MetricsETS + MetricsProc},
{mgmt_db, MgmtDbETS + MgmtDbProc},
- {msg_index, MsgIndexETS + MsgIndexProc},
- {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+
+ %% ETS
+ {mnesia, MnesiaETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS},
+
+ %% Messages (mostly, some binaries are not messages)
{binary, Bin},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+
+ %% System
{code, Code},
{atom, Atom},
- {other_system, System - ETS - Atom - Bin - Code}].
+ {other_system, System - ETS - Bin - Code - Atom + (OsTotal - ErlangTotal)},
+ {total, OsTotal}
+ ].
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
%% claims about negative memory. See
%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+%% Memory reported by erlang:memory(total) is not supposed to
+%% be equal to the total size of all pages mapped to the emulator,
+%% according to http://erlang.org/doc/man/erlang.html#memory-0
+%% erlang:memory(total) under-reports memory usage by around 20%
+-spec total_memory() -> Bytes :: integer().
+total_memory() ->
+ case get_memory_calculation_strategy() of
+ rss ->
+ case get_system_process_resident_memory() of
+ {ok, MemInBytes} ->
+ MemInBytes;
+ {error, Reason} ->
+ rabbit_log:debug("Unable to get system memory used. Reason: ~p."
+ " Falling back to erlang memory reporting",
+ [Reason]),
+ erlang:memory(total)
+ end;
+ erlang ->
+ erlang:memory(total)
+ end.
+
+-spec get_memory_calculation_strategy() -> rss | erlang.
+get_memory_calculation_strategy() ->
+ case application:get_env(rabbit, vm_memory_calculation_strategy, rss) of
+ erlang ->
+ erlang;
+ rss ->
+ rss;
+ UnsupportedValue ->
+ rabbit_log:warning(
+ "Unsupported value '~p' for vm_memory_calculation_strategy. "
+ "Supported values: (rss|erlang). "
+ "Defaulting to 'rss'",
+ [UnsupportedValue]
+ ),
+ rss
+ end.
+
+-spec get_system_process_resident_memory() -> {ok, Bytes :: integer()} | {error, term()}.
+get_system_process_resident_memory() ->
+ try
+ get_system_process_resident_memory(os:type())
+ catch _:Error ->
+ {error, {"Failed to get process resident memory", Error}}
+ end.
+
+get_system_process_resident_memory({unix,darwin}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix, linux}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix,freebsd}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix,openbsd}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({win32,_OSname}) ->
+ OsPid = os:getpid(),
+ Cmd = " tasklist /fi \"pid eq " ++ OsPid ++ "\" /fo LIST 2>&1 ",
+ CmdOutput = os:cmd(Cmd),
+ %% Memory usage is displayed in kilobytes
+ %% with comma-separated thousands
+ case re:run(CmdOutput, "Mem Usage:\\s+([0-9,]+)\\s+K", [{capture, all_but_first, list}]) of
+ {match, [Match]} ->
+ NoCommas = [ N || N <- Match, N =/= $, ],
+ {ok, list_to_integer(NoCommas) * 1024};
+ _ ->
+ {error, {unexpected_output_from_command, Cmd, CmdOutput}}
+ end;
+
+get_system_process_resident_memory({unix, sunos}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory({unix, aix}) ->
+ get_ps_memory();
+
+get_system_process_resident_memory(_OsType) ->
+ {error, not_implemented_for_os}.
+
+get_ps_memory() ->
+ OsPid = os:getpid(),
+ Cmd = "ps -p " ++ OsPid ++ " -o rss=",
+ CmdOutput = os:cmd(Cmd),
+ case re:run(CmdOutput, "[0-9]+", [{capture, first, list}]) of
+ {match, [Match]} ->
+ {ok, list_to_integer(Match) * 1024};
+ _ ->
+ {error, {unexpected_output_from_command, Cmd, CmdOutput}}
+ end.
+
binary() ->
All = interesting_sups(),
{Sums, Rest} =
diff --git a/src/term_to_binary_compat.erl b/src/term_to_binary_compat.erl
index 8a74bde3e0..13396ddacb 100644
--- a/src/term_to_binary_compat.erl
+++ b/src/term_to_binary_compat.erl
@@ -18,15 +18,8 @@
-include("rabbit.hrl").
--export([queue_name_to_binary/1]).
+-export([term_to_binary_1/1]).
-queue_name_to_binary(#resource{kind = queue, virtual_host = VHost, name = Name}) ->
- VHostBSize = byte_size(VHost),
- NameBSize = byte_size(Name),
- <<131, %% Binary format "version"
- 104, 4, %% 4-element tuple
- 100, 0, 8, "resource", %% `resource` atom
- 109, VHostBSize:32, VHost/binary, %% Vhost binary
- 100, 0, 5, "queue", %% `queue` atom
- 109, NameBSize:32, Name/binary>>. %% Name binary
+term_to_binary_1(Term) ->
+ term_to_binary(Term, [{minor_version, 1}]).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 71e9f36c46..bc45d04d62 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -117,14 +117,14 @@ get_memory_limit() ->
get_memory_use(bytes) ->
MemoryLimit = get_memory_limit(),
- {erlang:memory(total), case MemoryLimit > 0.0 of
- true -> MemoryLimit;
- false -> infinity
- end};
+ {rabbit_vm:total_memory(), case MemoryLimit > 0.0 of
+ true -> MemoryLimit;
+ false -> infinity
+ end};
get_memory_use(ratio) ->
MemoryLimit = get_memory_limit(),
case MemoryLimit > 0.0 of
- true -> erlang:memory(total) / MemoryLimit;
+ true -> rabbit_vm:total_memory() / MemoryLimit;
false -> infinity
end.
@@ -268,7 +268,7 @@ parse_mem_limit(_) ->
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed,
alarm_funs = {AlarmSet, AlarmClear} }) ->
- MemUsed = erlang:memory(total),
+ MemUsed = rabbit_vm:total_memory(),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} -> emit_update_info(set, MemUsed, MemLimit),
@@ -365,6 +365,7 @@ get_total_memory({unix, aix}) ->
get_total_memory(_OsType) ->
unknown.
+
%% A line looks like "Foo bar: 123456."
parse_line_mach(Line) ->
[Name, RHS | _Rest] = string:tokens(Line, ":"),
diff --git a/test/term_to_binary_compat_prop_SUITE.erl b/test/term_to_binary_compat_prop_SUITE.erl
index d09b23c9ea..6c8a92a29f 100644
--- a/test/term_to_binary_compat_prop_SUITE.erl
+++ b/test/term_to_binary_compat_prop_SUITE.erl
@@ -24,13 +24,11 @@
-include_lib("proper/include/proper.hrl").
all() ->
- %% The test should run on OTP < 20 (erts < 9)
- case erts_gt_8() of
- true ->
- [];
- false ->
- [queue_name_to_binary]
- end.
+ [
+ pre_3_6_11_works,
+ term_to_binary_latin_atom,
+ queue_name_to_binary
+ ].
erts_gt_8() ->
Vsn = erlang:system_info(version),
@@ -47,16 +45,51 @@ end_per_suite(Config) ->
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
+%% If this test fails - the erlang version is not supported in
+%% RabbitMQ-3.6.10 and earlier.
+pre_3_6_11_works(Config) ->
+ Fun = fun () -> prop_pre_3_6_11_works(Config) end,
+ rabbit_ct_proper_helpers:run_proper(Fun, [], 50000).
+
+prop_pre_3_6_11_works(_Config) ->
+ ?FORALL(Term, any(),
+ begin
+ Current = term_to_binary(Term),
+ Compat = term_to_binary_compat:term_to_binary_1(Term),
+ Current =:= Compat
+ end).
+
+term_to_binary_latin_atom(Config) ->
+ Fun = fun () -> prop_term_to_binary_latin_atom(Config) end,
+ rabbit_ct_proper_helpers:run_proper(Fun, [], 10000).
+
+prop_term_to_binary_latin_atom(_Config) ->
+ ?FORALL(LatinString, list(integer(0, 255)),
+ begin
+ Length = length(LatinString),
+ Atom = list_to_atom(LatinString),
+ Binary = list_to_binary(LatinString),
+ <<131,100, Length:16, Binary/binary>> =:= term_to_binary_compat:term_to_binary_1(Atom)
+ end).
+
queue_name_to_binary(Config) ->
Fun = fun () -> prop_queue_name_to_binary(Config) end,
rabbit_ct_proper_helpers:run_proper(Fun, [], 10000).
prop_queue_name_to_binary(_Config) ->
- ?FORALL({Vhost, QName}, {binary(), binary()},
+ ?FORALL({VHost, QName}, {binary(), binary()},
begin
- Resource = rabbit_misc:r(Vhost, queue, QName),
- Legacy = term_to_binary_compat:queue_name_to_binary(Resource),
- Current = term_to_binary(Resource),
- Current =:= Legacy
- end). \ No newline at end of file
+ VHostBSize = byte_size(VHost),
+ NameBSize = byte_size(QName),
+ Expected =
+ <<131, %% Binary format "version"
+ 104, 4, %% 4-element tuple
+ 100, 0, 8, "resource", %% `resource` atom
+ 109, VHostBSize:32, VHost/binary, %% Vhost binary
+ 100, 0, 5, "queue", %% `queue` atom
+ 109, NameBSize:32, QName/binary>>, %% Name binary
+ Resource = rabbit_misc:r(VHost, queue, QName),
+ Current = term_to_binary_compat:term_to_binary_1(Resource),
+ Current =:= Expected
+ end).