diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-12-01 14:12:57 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-12-01 14:12:57 +0300 |
| commit | 445f44a4db371b16c35f31667a3175d30697ff84 (patch) | |
| tree | 575155b0268336a6e0aafba493ea84dc0ae31b7b /src | |
| parent | 8401de2f41fad32a887a648265faf9b32542b0f1 (diff) | |
| parent | ad04a73e8ba3157e8b737f1521b1c4946c1d30e6 (diff) | |
| download | rabbitmq-server-git-445f44a4db371b16c35f31667a3175d30697ff84.tar.gz | |
Merge branch 'stable'
Conflicts:
src/rabbit_log.erl
src/rabbit_vm.erl
Diffstat (limited to 'src')
| -rw-r--r-- | src/delegate.erl | 269 | ||||
| -rw-r--r-- | src/delegate_sup.erl | 55 | ||||
| -rw-r--r-- | src/rabbit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_metrics.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 17 |
8 files changed, 97 insertions, 341 deletions
diff --git a/src/delegate.erl b/src/delegate.erl deleted file mode 100644 index 778137c1c7..0000000000 --- a/src/delegate.erl +++ /dev/null @@ -1,269 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% - --module(delegate). - -%% delegate is an alternative way of doing remote calls. Compared to -%% the rpc module, it reduces inter-node communication. For example, -%% if a message is routed to 1,000 queues on node A and needs to be -%% propagated to nodes B and C, it would be nice to avoid doing 2,000 -%% remote casts to queue processes. -%% -%% An important issue here is preserving order - we need to make sure -%% that messages from a certain channel to a certain queue take a -%% consistent route, to prevent them being reordered. In fact all -%% AMQP-ish things (such as queue declaration results and basic.get) -%% must take the same route as well, to ensure that clients see causal -%% ordering correctly. Therefore we have a rather generic mechanism -%% here rather than just a message-reflector. That's also why we pick -%% the delegate process to use based on a hash of the source pid. -%% -%% When a function is invoked using delegate:invoke/2, delegate:call/2 -%% or delegate:cast/2 on a group of pids, the pids are first split -%% into local and remote ones. Remote processes are then grouped by -%% node. The function is then invoked locally and on every node (using -%% gen_server2:multi/4) as many times as there are processes on that -%% node, sequentially. -%% -%% Errors returned when executing functions on remote nodes are re-raised -%% in the caller. -%% -%% RabbitMQ starts a pool of delegate processes on boot. The size of -%% the pool is configurable, the aim is to make sure we don't have too -%% few delegates and thus limit performance on many-CPU machines. - --behaviour(gen_server2). - --export([start_link/1, invoke_no_result/2, invoke/2, - monitor/2, demonitor/1, call/2, cast/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {node, monitors, name}). - -%%---------------------------------------------------------------------------- - --export_type([monitor_ref/0]). - --type monitor_ref() :: reference() | {atom(), pid()}. --type fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}. - --spec start_link - (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}. --spec invoke - ( pid(), fun_or_mfa(A)) -> A; - ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], [{pid(), term()}]}. --spec invoke_no_result(pid() | [pid()], fun_or_mfa(any())) -> 'ok'. --spec monitor('process', pid()) -> monitor_ref(). --spec demonitor(monitor_ref()) -> 'true'. - --spec call - ( pid(), any()) -> any(); - ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}. --spec cast(pid() | [pid()], any()) -> 'ok'. - -%%---------------------------------------------------------------------------- - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -%%---------------------------------------------------------------------------- - -start_link(Num) -> - Name = delegate_name(Num), - gen_server2:start_link({local, Name}, ?MODULE, [Name], []). - -invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> - apply1(FunOrMFA, Pid); -invoke(Pid, FunOrMFA) when is_pid(Pid) -> - case invoke([Pid], FunOrMFA) of - {[{Pid, Result}], []} -> - Result; - {[], [{Pid, {Class, Reason, StackTrace}}]} -> - erlang:raise(Class, Reason, StackTrace) - end; - -invoke([], _FunOrMFA) -> %% optimisation - {[], []}; -invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, FunOrMFA) of - {ok, _, Result} -> {[{Pid, Result}], []}; - {error, _, Error} -> {[], [{Pid, Error}]} - end; -invoke(Pids, FunOrMFA) when is_list(Pids) -> - {LocalPids, Grouped} = group_pids_by_node(Pids), - %% The use of multi_call is only safe because the timeout is - %% infinity, and thus there is no process spawned in order to do - %% the sending. Thus calls can't overtake preceding calls/casts. - {Replies, BadNodes} = - case orddict:fetch_keys(Grouped) of - [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(self(), RemoteNodes), - {invoke, FunOrMFA, Grouped}, infinity) - end, - BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || - BadNode <- BadNodes, - Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | - [Results || {_Node, Results} <- Replies]]), - lists:foldl( - fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; - ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} - end, {[], BadPids}, ResultsNoNode). - -invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> - _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error - ok; -invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> - invoke_no_result([Pid], FunOrMFA); - -invoke_no_result([], _FunOrMFA) -> %% optimisation - ok; -invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation - _ = safe_invoke(Pid, FunOrMFA), %% must not die - ok; -invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> - {LocalPids, Grouped} = group_pids_by_node(Pids), - case orddict:fetch_keys(Grouped) of - [] -> ok; - RemoteNodes -> gen_server2:abcast( - RemoteNodes, delegate(self(), RemoteNodes), - {invoke, FunOrMFA, Grouped}) - end, - _ = safe_invoke(LocalPids, FunOrMFA), %% must not die - ok. - -monitor(process, Pid) when node(Pid) =:= node() -> - erlang:monitor(process, Pid); -monitor(process, Pid) -> - Name = delegate(Pid, [node(Pid)]), - gen_server2:cast(Name, {monitor, self(), Pid}), - {Name, Pid}. - -demonitor(Ref) when is_reference(Ref) -> - erlang:demonitor(Ref); -demonitor({Name, Pid}) -> - gen_server2:cast(Name, {demonitor, self(), Pid}). - -call(PidOrPids, Msg) -> - invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). - -cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). - -%%---------------------------------------------------------------------------- - -group_pids_by_node(Pids) -> - LocalNode = node(), - lists:foldl( - fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> - {[Pid | Local], Remote}; - (Pid, {Local, Remote}) -> - {Local, - orddict:update( - node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} - end, {[], orddict:new()}, Pids). - -delegate_name(Hash) -> - list_to_atom("delegate_" ++ integer_to_list(Hash)). - -delegate(Pid, RemoteNodes) -> - case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(Pid, - delegate_sup:count(RemoteNodes))), - put(delegate, Name), - Name; - Name -> Name - end. - -safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> - [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; -safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> - try - {ok, Pid, apply1(FunOrMFA, Pid)} - catch Class:Reason -> - {error, Pid, {Class, Reason, erlang:get_stacktrace()}} - end. - -apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); -apply1(Fun, Arg) -> Fun(Arg). - -%%---------------------------------------------------------------------------- - -init([Name]) -> - {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, - hibernate}. - -handle_cast({monitor, MonitoringPid, Pid}, - State = #state{monitors = Monitors}) -> - Monitors1 = case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Pids1 = gb_sets:add_element(MonitoringPid, Pids), - dict:store(Pid, {Ref, Pids1}, Monitors); - error -> - Ref = erlang:monitor(process, Pid), - Pids = gb_sets:singleton(MonitoringPid), - dict:store(Pid, {Ref, Pids}, Monitors) - end, - {noreply, State#state{monitors = Monitors1}, hibernate}; - -handle_cast({demonitor, MonitoringPid, Pid}, - State = #state{monitors = Monitors}) -> - Monitors1 = case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Pids1 = gb_sets:del_element(MonitoringPid, Pids), - case gb_sets:is_empty(Pids1) of - true -> erlang:demonitor(Ref), - dict:erase(Pid, Monitors); - false -> dict:store(Pid, {Ref, Pids1}, Monitors) - end; - error -> - Monitors - end, - {noreply, State#state{monitors = Monitors1}, hibernate}; - -handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> - _ = safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), - {noreply, State, hibernate}. - -handle_info({'DOWN', Ref, process, Pid, Info}, - State = #state{monitors = Monitors, name = Name}) -> - {noreply, - case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Msg = {'DOWN', {Name, Pid}, process, Pid, Info}, - gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end, - none, Pids), - State#state{monitors = dict:erase(Pid, Monitors)}; - error -> - State - end, hibernate}; - -handle_info(_Info, State) -> - {noreply, State, hibernate}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl deleted file mode 100644 index ba0964f9dd..0000000000 --- a/src/delegate_sup.erl +++ /dev/null @@ -1,55 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% - --module(delegate_sup). - --behaviour(supervisor). - --export([start_link/1, count/1]). - --export([init/1]). - --define(SERVER, ?MODULE). - -%%---------------------------------------------------------------------------- - --spec start_link(integer()) -> rabbit_types:ok_pid_or_error(). --spec count([node()]) -> integer(). - -%%---------------------------------------------------------------------------- - -start_link(Count) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]). - -count([]) -> - 1; -count([Node | Nodes]) -> - try - length(supervisor:which_children({?SERVER, Node})) - catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> - count(Nodes); - exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown; - R =:= nodedown -> - count(Nodes) - end. - -%%---------------------------------------------------------------------------- - -init([Count]) -> - {ok, {{one_for_one, 10, 10}, - [{Num, {delegate, start_link, [Num]}, - transient, 16#ffffffff, worker, [delegate]} || - Num <- lists:seq(0, Count - 1)]}}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 6221cf0b44..9e0a211c17 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -91,6 +91,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_core_metrics, + [{description, "core metrics storage"}, + {mfa, {rabbit_sup, start_child, + [rabbit_metrics]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25555156d6..8db2a167e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -105,15 +105,16 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [name, + [messages_ready, + messages_unacknowledged, + messages, + reductions, + name, policy, operator_policy, effective_policy_definition, exclusive_consumer_pid, exclusive_consumer_tag, - messages_ready, - messages_unacknowledged, - messages, consumers, consumer_utilisation, memory, @@ -121,7 +122,6 @@ synchronised_slave_pids, recoverable_slaves, state, - reductions, garbage_collection ]). @@ -961,9 +961,13 @@ emit_stats(State) -> emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), - not lists:member(K, ExtraKs)], - rabbit_event:notify(queue_stats, Extra ++ Infos). + [{messages_ready, MR}, {messages_unacknowledged, MU}, {messages, M}, + {reductions, R}, {name, Name} | Infos] = All + = [{K, V} || {K, V} <- infos(statistics_keys(), State), + not lists:member(K, ExtraKs)], + rabbit_core_metrics:queue_stats(Name, Extra ++ Infos), + rabbit_core_metrics:queue_stats(Name, MR, MU, M, R), + rabbit_event:notify(queue_stats, Extra ++ All). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref) -> @@ -978,6 +982,7 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> + rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, @@ -1109,9 +1114,14 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), - PrefetchCount, Args, none), + AckRequired, QName, PrefetchCount, + Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 53b0340b8a..58e6f20cb6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -138,6 +138,7 @@ authz_socket_info_direct(Infos) -> connect1(User, VHost, Protocol, Pid, Infos) -> try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch @@ -156,4 +157,5 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, disconnect(Pid, Infos) -> pg_local:leave(rabbit_direct, Pid), + rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f60cf6c0c2..be5f0146b6 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -32,9 +32,6 @@ -type category() :: atom(). --spec log(category(), lager:log_level(), string()) -> 'ok'. --spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'. - -spec debug(string()) -> 'ok'. -spec debug(string(), [any()]) -> 'ok'. -spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'. @@ -65,8 +62,10 @@ %%---------------------------------------------------------------------------- +-spec log(category(), lager:log_level(), string()) -> 'ok'. log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). +-spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'. log(Category, Level, Fmt, Args) when is_list(Args) -> Sink = case Category of default -> ?LAGER_SINK; diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl new file mode 100644 index 0000000000..1ea28c2906 --- /dev/null +++ b/src/rabbit_metrics.erl @@ -0,0 +1,53 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_metrics). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +%%---------------------------------------------------------------------------- +%% Starts the raw metrics storage and owns the ETS tables. +%%---------------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + rabbit_core_metrics:init(), + {ok, none}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 1aba5538a1..eae7119007 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -41,9 +41,17 @@ memory() -> [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], - Mnesia = mnesia_memory(), - MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]), - MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]), + Mnesia = mnesia_memory(), + MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]), + MetricsETS = ets_memory([rabbit_metrics]), + MetricsProc = try + [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), + M + catch + error:badarg -> + 0 + end, + MgmtDbETS = ets_memory([rabbit_mgmt_storage]), [{total, Total}, {processes, Processes}, @@ -56,7 +64,7 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc, + - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [{total, Total}, {connection_readers, ConnsReader}, @@ -68,6 +76,7 @@ memory() -> {plugins, Plugins}, {other_proc, lists:max([0, OtherProc])}, %% [1] {mnesia, Mnesia}, + {metrics, MetricsETS + MetricsProc}, {mgmt_db, MgmtDbETS + MgmtDbProc}, {msg_index, MsgIndexETS + MsgIndexProc}, {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, |
