diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2016-12-13 08:39:04 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2016-12-13 08:39:04 +0100 |
| commit | 321abd47ccc97d3de3b0bcae388ea30299fd47a6 (patch) | |
| tree | 0b5adf0ba727fb17152fc5affe3500c7068bdf5c | |
| parent | f43990d05f8c629e41e9f748d7e8f6963039905f (diff) | |
| parent | 21e8b335950b6871c71e5e3de54ea249fe4322f9 (diff) | |
| download | rabbitmq-server-git-321abd47ccc97d3de3b0bcae388ea30299fd47a6.tar.gz | |
Merge branch 'master' into rabbitmq-server-567
| -rw-r--r-- | .travis.yml | 32 | ||||
| -rw-r--r-- | Makefile | 123 | ||||
| -rw-r--r-- | rabbitmq-components.mk | 25 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-service.bat | 4 | ||||
| -rw-r--r-- | src/delegate.erl | 269 | ||||
| -rw-r--r-- | src/delegate_sup.erl | 55 | ||||
| -rw-r--r-- | src/rabbit.app.src | 123 | ||||
| -rw-r--r-- | src/rabbit.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_metrics.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 15 | ||||
| -rw-r--r-- | test/metrics_SUITE.erl | 376 | ||||
| -rw-r--r-- | test/partitions_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 73 |
18 files changed, 718 insertions, 528 deletions
diff --git a/.travis.yml b/.travis.yml index e6bba5c81c..9bf4bd3396 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,34 +1,48 @@ -sudo: false -services: - - docker +# vim:sw=2:et: + +# Use a real VM so we can install all the packages we want. +sudo: required + language: erlang notifications: email: - alerts@rabbitmq.com addons: apt: + sources: + - sourceline: deb https://packages.erlang-solutions.com/ubuntu precise contrib + key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc packages: + # Use Elixir from Erlang Solutions. The provided Elixir is + # installed with kiex but is old. By using an prebuilt Debian + # package, we save the compilation time. + - elixir - xsltproc otp_release: - "18.3" - "19.0" +services: + - docker env: matrix: - GROUP=1 - GROUP=2 -# The checkout made by Travis is a "detached HEAD" and branches -# information is missing. Our Erlang.mk's git_rmq fetch method relies on -# it, so we need to restore it. -# -# We simply fetch master and, if it exists, stable branches. A branch is -# created, pointing to the detached HEAD. before_script: + # The checkout made by Travis is a "detached HEAD" and branches + # information is missing. Our Erlang.mk's git_rmq fetch method relies + # on it, so we need to restore it. + # + # We simply fetch master and, if it exists, stable branches. A branch + # is created, pointing to the detached HEAD. - | git checkout -B "${TRAVIS_TAG:-${TRAVIS_BRANCH}}" git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git git fetch upstream stable:stable || : git fetch upstream master:master || : + # Remove all kiex installations. This makes sure that the Erlang + # Solutions one is picked: it's after the kiex installations in $PATH. + - echo YES | kiex implode script: - if test "${GROUP}" = '1'; then make tests; fi @@ -1,7 +1,126 @@ PROJECT = rabbit -VERSION ?= $(call get_app_version,src/$(PROJECT).app.src) +PROJECT_DESCRIPTION = RabbitMQ +PROJECT_MOD = rabbit +PROJECT_REGISTERED = rabbit_amqqueue_sup \ + rabbit_direct_client_sup \ + rabbit_log \ + rabbit_node_monitor \ + rabbit_router + +define PROJECT_ENV +[ + {tcp_listeners, [5672]}, + {num_tcp_acceptors, 10}, + {ssl_listeners, []}, + {num_ssl_acceptors, 1}, + {ssl_options, []}, + {vm_memory_high_watermark, 0.4}, + {vm_memory_high_watermark_paging_ratio, 0.5}, + {memory_monitor_interval, 2500}, + {disk_free_limit, 50000000}, %% 50MB + {msg_store_index_module, rabbit_msg_store_ets_index}, + {backing_queue_module, rabbit_variable_queue}, + %% 0 ("no limit") would make a better default, but that + %% breaks the QPid Java client + {frame_max, 131072}, + {channel_max, 0}, + {heartbeat, 60}, + {msg_store_file_size_limit, 16777216}, + {fhc_write_buffering, true}, + {fhc_read_buffering, false}, + {queue_index_max_journal_entries, 32768}, + {queue_index_embed_msgs_below, 4096}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_user_tags, [administrator]}, + {default_vhost, <<"/">>}, + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {loopback_users, [<<"guest">>]}, + {password_hashing_module, rabbit_password_hashing_sha256}, + {server_properties, []}, + {collect_statistics, none}, + {collect_statistics_interval, 5000}, + {mnesia_table_loading_retry_timeout, 30000}, + {mnesia_table_loading_retry_limit, 10}, + {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, + {auth_backends, [rabbit_auth_backend_internal]}, + {delegate_count, 16}, + {trace_vhosts, []}, + {log_levels, [{connection, info}]}, + {ssl_cert_login_from, distinguished_name}, + {ssl_handshake_timeout, 5000}, + {ssl_allow_poodle_attack, false}, + {handshake_timeout, 10000}, + {reverse_dns_lookups, false}, + {cluster_partition_handling, ignore}, + {cluster_keepalive_interval, 10000}, + {tcp_listen_options, [{backlog, 128}, + {nodelay, true}, + {linger, {true, 0}}, + {exit_on_close, false} + ]}, + {halt_on_upgrade_failure, true}, + {hipe_compile, false}, + %% see bug 24513 for how this list was created + {hipe_modules, + [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange, + rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic, + rabbit_event, lists, queue, priority_queue, rabbit_router, + rabbit_trace, rabbit_misc, rabbit_binary_parser, + rabbit_exchange_type_direct, rabbit_guid, rabbit_net, + rabbit_amqqueue_process, rabbit_variable_queue, + rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, + sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, + rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets, + file_handle_cache, rabbit_msg_store, array, + rabbit_msg_store_ets_index, rabbit_msg_file, + rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, + pmon, ssl_connection, tls_connection, ssl_record, tls_record, + gen_fsm, ssl]}, + {ssl_apps, [asn1, crypto, public_key, ssl]}, + %% see rabbitmq-server#114 + {mirroring_flow_control, true}, + {mirroring_sync_batch_size, 4096}, + %% see rabbitmq-server#227 and related tickets. + %% msg_store_credit_disc_bound only takes effect when + %% messages are persisted to the message store. If messages + %% are embedded on the queue index, then modifying this + %% setting has no effect because credit_flow is not used when + %% writing to the queue index. See the setting + %% queue_index_embed_msgs_below above. + {msg_store_credit_disc_bound, {2000, 500}}, + {msg_store_io_batch_size, 2048}, + %% see rabbitmq-server#143 + %% and rabbitmq-server#949 + {credit_flow_default_credit, {200, 100}}, + %% see rabbitmq-server#248 + %% and rabbitmq-server#667 + {channel_operation_timeout, 15000}, + + %% see rabbitmq-server#486 + {peer_discovery_backend, rabbit_peer_discovery_classic_config}, + %% used by rabbit_peer_discovery_classic_config + {cluster_nodes, {[], disc}}, + + {config_entry_decoder, [{cipher, aes_cbc256}, + {hash, sha512}, + {iterations, 1000}, + {passphrase, undefined} + ]}, + + %% rabbitmq-server-973 + {lazy_queue_explicit_gc_run_operation_threshold, 250}, + {background_gc_enabled, true}, + {background_gc_target_interval, 60000} + ] +endef -DEPS = ranch lager rabbit_common rabbitmq_cli +# FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore +# indirect dependencies of rabbit. +LOCAL_DEPS = sasl mnesia os_mon xmerl goldrush jsx +BUILD_DEPS = rabbitmq_cli +DEPS = ranch lager rabbit_common TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) rabbitmq-cli-integration diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 071385e8e7..c05a66fe13 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -5,6 +5,27 @@ ifeq ($(.DEFAULT_GOAL),) .DEFAULT_GOAL = all endif +# PROJECT_VERSION defaults to: +# 1. the version exported by rabbitmq-server-release; +# 2. the version stored in `git-revisions.txt`, if it exists; +# 3. a version based on git-describe(1), if it is a Git clone; +# 4. 0.0.0 + +PROJECT_VERSION := $(RABBITMQ_VERSION) + +ifeq ($(PROJECT_VERSION),) +PROJECT_VERSION := $(shell \ +if test -f git-revisions.txt; then \ + head -n1 git-revisions.txt | \ + awk '{print $$$(words $(PROJECT_DESCRIPTION) version);}'; \ +else \ + (git describe --dirty --abbrev=7 --tags --always --first-parent \ + 2>/dev/null || echo rabbitmq_v0_0_0) | \ + sed -e 's/^rabbitmq_v//' -e 's/^v//' -e 's/_/./g' -e 's/-/+/' \ + -e 's/-/./g'; \ +fi) +endif + # -------------------------------------------------------------------- # RabbitMQ components. # -------------------------------------------------------------------- @@ -79,9 +100,9 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # all projects use the same versions. It avoids conflicts and makes it # possible to work with rabbitmq-public-umbrella. -dep_cowboy_commit = 1.0.3 +dep_cowboy_commit = 1.0.4 dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2 -dep_ranch_commit = 1.2.1 +dep_ranch_commit = 1.3.0 dep_webmachine_commit = 1.10.8p2 RABBITMQ_COMPONENTS = amqp_client \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b85b4348c1..33369616ef 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -106,7 +106,7 @@ else fi if [ ! -d ${RABBITMQ_SCHEMA_DIR} ]; then - mkdir "${RABBITMQ_SCHEMA_DIR}" + mkdir -p "${RABBITMQ_SCHEMA_DIR}" fi if [ ! -f "${RABBITMQ_SCHEMA_DIR}/rabbitmq.schema" ]; then diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index c9e404db46..347a5e62ae 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -269,6 +269,10 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"! -comment "Multi-protocol open source messaging broker" ^
-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
+if ERRORLEVEL 1 (
+ EXIT /B 1
+)
+
goto END
diff --git a/src/delegate.erl b/src/delegate.erl deleted file mode 100644 index 778137c1c7..0000000000 --- a/src/delegate.erl +++ /dev/null @@ -1,269 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% - --module(delegate). - -%% delegate is an alternative way of doing remote calls. Compared to -%% the rpc module, it reduces inter-node communication. For example, -%% if a message is routed to 1,000 queues on node A and needs to be -%% propagated to nodes B and C, it would be nice to avoid doing 2,000 -%% remote casts to queue processes. -%% -%% An important issue here is preserving order - we need to make sure -%% that messages from a certain channel to a certain queue take a -%% consistent route, to prevent them being reordered. In fact all -%% AMQP-ish things (such as queue declaration results and basic.get) -%% must take the same route as well, to ensure that clients see causal -%% ordering correctly. Therefore we have a rather generic mechanism -%% here rather than just a message-reflector. That's also why we pick -%% the delegate process to use based on a hash of the source pid. -%% -%% When a function is invoked using delegate:invoke/2, delegate:call/2 -%% or delegate:cast/2 on a group of pids, the pids are first split -%% into local and remote ones. Remote processes are then grouped by -%% node. The function is then invoked locally and on every node (using -%% gen_server2:multi/4) as many times as there are processes on that -%% node, sequentially. -%% -%% Errors returned when executing functions on remote nodes are re-raised -%% in the caller. -%% -%% RabbitMQ starts a pool of delegate processes on boot. The size of -%% the pool is configurable, the aim is to make sure we don't have too -%% few delegates and thus limit performance on many-CPU machines. - --behaviour(gen_server2). - --export([start_link/1, invoke_no_result/2, invoke/2, - monitor/2, demonitor/1, call/2, cast/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {node, monitors, name}). - -%%---------------------------------------------------------------------------- - --export_type([monitor_ref/0]). - --type monitor_ref() :: reference() | {atom(), pid()}. --type fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}. - --spec start_link - (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}. --spec invoke - ( pid(), fun_or_mfa(A)) -> A; - ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], [{pid(), term()}]}. --spec invoke_no_result(pid() | [pid()], fun_or_mfa(any())) -> 'ok'. --spec monitor('process', pid()) -> monitor_ref(). --spec demonitor(monitor_ref()) -> 'true'. - --spec call - ( pid(), any()) -> any(); - ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}. --spec cast(pid() | [pid()], any()) -> 'ok'. - -%%---------------------------------------------------------------------------- - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -%%---------------------------------------------------------------------------- - -start_link(Num) -> - Name = delegate_name(Num), - gen_server2:start_link({local, Name}, ?MODULE, [Name], []). - -invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> - apply1(FunOrMFA, Pid); -invoke(Pid, FunOrMFA) when is_pid(Pid) -> - case invoke([Pid], FunOrMFA) of - {[{Pid, Result}], []} -> - Result; - {[], [{Pid, {Class, Reason, StackTrace}}]} -> - erlang:raise(Class, Reason, StackTrace) - end; - -invoke([], _FunOrMFA) -> %% optimisation - {[], []}; -invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, FunOrMFA) of - {ok, _, Result} -> {[{Pid, Result}], []}; - {error, _, Error} -> {[], [{Pid, Error}]} - end; -invoke(Pids, FunOrMFA) when is_list(Pids) -> - {LocalPids, Grouped} = group_pids_by_node(Pids), - %% The use of multi_call is only safe because the timeout is - %% infinity, and thus there is no process spawned in order to do - %% the sending. Thus calls can't overtake preceding calls/casts. - {Replies, BadNodes} = - case orddict:fetch_keys(Grouped) of - [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(self(), RemoteNodes), - {invoke, FunOrMFA, Grouped}, infinity) - end, - BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || - BadNode <- BadNodes, - Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | - [Results || {_Node, Results} <- Replies]]), - lists:foldl( - fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; - ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} - end, {[], BadPids}, ResultsNoNode). - -invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> - _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error - ok; -invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> - invoke_no_result([Pid], FunOrMFA); - -invoke_no_result([], _FunOrMFA) -> %% optimisation - ok; -invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation - _ = safe_invoke(Pid, FunOrMFA), %% must not die - ok; -invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> - {LocalPids, Grouped} = group_pids_by_node(Pids), - case orddict:fetch_keys(Grouped) of - [] -> ok; - RemoteNodes -> gen_server2:abcast( - RemoteNodes, delegate(self(), RemoteNodes), - {invoke, FunOrMFA, Grouped}) - end, - _ = safe_invoke(LocalPids, FunOrMFA), %% must not die - ok. - -monitor(process, Pid) when node(Pid) =:= node() -> - erlang:monitor(process, Pid); -monitor(process, Pid) -> - Name = delegate(Pid, [node(Pid)]), - gen_server2:cast(Name, {monitor, self(), Pid}), - {Name, Pid}. - -demonitor(Ref) when is_reference(Ref) -> - erlang:demonitor(Ref); -demonitor({Name, Pid}) -> - gen_server2:cast(Name, {demonitor, self(), Pid}). - -call(PidOrPids, Msg) -> - invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). - -cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). - -%%---------------------------------------------------------------------------- - -group_pids_by_node(Pids) -> - LocalNode = node(), - lists:foldl( - fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> - {[Pid | Local], Remote}; - (Pid, {Local, Remote}) -> - {Local, - orddict:update( - node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} - end, {[], orddict:new()}, Pids). - -delegate_name(Hash) -> - list_to_atom("delegate_" ++ integer_to_list(Hash)). - -delegate(Pid, RemoteNodes) -> - case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(Pid, - delegate_sup:count(RemoteNodes))), - put(delegate, Name), - Name; - Name -> Name - end. - -safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> - [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; -safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> - try - {ok, Pid, apply1(FunOrMFA, Pid)} - catch Class:Reason -> - {error, Pid, {Class, Reason, erlang:get_stacktrace()}} - end. - -apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); -apply1(Fun, Arg) -> Fun(Arg). - -%%---------------------------------------------------------------------------- - -init([Name]) -> - {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, - hibernate}. - -handle_cast({monitor, MonitoringPid, Pid}, - State = #state{monitors = Monitors}) -> - Monitors1 = case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Pids1 = gb_sets:add_element(MonitoringPid, Pids), - dict:store(Pid, {Ref, Pids1}, Monitors); - error -> - Ref = erlang:monitor(process, Pid), - Pids = gb_sets:singleton(MonitoringPid), - dict:store(Pid, {Ref, Pids}, Monitors) - end, - {noreply, State#state{monitors = Monitors1}, hibernate}; - -handle_cast({demonitor, MonitoringPid, Pid}, - State = #state{monitors = Monitors}) -> - Monitors1 = case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Pids1 = gb_sets:del_element(MonitoringPid, Pids), - case gb_sets:is_empty(Pids1) of - true -> erlang:demonitor(Ref), - dict:erase(Pid, Monitors); - false -> dict:store(Pid, {Ref, Pids1}, Monitors) - end; - error -> - Monitors - end, - {noreply, State#state{monitors = Monitors1}, hibernate}; - -handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> - _ = safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), - {noreply, State, hibernate}. - -handle_info({'DOWN', Ref, process, Pid, Info}, - State = #state{monitors = Monitors, name = Name}) -> - {noreply, - case dict:find(Pid, Monitors) of - {ok, {Ref, Pids}} -> - Msg = {'DOWN', {Name, Pid}, process, Pid, Info}, - gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end, - none, Pids), - State#state{monitors = dict:erase(Pid, Monitors)}; - error -> - State - end, hibernate}; - -handle_info(_Info, State) -> - {noreply, State, hibernate}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl deleted file mode 100644 index ba0964f9dd..0000000000 --- a/src/delegate_sup.erl +++ /dev/null @@ -1,55 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% - --module(delegate_sup). - --behaviour(supervisor). - --export([start_link/1, count/1]). - --export([init/1]). - --define(SERVER, ?MODULE). - -%%---------------------------------------------------------------------------- - --spec start_link(integer()) -> rabbit_types:ok_pid_or_error(). --spec count([node()]) -> integer(). - -%%---------------------------------------------------------------------------- - -start_link(Count) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]). - -count([]) -> - 1; -count([Node | Nodes]) -> - try - length(supervisor:which_children({?SERVER, Node})) - catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> - count(Nodes); - exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown; - R =:= nodedown -> - count(Nodes) - end. - -%%---------------------------------------------------------------------------- - -init([Count]) -> - {ok, {{one_for_one, 10, 10}, - [{Num, {delegate, start_link, [Num]}, - transient, 16#ffffffff, worker, [delegate]} || - Num <- lists:seq(0, Count - 1)]}}. diff --git a/src/rabbit.app.src b/src/rabbit.app.src deleted file mode 100644 index 5f3120b117..0000000000 --- a/src/rabbit.app.src +++ /dev/null @@ -1,123 +0,0 @@ -%% -*- erlang -*- -{application, rabbit, - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "0.0.0"}, - {modules, []}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_router, - rabbit_sup, - rabbit_direct_client_sup]}, - %% FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore - %% indirect dependencies of rabbit. - {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl, jsx]}, -%% we also depend on crypto, public_key and ssl but they shouldn't be -%% in here as we don't actually want to start it - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [5672]}, - {num_tcp_acceptors, 10}, - {ssl_listeners, []}, - {num_ssl_acceptors, 1}, - {ssl_options, []}, - {vm_memory_high_watermark, 0.4}, - {vm_memory_high_watermark_paging_ratio, 0.5}, - {memory_monitor_interval, 2500}, - {disk_free_limit, 50000000}, %% 50MB - {msg_store_index_module, rabbit_msg_store_ets_index}, - {backing_queue_module, rabbit_variable_queue}, - %% 0 ("no limit") would make a better default, but that - %% breaks the QPid Java client - {frame_max, 131072}, - {channel_max, 0}, - {heartbeat, 60}, - {msg_store_file_size_limit, 16777216}, - {fhc_write_buffering, true}, - {fhc_read_buffering, false}, - {queue_index_max_journal_entries, 32768}, - {queue_index_embed_msgs_below, 4096}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_user_tags, [administrator]}, - {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {loopback_users, [<<"guest">>]}, - {password_hashing_module, rabbit_password_hashing_sha256}, - {server_properties, []}, - {collect_statistics, none}, - {collect_statistics_interval, 5000}, - {mnesia_table_loading_retry_timeout, 30000}, - {mnesia_table_loading_retry_limit, 10}, - {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, - {auth_backends, [rabbit_auth_backend_internal]}, - {delegate_count, 16}, - {trace_vhosts, []}, - {log_levels, [{connection, info}]}, - {ssl_cert_login_from, distinguished_name}, - {ssl_handshake_timeout, 5000}, - {ssl_allow_poodle_attack, false}, - {handshake_timeout, 10000}, - {reverse_dns_lookups, false}, - {cluster_partition_handling, ignore}, - {cluster_keepalive_interval, 10000}, - {tcp_listen_options, [{backlog, 128}, - {nodelay, true}, - {linger, {true, 0}}, - {exit_on_close, false}]}, - {halt_on_upgrade_failure, true}, - {hipe_compile, false}, - %% see bug 24513 for how this list was created - {hipe_modules, - [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange, - rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic, - rabbit_event, lists, queue, priority_queue, rabbit_router, - rabbit_trace, rabbit_misc, rabbit_binary_parser, - rabbit_exchange_type_direct, rabbit_guid, rabbit_net, - rabbit_amqqueue_process, rabbit_variable_queue, - rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, - sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, - rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets, - file_handle_cache, rabbit_msg_store, array, - rabbit_msg_store_ets_index, rabbit_msg_file, - rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, - pmon, ssl_connection, tls_connection, ssl_record, tls_record, - gen_fsm, ssl]}, - {ssl_apps, [asn1, crypto, public_key, ssl]}, - %% see rabbitmq-server#114 - {mirroring_flow_control, true}, - {mirroring_sync_batch_size, 4096}, - %% see rabbitmq-server#227 and related tickets. - %% msg_store_credit_disc_bound only takes effect when - %% messages are persisted to the message store. If messages - %% are embedded on the queue index, then modifying this - %% setting has no effect because credit_flow is not used when - %% writing to the queue index. See the setting - %% queue_index_embed_msgs_below above. - {msg_store_credit_disc_bound, {2000, 500}}, - {msg_store_io_batch_size, 2048}, - %% see rabbitmq-server#143 - %% and rabbitmq-server#949 - {credit_flow_default_credit, {200, 100}}, - %% see rabbitmq-server#248 - %% and rabbitmq-server#667 - {channel_operation_timeout, 15000}, - - %% see rabbitmq-server#486 - {peer_discovery_backend, rabbit_peer_discovery_classic_config}, - %% used by rabbit_peer_discovery_classic_config - {cluster_nodes, {[], disc}}, - - {config_entry_decoder, [ - {cipher, aes_cbc256}, - {hash, sha512}, - {iterations, 1000}, - {passphrase, undefined} - ]}, - - %% rabbitmq-server-973 - {lazy_queue_explicit_gc_run_operation_threshold, 250}, - {background_gc_enabled, true}, - {background_gc_target_interval, 60000} - ]}]}. diff --git a/src/rabbit.erl b/src/rabbit.erl index ebc92150eb..e121fb3e2e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -91,6 +91,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_core_metrics, + [{description, "core metrics storage"}, + {mfa, {rabbit_sup, start_child, + [rabbit_metrics]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, @@ -381,15 +388,16 @@ sd_open_port() -> use_stdio, out]). sd_notify_socat(Unit) -> - case sd_open_port() of - {'EXIT', Exit} -> - io:format(standard_error, "Failed to start socat ~p~n", [Exit]), - false; + try sd_open_port() of Port -> Port ! {self(), {command, sd_notify_data()}}, Result = sd_wait_activation(Port, Unit), port_close(Port), Result + catch + Class:Reason -> + io:format(standard_error, "Failed to start socat ~p:~p~n", [Class, Reason]), + false end. sd_current_unit() -> @@ -805,14 +813,25 @@ insert_default_data() -> {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), - ok = rabbit_vhost:add(DefaultVHost), - ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), - ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags), - ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, - DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), + + DefaultUserBin = rabbit_data_coercion:to_binary(DefaultUser), + DefaultPassBin = rabbit_data_coercion:to_binary(DefaultPass), + DefaultVHostBin = rabbit_data_coercion:to_binary(DefaultVHost), + DefaultConfigurePermBin = rabbit_data_coercion:to_binary(DefaultConfigurePerm), + DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm), + DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm), + + ok = rabbit_vhost:add(DefaultVHostBin), + ok = rabbit_auth_backend_internal:add_user( + DefaultUserBin, + DefaultPassBin + ), + ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin,DefaultTags), + ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin, + DefaultVHostBin, + DefaultConfigurePermBin, + DefaultWritePermBin, + DefaultReadPermBin), ok. %%--------------------------------------------------------------------------- @@ -864,7 +883,7 @@ erts_version_check() -> end. print_banner() -> - {ok, Product} = application:get_key(id), + {ok, Product} = application:get_key(description), {ok, Version} = application:get_key(vsn), {LogFmt, LogLocations} = case log_locations() of [_ | Tail] = LL -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25555156d6..8db2a167e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -105,15 +105,16 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [name, + [messages_ready, + messages_unacknowledged, + messages, + reductions, + name, policy, operator_policy, effective_policy_definition, exclusive_consumer_pid, exclusive_consumer_tag, - messages_ready, - messages_unacknowledged, - messages, consumers, consumer_utilisation, memory, @@ -121,7 +122,6 @@ synchronised_slave_pids, recoverable_slaves, state, - reductions, garbage_collection ]). @@ -961,9 +961,13 @@ emit_stats(State) -> emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), - not lists:member(K, ExtraKs)], - rabbit_event:notify(queue_stats, Extra ++ Infos). + [{messages_ready, MR}, {messages_unacknowledged, MU}, {messages, M}, + {reductions, R}, {name, Name} | Infos] = All + = [{K, V} || {K, V} <- infos(statistics_keys(), State), + not lists:member(K, ExtraKs)], + rabbit_core_metrics:queue_stats(Name, Extra ++ Infos), + rabbit_core_metrics:queue_stats(Name, MR, MU, M, R), + rabbit_event:notify(queue_stats, Extra ++ All). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref) -> @@ -978,6 +982,7 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> + rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, @@ -1109,9 +1114,14 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), - PrefetchCount, Args, none), + AckRequired, QName, PrefetchCount, + Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 53b0340b8a..58e6f20cb6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -138,6 +138,7 @@ authz_socket_info_direct(Infos) -> connect1(User, VHost, Protocol, Pid, Infos) -> try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch @@ -156,4 +157,5 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, disconnect(Pid, Infos) -> pg_local:leave(rabbit_direct, Pid), + rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index bb5ae14c3e..22181ce8b7 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -32,9 +32,6 @@ -type category() :: atom(). --spec log(category(), lager:log_level(), string()) -> 'ok'. --spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'. - -spec debug(string()) -> 'ok'. -spec debug(string(), [any()]) -> 'ok'. -spec debug(pid() | [tuple()], string(), [any()]) -> 'ok'. @@ -65,8 +62,10 @@ %%---------------------------------------------------------------------------- +-spec log(category(), lager:log_level(), string()) -> 'ok'. log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). +-spec log(category(), lager:log_level(), string(), [any()]) -> 'ok'. log(Category, Level, Fmt, Args) when is_list(Args) -> Sink = case Category of default -> ?LAGER_SINK; diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl new file mode 100644 index 0000000000..1ea28c2906 --- /dev/null +++ b/src/rabbit_metrics.erl @@ -0,0 +1,53 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_metrics). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + +%%---------------------------------------------------------------------------- +%% Starts the raw metrics storage and owns the ETS tables. +%%---------------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + rabbit_core_metrics:init(), + {ok, none}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 3f65452bdf..b7ba3af732 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/zip.hrl"). --export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3]). +-export([setup/0, active/0, read_enabled/1, list/1, list/2, dependencies/3, running_plugins/0]). -export([ensure/1]). -export([extract_schemas/1]). -export([validate_plugins/1, format_invalid_plugins/1]). @@ -211,6 +211,13 @@ is_loadable(App) -> _ -> false end. + +%% List running plugins along with their version. +-spec running_plugins() -> [{atom(), Vsn :: string()}]. +running_plugins() -> + ActivePlugins = active(), + {ok, [{App, Vsn} || {App, _ , Vsn} <- rabbit_misc:which_applications(), lists:member(App, ActivePlugins)]}. + %%---------------------------------------------------------------------------- prepare_plugins(Enabled) -> @@ -489,7 +496,7 @@ list_free_apps([Dir|Rest]) -> compare_by_name_and_version(#plugin{name = Name, version = VersionA}, #plugin{name = Name, version = VersionB}) -> - ec_semver:lte(VersionA, VersionB); + rabbit_semver:lte(VersionA, VersionB); compare_by_name_and_version(#plugin{name = NameA}, #plugin{name = NameB}) -> NameA =< NameB. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 7a6e290490..59c63022d8 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -41,9 +41,17 @@ memory() -> [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], - Mnesia = mnesia_memory(), + Mnesia = mnesia_memory(), MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]), - MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]), + MetricsETS = ets_memory([rabbit_metrics]), + MetricsProc = try + [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), + M + catch + error:badarg -> + 0 + end, + MgmtDbETS = ets_memory([rabbit_mgmt_storage]), [{total, Total}, {processes, Processes}, @@ -56,7 +64,7 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc, + - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [{total, Total}, {connection_readers, ConnsReader}, @@ -68,6 +76,7 @@ memory() -> {plugins, Plugins}, {other_proc, lists:max([0, OtherProc])}, %% [1] {mnesia, Mnesia}, + {metrics, MetricsETS + MetricsProc}, {mgmt_db, MgmtDbETS + MgmtDbProc}, {msg_index, MsgIndexETS + MsgIndexProc}, {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl new file mode 100644 index 0000000000..b2b0fe3560 --- /dev/null +++ b/test/metrics_SUITE.erl @@ -0,0 +1,376 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved. +%% +-module(metrics_SUITE). +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + connection, + channel, + channel_connection_close, + channel_queue_exchange_consumer_close_connection, + channel_queue_delete_queue, + connection_metric_count_test, + channel_metric_count_test, + queue_metric_count_test, + queue_metric_count_channel_per_queue_test, + connection_metric_idemp_test, + channel_metric_idemp_test, + queue_metric_idemp_test + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, fine}, + {collect_statistics_interval, 500} + ]}). +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +% NB: node_stats tests are in the management_agent repo + +connection_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_count/1, [Config], 25). + +channel_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_count/1, [Config], 25). + +queue_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count/1, [Config], 5). + +queue_metric_count_channel_per_queue_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count_channel_per_queue/1, + [Config], 5). + +connection_metric_idemp_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_idemp/1, [Config], 25). + +channel_metric_idemp_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_idemp/1, [Config], 25). + +queue_metric_idemp_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_idemp/1, [Config], 25). + +prop_connection_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + connection_metric_idemp(Config, N)). + +prop_channel_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + channel_metric_idemp(Config, N)). + +prop_queue_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + queue_metric_idemp(Config, N)). + +prop_connection_metric_count(Config) -> + ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))}, + connection_metric_count(Config, N)). + +prop_channel_metric_count(Config) -> + ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))}, + channel_metric_count(Config, N)). + +prop_queue_metric_count(Config) -> + ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))}, + queue_metric_count(Config, N)). + +prop_queue_metric_count_channel_per_queue(Config) -> + ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))}, + queue_metric_count_channel_per_queue(Config, N)). + +connection_metric_idemp(Config, {N, R}) -> + Conns = [rabbit_ct_client_helpers:open_unmanaged_connection(Config) + || _ <- lists:seq(1, N)], + Table = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], + Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], + % referesh stats 'R' times + [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], + timer:sleep(100), + TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], + TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], + [rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns], + (Table2 == TableAfter2) and (Table == TableAfter) and + (N == length(Table)) and (N == length(TableAfter)). + +channel_metric_idemp(Config, {N, R}) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)], + Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], + Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], + % referesh stats 'R' times + [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], + timer:sleep(100), + TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], + TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], + rabbit_ct_client_helpers:close_connection(Conn), + (Table2 == TableAfter2) and (Table == TableAfter) and + (N == length(Table)) and (N == length(TableAfter)). + +queue_metric_idemp(Config, {N, R}) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queues = + [begin + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + Queue + end || _ <- lists:seq(1, N)], + Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)], + Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)], + % referesh stats 'R' times + ChanTable = read_table_rpc(Config, channel_created), + [[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)], + timer:sleep(100), + TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)], + TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)], + [ delete_queue(Chan, Q) || Q <- Queues], + rabbit_ct_client_helpers:close_connection(Conn), + (Table2 == TableAfter2) and (Table == TableAfter) and + (N == length(Table)) and (N == length(TableAfter)). + +connection_metric_count(Config, Ops) -> + add_rem_counter(Config, Ops, + {fun rabbit_ct_client_helpers:open_unmanaged_connection/1, + fun rabbit_ct_client_helpers:close_connection/1}, + [ connection_created, + connection_metrics, + connection_coarse_metrics ]). + +channel_metric_count(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + Result = add_rem_counter(Config, Ops, + {fun (_Config) -> + {ok, Chan} = amqp_connection:open_channel(Conn), + Chan + end, + fun amqp_channel:close/1}, + [ channel_created, + channel_metrics, + channel_process_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +queue_metric_count(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + AddFun = fun (_) -> + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + Queue + end, + Result = add_rem_counter(Config, Ops, + {AddFun, + fun (Q) -> delete_queue(Chan, Q) end}, + [ channel_queue_metrics, + channel_queue_exchange_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +queue_metric_count_channel_per_queue(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + AddFun = fun (_) -> + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + {Chan, Queue} + end, + Result = add_rem_counter(Config, Ops, + {AddFun, + fun ({Chan, Q}) -> delete_queue(Chan, Q) end}, + [ channel_queue_metrics, + channel_queue_exchange_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> + Things = [ AddFun(Config) || _ <- lists:seq(1, Initial) ], + % either add or remove some things + {FinalLen, Things1} = + lists:foldl(fun(add, {L, Items}) -> + {L+1, [AddFun(Config) | Items]}; + (remove, {L, [H|Tail]}) -> + RemFun(H), + {L-1, Tail}; + (_, S) -> S end, + {Initial, Things}, + Ops), + TabLens = lists:map(fun(T) -> + length(read_table_rpc(Config, T)) end, Tables), + [RemFun(Thing) || Thing <- Things1], + [FinalLen] == lists:usort(TabLens). + + +connection(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + [_] = read_table_rpc(Config, connection_created), + [_] = read_table_rpc(Config, connection_metrics), + [_] = read_table_rpc(Config, connection_coarse_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn), + [] = read_table_rpc(Config, connection_created), + [] = read_table_rpc(Config, connection_metrics), + [] = read_table_rpc(Config, connection_coarse_metrics). + +channel(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + [_] = read_table_rpc(Config, channel_created), + [_] = read_table_rpc(Config, channel_metrics), + [_] = read_table_rpc(Config, channel_process_metrics), + ok = amqp_channel:close(Chan), + [] = read_table_rpc(Config, channel_created), + [] = read_table_rpc(Config, channel_metrics), + [] = read_table_rpc(Config, channel_process_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn). + +channel_connection_close(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, _} = amqp_connection:open_channel(Conn), + [_] = read_table_rpc(Config, channel_created), + [_] = read_table_rpc(Config, channel_metrics), + [_] = read_table_rpc(Config, channel_process_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn), + [] = read_table_rpc(Config, channel_created), + [] = read_table_rpc(Config, channel_metrics), + [] = read_table_rpc(Config, channel_process_metrics). + +channel_queue_delete_queue(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + [_] = read_table_rpc(Config, channel_queue_metrics), + [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + + delete_queue(Chan, Queue), + % ensure removal of queue cleans up channel_queue metrics + [] = read_table_rpc(Config, channel_queue_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn). + +channel_queue_exchange_consumer_close_connection(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + force_channel_stats(Config), + + [_] = read_table_rpc(Config, channel_exchange_metrics), + [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + [_] = read_table_rpc(Config, channel_queue_metrics), + + Sub = #'basic.consume'{queue = Queue}, + #'basic.consume_ok'{consumer_tag = _} = + amqp_channel:call(Chan, Sub), + + [_] = read_table_rpc(Config, consumer_created), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + % ensure cleanup happened + [] = read_table_rpc(Config, channel_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_metrics), + [] = read_table_rpc(Config, consumer_created). + + +%% ------------------------------------------------------------------- +%% Utilities +%% ------------------------------------------------------------------- + +declare_queue(Chan) -> + Declare = #'queue.declare'{durable = false, auto_delete = true}, + #'queue.declare_ok'{queue = Name} = amqp_channel:call(Chan, Declare), + Name. + +delete_queue(Chan, Name) -> + Delete = #'queue.delete'{queue = Name}, + #'queue.delete_ok'{} = amqp_channel:call(Chan, Delete). + +ensure_exchange_metrics_populated(Chan, RoutingKey) -> + % need to publish for exchange metrics to be populated + Publish = #'basic.publish'{routing_key = RoutingKey}, + amqp_channel:call(Chan, Publish, #amqp_msg{payload = <<"hello">>}). + +ensure_channel_queue_metrics_populated(Chan, Queue) -> + % need to get and wait for timer for channel queue metrics to be populated + Get = #'basic.get'{queue = Queue, no_ack=true}, + {#'basic.get_ok'{}, #amqp_msg{}} = amqp_channel:call(Chan, Get). + +force_channel_stats(Config) -> + [ Pid ! emit_stats || {Pid, _} <- read_table_rpc(Config, channel_created) ], + timer:sleep(100). + +read_table_rpc(Config, Table) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]). + +read_table(Table) -> + ets:tab2list(Table). + diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl index e00c015d02..3f0ec419c2 100644 --- a/test/partitions_SUITE.erl +++ b/test/partitions_SUITE.erl @@ -33,6 +33,9 @@ %% It's a lot, but still better than timetrap_timeout -define(AWAIT_TIMEOUT, 300000). +suite() -> + [{timetrap, {minutes, 60}}]. + all() -> [ {group, net_ticktime_1}, diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index 569c7a88fa..aee815aee4 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -2885,10 +2885,13 @@ channel_statistics1(_Config) -> dummy_event_receiver:start(self(), [node()], [channel_stats]), %% Check stats empty - Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end), - [] = proplists:get_value(channel_queue_stats, Event), - [] = proplists:get_value(channel_exchange_stats, Event), - [] = proplists:get_value(channel_queue_exchange_stats, Event), + Check1 = fun() -> + [] = ets:match(channel_queue_metrics, {Ch, QRes}), + [] = ets:match(channel_exchange_metrics, {Ch, X}), + [] = ets:match(channel_queue_exchange_metrics, + {Ch, {QRes, X}}) + end, + test_ch_metrics(Check1, ?TIMEOUT), %% Publish and get a message rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, @@ -2897,46 +2900,44 @@ channel_statistics1(_Config) -> rabbit_channel:do(Ch, #'basic.get'{queue = QName}), %% Check the stats reflect that - Event2 = test_ch_statistics_receive_event( - Ch, - fun (E) -> - length(proplists:get_value( - channel_queue_exchange_stats, E)) > 0 - end), - [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), - [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QRes,X},[{publish,1}]}] = - proplists:get_value(channel_queue_exchange_stats, Event2), + Check2 = fun() -> + [{{Ch, QRes}, 1, 0, 0, 0, 0, 0}] = ets:lookup( + channel_queue_metrics, + {Ch, QRes}), + [{{Ch, X}, 1, 0, 0}] = ets:lookup( + channel_exchange_metrics, + {Ch, X}), + [{{Ch, {QRes, X}}, 1}] = ets:lookup( + channel_queue_exchange_metrics, + {Ch, {QRes, X}}) + end, + test_ch_metrics(Check2, ?TIMEOUT), %% Check the stats remove stuff on queue deletion rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), - Event3 = test_ch_statistics_receive_event( - Ch, - fun (E) -> - length(proplists:get_value( - channel_queue_exchange_stats, E)) == 0 - end), - - [] = proplists:get_value(channel_queue_stats, Event3), - [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), - [] = proplists:get_value(channel_queue_exchange_stats, Event3), + Check3 = fun() -> + [] = ets:lookup(channel_queue_metrics, {Ch, QRes}), + [{{Ch, X}, 1, 0, 0}] = ets:lookup( + channel_exchange_metrics, + {Ch, X}), + [] = ets:lookup(channel_queue_exchange_metrics, + {Ch, {QRes, X}}) + end, + test_ch_metrics(Check3, ?TIMEOUT), rabbit_channel:shutdown(Ch), dummy_event_receiver:stop(), passed. -test_ch_statistics_receive_event(Ch, Matcher) -> - rabbit_channel:flush(Ch), - Ch ! emit_stats, - test_ch_statistics_receive_event1(Ch, Matcher). - -test_ch_statistics_receive_event1(Ch, Matcher) -> - receive #event{type = channel_stats, props = Props} -> - case Matcher(Props) of - true -> Props; - _ -> test_ch_statistics_receive_event1(Ch, Matcher) - end - after ?TIMEOUT -> throw(failed_to_receive_event) +test_ch_metrics(Fun, Timeout) when Timeout =< 0 -> + Fun(); +test_ch_metrics(Fun, Timeout) -> + try + Fun() + catch + _:{badmatch, _} -> + timer:sleep(1000), + test_ch_metrics(Fun, Timeout - 1000) end. head_message_timestamp_statistics(Config) -> |
