summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml17
-rw-r--r--Makefile23
-rw-r--r--docs/rabbitmq.conf.example159
-rw-r--r--priv/schema/rabbit.schema107
-rw-r--r--rabbitmq-components.mk1
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl202
-rw-r--r--src/rabbit_binding.erl73
-rw-r--r--src/rabbit_channel.erl59
-rw-r--r--src/rabbit_fifo.erl208
-rw-r--r--src/rabbit_fifo_client.erl20
-rw-r--r--src/rabbit_queue_consumers.erl75
-rw-r--r--src/rabbit_queue_location_min_masters.erl70
-rw-r--r--src/rabbit_quorum_queue.erl23
-rw-r--r--src/rabbit_sysmon_handler.erl230
-rw-r--r--src/rabbit_sysmon_minder.erl156
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets845
-rw-r--r--test/per_vhost_connection_limit_partitions_SUITE.erl19
-rw-r--r--test/quorum_queue_SUITE.erl23
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl12
-rw-r--r--test/single_active_consumer_SUITE.erl286
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl77
-rw-r--r--test/unit_queue_consumers_SUITE.erl102
25 files changed, 2427 insertions, 394 deletions
diff --git a/.travis.yml b/.travis.yml
index 9237d9632a..6ccdbab1ce 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,5 +1,6 @@
# vim:sw=2:et:
+dist: xenial
sudo: false
language: erlang
notifications:
@@ -10,15 +11,8 @@ notifications:
on_failure: always
addons:
apt:
- sources:
- - sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib
- key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
- awscli
- # Use Elixir from Erlang Solutions. The provided Elixir is
- # installed with kiex but is old. We also can't use kiex to
- # install a newer one because of GitHub API rate limiting.
- - elixir=1.6.0-1
cache:
apt: true
env:
@@ -27,10 +21,10 @@ env:
- secure: L1t0CHGR4RzOXwtkpM6feRKax95rszScBLqzjstEiMPkhjTsYTlAecnNxx6lTrGMnk5hQoi4PtbhmyZOX0siHTngTogoA/Nyn8etYzicU5ZO+qmBQOYpegz51lEu70ewXgkhEHzk9DtEPxfYviH9WiILrdUVRXXgZpoXq13p1QA=
otp_release:
- - "19.3"
- - "20.3"
+ - "21.2"
before_script:
+ - elixir --version
# The checkout made by Travis is a "detached HEAD" and branches
# information is missing. Our Erlang.mk's git_rmq fetch method relies
# on it, so we need to restore it.
@@ -42,11 +36,6 @@ before_script:
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream v3.8.x:v3.8.x || :
git fetch upstream master:master || :
- # Make sure we use Elixir from Erlang Solutions and not kiex.
- - |
- echo YES | kiex implode
- elixir --version
- elixir --version | grep -q 'Elixir 1.6.0'
script:
- make xref
diff --git a/Makefile b/Makefile
index e26f32d89f..b2bca3d50b 100644
--- a/Makefile
+++ b/Makefile
@@ -130,13 +130,15 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
- {channel_queue_cleanup_interval, 60000}
+ {channel_queue_cleanup_interval, 60000},
+ %% Default max message size is 128 MB
+ {max_message_size, 134217728}
]
endef
LOCAL_DEPS = sasl mnesia os_mon inets
BUILD_DEPS = rabbitmq_cli syslog
-DEPS = ranch lager rabbit_common ra
+DEPS = ranch lager rabbit_common ra sysmon_handler
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
dep_syslog = git https://github.com/schlagert/syslog 3.4.5
@@ -186,13 +188,28 @@ tests:: bats
SLOW_CT_SUITES := backing_queue \
cluster_rename \
clustering_management \
+ config_schema \
dynamic_ha \
eager_sync \
health_check \
+ lazy_queue \
+ metrics \
+ msg_store \
partitions \
+ per_user_connection_tracking \
+ per_vhost_connection_limit \
+ per_vhost_msg_store \
+ per_vhost_queue_limit \
+ policy \
priority_queue \
queue_master_location \
- simple_ha
+ quorum_queue \
+ rabbit_core_metrics_gc \
+ rabbit_fifo_prop \
+ simple_ha \
+ sync_detection \
+ unit_inbroker_parallel \
+ vhost
FAST_CT_SUITES := $(filter-out $(sort $(SLOW_CT_SUITES)),$(CT_SUITES))
ct-fast: CT_SUITES = $(FAST_CT_SUITES)
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index a62ed38291..b82956a267 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -470,7 +470,7 @@
## Disabling background GC may reduce latency for client operations,
## keeping it enabled may reduce median RAM usage by the binary heap
## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html).
-##
+##
## Before trying this option, please take a look at the memory
## breakdown (http://www.rabbitmq.com/memory-use.html).
##
@@ -533,18 +533,49 @@
##
# management.http_log_dir = /path/to/access.log
-## Change the port on which the HTTP listener listens,
-## specifying an interface for the web server to bind to.
-## Also set the listener to use TLS and provide TLS options.
-##
+## HTTP listener and embedded Web server settings.
+# ## See https://rabbitmq.com/management.html for details.
+#
+# management.tcp.port = 15672
+# management.tcp.ip = 0.0.0.0
+#
+# management.tcp.shutdown_timeout = 7000
+# management.tcp.max_keepalive = 120
+# management.tcp.idle_timeout = 120
+# management.tcp.inactivity_timeout = 120
+# management.tcp.request_timeout = 120
+# management.tcp.compress = true
+
+## HTTPS listener settings.
+## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details.
+##
+# management.ssl.port = 15671
+# management.ssl.cacertfile = /path/to/ca_certificate.pem
+# management.ssl.certfile = /path/to/server_certificate.pem
+# management.ssl.keyfile = /path/to/server_key.pem
+
+## More TLS options
+# management.ssl.honor_cipher_order = true
+# management.ssl.honor_ecc_order = true
+# management.ssl.client_renegotiation = false
+# management.ssl.secure_renegotiate = true
+
+## Supported TLS versions
+# management.ssl.versions.1 = tlsv1.2
+# management.ssl.versions.2 = tlsv1.1
+
+## Cipher suites the server is allowed to use
+# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
+# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
+# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
+# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384
+# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
+# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
+# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384
+# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384
+# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384
-# management.listener.port = 15672
-# management.listener.ip = 127.0.0.1
-# management.listener.ssl = true
-# management.listener.ssl_opts.cacertfile = /path/to/cacert.pem
-# management.listener.ssl_opts.certfile = /path/to/cert.pem
-# management.listener.ssl_opts.keyfile = /path/to/key.pem
## One of 'basic', 'detailed' or 'none'. See
## http://rabbitmq.com/management.html#fine-stats for more details.
@@ -583,13 +614,39 @@
# STOMP section
# =======================================
-## Network Configuration. The format is generally the same as for the core broker.
+## See https://rabbitmq.com/stomp.html for details.
+
+## TCP listeners.
+##
+# stomp.listeners.tcp.1 = 127.0.0.1:61613
+# stomp.listeners.tcp.2 = ::1:61613
+
+## TCP listener settings
##
-# stomp.listeners.tcp.default = 61613
+# stomp.tcp_listen_options.backlog = 2048
+# stomp.tcp_listen_options.recbuf = 131072
+# stomp.tcp_listen_options.sndbuf = 131072
+#
+# stomp.tcp_listen_options.keepalive = true
+# stomp.tcp_listen_options.nodelay = true
+#
+# stomp.tcp_listen_options.exit_on_close = true
+# stomp.tcp_listen_options.send_timeout = 120
-## Same for ssl listeners
+## Proxy protocol support
##
+# stomp.proxy_protocol = false
+
+## TLS listeners
+## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details.
# stomp.listeners.ssl.default = 61614
+#
+# ssl_options.cacertfile = path/to/cacert.pem
+# ssl_options.certfile = path/to/cert.pem
+# ssl_options.keyfile = path/to/key.pem
+# ssl_options.verify = verify_peer
+# ssl_options.fail_if_no_peer_cert = true
+
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
@@ -642,6 +699,52 @@
# MQTT section
# =======================================
+## TCP listener settings.
+##
+# mqtt.listeners.tcp.1 = 127.0.0.1:61613
+# mqtt.listeners.tcp.2 = ::1:61613
+
+## TCP listener options (as per the broker configuration).
+##
+# mqtt.tcp_listen_options.backlog = 4096
+# mqtt.tcp_listen_options.recbuf = 131072
+# mqtt.tcp_listen_options.sndbuf = 131072
+#
+# mqtt.tcp_listen_options.keepalive = true
+# mqtt.tcp_listen_options.nodelay = true
+#
+# mqtt.tcp_listen_options.exit_on_close = true
+# mqtt.tcp_listen_options.send_timeout = 120
+
+## TLS listener settings
+## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
+#
+# mqtt.listeners.ssl.default = 8883
+#
+# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
+# ssl_options.certfile = /path/to/tls/server_certificate.pem
+# ssl_options.keyfile = /path/to/tls/server_key.pem
+# ssl_options.verify = verify_peer
+# ssl_options.fail_if_no_peer_cert = true
+#
+
+
+## Number of Erlang processes that will accept connections for the TCP
+## and TLS listeners.
+##
+# mqtt.num_acceptors.tcp = 10
+# mqtt.num_acceptors.ssl = 10
+
+## Whether or not to enable proxy protocol support.
+## Once enabled, clients cannot directly connect to the broker
+## anymore. They must connect through a load balancer that sends the
+## proxy protocol header to the broker at connection time.
+## This setting applies only to STOMP clients, other protocols
+## like STOMP or AMQP have their own setting to enable proxy protocol.
+## See the plugins or broker documentation for more information.
+##
+# mqtt.proxy_protocol = false
+
## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
@@ -672,34 +775,6 @@
##
# mqtt.prefetch = 10
-## TCP/SSL Configuration (as per the broker configuration).
-##
-# mqtt.listeners.tcp.default = 1883
-
-## Same for ssl listener
-##
-# mqtt.listeners.ssl.default = 1884
-
-## Number of Erlang processes that will accept connections for the TCP
-## and TLS listeners.
-##
-# mqtt.num_acceptors.tcp = 10
-# mqtt.num_acceptors.ssl = 10
-
-## TCP listener options (as per the broker configuration).
-##
-# mqtt.tcp_listen_options.backlog = 128
-# mqtt.tcp_listen_options.nodelay = true
-
-## Whether or not to enable proxy protocol support.
-## Once enabled, clients cannot directly connect to the broker
-## anymore. They must connect through a load balancer that sends the
-## proxy protocol header to the broker at connection time.
-## This setting applies only to STOMP clients, other protocols
-## like STOMP or AMQP have their own setting to enable proxy protocol.
-## See the plugins or broker documentation for more information.
-##
-# mqtt.proxy_protocol = false
## ----------------------------------------------------------------------------
## RabbitMQ AMQP 1.0 Support
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 5c6078a413..da28989c37 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -258,7 +258,7 @@ end}.
{translation, "rabbit.ssl_options.ciphers",
fun(Conf) ->
Settings = cuttlefish_variable:filter_by_prefix("ssl_options.ciphers", Conf),
- [V || {_, V} <- Settings]
+ lists:reverse([V || {_, V} <- Settings])
end}.
%% ===========================================================================
@@ -554,6 +554,9 @@ end}.
}.
+{mapping, "msx_message_size", "rabbit.max_message_size",
+ [{datatype, integer}, {validators, ["less_then_512MB"]}]}.
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@@ -1352,6 +1355,103 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
+% ==========================
+% sysmon_handler section
+% ==========================
+
+%% @doc The threshold at which to warn about the number of processes
+%% that are overly busy. Processes with large heaps or that take a
+%% long time to garbage collect will count toward this threshold.
+{mapping, "sysmon_handler.thresholds.busy_processes", "sysmon_handler.process_limit", [
+ {default, 30},
+ {datatype, integer},
+ hidden
+]}.
+
+%% @doc The threshold at which to warn about the number of ports that
+%% are overly busy. Ports with full input buffers count toward this
+%% threshold.
+{mapping, "sysmon_handler.thresholds.busy_ports", "sysmon_handler.port_limit", [
+ {default, 2},
+ {datatype, integer},
+ hidden
+]}.
+
+%% @doc A process will become busy when it exceeds this amount of time
+%% doing garbage collection.
+%%
+%% NOTE: Enabling this setting can cause performance problems on
+%% multi-core systems.
+%% @see sysmon_handler.thresholds.busy_processes
+{mapping, "sysmon_handler.triggers.process.garbage_collection", "sysmon_handler.gc_ms_limit", [
+ {default, off},
+ {datatype, [{atom, off},
+ {duration, ms}]},
+ hidden
+]}.
+
+{translation, "sysmon_handler.gc_ms_limit",
+ fun(Conf) ->
+ case cuttlefish:conf_get("sysmon_handler.triggers.process.garbage_collection", Conf) of
+ off -> 0;
+ Int -> Int
+ end
+ end}.
+
+%% @doc A process will become busy when it exceeds this amount of time
+%% during a single process scheduling & execution cycle.
+{mapping, "sysmon_handler.triggers.process.long_scheduled_execution", "sysmon_handler.schedule_ms_limit", [
+ {default, off},
+ {datatype, [{atom, off},
+ {duration, ms}]},
+ hidden
+]}.
+
+{translation, "sysmon_handler.schedule_ms_limit",
+ fun(Conf) ->
+ case cuttlefish:conf_get("sysmon_handler.triggers.process.long_scheduled_execution", Conf) of
+ off -> 0;
+ Int -> Int
+ end
+ end}.
+
+%% @doc A process will become busy when its heap exceeds this size.
+%% @see sysmon_handler.thresholds.busy_processes
+{mapping, "sysmon_handler.triggers.process.heap_size", "sysmon_handler.heap_word_limit", [
+ {default, "160444000"},
+ {datatype, [bytesize, {atom, off}]},
+ hidden
+]}.
+
+{translation, "sysmon_handler.heap_word_limit",
+ fun(Conf) ->
+ case cuttlefish:conf_get("sysmon_handler.triggers.process.heap_size", Conf) of
+ off -> 0;
+ Bytes ->
+ WordSize = erlang:system_info(wordsize),
+ Bytes div WordSize
+ end
+end}.
+
+%% @doc Whether ports with full input buffers will be counted as
+%% busy. Ports can represent open files or network sockets.
+%% @see sysmon_handler.thresholds.busy_ports
+{mapping, "sysmon_handler.triggers.port", "sysmon_handler.busy_port", [
+ {default, on},
+ {datatype, flag},
+ hidden
+]}.
+
+%% @doc Whether distribution ports with full input buffers will be
+%% counted as busy. Distribution ports connect Erlang nodes within a
+%% single cluster.
+%% @see sysmon_handler.thresholds.busy_ports
+{mapping, "sysmon_handler.triggers.distribution_port", "sysmon_handler.busy_dist_port", [
+ {default, on},
+ {datatype, flag},
+ hidden
+]}.
+
% ===============================
% Validators
% ===============================
@@ -1361,6 +1461,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.
+{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
+fun(Size) when is_integer(Size) ->
+ Size > 0 andalso Size < 536870912
+end}.
+
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 1bec3c0942..6d43e68ac7 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -115,6 +115,7 @@ dep_lager = hex 3.6.5
dep_ra = git https://github.com/rabbitmq/ra.git master
dep_ranch = hex 1.7.1
dep_recon = hex 2.3.6
+dep_sysmon_handler = hex 1.0.0
RABBITMQ_COMPONENTS = amqp_client \
amqp10_common \
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3401391b09..2a37d0ba75 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -156,6 +156,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+-rabbit_boot_step({rabbit_sysmon_minder,
+ [{description, "sysmon_handler supervisor"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_sysmon_minder]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
@@ -225,7 +232,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
+-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]).
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7dc343125e..ba4dd74259 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
--export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
not_found -> Q1 = rabbit_policy:set(Q),
Q2 = Q1#amqqueue{state = live},
ok = store_queue(Q2),
- B = add_default_binding(Q2),
- fun () -> B(), {created, Q2} end;
+ fun () -> {created, Q2} end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
end;
[ExistingQ] ->
@@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
-add_default_binding(#amqqueue{name = QueueName}) ->
- ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
- RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{source = ExchangeName,
- destination = QueueName,
- key = RoutingKey,
- args = []},
- ?INTERNAL_USER).
-
lookup([]) -> []; %% optimisation
lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
@@ -666,6 +656,7 @@ declare_args() ->
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
+ {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
{<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}].
@@ -708,6 +699,12 @@ check_max_priority_arg({Type, Val}, Args) ->
Error -> Error
end.
+check_single_active_consumer_arg({Type, Val}, Args) ->
+ case check_bool_arg({Type, Val}, Args) of
+ ok -> ok;
+ Error -> Error
+ end.
+
check_default_quorum_initial_group_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
@@ -757,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
+list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
+
list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 52925ce165..37ee8d0a15 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -36,8 +36,8 @@
-record(q, {
%% an #amqqueue record
q,
- %% none | {exclusive consumer channel PID, consumer tag}
- exclusive_consumer,
+ %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
+ active_consumer,
%% Set to true if a queue has ever had a consumer.
%% This is used to determine when to delete auto-delete queues.
has_had_consumers,
@@ -94,7 +94,9 @@
%% example.
mirroring_policy_version = 0,
%% running | flow | idle
- status
+ status,
+ %% true | false
+ single_active_consumer_on
}).
%%----------------------------------------------------------------------------
@@ -155,15 +157,20 @@ init(Q) ->
?MODULE}.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- consumers = rabbit_queue_consumers:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0,
- overflow = 'drop-head'},
+ SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end,
+ State = #q{q = Q,
+ active_consumer = none,
+ has_had_consumers = false,
+ consumers = rabbit_queue_consumers:new(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0,
+ overflow = 'drop-head',
+ single_active_consumer_on = SingleActiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -545,7 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{consumers = Consumers}) ->
+assert_invariant(#q{single_active_consumer_on = true}) ->
+ %% queue may contain messages and have available consumers with exclusive consumer
+ ok;
+assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -619,7 +629,8 @@ run_message_queue(ActiveConsumersChanged, State) ->
true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
- qname(State), State#q.consumers) of
+ qname(State), State#q.consumers,
+ State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
@@ -645,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
- end, qname(State), State#q.consumers) of
+ end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -814,9 +825,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{consumers = Consumers,
- exclusive_consumer = Holder,
- senders = Senders}) ->
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn,
+ senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false ->
Senders;
@@ -840,12 +852,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
[emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
- Holder1 = case Holder of
- {DownPid, _} -> none;
- Other -> Other
- end,
+ Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@@ -860,6 +869,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end
end.
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {DownChPid, _} ->
+ case rabbit_queue_consumers:get_consumer(Consumers) of
+ undefined -> none;
+ Consumer -> Consumer
+ end;
+ false ->
+ CurrentSingleActiveConsumer
+ end;
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {DownChPid, _} -> none;
+ Other -> Other
+ end.
+
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -1007,14 +1032,14 @@ i(effective_policy_definition, #q{q = Q}) ->
undefined -> [];
Def -> Def
end;
-i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
- '';
-i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
+i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTag}, single_active_consumer_on = false}) ->
ChPid;
-i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+i(exclusive_consumer_pid, _) ->
'';
-i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_active_consumer_on = false}) ->
ConsumerTag;
+i(exclusive_consumer_tag, _) ->
+ '';
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
@@ -1213,49 +1238,81 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
- _From, State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
- case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use -> reply({error, exclusive_consume_unavailable}, State);
- ok -> Consumers1 = rabbit_queue_consumers:add(
- ChPid, ConsumerTag, NoAck,
- LimiterPid, LimiterActive,
- PrefetchCount, Args, is_empty(State),
- ActingUser, Consumers),
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{consumers = Consumers1,
- has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- QName = qname(State1),
- AckRequired = not NoAck,
- rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, PrefetchCount,
- Args, none, ActingUser),
- notify_decorators(State1),
- reply(ok, run_message_queue(State1))
+ _From, State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn}) ->
+ ConsumerRegistration = case SingleActiveConsumerOn of
+ true ->
+ case ExclusiveConsume of
+ true ->
+ {error, reply({error, exclusive_consume_unavailable}, State)};
+ false ->
+ Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+
+ case Holder of
+ none ->
+ NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ active_consumer = NewConsumer}};
+ _ ->
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true}}
+ end
+ end;
+ false ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
+ in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
+ ok ->
+ Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ active_consumer = ExclusiveConsumer}}
+ end
+ end,
+ case ConsumerRegistration of
+ {error, Reply} ->
+ Reply;
+ {state, State1} ->
+ ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, PrefetchCount,
+ Args, none, ActingUser),
+ notify_decorators(State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
+ State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn }) ->
ok = maybe_send_reply(ChPid, OkMsg),
case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
Consumers1 ->
- Holder1 = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
+ Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag,
+ Holder, SingleActiveConsumerOn, Consumers1
+ ),
State1 = State#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@@ -1325,6 +1382,24 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = true, Consumers) ->
+ case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of
+ true ->
+ case rabbit_queue_consumers:get_consumer(Consumers) of
+ undefined -> none;
+ Consumer -> Consumer
+ end;
+ false ->
+ CurrentSingleActiveConsumer
+ end;
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {ChPid, ConsumerTag} -> none;
+ _ -> CurrentSingleActiveConsumer
+ end.
+
handle_cast(init, State) ->
try
init_it({no_barrier, non_clean_shutdown}, none, State)
@@ -1432,7 +1507,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
run_message_queue(true, State1)
end);
-
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e96dfd7673..258e85ffa2 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -27,6 +27,10 @@
-export([has_for_source/1, remove_for_source/1,
remove_for_destination/2, remove_transient_for_destination/1]).
+-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = <<>>}).
+
%%----------------------------------------------------------------------------
-export_type([key/0, deletions/0]).
@@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
(Serial, false) -> x_callback(Serial, X, add_binding, B)
end).
+exists(#binding{source = ?DEFAULT_EXCHANGE(_),
+ destination = #resource{kind = queue, name = QName} = Queue,
+ key = QName,
+ args = []}) ->
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, _} -> true;
+ {error, not_found} -> false
+ end;
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
@@ -243,9 +255,17 @@ list(VHostPath) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
- [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
- Route)].
-
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)],
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_bindings(VHostPath) ++ Filtered.
+
+list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
+ implicit_bindings(VHostPath);
list_for_source(SrcName) ->
mnesia:async_dirty(
fun() ->
@@ -255,16 +275,43 @@ list_for_source(SrcName) ->
end).
list_for_destination(DstName) ->
- mnesia:async_dirty(
- fun() ->
- Route = #route{binding = #binding{destination = DstName,
- _ = '_'}},
- [reverse_binding(B) ||
- #reverse_route{reverse_binding = B} <-
- mnesia:match_object(rabbit_reverse_route,
- reverse_route(Route), read)]
- end).
-
+ implicit_for_destination(DstName) ++
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{destination = DstName,
+ _ = '_'}},
+ [reverse_binding(B) ||
+ #reverse_route{reverse_binding = B} <-
+ mnesia:match_object(rabbit_reverse_route,
+ reverse_route(Route), read)]
+ end).
+
+implicit_bindings(VHostPath) ->
+ DstQueues = rabbit_amqqueue:list_names(VHostPath),
+ [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}
+ || DstQueue = #resource{name = QName} <- DstQueues ].
+
+implicit_for_destination(DstQueue = #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName}) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
+implicit_for_destination(_) ->
+ [].
+
+list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
+ #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName} = DstQueue) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
list_for_source_and_destination(SrcName, DstName) ->
mnesia:async_dirty(
fun() ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d1f3b06528..14ce54c949 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
--export([list_queue_states/1]).
+-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
-export([handle_method/5]).
@@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ queue_cleanup_timer,
+ %% Message content size limit
+ max_message_size
}).
-define(QUEUE, lqueue).
@@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
+ MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{}},
+ queue_states = #{},
+ max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -558,7 +562,6 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
-
try handle_method(rabbit_channel_interceptor:intercept_in(
expand_shortcuts(Method, State), Content, IState),
State) of
@@ -793,6 +796,16 @@ code_change(_OldVsn, State, _Extra) ->
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+-spec get_max_message_size() -> non_neg_integer().
+
+get_max_message_size() ->
+ case application:get_env(rabbit, max_message_size) of
+ {ok, MS} when is_integer(MS) ->
+ erlang:min(MS, ?MAX_MSG_SIZE);
+ _ ->
+ ?MAX_MSG_SIZE
+ end.
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -985,12 +998,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.
-check_msg_size(Content) ->
+check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
- case Size > ?MAX_MSG_SIZE of
- true -> precondition_failed("message size ~B larger than max size ~B",
- [Size, ?MAX_MSG_SIZE]);
- false -> ok
+ case Size of
+ S when S > MaxMessageSize ->
+ ErrorMessage = case MaxMessageSize of
+ ?MAX_MSG_SIZE ->
+ "message size ~B is larger than max size ~B";
+ _ ->
+ "message size ~B is larger than configured max size ~B"
+ end,
+ precondition_failed(ErrorMessage,
+ [Size, MaxMessageSize]);
+ _ -> ok
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@@ -1164,16 +1184,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
- tx = Tx,
- channel = ChannelNum,
- confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid}) ->
- check_msg_size(Content),
+ Content, State = #ch{virtual_host = VHostPath,
+ tx = Tx,
+ channel = ChannelNum,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState,
+ user = #user{username = Username} = User,
+ conn_name = ConnName,
+ delivery_flow = Flow,
+ conn_pid = ConnPid,
+ max_message_size = MaxMessageSize}) ->
+ check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 37847fa3e5..73b117157c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_fifo).
@@ -42,6 +42,7 @@
query_ra_indexes/1,
query_consumer_count/1,
query_consumers/1,
+ query_stat/1,
usage/1,
zero/1,
@@ -57,8 +58,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
- make_update_config/1,
- make_stat/0
+ make_update_config/1
]).
-type raw_msg() :: term().
@@ -146,8 +146,7 @@
#discard{} |
#credit{} |
#purge{} |
- #update_config{} |
- #stat{}.
+ #update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -181,6 +180,8 @@
suspected_down = false :: boolean()
}).
+-type consumer() :: #consumer{}.
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -235,7 +236,12 @@
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
PrefixMsgs :: non_neg_integer()},
msg_bytes_enqueue = 0 :: non_neg_integer(),
- msg_bytes_checkout = 0 :: non_neg_integer()
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = default :: default | single_active,
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-opaque state() :: #state{}.
@@ -244,7 +250,8 @@
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- shadow_copy_interval => non_neg_integer()}.
+ shadow_copy_interval => non_neg_integer(),
+ single_active_consumer_on => boolean()}.
-export_type([protocol/0,
delivery/0,
@@ -271,9 +278,16 @@ update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ default
+ end,
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
- shadow_copy_interval = SHI}.
+ shadow_copy_interval = SHI,
+ consumer_strategy = ConsumerStrategy}.
zero(_) ->
0.
@@ -434,19 +448,6 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
-apply(_, #stat{}, #state{name = Name,
- messages = Messages,
- ra_indexes = Indexes,
- consumers = Cons,
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
- Metrics = {maps:size(Messages), % Ready
- num_checked_out(State), % checked out
- rabbit_fifo_index:size(Indexes), %% Total
- maps:size(Cons), % Consumers
- EnqueueBytes,
- CheckoutBytes},
- {State, {stat, Metrics}, []};
apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -549,7 +550,8 @@ state_enter(leader, #state{consumers = Cons,
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
- Effects = Mons ++ Nots,
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
@@ -663,6 +665,11 @@ query_consumers(#state{consumers = Consumers}) ->
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers).
+
+query_stat(#state{messages = M,
+ consumers = Consumers}) ->
+ {maps:size(M), maps:size(Consumers)}.
+
%% other
-spec usage(atom()) -> float().
@@ -708,6 +715,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
+ {Effects0, #state{consumer_strategy = default} = S0}) ->
+ %% general case, single active consumer off
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{consumer_strategy = single_active,
+ waiting_consumers = [] } = S0}) ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{consumers = Cons0,
+ consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0 } = State0}) ->
+ %% single active consumer on, consumers are waiting
+ case maps:take(ConsumerId, Cons0) of
+ {_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} ->
+ % The active consumer is to be removed
+ % Cancel it
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ % Take another one from the waiting consumers and put it in consumers
+ [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0,
+ #state{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
+ State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = RemainingWaitingConsumers},
+ {Effects, State1};
+ error ->
+ % The cancelled consumer is not the active one
+ % Just remove it from idle_consumers
+ {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0),
+ % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
+ {Effects0, State0#state{waiting_consumers = WaitingConsumers1}}
+ end.
+
+cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
@@ -718,9 +761,9 @@ cancel_consumer(ConsumerId,
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
_ ->
{Effects, S#state{consumers = Cons}}
- end;
+ end;
error ->
- % already removed - do nothing
+ %% already removed: do nothing
{Effects0, S0}
end.
@@ -1103,23 +1146,42 @@ uniq_queue_in(Key, Queue) ->
end.
+update_consumer(ConsumerId, Meta, Spec,
+ #state{consumer_strategy = default} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, Spec,
+ #state{consumers = Cons0,
+ consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#state{waiting_consumers = WaitingConsumers1}.
+
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+ #state{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
- fun(S) ->
- %% remove any in-flight messages from
- %% the credit update
- N = maps:size(S#consumer.checked_out),
- C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
- end, Init, Cons0),
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life,
+ credit = C}
+ end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1197,9 +1259,6 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
--spec make_stat() -> protocol().
-make_stat() -> #stat{}.
-
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
@@ -1506,7 +1565,6 @@ cancelled_checkout_out_test() ->
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- ?debugFmt("State3 ~p", [State3]),
{_State, {dequeue, {_, {_, second}}}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
@@ -1704,8 +1762,7 @@ return_prefix_msg_count_test() ->
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
- ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
ok.
@@ -1777,7 +1834,6 @@ run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1790,11 +1846,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?debugFmt("Name ~p~nS~p~nState~p~nn",
- [Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -1917,6 +1970,71 @@ down_returns_checked_out_in_order_test() ->
?assertEqual(lists:sort(Returns), Returns),
ok.
+single_active_consumer_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#state.consumer_strategy),
+ ?assertEqual(0, map_size(State0#state.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
+ ?assertEqual(3, length(State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#state.consumers)),
+ ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#state.consumers)),
+ ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#state.waiting_consumers)),
+
+ % cancelling the last consumer
+ {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#state.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#state.waiting_consumers)),
+
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 7e21ba222a..70793d6943 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -399,22 +399,12 @@ purge(Node) ->
Err
end.
--spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer(),
- non_neg_integer(), non_neg_integer(),
- non_neg_integer(), non_neg_integer()}}
+-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
| {error | timeout, term()}.
-stat(Servers) ->
- try_process_stat(Servers, rabbit_fifo:make_stat()).
-
-try_process_stat([Server | Rem], Cmd) ->
- case ra:process_command(Server, Cmd, 30000) of
- {ok, {stat, Reply}, _} ->
- {ok, Reply};
- Err when length(Rem) =:= 0 ->
- Err;
- _ ->
- try_process_stat(Rem, Cmd)
- end.
+stat(Leader) ->
+ Query = fun (State) -> rabbit_fifo:query_stat(State) end,
+ {ok, {_, Stat}, _} = ra:local_query(Leader, Query),
+ Stat.
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 98582c8117..e743fbce18 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -11,17 +11,18 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
- credit/6, utilisation/1]).
+ credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
+ consumer_tag/1]).
%%----------------------------------------------------------------------------
@@ -42,7 +43,7 @@
acktags,
consumer_count,
%% Queue of {ChPid, #consumer{}} for consumers which have
- %% been blocked for any reason
+ %% been blocked (rate/prefetch limited) for any reason
blocked_consumers,
%% The limiter itself
limiter,
@@ -57,6 +58,9 @@
use :: {'inactive',
time_micros(), time_micros(), ratio()} |
{'active', time_micros(), ratio()}}.
+-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(),
+ prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(),
+ user::rabbit_types:username()}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@@ -81,7 +85,8 @@
state()}.
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
- rabbit_amqqueue:name(), state()) ->
+ rabbit_amqqueue:name(), state(), boolean(),
+ none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@@ -95,6 +100,7 @@
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
+-spec consumer_tag(consumer()) -> rabbit_types:ctag().
%%----------------------------------------------------------------------------
@@ -189,10 +195,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-
+deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
+ deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
+
+deliver(_FetchFun, _QName, false, State, true, none) ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ %% blocked (rate/prefetch limited) consumers are removed from the queue state, but not the exclusive_consumer field,
+ %% so we need to do this check to avoid adding the exclusive consumer to the channel record
+ %% over and over
+ case is_blocked(SingleActiveConsumer) of
+ true ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+ false ->
+ case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of
+ {delivered, R} ->
+ {delivered, false, R, State};
+ undelivered ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
+ {undelivered, true,
+ State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ end
+ end;
deliver(FetchFun, QName, ConsumersChanged,
- State = #state{consumers = Consumers}) ->
+ State = #state{consumers = Consumers}, false, _SingleActiveConsumer) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
@@ -205,7 +235,7 @@ deliver(FetchFun, QName, ConsumersChanged,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
- State#state{consumers = Tail})
+ State#state{consumers = Tail}, false, _SingleActiveConsumer)
end
end.
@@ -246,6 +276,10 @@ deliver_to_consumer(FetchFun,
unsent_message_count = Count + 1}),
R.
+is_blocked(Consumer = {ChPid, _C}) ->
+ #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
+ priority_queue:member(Consumer, BlockedConsumers).
+
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
@@ -357,6 +391,29 @@ utilisation(#state{use = {active, Since, Avg}}) ->
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) ->
+ true;
+is_same(_ChPid, _ConsumerTag, _Consumer) ->
+ false.
+
+get_consumer(#state{consumers = Consumers}) ->
+ case priority_queue:out_p(Consumers) of
+ {{value, Consumer, _Priority}, _Tail} -> Consumer;
+ {empty, _} -> undefined
+ end.
+
+get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
+ Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP == ChPid) and (CT == ConsumerTag)
+ end, Consumers),
+ case priority_queue:out_p(Consumers1) of
+ {empty, _} -> undefined;
+ {{value, Consumer, _Priority}, _Tail} -> Consumer
+ end.
+
+consumer_tag(#consumer{tag = CTag}) ->
+ CTag.
+
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index c532714d41..7c386855f6 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -38,45 +38,33 @@ description() ->
<<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{} = Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- VHosts = rabbit_vhost:list(),
- BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
- {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
- {ok, MinMaster}.
+ Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
+ QueueNames = rabbit_amqqueue:list_names(),
+ MastersPerNode = lists:foldl(
+ fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
+ case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
+ {ok, Master} when is_atom(Master) ->
+ case maps:is_key(Master, NodeMasters) of
+ true -> maps:update_with(Master,
+ fun(N) -> N + 1 end,
+ NodeMasters);
+ false -> NodeMasters
+ end;
+ _ -> NodeMasters
+ end
+ end,
+ maps:from_list([{N, 0} || N <- Cluster]),
+ QueueNames),
-%%---------------------------------------------------------------------------
-%% Private helper functions
-%%---------------------------------------------------------------------------
-get_min_master(Cluster, BoundQueueMasters) ->
- lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
- Node <- Cluster ]).
-
-count_masters(Node, Masters) ->
- length([ X || X <- Masters, X == Node ]).
-
-get_bound_queue_masters_per_vhost([], Acc) ->
- lists:flatten(Acc);
-get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
- BoundQueueNames =
- lists:filtermap(
- fun(#binding{destination =#resource{kind = queue,
- name = QueueName}}) ->
- {true, QueueName};
- (_) ->
- false
- end,
- rabbit_binding:list(VHost)),
- UniqQueueNames = lists:usort(BoundQueueNames),
- BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
- get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
-
-
-get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
-get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
- QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
- QueueName, VHost) of
- {ok, Master} when is_atom(Master) ->
- [Master|QueueMastersAcc];
- _ -> QueueMastersAcc
- end,
- get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init,
+ MastersPerNode),
+ {ok, MinNode}. \ No newline at end of file
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 540d130243..864977f650 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,7 +37,8 @@
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
--include_lib("rabbit_common/include/rabbit.hrl").
+%%-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@@ -150,7 +151,14 @@ ra_machine_config(Q = #amqqueue{name = QName,
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
- become_leader_handler => {?MODULE, become_leader, [QName]}}.
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ single_active_consumer_on => single_active_consumer_on(Q)}.
+
+single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
+ case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
@@ -424,11 +432,12 @@ infos(QName) ->
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
-stat(#amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) ->
- case rabbit_fifo_client:stat([{Name, N} || N <- Nodes]) of
- {ok, {Ready, _, _, Consumers, _, _}} ->
- {ok, Ready, Consumers};
- _ ->
+stat(#amqqueue{pid = Leader}) ->
+ try
+ {Ready, Consumers} = rabbit_fifo_client:stat(Leader),
+ {ok, Ready, Consumers}
+ catch
+ _:_ ->
%% Leader is not available, cluster might be in minority
{ok, 0, 0}
end.
diff --git a/src/rabbit_sysmon_handler.erl b/src/rabbit_sysmon_handler.erl
new file mode 100644
index 0000000000..4e878f618d
--- /dev/null
+++ b/src/rabbit_sysmon_handler.erl
@@ -0,0 +1,230 @@
+%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc A custom event handler to the `sysmon_handler' application's
+%% `system_monitor' event manager.
+%%
+%% This module attempts to discover more information about a process
+%% that generates a system_monitor event.
+
+-module(rabbit_sysmon_handler).
+
+-behaviour(gen_event).
+
+%% API
+-export([add_handler/0]).
+
+%% gen_event callbacks
+-export([init/1, handle_event/2, handle_call/2,
+ handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {timer_ref :: reference()}).
+
+-define(INACTIVITY_TIMEOUT, 5000).
+
+%%%===================================================================
+%%% gen_event callbacks
+%%%===================================================================
+
+add_handler() ->
+ %% Vulnerable to race conditions (installing handler multiple
+ %% times), but risk is zero in the common OTP app startup case.
+ case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
+ true ->
+ ok;
+ false ->
+ sysmon_handler_filter:add_custom_handler(?MODULE, [])
+ end.
+
+%%%===================================================================
+%%% gen_event callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever a new event handler is added to an event manager,
+%% this function is called to initialize the event handler.
+%%
+%% @spec init(Args) -> {ok, State}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ {ok, #state{}, hibernate}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event manager receives an event sent using
+%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
+%% called for each installed event handler to handle the event.
+%%
+%% @spec handle_event(Event, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+%%--------------------------------------------------------------------
+handle_event({monitor, Pid, Type, _Info},
+ State=#state{timer_ref=TimerRef}) when Pid == self() ->
+ %% Reset the inactivity timeout
+ NewTimerRef = reset_timer(TimerRef),
+ maybe_collect_garbage(Type),
+ {ok, State#state{timer_ref=NewTimerRef}};
+handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
+ %% Reset the inactivity timeout
+ NewTimerRef = reset_timer(TimerRef),
+ {Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
+ rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
+ {ok, State#state{timer_ref=NewTimerRef}};
+handle_event(Event, State=#state{timer_ref=TimerRef}) ->
+ NewTimerRef = reset_timer(TimerRef),
+ rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
+ {ok, State#state{timer_ref=NewTimerRef}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event manager receives a request sent using
+%% gen_event:call/3,4, this function is called for the specified
+%% event handler to handle the request.
+%%
+%% @spec handle_call(Request, State) ->
+%% {ok, Reply, State} |
+%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
+%% {remove_handler, Reply}
+%% @end
+%%--------------------------------------------------------------------
+handle_call(_Call, State) ->
+ Reply = not_supported,
+ {ok, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called for each installed event handler when
+%% an event manager receives any other message than an event or a
+%% synchronous request (or a system message).
+%%
+%% @spec handle_info(Info, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+%%--------------------------------------------------------------------
+handle_info(inactivity_timeout, State) ->
+ %% No events have arrived for the timeout period
+ %% so hibernate to free up resources.
+ {ok, State, hibernate};
+handle_info(Info, State) ->
+ rabbit_log:info("handle_info got ~p", [Info]),
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event handler is deleted from an event manager, this
+%% function is called. It should be the opposite of Module:init/1 and
+%% do any necessary cleaning up.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+format_pretty_proc_or_port_info(PidOrPort) ->
+ try
+ case get_pretty_proc_or_port_info(PidOrPort) of
+ undefined ->
+ {"", []};
+ Res ->
+ Res
+ end
+ catch C:E:S ->
+ {"Pid ~w, ~W ~W at ~w\n",
+ [PidOrPort, C, 20, E, 20, S]}
+ end.
+
+get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
+ Infos = [registered_name, initial_call, current_function, message_queue_len],
+ case process_info(Pid, Infos) of
+ undefined ->
+ undefined;
+ [] ->
+ undefined;
+ [{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
+ ICT = case proc_lib:translate_initial_call(Pid) of
+ {proc_lib, init_p, 5} -> % not by proc_lib, see docs
+ ICT1;
+ ICT2 ->
+ {initial_call, ICT2}
+ end,
+ RNL = if RN0 == [] -> [];
+ true -> [{name, RN0}]
+ end,
+ {"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
+ end;
+get_pretty_proc_or_port_info(Port) when is_port(Port) ->
+ PortInfo = erlang:port_info(Port),
+ {value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
+ QueueSize = [erlang:port_info(Port, queue_size)],
+ Connected = case proplists:get_value(connected, PortInfo2) of
+ undefined ->
+ [];
+ ConnectedPid ->
+ case proc_lib:translate_initial_call(ConnectedPid) of
+ {proc_lib, init_p, 5} -> % not by proc_lib, see docs
+ [];
+ ICT ->
+ [{initial_call, ICT}]
+ end
+ end,
+ {"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
+
+
+%% @doc If the message type is due to a large heap warning
+%% and the source is ourself, go ahead and collect garbage
+%% to avoid the death spiral.
+-spec maybe_collect_garbage(atom()) -> ok.
+maybe_collect_garbage(large_heap) ->
+ erlang:garbage_collect(),
+ ok;
+maybe_collect_garbage(_) ->
+ ok.
+
+-spec reset_timer(undefined | reference()) -> reference().
+reset_timer(undefined) ->
+ erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
+reset_timer(TimerRef) ->
+ _ = erlang:cancel_timer(TimerRef),
+ reset_timer(undefined).
diff --git a/src/rabbit_sysmon_minder.erl b/src/rabbit_sysmon_minder.erl
new file mode 100644
index 0000000000..b2f5dd6316
--- /dev/null
+++ b/src/rabbit_sysmon_minder.erl
@@ -0,0 +1,156 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(rabbit_sysmon_minder).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ %% Add our system_monitor event handler. We do that here because
+ %% we have a process at our disposal (i.e. ourself) to receive the
+ %% notification in the very unlikely event that the
+ %% sysmon_handler has crashed and been removed from the
+ %% sysmon_handler gen_event server. (If we had a supervisor
+ %% or app-starting process add the handler, then if the handler
+ %% crashes, nobody will act on the crash notification.)
+ rabbit_sysmon_handler:add_handler(),
+ {ok, #state{}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({gen_event_EXIT, rabbit_sysmon_handler, _}, State) ->
+ %% SASL will create an error message, no need for us to duplicate it.
+ %%
+ %% Our handler should never crash, but it did indeed crash. If
+ %% there's a pathological condition somewhere that's generating
+ %% lots of unforseen things that crash core's custom handler, we
+ %% could make things worse by jumping back into the exploding
+ %% volcano. Wait a little bit before jumping back. Besides, the
+ %% system_monitor data is nice but is not critical: there is no
+ %% need to make things worse if things are indeed bad, and if we
+ %% miss a few seconds of system_monitor events, the world will not
+ %% end.
+ timer:sleep(2*1000),
+ rabbit_sysmon_handler:add_handler(),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index b318adaa12..a7a06aaadf 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -1,24 +1,56 @@
[{internal_auth_backend,
"auth_backends.1 = internal",
- [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]}],
+ [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ldap_auth_backend,
"auth_backends.1 = ldap",
- [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]}],
+ [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{multiple_auth_backends,
"auth_backends.1 = ldap
auth_backends.2 = internal",
[{rabbit,
[{auth_backends,
- [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]}],
+ [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{full_name_auth_backend,
"auth_backends.1 = ldap
# uses module name instead of a short alias, \"http\"
auth_backends.2 = rabbit_auth_backend_http",
[{rabbit,
- [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]}],
+ [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{third_party_auth_backend,
"auth_backends.1.authn = internal
@@ -26,14 +58,30 @@ auth_backends.2 = rabbit_auth_backend_http",
auth_backends.1.authz = rabbit_auth_backend_ip_range",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]}],
+ [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_authz_backend,
"auth_backends.1.authn = ldap
auth_backends.1.authz = internal",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]}],
+ [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_authz_multiple_backends,
"auth_backends.1.authn = ldap
@@ -42,13 +90,29 @@ auth_backends.2 = internal",
[{rabbit,
[{auth_backends,
[{rabbit_auth_backend_ldap,rabbit_auth_backend_internal},
- rabbit_auth_backend_internal]}]}],
+ rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_backend_only,
"auth_backends.1.authn = ldap",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]}],
+ [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options,
"ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
@@ -62,15 +126,50 @@ ssl_options.fail_if_no_peer_cert = true",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,true}]}]}],
+ {fail_if_no_peer_cert,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listener,
"listeners.tcp.default = 5673",
- [{rabbit,[{tcp_listeners,[5673]}]}],[]},
+ [{rabbit,[{tcp_listeners,[5673]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{ssl_listener,
- "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]}],[]},
+ "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{num_acceptors,
- "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]}],[]},
+ "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{default_user_settings,
"default_user = guest
default_pass = guest
@@ -82,7 +181,15 @@ default_permissions.write = .*",
[{default_user,<<"guest">>},
{default_pass,<<"guest">>},
{default_user_tags,[administrator]},
- {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]}],
+ {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -93,7 +200,15 @@ cluster_formation.node_type = disc",
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
{node_type,disc}]},
- {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}],
+ {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_disK,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -104,88 +219,248 @@ cluster_formation.node_type = disc",
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
{node_type,disc}]},
- {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}],
+ {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_ram_ignored,
- "cluster_formation.node_type = ram",[],[]},
+ "cluster_formation.node_type = ram",[
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{tcp_listen_options,
"tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
tcp_listen_options.exit_on_close = false",
[{rabbit,
[{tcp_listen_options,
- [{backlog,128},{nodelay,true},{exit_on_close,false}]}]}],
+ [{backlog,128},{nodelay,true},{exit_on_close,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_absolute,
"vm_memory_high_watermark.absolute = 1073741824",
- [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]}],
+ [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_absolute_units,
"vm_memory_high_watermark.absolute = 1024MB",
- [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]}],
+ [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_paging_ratio,
"vm_memory_high_watermark_paging_ratio = 0.75
vm_memory_high_watermark.relative = 0.4",
[{rabbit,
[{vm_memory_high_watermark_paging_ratio,0.75},
- {vm_memory_high_watermark,0.4}]}],
+ {vm_memory_high_watermark,0.4}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{memory_monitor_interval, "memory_monitor_interval = 5000",
[{rabbit,
- [{memory_monitor_interval, 5000}]}],
+ [{memory_monitor_interval, 5000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = rss",
[{rabbit,
- [{vm_memory_calculation_strategy, rss}]}],
+ [{vm_memory_calculation_strategy, rss}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = erlang",
[{rabbit,
- [{vm_memory_calculation_strategy, erlang}]}],
+ [{vm_memory_calculation_strategy, erlang}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = allocated",
[{rabbit,
- [{vm_memory_calculation_strategy, allocated}]}],
+ [{vm_memory_calculation_strategy, allocated}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = legacy",
[{rabbit,
- [{vm_memory_calculation_strategy, legacy}]}],
+ [{vm_memory_calculation_strategy, legacy}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{total_memory_available_override_value,
"total_memory_available_override_value = 1000000000",
- [{rabbit,[{total_memory_available_override_value, 1000000000}]}],
+ [{rabbit,[{total_memory_available_override_value, 1000000000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{total_memory_available_override_value_units,
"total_memory_available_override_value = 1024MB",
- [{rabbit,[{total_memory_available_override_value, "1024MB"}]}],
+ [{rabbit,[{total_memory_available_override_value, "1024MB"}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{connection_max,
"connection_max = 999",
- [{rabbit,[{connection_max, 999}]}],
+ [{rabbit,[{connection_max, 999}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{connection_max,
"connection_max = infinity",
- [{rabbit,[{connection_max, infinity}]}],
+ [{rabbit,[{connection_max, infinity}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{channel_max,
"channel_max = 16",
- [{rabbit,[{channel_max, 16}]}],
+ [{rabbit,[{channel_max, 16}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip,
"listeners.tcp.1 = 192.168.1.99:5672",
- [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip_multiple,
"listeners.tcp.1 = 127.0.0.1:5672
listeners.tcp.2 = ::1:5672",
- [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip_all,"listeners.tcp.1 = :::5672",
- [{rabbit,[{tcp_listeners,[{"::",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"::",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ipv6,
"listeners.tcp.1 = fe80::2acf:e9ff:fe17:f97b:5672",
- [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_options_sndbuf,
"tcp_listen_options.backlog = 128
@@ -194,7 +469,15 @@ tcp_listen_options.exit_on_close = false",
tcp_listen_options.recbuf = 196608",
[{rabbit,
[{tcp_listen_options,
- [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]}],
+ [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_nodelay_with_kernel,
"tcp_listen_options.backlog = 4096
@@ -205,16 +488,40 @@ tcp_listen_options.exit_on_close = false",
[{kernel,
[{inet_default_connect_options,[{nodelay,true}]},
{inet_default_listen_options,[{nodelay,true}]}]},
- {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}],
+ {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_nodelay,
"tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true",
- [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}],
+ [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_handshake_timeout,
"ssl_handshake_timeout = 10000",
- [{rabbit,[{ssl_handshake_timeout,10000}]}],
+ [{rabbit,[{ssl_handshake_timeout,10000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_partition_handling_pause_if_all_down,
"cluster_partition_handling = pause_if_all_down
@@ -227,15 +534,39 @@ tcp_listen_options.exit_on_close = false",
cluster_partition_handling.pause_if_all_down.nodes.2 = rabbit@myhost2",
[{rabbit,
[{cluster_partition_handling,
- {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]}],
+ {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_partition_handling_autoheal,
"cluster_partition_handling = autoheal",
- [{rabbit,[{cluster_partition_handling,autoheal}]}],
+ [{rabbit,[{cluster_partition_handling,autoheal}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{password_hashing,
"password_hashing_module = rabbit_password_hashing_sha512",
- [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]}],
+ [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_verify_peer,
"listeners.ssl.1 = 5671
@@ -251,7 +582,15 @@ tcp_listen_options.exit_on_close = false",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_password,
"listeners.ssl.1 = 5671
@@ -265,7 +604,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {password,"t0p$3kRe7"}]}]}],
+ {password,"t0p$3kRe7"}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_tls_ver_old,
"listeners.ssl.1 = 5671
@@ -283,7 +630,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]}],
+ {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_tls_ver_new,
"listeners.ssl.1 = 5671
@@ -300,7 +655,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1']}]}]}],
+ {versions,['tlsv1.2','tlsv1.1']}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_ciphers,
@@ -326,19 +689,27 @@ tcp_listen_options.exit_on_close = false",
{ssl_options,
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{ciphers, [
- "DHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-GCM-SHA384",
+ "ECDHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-SHA384",
+ "ECDHE-RSA-AES256-SHA384",
"ECDH-ECDSA-AES256-GCM-SHA384",
- "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-GCM-SHA384",
+ "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-SHA384",
- "ECDHE-ECDSA-AES256-GCM-SHA384",
- "ECDHE-ECDSA-AES256-SHA384",
- "ECDHE-RSA-AES256-GCM-SHA384",
- "ECDHE-RSA-AES256-SHA384"
+ "DHE-RSA-AES256-GCM-SHA384"
]},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1']}]}]}],
+ {versions,['tlsv1.2','tlsv1.1']}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_allow_poodle,
@@ -357,7 +728,15 @@ tcp_listen_options.exit_on_close = false",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_depth,
"listeners.ssl.1 = 5671
@@ -375,7 +754,15 @@ tcp_listen_options.exit_on_close = false",
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{depth,2},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_honor_cipher_order,
"listeners.ssl.1 = 5671
@@ -395,7 +782,15 @@ tcp_listen_options.exit_on_close = false",
{depth,2},
{verify,verify_peer},
{fail_if_no_peer_cert, false},
- {honor_cipher_order, true}]}]}],
+ {honor_cipher_order, true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_honor_ecc_order,
"listeners.ssl.1 = 5671
@@ -415,29 +810,77 @@ tcp_listen_options.exit_on_close = false",
{depth,2},
{verify,verify_peer},
{fail_if_no_peer_cert, false},
- {honor_ecc_order, true}]}]}],
+ {honor_ecc_order, true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_cert_login_from,
"ssl_cert_login_from = common_name",
- [{rabbit,[{ssl_cert_login_from,common_name}]}],
+ [{rabbit,[{ssl_cert_login_from,common_name}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_on,
"tcp_listen_options.linger.on = true
tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_off,
"tcp_listen_options.linger.on = false
tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_on_notimeout,
"tcp_listen_options.linger.on = true",
- [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_timeout,
"tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_both_values,
@@ -445,21 +888,45 @@ tcp_listen_options.exit_on_close = false",
cluster_formation.randomized_startup_delay_range.max = 30",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {10, 30}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_min_only,
"cluster_formation.randomized_startup_delay_range.min = 10",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {10, 60}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_max_only,
"cluster_formation.randomized_startup_delay_range.max = 30",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {5, 30}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_dns,
@@ -470,7 +937,15 @@ tcp_listen_options.exit_on_close = false",
[{cluster_formation,
[{peer_discovery_dns,[{hostname,<<"192.168.0.2.xip.io">>}]},
{peer_discovery_backend,rabbit_peer_discovery_dns},
- {node_type,disc}]}]}],
+ {node_type,disc}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_classic,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -478,7 +953,15 @@ tcp_listen_options.exit_on_close = false",
[{rabbit,
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
- {node_type,disc}]}]}],
+ {node_type,disc}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_classic_ram,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -486,19 +969,43 @@ tcp_listen_options.exit_on_close = false",
[{rabbit,
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
- {node_type,ram}]}]}],
+ {node_type,ram}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{background_gc_enabled,
"background_gc_enabled = true
background_gc_target_interval = 30000",
[{rabbit,
- [{background_gc_enabled,true},{background_gc_target_interval,30000}]}],
+ [{background_gc_enabled,true},{background_gc_target_interval,30000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{background_gc_disabled,
"background_gc_enabled = false
background_gc_target_interval = 30000",
[{rabbit,
- [{background_gc_enabled,false},{background_gc_target_interval,30000}]}],
+ [{background_gc_enabled,false},{background_gc_target_interval,30000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{credential_validator_length,
"credential_validator.validation_backend = rabbit_credential_validator_min_password_length
@@ -507,7 +1014,15 @@ credential_validator.min_length = 10",
[{credential_validator,
[{validation_backend,
rabbit_credential_validator_min_password_length},
- {min_length,10}]}]}],
+ {min_length,10}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{credential_validator_regexp,
"credential_validator.validation_backend = rabbit_credential_validator_password_regexp
@@ -515,78 +1030,198 @@ credential_validator.regexp = ^abc\\d+",
[{rabbit,
[{credential_validator,
[{validation_backend,rabbit_credential_validator_password_regexp},
- {regexp,"^abc\\d+"}]}]}],
+ {regexp,"^abc\\d+"}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{proxy_protocol_on,
"proxy_protocol = true",
- [{rabbit,[{proxy_protocol,true}]}],[]},
+ [{rabbit,[{proxy_protocol,true}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{proxy_protocol_off,
"proxy_protocol = false",
- [{rabbit,[{proxy_protocol,false}]}],[]},
+ [{rabbit,[{proxy_protocol,false}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{log_debug_file,
"log.file.level = debug",
- [{rabbit,[{log, [{file, [{level, debug}]}]}]}],
+ [{rabbit,[{log, [{file, [{level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_console,
"log.console = true
log.console.level = debug",
- [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_exchange,
"log.exchange = true
log.exchange.level = debug",
- [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_syslog,
"log.syslog = true
log.syslog.level = debug",
- [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_file_name,
"log.file = file_name",
- [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]}],
+ [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_file_disabled,
"log.file = false",
- [{rabbit,[{log, [{file, [{file, false}]}]}]}],
+ [{rabbit,[{log, [{file, [{file, false}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_category_level,
"log.connection.level = debug
log.channel.level = error",
[{rabbit,[{log, [{categories, [{connection, [{level, debug}]},
- {channel, [{level, error}]}]}]}]}],
+ {channel, [{level, error}]}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_category_file,
"log.connection.file = file_name_connection
log.channel.file = file_name_channel",
[{rabbit,[{log, [{categories, [{connection, [{file, "file_name_connection"}]},
- {channel, [{file, "file_name_channel"}]}]}]}]}],
+ {channel, [{file, "file_name_channel"}]}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{delegate_count,
"delegate_count = 64",
[{rabbit, [
{delegate_count, 64}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_net_ticktime,
"net_ticktime = 20",
[{kernel, [
{net_ticktime, 20}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_inet_dist_listen_min,
"inet_dist_listen_min = 16000",
[{kernel, [
{inet_dist_listen_min, 16000}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_inet_dist_listen_max,
"inet_dist_listen_max = 16100",
[{kernel, [
{inet_dist_listen_max, 16100}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_syslog_settings,
@@ -602,7 +1237,15 @@ credential_validator.regexp = ^abc\\d+",
{facility, user},
{multiline_mode, true},
{dest_host, "10.10.10.10"},
- {dest_port, 123}]}
+ {dest_port, 123}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_tcp,
@@ -613,7 +1256,15 @@ credential_validator.regexp = ^abc\\d+",
[
{rabbit,[{log, [{syslog, [{enabled, true}]}]}]},
{syslog, [{protocol, {rfc5424, tcp}},
- {dest_host, "syslog.my-network.com"}]}
+ {dest_host, "syslog.my-network.com"}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_udp_default,
@@ -621,7 +1272,15 @@ credential_validator.regexp = ^abc\\d+",
log.syslog.protocol = rfc3164",
[
{rabbit,[{log, [{syslog, [{enabled, true}]}]}]},
- {syslog, [{protocol, {rfc3164, udp}}]}
+ {syslog, [{protocol, {rfc3164, udp}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_tls,
@@ -638,6 +1297,14 @@ credential_validator.regexp = ^abc\\d+",
{fail_if_no_peer_cert,false},
{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
- {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]}],
+ {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]}
].
diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl
index 051f3f13b7..20adf704a0 100644
--- a/test/per_vhost_connection_limit_partitions_SUITE.erl
+++ b/test/per_vhost_connection_limit_partitions_SUITE.erl
@@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) ->
Conn4 = open_unmanaged_connection(Config, B),
Conn5 = open_unmanaged_connection(Config, C),
Conn6 = open_unmanaged_connection(Config, C),
- ?assertEqual(6, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 6, 60000),
%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
@@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) ->
timer:sleep(?DELAY),
%% A and C are still connected, so 4 connections are tracked
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),
%% during autoheal B's connections were dropped
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
lists:foreach(fun (Conn) ->
(catch rabbit_ct_client_helpers:close_connection(Conn))
@@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) ->
passed.
-
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
+wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 ->
+ ?assertEqual(Expected, count_connections_in(Config, VHost));
+wait_for_count_connections_in(Config, VHost, Expected, Time) ->
+ case count_connections_in(Config, VHost) of
+ Expected ->
+ ok;
+ _ ->
+ Sleep = 3000,
+ timer:sleep(Sleep),
+ wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep)
+ end.
+
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bfb9eeec12..b453f5cdb3 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -345,7 +345,7 @@ start_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -361,7 +361,7 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -422,13 +422,13 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -447,7 +447,7 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -526,7 +526,7 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -568,7 +568,7 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -1265,7 +1265,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup])
+ [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1302,7 +1302,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1966,7 +1966,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -2237,7 +2237,8 @@ wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 0) ->
- ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+ ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
+ Number);
wait_for_cleanup(Server, Channel, Number, N) ->
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
Length when Number == Length ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 3263a733a9..0512e8161a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
- ra_server_sup:remove_all(),
+ ra_server_sup_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 1af0d3b4b0..a8604b46af 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -259,7 +259,8 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
_ ->
T
end;
-handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
+handle_op({input_event, Settlement}, #t{effects = Effs,
+ down = Down} = T) ->
case queue:out(Effs) of
{{value, {settle, MsgIds, CId}}, Q} ->
Cmd = case Settlement of
@@ -269,7 +270,14 @@ handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
end,
do_apply(Cmd, T#t{effects = Q});
{{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
- do_apply(Cmd, T#t{effects = Q});
+ case maps:is_key(element(2, Cmd), Down) of
+ true ->
+ %% enqueues cannot arrive after down for the same process
+ %% drop message
+ T#t{effects = Q};
+ false ->
+ do_apply(Cmd, T#t{effects = Q})
+ end;
_ ->
T
end;
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
new file mode 100644
index 0000000000..945229e372
--- /dev/null
+++ b/test/single_active_consumer_SUITE.erl
@@ -0,0 +1,286 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(single_active_consumer_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, classic_queue}, {group, quorum_queue}
+ ].
+
+groups() ->
+ [
+ {classic_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ amqp_exclusive_consume_fails_on_exclusive_consumer_queue
+ ]},
+ {quorum_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(classic_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"classic">>}
+ ],
+ auto_delete = true}
+ } | Config];
+init_per_group(quorum_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config].
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+all_messages_go_to_one_consumer(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 5,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)],
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(2, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)),
+ ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag3} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
+ ?assertEqual(1, length(FirstActiveConsumerInList)),
+
+ FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
+
+ {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+ SecondActiveConsumerInList = maps:keys(maps:filter(
+ fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
+ MessagesPerConsumer2)
+ ),
+ ?assertEqual(1, length(SecondActiveConsumerInList)),
+ SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
+ wait_for_messages(1),
+
+ LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(3, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ {C1, Ch1} = connection_and_channel(Config),
+ {C2, Ch2} = connection_and_channel(Config),
+ {C3, Ch3} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]),
+ Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, MessageCount1} = consume_results(),
+ ?assertEqual(MessageCount div 2, MessageCount1),
+ ?assertEqual(1, maps:size(MessagesPerConsumer1)),
+ ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)),
+
+ ok = amqp_channel:close(Ch1),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, MessageCount2} = consume_results(),
+ ?assertEqual(MessageCount div 2 - 1, MessageCount2),
+ ?assertEqual(1, maps:size(MessagesPerConsumer2)),
+
+ ok = amqp_channel:close(Ch2),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}),
+
+ {MessagesPerConsumer3, MessageCount3} = consume_results(),
+ ?assertEqual(1, MessageCount3),
+ ?assertEqual(1, maps:size(MessagesPerConsumer3)),
+
+ [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
+ ok.
+
+amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 403, _}}, _},
+ amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
+ ),
+ amqp_connection:close(C),
+ ok.
+
+connection_and_channel(Config) ->
+ C = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Ch} = amqp_connection:open_channel(C),
+ {C, Ch}.
+
+queue_declare(Channel, Config) ->
+ Declare = ?config(single_active_consumer_queue_declare, Config),
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
+ Q.
+
+consume({Parent, State, 0}) ->
+ Parent ! {consumer_done, State};
+consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown});
+ {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} ->
+ Parent ! {consumer_done,
+ {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1}};
+ {#'basic.deliver'{consumer_tag = CTag}, _Content} ->
+ NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1},
+ Parent ! {message, NewState},
+ consume({Parent, NewState, CountDown - 1});
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ Parent ! {cancel_ok, CTag},
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
+ _ ->
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
+ after 10000 ->
+ Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}},
+ exit(consumer_timeout)
+ end.
+
+consume_results() ->
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ {consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ _ ->
+ consume_results()
+ after 1000 ->
+ throw(failed)
+ end.
+
+wait_for_messages(ExpectedCount) ->
+ wait_for_messages(ExpectedCount, {}).
+
+wait_for_messages(0, State) ->
+ State;
+wait_for_messages(ExpectedCount, _) ->
+ receive
+ {message, {MessagesPerConsumer, MessageCount}} ->
+ wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
+ after 5000 ->
+ throw(message_waiting_timeout)
+ end.
+
+wait_for_cancel_ok() ->
+ receive
+ {cancel_ok, CTag} ->
+ {cancel_ok, CTag}
+ after 5000 ->
+ throw(consumer_cancel_ok_timeout)
+ end.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d8031ce6d7..466df684af 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -25,6 +25,7 @@
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
@@ -60,10 +61,16 @@ groups() ->
topic_matching,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]}
+ {max_length_mirrored, [], MaxLengthTests}]},
+ max_message_size
]}
].
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
+gen_binary_mb(N) ->
+ B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
+ << B1M || _ <- lists:seq(1, N) >>.
+
+assert_channel_alive(Ch) ->
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
+ #amqp_msg{payload = <<"HI">>}).
+
+assert_channel_fail_max_size(Ch, Monitor) ->
+ receive
+ {'DOWN', Monitor, process, Ch,
+ {shutdown,
+ {server_initiated_close, 406, _Error}}} ->
+ ok
+ after ?TIMEOUT_CHANNEL_EXCEPTION ->
+ error({channel_exception_expected, max_message_size})
+ end.
+
+max_message_size(Config) ->
+ Binary2M = gen_binary_mb(2),
+ Binary4M = gen_binary_mb(4),
+ Binary6M = gen_binary_mb(6),
+ Binary10M = gen_binary_mb(10),
+
+ Size2Mb = 1024 * 1024 * 2,
+ Size2Mb = byte_size(Binary2M),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
+
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ %% Binary is whithin the max size limit
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
+ %% The channel process is alive
+ assert_channel_alive(Ch),
+
+ Monitor = monitor(process, Ch),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_fail_max_size(Ch, Monitor),
+
+ %% increase the limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
+
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
+ assert_channel_alive(Ch1),
+
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
+ assert_channel_fail_max_size(Ch1, Monitor1),
+
+ %% increase beyond the hard limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
+ Val = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_channel, get_max_message_size, []),
+
+ ?assertEqual(?MAX_MSG_SIZE, Val).
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
new file mode 100644
index 0000000000..08d12e7ec5
--- /dev/null
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -0,0 +1,102 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(unit_queue_consumers_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ is_same,
+ get_consumer,
+ get
+ ].
+
+is_same(_Config) ->
+ ?assertEqual(
+ true,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"1">>)
+ )),
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"2">>)
+ )),
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(Pid, <<"1">>)
+ )),
+ ok.
+
+get(_Config) ->
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {Pid, {consumer, <<"2">>, _, _, _, _}} =
+ rabbit_queue_consumers:get(Pid, <<"2">>, State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(self(), <<"2">>, State)
+ ),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(Pid, <<"1">>, State)
+ ),
+ ok.
+
+get_consumer(_Config) ->
+ Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {_Pid, {consumer, _, _, _, _, _}} =
+ rabbit_queue_consumers:get_consumer(State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get_consumer(state(consumers([])))
+ ),
+ ok.
+
+consumers([]) ->
+ priority_queue:new();
+consumers(Consumers) ->
+ consumers(Consumers, priority_queue:new()).
+
+consumers([H], Q) ->
+ priority_queue:in(H, Q);
+consumers([H | T], Q) ->
+ consumers(T, priority_queue:in(H, Q)).
+
+
+consumer(Pid, ConsumerTag) ->
+ {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}.
+
+state(Consumers) ->
+ {state, Consumers, {}}.
+
+function_for_process() ->
+ receive
+ _ -> ok
+ end. \ No newline at end of file