summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-12-01 14:03:08 +0300
committerGitHub <noreply@github.com>2016-12-01 14:03:08 +0300
commitad04a73e8ba3157e8b737f1521b1c4946c1d30e6 (patch)
tree7687add0c1fbc11c773f5bffe3a350f874db6eba /src
parentc2160877085bf4755cac3a2338695597c136523f (diff)
parent0a5d5689af7ca86f32477b87f7b01538c3d1a24d (diff)
downloadrabbitmq-server-git-ad04a73e8ba3157e8b737f1521b1c4946c1d30e6.tar.gz
Merge pull request #1034 from rabbitmq/rabbitmq-management-236
Make statistics collection distributed
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl269
-rw-r--r--src/delegate_sup.erl55
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_diagnostics.erl1
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_log.erl25
-rw-r--r--src/rabbit_metrics.erl53
-rw-r--r--src/rabbit_upgrade_functions.erl10
-rw-r--r--src/rabbit_vm.erl20
10 files changed, 117 insertions, 355 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
deleted file mode 100644
index 778137c1c7..0000000000
--- a/src/delegate.erl
+++ /dev/null
@@ -1,269 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(delegate).
-
-%% delegate is an alternative way of doing remote calls. Compared to
-%% the rpc module, it reduces inter-node communication. For example,
-%% if a message is routed to 1,000 queues on node A and needs to be
-%% propagated to nodes B and C, it would be nice to avoid doing 2,000
-%% remote casts to queue processes.
-%%
-%% An important issue here is preserving order - we need to make sure
-%% that messages from a certain channel to a certain queue take a
-%% consistent route, to prevent them being reordered. In fact all
-%% AMQP-ish things (such as queue declaration results and basic.get)
-%% must take the same route as well, to ensure that clients see causal
-%% ordering correctly. Therefore we have a rather generic mechanism
-%% here rather than just a message-reflector. That's also why we pick
-%% the delegate process to use based on a hash of the source pid.
-%%
-%% When a function is invoked using delegate:invoke/2, delegate:call/2
-%% or delegate:cast/2 on a group of pids, the pids are first split
-%% into local and remote ones. Remote processes are then grouped by
-%% node. The function is then invoked locally and on every node (using
-%% gen_server2:multi/4) as many times as there are processes on that
-%% node, sequentially.
-%%
-%% Errors returned when executing functions on remote nodes are re-raised
-%% in the caller.
-%%
-%% RabbitMQ starts a pool of delegate processes on boot. The size of
-%% the pool is configurable, the aim is to make sure we don't have too
-%% few delegates and thus limit performance on many-CPU machines.
-
--behaviour(gen_server2).
-
--export([start_link/1, invoke_no_result/2, invoke/2,
- monitor/2, demonitor/1, call/2, cast/2]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(state, {node, monitors, name}).
-
-%%----------------------------------------------------------------------------
-
--export_type([monitor_ref/0]).
-
--type monitor_ref() :: reference() | {atom(), pid()}.
--type fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}.
-
--spec start_link
- (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}.
--spec invoke
- ( pid(), fun_or_mfa(A)) -> A;
- ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], [{pid(), term()}]}.
--spec invoke_no_result(pid() | [pid()], fun_or_mfa(any())) -> 'ok'.
--spec monitor('process', pid()) -> monitor_ref().
--spec demonitor(monitor_ref()) -> 'true'.
-
--spec call
- ( pid(), any()) -> any();
- ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}.
--spec cast(pid() | [pid()], any()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
-%%----------------------------------------------------------------------------
-
-start_link(Num) ->
- Name = delegate_name(Num),
- gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
-
-invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
- apply1(FunOrMFA, Pid);
-invoke(Pid, FunOrMFA) when is_pid(Pid) ->
- case invoke([Pid], FunOrMFA) of
- {[{Pid, Result}], []} ->
- Result;
- {[], [{Pid, {Class, Reason, StackTrace}}]} ->
- erlang:raise(Class, Reason, StackTrace)
- end;
-
-invoke([], _FunOrMFA) -> %% optimisation
- {[], []};
-invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, FunOrMFA) of
- {ok, _, Result} -> {[{Pid, Result}], []};
- {error, _, Error} -> {[], [{Pid, Error}]}
- end;
-invoke(Pids, FunOrMFA) when is_list(Pids) ->
- {LocalPids, Grouped} = group_pids_by_node(Pids),
- %% The use of multi_call is only safe because the timeout is
- %% infinity, and thus there is no process spawned in order to do
- %% the sending. Thus calls can't overtake preceding calls/casts.
- {Replies, BadNodes} =
- case orddict:fetch_keys(Grouped) of
- [] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, FunOrMFA, Grouped}, infinity)
- end,
- BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
- BadNode <- BadNodes,
- Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
- [Results || {_Node, Results} <- Replies]]),
- lists:foldl(
- fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
- ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
- end, {[], BadPids}, ResultsNoNode).
-
-invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
- _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error
- ok;
-invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
- invoke_no_result([Pid], FunOrMFA);
-
-invoke_no_result([], _FunOrMFA) -> %% optimisation
- ok;
-invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
- _ = safe_invoke(Pid, FunOrMFA), %% must not die
- ok;
-invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
- {LocalPids, Grouped} = group_pids_by_node(Pids),
- case orddict:fetch_keys(Grouped) of
- [] -> ok;
- RemoteNodes -> gen_server2:abcast(
- RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, FunOrMFA, Grouped})
- end,
- _ = safe_invoke(LocalPids, FunOrMFA), %% must not die
- ok.
-
-monitor(process, Pid) when node(Pid) =:= node() ->
- erlang:monitor(process, Pid);
-monitor(process, Pid) ->
- Name = delegate(Pid, [node(Pid)]),
- gen_server2:cast(Name, {monitor, self(), Pid}),
- {Name, Pid}.
-
-demonitor(Ref) when is_reference(Ref) ->
- erlang:demonitor(Ref);
-demonitor({Name, Pid}) ->
- gen_server2:cast(Name, {demonitor, self(), Pid}).
-
-call(PidOrPids, Msg) ->
- invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
-
-cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
-
-%%----------------------------------------------------------------------------
-
-group_pids_by_node(Pids) ->
- LocalNode = node(),
- lists:foldl(
- fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
- {[Pid | Local], Remote};
- (Pid, {Local, Remote}) ->
- {Local,
- orddict:update(
- node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
- end, {[], orddict:new()}, Pids).
-
-delegate_name(Hash) ->
- list_to_atom("delegate_" ++ integer_to_list(Hash)).
-
-delegate(Pid, RemoteNodes) ->
- case get(delegate) of
- undefined -> Name = delegate_name(
- erlang:phash2(Pid,
- delegate_sup:count(RemoteNodes))),
- put(delegate, Name),
- Name;
- Name -> Name
- end.
-
-safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
- [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
-safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
- try
- {ok, Pid, apply1(FunOrMFA, Pid)}
- catch Class:Reason ->
- {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
- end.
-
-apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
-apply1(Fun, Arg) -> Fun(Arg).
-
-%%----------------------------------------------------------------------------
-
-init([Name]) ->
- {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
- hibernate}.
-
-handle_cast({monitor, MonitoringPid, Pid},
- State = #state{monitors = Monitors}) ->
- Monitors1 = case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Pids1 = gb_sets:add_element(MonitoringPid, Pids),
- dict:store(Pid, {Ref, Pids1}, Monitors);
- error ->
- Ref = erlang:monitor(process, Pid),
- Pids = gb_sets:singleton(MonitoringPid),
- dict:store(Pid, {Ref, Pids}, Monitors)
- end,
- {noreply, State#state{monitors = Monitors1}, hibernate};
-
-handle_cast({demonitor, MonitoringPid, Pid},
- State = #state{monitors = Monitors}) ->
- Monitors1 = case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Pids1 = gb_sets:del_element(MonitoringPid, Pids),
- case gb_sets:is_empty(Pids1) of
- true -> erlang:demonitor(Ref),
- dict:erase(Pid, Monitors);
- false -> dict:store(Pid, {Ref, Pids1}, Monitors)
- end;
- error ->
- Monitors
- end,
- {noreply, State#state{monitors = Monitors1}, hibernate};
-
-handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
- _ = safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
- {noreply, State, hibernate}.
-
-handle_info({'DOWN', Ref, process, Pid, Info},
- State = #state{monitors = Monitors, name = Name}) ->
- {noreply,
- case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Msg = {'DOWN', {Name, Pid}, process, Pid, Info},
- gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end,
- none, Pids),
- State#state{monitors = dict:erase(Pid, Monitors)};
- error ->
- State
- end, hibernate};
-
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
deleted file mode 100644
index ba0964f9dd..0000000000
--- a/src/delegate_sup.erl
+++ /dev/null
@@ -1,55 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(delegate_sup).
-
--behaviour(supervisor).
-
--export([start_link/1, count/1]).
-
--export([init/1]).
-
--define(SERVER, ?MODULE).
-
-%%----------------------------------------------------------------------------
-
--spec start_link(integer()) -> rabbit_types:ok_pid_or_error().
--spec count([node()]) -> integer().
-
-%%----------------------------------------------------------------------------
-
-start_link(Count) ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
-
-count([]) ->
- 1;
-count([Node | Nodes]) ->
- try
- length(supervisor:which_children({?SERVER, Node}))
- catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
- count(Nodes);
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
- R =:= nodedown ->
- count(Nodes)
- end.
-
-%%----------------------------------------------------------------------------
-
-init([Count]) ->
- {ok, {{one_for_one, 10, 10},
- [{Num, {delegate, start_link, [Num]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index cfb92eb843..ddc41bee0c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -85,6 +85,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_core_metrics,
+ [{description, "core metrics storage"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_metrics]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bfa868c651..3736a51f59 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -101,13 +101,14 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [name,
+ [messages_ready,
+ messages_unacknowledged,
+ messages,
+ reductions,
+ name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
- messages_ready,
- messages_unacknowledged,
- messages,
consumers,
consumer_utilisation,
memory,
@@ -115,7 +116,6 @@
synchronised_slave_pids,
recoverable_slaves,
state,
- reductions,
garbage_collection
]).
@@ -938,9 +938,13 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
- not lists:member(K, ExtraKs)],
- rabbit_event:notify(queue_stats, Extra ++ Infos).
+ [{messages_ready, MR}, {messages_unacknowledged, MU}, {messages, M},
+ {reductions, R}, {name, Name} | Infos] = All
+ = [{K, V} || {K, V} <- infos(statistics_keys(), State),
+ not lists:member(K, ExtraKs)],
+ rabbit_core_metrics:queue_stats(Name, Extra ++ Infos),
+ rabbit_core_metrics:queue_stats(Name, MR, MU, M, R),
+ rabbit_event:notify(queue_stats, Extra ++ All).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
PrefetchCount, Args, Ref) ->
@@ -955,6 +959,7 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
+ rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
@@ -1085,9 +1090,14 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1),
- PrefetchCount, Args, none),
+ AckRequired, QName, PrefetchCount,
+ Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index d28bb9ffd7..e5df1d5baf 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -64,7 +64,6 @@ maybe_stuck_stacktrace({prim_inet, accept0, _}) -> false;
maybe_stuck_stacktrace({prim_inet, recv0, _}) -> false;
maybe_stuck_stacktrace({rabbit_heartbeat, heartbeater, _}) -> false;
maybe_stuck_stacktrace({rabbit_net, recv, _}) -> false;
-maybe_stuck_stacktrace({mochiweb_http, request, _}) -> false;
maybe_stuck_stacktrace({group, _, _}) -> false;
maybe_stuck_stacktrace({shell, _, _}) -> false;
maybe_stuck_stacktrace({io, _, _}) -> false;
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 061105c150..8e429b3ab8 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -109,6 +109,7 @@ authz_socket_info_direct(Infos) ->
connect1(User, VHost, Protocol, Pid, Infos) ->
try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of
ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_created(Pid, Infos),
rabbit_event:notify(connection_created, Infos),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
catch
@@ -127,4 +128,5 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
disconnect(Pid, Infos) ->
pg_local:leave(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_closed(Pid),
rabbit_event:notify(connection_closed, Infos).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 337fb23f84..652e28aa90 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -27,24 +27,12 @@
-type category() :: atom().
-type level() :: 'debug' | 'info' | 'warning' | 'error'.
--spec log(category(), level(), string()) -> 'ok'.
--spec log(category(), level(), string(), [any()]) -> 'ok'.
-
--spec debug(string()) -> 'ok'.
--spec debug(string(), [any()]) -> 'ok'.
--spec info(string()) -> 'ok'.
--spec info(string(), [any()]) -> 'ok'.
--spec warning(string()) -> 'ok'.
--spec warning(string(), [any()]) -> 'ok'.
--spec error(string()) -> 'ok'.
--spec error(string(), [any()]) -> 'ok'.
-
--spec with_local_io(fun (() -> A)) -> A.
-
%%----------------------------------------------------------------------------
+-spec log(category(), level(), string()) -> 'ok'.
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
+-spec log(category(), level(), string(), [any()]) -> 'ok'.
log(Category, Level, Fmt, Args) when is_list(Args) ->
case level(Level) =< catlevel(Category) of
false -> ok;
@@ -57,13 +45,21 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
with_local_io(fun () -> F(Fmt, Args) end)
end.
+-spec debug(string()) -> 'ok'.
debug(Fmt) -> log(default, debug, Fmt).
+-spec debug(string(), [any()]) -> 'ok'.
debug(Fmt, Args) -> log(default, debug, Fmt, Args).
+-spec info(string()) -> 'ok'.
info(Fmt) -> log(default, info, Fmt).
+-spec info(string(), [any()]) -> 'ok'.
info(Fmt, Args) -> log(default, info, Fmt, Args).
+-spec warning(string()) -> 'ok'.
warning(Fmt) -> log(default, warning, Fmt).
+-spec warning(string(), [any()]) -> 'ok'.
warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+-spec error(string()) -> 'ok'.
error(Fmt) -> log(default, error, Fmt).
+-spec error(string(), [any()]) -> 'ok'.
error(Fmt, Args) -> log(default, error, Fmt, Args).
catlevel(Category) ->
@@ -87,6 +83,7 @@ level(none) -> 0.
%% Execute Fun using the IO system of the local node (i.e. the node on
%% which the code is executing). Since this is invoked for every log
%% message, we try to avoid unnecessarily churning group_leader/1.
+-spec with_local_io(fun (() -> A)) -> A.
with_local_io(Fun) ->
GL = group_leader(),
Node = node(),
diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl
new file mode 100644
index 0000000000..1ea28c2906
--- /dev/null
+++ b/src/rabbit_metrics.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_metrics).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
+%%----------------------------------------------------------------------------
+%% Starts the raw metrics storage and owns the ETS tables.
+%%----------------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ rabbit_core_metrics:init(),
+ {ok, none}.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 8609a0e424..1b647f9c05 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -24,6 +24,7 @@
-rabbit_upgrade({remove_user_scope, mnesia, []}).
-rabbit_upgrade({hash_passwords, mnesia, []}).
-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({add_opts_to_listener, mnesia, [add_ip_to_listener]}).
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
@@ -59,6 +60,7 @@
-spec remove_user_scope() -> 'ok'.
-spec hash_passwords() -> 'ok'.
-spec add_ip_to_listener() -> 'ok'.
+-spec add_opts_to_listener() -> 'ok'.
-spec internal_exchanges() -> 'ok'.
-spec user_to_internal_user() -> 'ok'.
-spec topic_trie() -> 'ok'.
@@ -123,6 +125,14 @@ add_ip_to_listener() ->
end,
[node, protocol, host, ip_address, port]).
+add_opts_to_listener() ->
+ transform(
+ rabbit_listener,
+ fun ({listener, Node, Protocol, Host, IP, Port}) ->
+ {listener, Node, Protocol, Host, IP, Port, []}
+ end,
+ [node, protocol, host, ip_address, port, opts]).
+
internal_exchanges() ->
Tables = [rabbit_exchange, rabbit_durable_exchange],
AddInternalFun =
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 9c8732bb6b..49293816e4 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -18,8 +18,7 @@
-export([memory/0, binary/0, ets_tables_memory/1]).
--define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
- "rfc4627_jsonrpc"]).
+-define(MAGIC_PLUGINS, ["cowboy", "sockjs", "rfc4627_jsonrpc"]).
%%----------------------------------------------------------------------------
@@ -42,9 +41,17 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- Mnesia = mnesia_memory(),
- MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]),
- MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]),
+ MetricsETS = ets_memory([rabbit_metrics]),
+ MetricsProc = try
+ [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
+ M
+ catch
+ error:badarg ->
+ 0
+ end,
+ MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
[{total, Total},
{processes, Processes},
@@ -57,7 +64,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc,
+ - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[{total, Total},
{connection_readers, ConnsReader},
@@ -69,6 +76,7 @@ memory() ->
{plugins, Plugins},
{other_proc, lists:max([0, OtherProc])}, %% [1]
{mnesia, Mnesia},
+ {metrics, MetricsETS + MetricsProc},
{mgmt_db, MgmtDbETS + MgmtDbProc},
{msg_index, MsgIndexETS + MsgIndexProc},
{other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},