summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml32
-rw-r--r--Makefile123
-rw-r--r--rabbitmq-components.mk25
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-service.bat4
-rw-r--r--src/delegate.erl269
-rw-r--r--src/delegate_sup.erl55
-rw-r--r--src/rabbit.app.src123
-rw-r--r--src/rabbit.erl45
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_log.erl5
-rw-r--r--src/rabbit_metrics.erl53
-rw-r--r--src/rabbit_plugins.erl11
-rw-r--r--src/rabbit_vm.erl15
-rw-r--r--test/metrics_SUITE.erl376
-rw-r--r--test/partitions_SUITE.erl3
-rw-r--r--test/unit_inbroker_SUITE.erl73
18 files changed, 718 insertions, 528 deletions
diff --git a/.travis.yml b/.travis.yml
index e6bba5c81c..9bf4bd3396 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,34 +1,48 @@
-sudo: false
-services:
- - docker
+# vim:sw=2:et:
+
+# Use a real VM so we can install all the packages we want.
+sudo: required
+
language: erlang
notifications:
email:
- alerts@rabbitmq.com
addons:
apt:
+ sources:
+ - sourceline: deb https://packages.erlang-solutions.com/ubuntu precise contrib
+ key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
+ # Use Elixir from Erlang Solutions. The provided Elixir is
+ # installed with kiex but is old. By using an prebuilt Debian
+ # package, we save the compilation time.
+ - elixir
- xsltproc
otp_release:
- "18.3"
- "19.0"
+services:
+ - docker
env:
matrix:
- GROUP=1
- GROUP=2
-# The checkout made by Travis is a "detached HEAD" and branches
-# information is missing. Our Erlang.mk's git_rmq fetch method relies on
-# it, so we need to restore it.
-#
-# We simply fetch master and, if it exists, stable branches. A branch is
-# created, pointing to the detached HEAD.
before_script:
+ # The checkout made by Travis is a "detached HEAD" and branches
+ # information is missing. Our Erlang.mk's git_rmq fetch method relies
+ # on it, so we need to restore it.
+ #
+ # We simply fetch master and, if it exists, stable branches. A branch
+ # is created, pointing to the detached HEAD.
- |
git checkout -B "${TRAVIS_TAG:-${TRAVIS_BRANCH}}"
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream stable:stable || :
git fetch upstream master:master || :
+ # Remove all kiex installations. This makes sure that the Erlang
+ # Solutions one is picked: it's after the kiex installations in $PATH.
+ - echo YES | kiex implode
script:
- if test "${GROUP}" = '1'; then make tests; fi
diff --git a/Makefile b/Makefile
index 8c2ad5518a..3c11b54020 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,126 @@
PROJECT = rabbit
-VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
+PROJECT_DESCRIPTION = RabbitMQ
+PROJECT_MOD = rabbit
+PROJECT_REGISTERED = rabbit_amqqueue_sup \
+ rabbit_direct_client_sup \
+ rabbit_log \
+ rabbit_node_monitor \
+ rabbit_router
+
+define PROJECT_ENV
+[
+ {tcp_listeners, [5672]},
+ {num_tcp_acceptors, 10},
+ {ssl_listeners, []},
+ {num_ssl_acceptors, 1},
+ {ssl_options, []},
+ {vm_memory_high_watermark, 0.4},
+ {vm_memory_high_watermark_paging_ratio, 0.5},
+ {memory_monitor_interval, 2500},
+ {disk_free_limit, 50000000}, %% 50MB
+ {msg_store_index_module, rabbit_msg_store_ets_index},
+ {backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
+ {frame_max, 131072},
+ {channel_max, 0},
+ {heartbeat, 60},
+ {msg_store_file_size_limit, 16777216},
+ {fhc_write_buffering, true},
+ {fhc_read_buffering, false},
+ {queue_index_max_journal_entries, 32768},
+ {queue_index_embed_msgs_below, 4096},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_user_tags, [administrator]},
+ {default_vhost, <<"/">>},
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {loopback_users, [<<"guest">>]},
+ {password_hashing_module, rabbit_password_hashing_sha256},
+ {server_properties, []},
+ {collect_statistics, none},
+ {collect_statistics_interval, 5000},
+ {mnesia_table_loading_retry_timeout, 30000},
+ {mnesia_table_loading_retry_limit, 10},
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
+ {auth_backends, [rabbit_auth_backend_internal]},
+ {delegate_count, 16},
+ {trace_vhosts, []},
+ {log_levels, [{connection, info}]},
+ {ssl_cert_login_from, distinguished_name},
+ {ssl_handshake_timeout, 5000},
+ {ssl_allow_poodle_attack, false},
+ {handshake_timeout, 10000},
+ {reverse_dns_lookups, false},
+ {cluster_partition_handling, ignore},
+ {cluster_keepalive_interval, 10000},
+ {tcp_listen_options, [{backlog, 128},
+ {nodelay, true},
+ {linger, {true, 0}},
+ {exit_on_close, false}
+ ]},
+ {halt_on_upgrade_failure, true},
+ {hipe_compile, false},
+ %% see bug 24513 for how this list was created
+ {hipe_modules,
+ [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange,
+ rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic,
+ rabbit_event, lists, queue, priority_queue, rabbit_router,
+ rabbit_trace, rabbit_misc, rabbit_binary_parser,
+ rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
+ rabbit_amqqueue_process, rabbit_variable_queue,
+ rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
+ sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
+ rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets,
+ file_handle_cache, rabbit_msg_store, array,
+ rabbit_msg_store_ets_index, rabbit_msg_file,
+ rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow,
+ pmon, ssl_connection, tls_connection, ssl_record, tls_record,
+ gen_fsm, ssl]},
+ {ssl_apps, [asn1, crypto, public_key, ssl]},
+ %% see rabbitmq-server#114
+ {mirroring_flow_control, true},
+ {mirroring_sync_batch_size, 4096},
+ %% see rabbitmq-server#227 and related tickets.
+ %% msg_store_credit_disc_bound only takes effect when
+ %% messages are persisted to the message store. If messages
+ %% are embedded on the queue index, then modifying this
+ %% setting has no effect because credit_flow is not used when
+ %% writing to the queue index. See the setting
+ %% queue_index_embed_msgs_below above.
+ {msg_store_credit_disc_bound, {2000, 500}},
+ {msg_store_io_batch_size, 2048},
+ %% see rabbitmq-server#143
+ %% and rabbitmq-server#949
+ {credit_flow_default_credit, {200, 100}},
+ %% see rabbitmq-server#248
+ %% and rabbitmq-server#667
+ {channel_operation_timeout, 15000},
+
+ %% see rabbitmq-server#486
+ {peer_discovery_backend, rabbit_peer_discovery_classic_config},
+ %% used by rabbit_peer_discovery_classic_config
+ {cluster_nodes, {[], disc}},
+
+ {config_entry_decoder, [{cipher, aes_cbc256},
+ {hash, sha512},
+ {iterations, 1000},
+ {passphrase, undefined}
+ ]},
+
+ %% rabbitmq-server-973
+ {lazy_queue_explicit_gc_run_operation_threshold, 250},
+ {background_gc_enabled, true},
+ {background_gc_target_interval, 60000}
+ ]
+endef
-DEPS = ranch lager rabbit_common rabbitmq_cli
+# FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore
+# indirect dependencies of rabbit.
+LOCAL_DEPS = sasl mnesia os_mon xmerl goldrush jsx
+BUILD_DEPS = rabbitmq_cli
+DEPS = ranch lager rabbit_common
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) rabbitmq-cli-integration
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 071385e8e7..c05a66fe13 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -5,6 +5,27 @@ ifeq ($(.DEFAULT_GOAL),)
.DEFAULT_GOAL = all
endif
+# PROJECT_VERSION defaults to:
+# 1. the version exported by rabbitmq-server-release;
+# 2. the version stored in `git-revisions.txt`, if it exists;
+# 3. a version based on git-describe(1), if it is a Git clone;
+# 4. 0.0.0
+
+PROJECT_VERSION := $(RABBITMQ_VERSION)
+
+ifeq ($(PROJECT_VERSION),)
+PROJECT_VERSION := $(shell \
+if test -f git-revisions.txt; then \
+ head -n1 git-revisions.txt | \
+ awk '{print $$$(words $(PROJECT_DESCRIPTION) version);}'; \
+else \
+ (git describe --dirty --abbrev=7 --tags --always --first-parent \
+ 2>/dev/null || echo rabbitmq_v0_0_0) | \
+ sed -e 's/^rabbitmq_v//' -e 's/^v//' -e 's/_/./g' -e 's/-/+/' \
+ -e 's/-/./g'; \
+fi)
+endif
+
# --------------------------------------------------------------------
# RabbitMQ components.
# --------------------------------------------------------------------
@@ -79,9 +100,9 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
# all projects use the same versions. It avoids conflicts and makes it
# possible to work with rabbitmq-public-umbrella.
-dep_cowboy_commit = 1.0.3
+dep_cowboy_commit = 1.0.4
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
-dep_ranch_commit = 1.2.1
+dep_ranch_commit = 1.3.0
dep_webmachine_commit = 1.10.8p2
RABBITMQ_COMPONENTS = amqp_client \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b85b4348c1..33369616ef 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -106,7 +106,7 @@ else
fi
if [ ! -d ${RABBITMQ_SCHEMA_DIR} ]; then
- mkdir "${RABBITMQ_SCHEMA_DIR}"
+ mkdir -p "${RABBITMQ_SCHEMA_DIR}"
fi
if [ ! -f "${RABBITMQ_SCHEMA_DIR}/rabbitmq.schema" ]; then
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index c9e404db46..347a5e62ae 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -269,6 +269,10 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
-comment "Multi-protocol open source messaging broker" ^
-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
+if ERRORLEVEL 1 (
+ EXIT /B 1
+)
+
goto END
diff --git a/src/delegate.erl b/src/delegate.erl
deleted file mode 100644
index 778137c1c7..0000000000
--- a/src/delegate.erl
+++ /dev/null
@@ -1,269 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(delegate).
-
-%% delegate is an alternative way of doing remote calls. Compared to
-%% the rpc module, it reduces inter-node communication. For example,
-%% if a message is routed to 1,000 queues on node A and needs to be
-%% propagated to nodes B and C, it would be nice to avoid doing 2,000
-%% remote casts to queue processes.
-%%
-%% An important issue here is preserving order - we need to make sure
-%% that messages from a certain channel to a certain queue take a
-%% consistent route, to prevent them being reordered. In fact all
-%% AMQP-ish things (such as queue declaration results and basic.get)
-%% must take the same route as well, to ensure that clients see causal
-%% ordering correctly. Therefore we have a rather generic mechanism
-%% here rather than just a message-reflector. That's also why we pick
-%% the delegate process to use based on a hash of the source pid.
-%%
-%% When a function is invoked using delegate:invoke/2, delegate:call/2
-%% or delegate:cast/2 on a group of pids, the pids are first split
-%% into local and remote ones. Remote processes are then grouped by
-%% node. The function is then invoked locally and on every node (using
-%% gen_server2:multi/4) as many times as there are processes on that
-%% node, sequentially.
-%%
-%% Errors returned when executing functions on remote nodes are re-raised
-%% in the caller.
-%%
-%% RabbitMQ starts a pool of delegate processes on boot. The size of
-%% the pool is configurable, the aim is to make sure we don't have too
-%% few delegates and thus limit performance on many-CPU machines.
-
--behaviour(gen_server2).
-
--export([start_link/1, invoke_no_result/2, invoke/2,
- monitor/2, demonitor/1, call/2, cast/2]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(state, {node, monitors, name}).
-
-%%----------------------------------------------------------------------------
-
--export_type([monitor_ref/0]).
-
--type monitor_ref() :: reference() | {atom(), pid()}.
--type fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}.
-
--spec start_link
- (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}.
--spec invoke
- ( pid(), fun_or_mfa(A)) -> A;
- ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], [{pid(), term()}]}.
--spec invoke_no_result(pid() | [pid()], fun_or_mfa(any())) -> 'ok'.
--spec monitor('process', pid()) -> monitor_ref().
--spec demonitor(monitor_ref()) -> 'true'.
-
--spec call
- ( pid(), any()) -> any();
- ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}.
--spec cast(pid() | [pid()], any()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
-%%----------------------------------------------------------------------------
-
-start_link(Num) ->
- Name = delegate_name(Num),
- gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
-
-invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
- apply1(FunOrMFA, Pid);
-invoke(Pid, FunOrMFA) when is_pid(Pid) ->
- case invoke([Pid], FunOrMFA) of
- {[{Pid, Result}], []} ->
- Result;
- {[], [{Pid, {Class, Reason, StackTrace}}]} ->
- erlang:raise(Class, Reason, StackTrace)
- end;
-
-invoke([], _FunOrMFA) -> %% optimisation
- {[], []};
-invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, FunOrMFA) of
- {ok, _, Result} -> {[{Pid, Result}], []};
- {error, _, Error} -> {[], [{Pid, Error}]}
- end;
-invoke(Pids, FunOrMFA) when is_list(Pids) ->
- {LocalPids, Grouped} = group_pids_by_node(Pids),
- %% The use of multi_call is only safe because the timeout is
- %% infinity, and thus there is no process spawned in order to do
- %% the sending. Thus calls can't overtake preceding calls/casts.
- {Replies, BadNodes} =
- case orddict:fetch_keys(Grouped) of
- [] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, FunOrMFA, Grouped}, infinity)
- end,
- BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
- BadNode <- BadNodes,
- Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
- [Results || {_Node, Results} <- Replies]]),
- lists:foldl(
- fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
- ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
- end, {[], BadPids}, ResultsNoNode).
-
-invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
- _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error
- ok;
-invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
- invoke_no_result([Pid], FunOrMFA);
-
-invoke_no_result([], _FunOrMFA) -> %% optimisation
- ok;
-invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
- _ = safe_invoke(Pid, FunOrMFA), %% must not die
- ok;
-invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
- {LocalPids, Grouped} = group_pids_by_node(Pids),
- case orddict:fetch_keys(Grouped) of
- [] -> ok;
- RemoteNodes -> gen_server2:abcast(
- RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, FunOrMFA, Grouped})
- end,
- _ = safe_invoke(LocalPids, FunOrMFA), %% must not die
- ok.
-
-monitor(process, Pid) when node(Pid) =:= node() ->
- erlang:monitor(process, Pid);
-monitor(process, Pid) ->
- Name = delegate(Pid, [node(Pid)]),
- gen_server2:cast(Name, {monitor, self(), Pid}),
- {Name, Pid}.
-
-demonitor(Ref) when is_reference(Ref) ->
- erlang:demonitor(Ref);
-demonitor({Name, Pid}) ->
- gen_server2:cast(Name, {demonitor, self(), Pid}).
-
-call(PidOrPids, Msg) ->
- invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
-
-cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
-
-%%----------------------------------------------------------------------------
-
-group_pids_by_node(Pids) ->
- LocalNode = node(),
- lists:foldl(
- fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
- {[Pid | Local], Remote};
- (Pid, {Local, Remote}) ->
- {Local,
- orddict:update(
- node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
- end, {[], orddict:new()}, Pids).
-
-delegate_name(Hash) ->
- list_to_atom("delegate_" ++ integer_to_list(Hash)).
-
-delegate(Pid, RemoteNodes) ->
- case get(delegate) of
- undefined -> Name = delegate_name(
- erlang:phash2(Pid,
- delegate_sup:count(RemoteNodes))),
- put(delegate, Name),
- Name;
- Name -> Name
- end.
-
-safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
- [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
-safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
- try
- {ok, Pid, apply1(FunOrMFA, Pid)}
- catch Class:Reason ->
- {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
- end.
-
-apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
-apply1(Fun, Arg) -> Fun(Arg).
-
-%%----------------------------------------------------------------------------
-
-init([Name]) ->
- {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
- hibernate}.
-
-handle_cast({monitor, MonitoringPid, Pid},
- State = #state{monitors = Monitors}) ->
- Monitors1 = case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Pids1 = gb_sets:add_element(MonitoringPid, Pids),
- dict:store(Pid, {Ref, Pids1}, Monitors);
- error ->
- Ref = erlang:monitor(process, Pid),
- Pids = gb_sets:singleton(MonitoringPid),
- dict:store(Pid, {Ref, Pids}, Monitors)
- end,
- {noreply, State#state{monitors = Monitors1}, hibernate};
-
-handle_cast({demonitor, MonitoringPid, Pid},
- State = #state{monitors = Monitors}) ->
- Monitors1 = case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Pids1 = gb_sets:del_element(MonitoringPid, Pids),
- case gb_sets:is_empty(Pids1) of
- true -> erlang:demonitor(Ref),
- dict:erase(Pid, Monitors);
- false -> dict:store(Pid, {Ref, Pids1}, Monitors)
- end;
- error ->
- Monitors
- end,
- {noreply, State#state{monitors = Monitors1}, hibernate};
-
-handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
- _ = safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
- {noreply, State, hibernate}.
-
-handle_info({'DOWN', Ref, process, Pid, Info},
- State = #state{monitors = Monitors, name = Name}) ->
- {noreply,
- case dict:find(Pid, Monitors) of
- {ok, {Ref, Pids}} ->
- Msg = {'DOWN', {Name, Pid}, process, Pid, Info},
- gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end,
- none, Pids),
- State#state{monitors = dict:erase(Pid, Monitors)};
- error ->
- State
- end, hibernate};
-
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
deleted file mode 100644
index ba0964f9dd..0000000000
--- a/src/delegate_sup.erl
+++ /dev/null
@@ -1,55 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(delegate_sup).
-
--behaviour(supervisor).
-
--export([start_link/1, count/1]).
-
--export([init/1]).
-
--define(SERVER, ?MODULE).
-
-%%----------------------------------------------------------------------------
-
--spec start_link(integer()) -> rabbit_types:ok_pid_or_error().
--spec count([node()]) -> integer().
-
-%%----------------------------------------------------------------------------
-
-start_link(Count) ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
-
-count([]) ->
- 1;
-count([Node | Nodes]) ->
- try
- length(supervisor:which_children({?SERVER, Node}))
- catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
- count(Nodes);
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
- R =:= nodedown ->
- count(Nodes)
- end.
-
-%%----------------------------------------------------------------------------
-
-init([Count]) ->
- {ok, {{one_for_one, 10, 10},
- [{Num, {delegate, start_link, [Num]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
deleted file mode 100644
index 5f3120b117..0000000000
--- a/src/rabbit.app.src
+++ /dev/null
@@ -1,123 +0,0 @@
-%% -*- erlang -*-
-{application, rabbit,
- [{description, "RabbitMQ"},
- {id, "RabbitMQ"},
- {vsn, "0.0.0"},
- {modules, []},
- {registered, [rabbit_amqqueue_sup,
- rabbit_log,
- rabbit_node_monitor,
- rabbit_router,
- rabbit_sup,
- rabbit_direct_client_sup]},
- %% FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore
- %% indirect dependencies of rabbit.
- {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl, jsx]},
-%% we also depend on crypto, public_key and ssl but they shouldn't be
-%% in here as we don't actually want to start it
- {mod, {rabbit, []}},
- {env, [{tcp_listeners, [5672]},
- {num_tcp_acceptors, 10},
- {ssl_listeners, []},
- {num_ssl_acceptors, 1},
- {ssl_options, []},
- {vm_memory_high_watermark, 0.4},
- {vm_memory_high_watermark_paging_ratio, 0.5},
- {memory_monitor_interval, 2500},
- {disk_free_limit, 50000000}, %% 50MB
- {msg_store_index_module, rabbit_msg_store_ets_index},
- {backing_queue_module, rabbit_variable_queue},
- %% 0 ("no limit") would make a better default, but that
- %% breaks the QPid Java client
- {frame_max, 131072},
- {channel_max, 0},
- {heartbeat, 60},
- {msg_store_file_size_limit, 16777216},
- {fhc_write_buffering, true},
- {fhc_read_buffering, false},
- {queue_index_max_journal_entries, 32768},
- {queue_index_embed_msgs_below, 4096},
- {default_user, <<"guest">>},
- {default_pass, <<"guest">>},
- {default_user_tags, [administrator]},
- {default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {loopback_users, [<<"guest">>]},
- {password_hashing_module, rabbit_password_hashing_sha256},
- {server_properties, []},
- {collect_statistics, none},
- {collect_statistics_interval, 5000},
- {mnesia_table_loading_retry_timeout, 30000},
- {mnesia_table_loading_retry_limit, 10},
- {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
- {auth_backends, [rabbit_auth_backend_internal]},
- {delegate_count, 16},
- {trace_vhosts, []},
- {log_levels, [{connection, info}]},
- {ssl_cert_login_from, distinguished_name},
- {ssl_handshake_timeout, 5000},
- {ssl_allow_poodle_attack, false},
- {handshake_timeout, 10000},
- {reverse_dns_lookups, false},
- {cluster_partition_handling, ignore},
- {cluster_keepalive_interval, 10000},
- {tcp_listen_options, [{backlog, 128},
- {nodelay, true},
- {linger, {true, 0}},
- {exit_on_close, false}]},
- {halt_on_upgrade_failure, true},
- {hipe_compile, false},
- %% see bug 24513 for how this list was created
- {hipe_modules,
- [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange,
- rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic,
- rabbit_event, lists, queue, priority_queue, rabbit_router,
- rabbit_trace, rabbit_misc, rabbit_binary_parser,
- rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
- rabbit_amqqueue_process, rabbit_variable_queue,
- rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
- sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
- rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets,
- file_handle_cache, rabbit_msg_store, array,
- rabbit_msg_store_ets_index, rabbit_msg_file,
- rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow,
- pmon, ssl_connection, tls_connection, ssl_record, tls_record,
- gen_fsm, ssl]},
- {ssl_apps, [asn1, crypto, public_key, ssl]},
- %% see rabbitmq-server#114
- {mirroring_flow_control, true},
- {mirroring_sync_batch_size, 4096},
- %% see rabbitmq-server#227 and related tickets.
- %% msg_store_credit_disc_bound only takes effect when
- %% messages are persisted to the message store. If messages
- %% are embedded on the queue index, then modifying this
- %% setting has no effect because credit_flow is not used when
- %% writing to the queue index. See the setting
- %% queue_index_embed_msgs_below above.
- {msg_store_credit_disc_bound, {2000, 500}},
- {msg_store_io_batch_size, 2048},
- %% see rabbitmq-server#143
- %% and rabbitmq-server#949
- {credit_flow_default_credit, {200, 100}},
- %% see rabbitmq-server#248
- %% and rabbitmq-server#667
- {channel_operation_timeout, 15000},
-
- %% see rabbitmq-server#486
- {peer_discovery_backend, rabbit_peer_discovery_classic_config},
- %% used by rabbit_peer_discovery_classic_config
- {cluster_nodes, {[], disc}},
-
- {config_entry_decoder, [
- {cipher, aes_cbc256},
- {hash, sha512},
- {iterations, 1000},
- {passphrase, undefined}
- ]},
-
- %% rabbitmq-server-973
- {lazy_queue_explicit_gc_run_operation_threshold, 250},
- {background_gc_enabled, true},
- {background_gc_target_interval, 60000}
- ]}]}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ebc92150eb..e121fb3e2e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,6 +91,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_core_metrics,
+ [{description, "core metrics storage"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_metrics]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -381,15 +388,16 @@ sd_open_port() ->
use_stdio, out]).
sd_notify_socat(Unit) ->
- case sd_open_port() of
- {'EXIT', Exit} ->
- io:format(standard_error, "Failed to start socat ~p~n", [Exit]),
- false;
+ try sd_open_port() of
Port ->
Port ! {self(), {command, sd_notify_data()}},
Result = sd_wait_activation(Port, Unit),
port_close(Port),
Result
+ catch
+ Class:Reason ->
+ io:format(standard_error, "Failed to start socat ~p:~p~n", [Class, Reason]),
+ false
end.
sd_current_unit() ->
@@ -805,14 +813,25 @@ insert_default_data() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
- ok = rabbit_vhost:add(DefaultVHost),
- ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
- ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
- ok = rabbit_auth_backend_internal:set_permissions(DefaultUser,
- DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
+
+ DefaultUserBin = rabbit_data_coercion:to_binary(DefaultUser),
+ DefaultPassBin = rabbit_data_coercion:to_binary(DefaultPass),
+ DefaultVHostBin = rabbit_data_coercion:to_binary(DefaultVHost),
+ DefaultConfigurePermBin = rabbit_data_coercion:to_binary(DefaultConfigurePerm),
+ DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm),
+ DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm),
+
+ ok = rabbit_vhost:add(DefaultVHostBin),
+ ok = rabbit_auth_backend_internal:add_user(
+ DefaultUserBin,
+ DefaultPassBin
+ ),
+ ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin,DefaultTags),
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin,
+ DefaultVHostBin,
+ DefaultConfigurePermBin,
+ DefaultWritePermBin,
+ DefaultReadPermBin),
ok.
%%---------------------------------------------------------------------------
@@ -864,7 +883,7 @@ erts_version_check() ->
end.
print_banner() ->
- {ok, Product} = application:get_key(id),
+ {ok, Product} = application:get_key(description),
{ok, Version} = application:get_key(vsn),
{LogFmt, LogLocations} = case log_locations() of
[_ | Tail] = LL ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 25555156d6..8db2a167e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -105,15 +105,16 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [name,
+ [messages_ready,
+ messages_unacknowledged,
+ messages,
+ reductions,
+ name,
policy,
operator_policy,
effective_policy_definition,
exclusive_consumer_pid,
exclusive_consumer_tag,
- messages_ready,
- messages_unacknowledged,
- messages,
consumers,
consumer_utilisation,
memory,
@@ -121,7 +122,6 @@
synchronised_slave_pids,
recoverable_slaves,
state,
- reductions,
garbage_collection
]).
@@ -961,9 +961,13 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
- not lists:member(K, ExtraKs)],
- rabbit_event:notify(queue_stats, Extra ++ Infos).
+ [{messages_ready, MR}, {messages_unacknowledged, MU}, {messages, M},
+ {reductions, R}, {name, Name} | Infos] = All
+ = [{K, V} || {K, V} <- infos(statistics_keys(), State),
+ not lists:member(K, ExtraKs)],
+ rabbit_core_metrics:queue_stats(Name, Extra ++ Infos),
+ rabbit_core_metrics:queue_stats(Name, MR, MU, M, R),
+ rabbit_event:notify(queue_stats, Extra ++ All).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
PrefetchCount, Args, Ref) ->
@@ -978,6 +982,7 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
+ rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
@@ -1109,9 +1114,14 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1),
- PrefetchCount, Args, none),
+ AckRequired, QName, PrefetchCount,
+ Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 53b0340b8a..58e6f20cb6 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -138,6 +138,7 @@ authz_socket_info_direct(Infos) ->
connect1(User, VHost, Protocol, Pid, Infos) ->
try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of
ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_created(Pid, Infos),
rabbit_event:notify(connection_created, Infos),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
catch
@@ -156,4 +157,5 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
disconnect(Pid, Infos) ->
pg_local:leave(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_closed(Pid),
rabbit_event:notify(connection_closed, Infos).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index bb5ae14c3e..22181ce8b7 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -32,9 +32,6 @@
-type category() :: atom().
--spec log(category(), lager:log_level(), string()) -> 'ok'.
--spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'.
-
-spec debug(string()) -> 'ok'.
-spec debug(string(), [any()]) -> 'ok'.
-spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'.
@@ -65,8 +62,10 @@
%%----------------------------------------------------------------------------
+-spec log(category(), lager:log_level(), string()) -> 'ok'.
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
+-spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'.
log(Category, Level, Fmt, Args) when is_list(Args) ->
Sink = case Category of
default -> ?LAGER_SINK;
diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl
new file mode 100644
index 0000000000..1ea28c2906
--- /dev/null
+++ b/src/rabbit_metrics.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_metrics).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
+%%----------------------------------------------------------------------------
+%% Starts the raw metrics storage and owns the ETS tables.
+%%----------------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ rabbit_core_metrics:init(),
+ {ok, none}.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 3f65452bdf..b7ba3af732 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/zip.hrl").
--export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3]).
+-export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3, running_plugins/0]).
-export([ensure/1]).
-export([extract_schemas/1]).
-export([validate_plugins/1, format_invalid_plugins/1]).
@@ -211,6 +211,13 @@ is_loadable(App) ->
_ -> false
end.
+
+%% List running plugins along with their version.
+-spec running_plugins() -> [{atom(), Vsn :: string()}].
+running_plugins() ->
+ ActivePlugins = active(),
+ {ok, [{App, Vsn} || {App, _ , Vsn} <- rabbit_misc:which_applications(), lists:member(App, ActivePlugins)]}.
+
%%----------------------------------------------------------------------------
prepare_plugins(Enabled) ->
@@ -489,7 +496,7 @@ list_free_apps([Dir|Rest]) ->
compare_by_name_and_version(#plugin{name = Name, version = VersionA},
#plugin{name = Name, version = VersionB}) ->
- ec_semver:lte(VersionA, VersionB);
+ rabbit_semver:lte(VersionA, VersionB);
compare_by_name_and_version(#plugin{name = NameA},
#plugin{name = NameB}) ->
NameA =< NameB.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 7a6e290490..59c63022d8 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -41,9 +41,17 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- Mnesia = mnesia_memory(),
+ Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]),
- MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]),
+ MetricsETS = ets_memory([rabbit_metrics]),
+ MetricsProc = try
+ [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
+ M
+ catch
+ error:badarg ->
+ 0
+ end,
+ MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
[{total, Total},
{processes, Processes},
@@ -56,7 +64,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc,
+ - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[{total, Total},
{connection_readers, ConnsReader},
@@ -68,6 +76,7 @@ memory() ->
{plugins, Plugins},
{other_proc, lists:max([0, OtherProc])}, %% [1]
{mnesia, Mnesia},
+ {metrics, MetricsETS + MetricsProc},
{mgmt_db, MgmtDbETS + MgmtDbProc},
{msg_index, MsgIndexETS + MsgIndexProc},
{other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl
new file mode 100644
index 0000000000..b2b0fe3560
--- /dev/null
+++ b/test/metrics_SUITE.erl
@@ -0,0 +1,376 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(metrics_SUITE).
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ connection,
+ channel,
+ channel_connection_close,
+ channel_queue_exchange_consumer_close_connection,
+ channel_queue_delete_queue,
+ connection_metric_count_test,
+ channel_metric_count_test,
+ queue_metric_count_test,
+ queue_metric_count_channel_per_queue_test,
+ connection_metric_idemp_test,
+ channel_metric_idemp_test,
+ queue_metric_idemp_test
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, fine},
+ {collect_statistics_interval, 500}
+ ]}).
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+% NB: node_stats tests are in the management_agent repo
+
+connection_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_count/1, [Config], 25).
+
+channel_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_count/1, [Config], 25).
+
+queue_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count/1, [Config], 5).
+
+queue_metric_count_channel_per_queue_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count_channel_per_queue/1,
+ [Config], 5).
+
+connection_metric_idemp_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_idemp/1, [Config], 25).
+
+channel_metric_idemp_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_idemp/1, [Config], 25).
+
+queue_metric_idemp_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_idemp/1, [Config], 25).
+
+prop_connection_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ connection_metric_idemp(Config, N)).
+
+prop_channel_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ channel_metric_idemp(Config, N)).
+
+prop_queue_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ queue_metric_idemp(Config, N)).
+
+prop_connection_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))},
+ connection_metric_count(Config, N)).
+
+prop_channel_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))},
+ channel_metric_count(Config, N)).
+
+prop_queue_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))},
+ queue_metric_count(Config, N)).
+
+prop_queue_metric_count_channel_per_queue(Config) ->
+ ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))},
+ queue_metric_count_channel_per_queue(Config, N)).
+
+connection_metric_idemp(Config, {N, R}) ->
+ Conns = [rabbit_ct_client_helpers:open_unmanaged_connection(Config)
+ || _ <- lists:seq(1, N)],
+ Table = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
+ Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
+ % referesh stats 'R' times
+ [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
+ timer:sleep(100),
+ TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
+ TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
+ [rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns],
+ (Table2 == TableAfter2) and (Table == TableAfter) and
+ (N == length(Table)) and (N == length(TableAfter)).
+
+channel_metric_idemp(Config, {N, R}) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)],
+ Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
+ Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
+ % referesh stats 'R' times
+ [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
+ timer:sleep(100),
+ TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
+ TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
+ rabbit_ct_client_helpers:close_connection(Conn),
+ (Table2 == TableAfter2) and (Table == TableAfter) and
+ (N == length(Table)) and (N == length(TableAfter)).
+
+queue_metric_idemp(Config, {N, R}) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queues =
+ [begin
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ Queue
+ end || _ <- lists:seq(1, N)],
+ Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
+ Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ % referesh stats 'R' times
+ ChanTable = read_table_rpc(Config, channel_created),
+ [[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)],
+ timer:sleep(100),
+ TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
+ TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ [ delete_queue(Chan, Q) || Q <- Queues],
+ rabbit_ct_client_helpers:close_connection(Conn),
+ (Table2 == TableAfter2) and (Table == TableAfter) and
+ (N == length(Table)) and (N == length(TableAfter)).
+
+connection_metric_count(Config, Ops) ->
+ add_rem_counter(Config, Ops,
+ {fun rabbit_ct_client_helpers:open_unmanaged_connection/1,
+ fun rabbit_ct_client_helpers:close_connection/1},
+ [ connection_created,
+ connection_metrics,
+ connection_coarse_metrics ]).
+
+channel_metric_count(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ Result = add_rem_counter(Config, Ops,
+ {fun (_Config) ->
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Chan
+ end,
+ fun amqp_channel:close/1},
+ [ channel_created,
+ channel_metrics,
+ channel_process_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+queue_metric_count(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ AddFun = fun (_) ->
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ Queue
+ end,
+ Result = add_rem_counter(Config, Ops,
+ {AddFun,
+ fun (Q) -> delete_queue(Chan, Q) end},
+ [ channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+queue_metric_count_channel_per_queue(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ AddFun = fun (_) ->
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ {Chan, Queue}
+ end,
+ Result = add_rem_counter(Config, Ops,
+ {AddFun,
+ fun ({Chan, Q}) -> delete_queue(Chan, Q) end},
+ [ channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->
+ Things = [ AddFun(Config) || _ <- lists:seq(1, Initial) ],
+ % either add or remove some things
+ {FinalLen, Things1} =
+ lists:foldl(fun(add, {L, Items}) ->
+ {L+1, [AddFun(Config) | Items]};
+ (remove, {L, [H|Tail]}) ->
+ RemFun(H),
+ {L-1, Tail};
+ (_, S) -> S end,
+ {Initial, Things},
+ Ops),
+ TabLens = lists:map(fun(T) ->
+ length(read_table_rpc(Config, T)) end, Tables),
+ [RemFun(Thing) || Thing <- Things1],
+ [FinalLen] == lists:usort(TabLens).
+
+
+connection(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ [_] = read_table_rpc(Config, connection_created),
+ [_] = read_table_rpc(Config, connection_metrics),
+ [_] = read_table_rpc(Config, connection_coarse_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ [] = read_table_rpc(Config, connection_created),
+ [] = read_table_rpc(Config, connection_metrics),
+ [] = read_table_rpc(Config, connection_coarse_metrics).
+
+channel(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ [_] = read_table_rpc(Config, channel_created),
+ [_] = read_table_rpc(Config, channel_metrics),
+ [_] = read_table_rpc(Config, channel_process_metrics),
+ ok = amqp_channel:close(Chan),
+ [] = read_table_rpc(Config, channel_created),
+ [] = read_table_rpc(Config, channel_metrics),
+ [] = read_table_rpc(Config, channel_process_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+channel_connection_close(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, _} = amqp_connection:open_channel(Conn),
+ [_] = read_table_rpc(Config, channel_created),
+ [_] = read_table_rpc(Config, channel_metrics),
+ [_] = read_table_rpc(Config, channel_process_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ [] = read_table_rpc(Config, channel_created),
+ [] = read_table_rpc(Config, channel_metrics),
+ [] = read_table_rpc(Config, channel_process_metrics).
+
+channel_queue_delete_queue(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ [_] = read_table_rpc(Config, channel_queue_metrics),
+ [_] = read_table_rpc(Config, channel_queue_exchange_metrics),
+
+ delete_queue(Chan, Queue),
+ % ensure removal of queue cleans up channel_queue metrics
+ [] = read_table_rpc(Config, channel_queue_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+channel_queue_exchange_consumer_close_connection(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+
+ [_] = read_table_rpc(Config, channel_exchange_metrics),
+ [_] = read_table_rpc(Config, channel_queue_exchange_metrics),
+
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ [_] = read_table_rpc(Config, channel_queue_metrics),
+
+ Sub = #'basic.consume'{queue = Queue},
+ #'basic.consume_ok'{consumer_tag = _} =
+ amqp_channel:call(Chan, Sub),
+
+ [_] = read_table_rpc(Config, consumer_created),
+
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ % ensure cleanup happened
+ [] = read_table_rpc(Config, channel_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_metrics),
+ [] = read_table_rpc(Config, consumer_created).
+
+
+%% -------------------------------------------------------------------
+%% Utilities
+%% -------------------------------------------------------------------
+
+declare_queue(Chan) ->
+ Declare = #'queue.declare'{durable = false, auto_delete = true},
+ #'queue.declare_ok'{queue = Name} = amqp_channel:call(Chan, Declare),
+ Name.
+
+delete_queue(Chan, Name) ->
+ Delete = #'queue.delete'{queue = Name},
+ #'queue.delete_ok'{} = amqp_channel:call(Chan, Delete).
+
+ensure_exchange_metrics_populated(Chan, RoutingKey) ->
+ % need to publish for exchange metrics to be populated
+ Publish = #'basic.publish'{routing_key = RoutingKey},
+ amqp_channel:call(Chan, Publish, #amqp_msg{payload = <<"hello">>}).
+
+ensure_channel_queue_metrics_populated(Chan, Queue) ->
+ % need to get and wait for timer for channel queue metrics to be populated
+ Get = #'basic.get'{queue = Queue, no_ack=true},
+ {#'basic.get_ok'{}, #amqp_msg{}} = amqp_channel:call(Chan, Get).
+
+force_channel_stats(Config) ->
+ [ Pid ! emit_stats || {Pid, _} <- read_table_rpc(Config, channel_created) ],
+ timer:sleep(100).
+
+read_table_rpc(Config, Table) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]).
+
+read_table(Table) ->
+ ets:tab2list(Table).
+
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index e00c015d02..3f0ec419c2 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -33,6 +33,9 @@
%% It's a lot, but still better than timetrap_timeout
-define(AWAIT_TIMEOUT, 300000).
+suite() ->
+ [{timetrap, {minutes, 60}}].
+
all() ->
[
{group, net_ticktime_1},
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index 569c7a88fa..aee815aee4 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -2885,10 +2885,13 @@ channel_statistics1(_Config) ->
dummy_event_receiver:start(self(), [node()], [channel_stats]),
%% Check stats empty
- Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end),
- [] = proplists:get_value(channel_queue_stats, Event),
- [] = proplists:get_value(channel_exchange_stats, Event),
- [] = proplists:get_value(channel_queue_exchange_stats, Event),
+ Check1 = fun() ->
+ [] = ets:match(channel_queue_metrics, {Ch, QRes}),
+ [] = ets:match(channel_exchange_metrics, {Ch, X}),
+ [] = ets:match(channel_queue_exchange_metrics,
+ {Ch, {QRes, X}})
+ end,
+ test_ch_metrics(Check1, ?TIMEOUT),
%% Publish and get a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
@@ -2897,46 +2900,44 @@ channel_statistics1(_Config) ->
rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
%% Check the stats reflect that
- Event2 = test_ch_statistics_receive_event(
- Ch,
- fun (E) ->
- length(proplists:get_value(
- channel_queue_exchange_stats, E)) > 0
- end),
- [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
- [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QRes,X},[{publish,1}]}] =
- proplists:get_value(channel_queue_exchange_stats, Event2),
+ Check2 = fun() ->
+ [{{Ch, QRes}, 1, 0, 0, 0, 0, 0}] = ets:lookup(
+ channel_queue_metrics,
+ {Ch, QRes}),
+ [{{Ch, X}, 1, 0, 0}] = ets:lookup(
+ channel_exchange_metrics,
+ {Ch, X}),
+ [{{Ch, {QRes, X}}, 1}] = ets:lookup(
+ channel_queue_exchange_metrics,
+ {Ch, {QRes, X}})
+ end,
+ test_ch_metrics(Check2, ?TIMEOUT),
%% Check the stats remove stuff on queue deletion
rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
- Event3 = test_ch_statistics_receive_event(
- Ch,
- fun (E) ->
- length(proplists:get_value(
- channel_queue_exchange_stats, E)) == 0
- end),
-
- [] = proplists:get_value(channel_queue_stats, Event3),
- [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3),
- [] = proplists:get_value(channel_queue_exchange_stats, Event3),
+ Check3 = fun() ->
+ [] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
+ [{{Ch, X}, 1, 0, 0}] = ets:lookup(
+ channel_exchange_metrics,
+ {Ch, X}),
+ [] = ets:lookup(channel_queue_exchange_metrics,
+ {Ch, {QRes, X}})
+ end,
+ test_ch_metrics(Check3, ?TIMEOUT),
rabbit_channel:shutdown(Ch),
dummy_event_receiver:stop(),
passed.
-test_ch_statistics_receive_event(Ch, Matcher) ->
- rabbit_channel:flush(Ch),
- Ch ! emit_stats,
- test_ch_statistics_receive_event1(Ch, Matcher).
-
-test_ch_statistics_receive_event1(Ch, Matcher) ->
- receive #event{type = channel_stats, props = Props} ->
- case Matcher(Props) of
- true -> Props;
- _ -> test_ch_statistics_receive_event1(Ch, Matcher)
- end
- after ?TIMEOUT -> throw(failed_to_receive_event)
+test_ch_metrics(Fun, Timeout) when Timeout =< 0 ->
+ Fun();
+test_ch_metrics(Fun, Timeout) ->
+ try
+ Fun()
+ catch
+ _:{badmatch, _} ->
+ timer:sleep(1000),
+ test_ch_metrics(Fun, Timeout - 1000)
end.
head_message_timestamp_statistics(Config) ->