diff options
author | Diana Corbacho <diana@rabbitmq.com> | 2019-01-10 09:57:04 +0000 |
---|---|---|
committer | Diana Corbacho <diana@rabbitmq.com> | 2019-01-10 09:57:04 +0000 |
commit | e4b2d7cfefed5881e185b87157a226b3f08557a6 (patch) | |
tree | 0dbe04ed8e36ad2224abc3313792a5e288aed286 | |
parent | 73abcfa99f6453bbcf00320d3c7377c0a7fc7663 (diff) | |
parent | b204aaac6e787a1b28ceaa60ba10e07ab2635a72 (diff) | |
download | rabbitmq-server-git-qq-queue-length-limit.tar.gz |
Merge remote-tracking branch 'origin/master' into qq-queue-length-limitqq-queue-length-limit
-rw-r--r-- | .travis.yml | 17 | ||||
-rw-r--r-- | Makefile | 19 | ||||
-rw-r--r-- | docs/rabbitmq.conf.example | 159 | ||||
-rw-r--r-- | priv/schema/rabbit.schema | 156 | ||||
-rw-r--r-- | rabbitmq-components.mk | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 35 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 202 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
-rw-r--r-- | src/rabbit_fifo.erl | 180 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 75 | ||||
-rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 70 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_sysmon_handler.erl | 235 | ||||
-rw-r--r-- | src/rabbit_sysmon_minder.erl | 156 | ||||
-rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 4 | ||||
-rw-r--r-- | test/per_vhost_connection_limit_partitions_SUITE.erl | 19 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 26 | ||||
-rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
-rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 12 | ||||
-rw-r--r-- | test/single_active_consumer_SUITE.erl | 286 | ||||
-rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 2 | ||||
-rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 102 |
23 files changed, 1619 insertions, 247 deletions
diff --git a/.travis.yml b/.travis.yml index 9237d9632a..6ccdbab1ce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ # vim:sw=2:et: +dist: xenial sudo: false language: erlang notifications: @@ -10,15 +11,8 @@ notifications: on_failure: always addons: apt: - sources: - - sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib - key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc packages: - awscli - # Use Elixir from Erlang Solutions. The provided Elixir is - # installed with kiex but is old. We also can't use kiex to - # install a newer one because of GitHub API rate limiting. - - elixir=1.6.0-1 cache: apt: true env: @@ -27,10 +21,10 @@ env: - secure: L1t0CHGR4RzOXwtkpM6feRKax95rszScBLqzjstEiMPkhjTsYTlAecnNxx6lTrGMnk5hQoi4PtbhmyZOX0siHTngTogoA/Nyn8etYzicU5ZO+qmBQOYpegz51lEu70ewXgkhEHzk9DtEPxfYviH9WiILrdUVRXXgZpoXq13p1QA= otp_release: - - "19.3" - - "20.3" + - "21.2" before_script: + - elixir --version # The checkout made by Travis is a "detached HEAD" and branches # information is missing. Our Erlang.mk's git_rmq fetch method relies # on it, so we need to restore it. @@ -42,11 +36,6 @@ before_script: git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git git fetch upstream v3.8.x:v3.8.x || : git fetch upstream master:master || : - # Make sure we use Elixir from Erlang Solutions and not kiex. - - | - echo YES | kiex implode - elixir --version - elixir --version | grep -q 'Elixir 1.6.0' script: - make xref @@ -138,7 +138,7 @@ endef LOCAL_DEPS = sasl mnesia os_mon inets BUILD_DEPS = rabbitmq_cli syslog -DEPS = ranch lager rabbit_common ra +DEPS = ranch lager rabbit_common ra sysmon_handler TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper dep_syslog = git https://github.com/schlagert/syslog 3.4.5 @@ -188,13 +188,28 @@ tests:: bats SLOW_CT_SUITES := backing_queue \ cluster_rename \ clustering_management \ + config_schema \ dynamic_ha \ eager_sync \ health_check \ + lazy_queue \ + metrics \ + msg_store \ partitions \ + per_user_connection_tracking \ + per_vhost_connection_limit \ + per_vhost_msg_store \ + per_vhost_queue_limit \ + policy \ priority_queue \ queue_master_location \ - simple_ha + quorum_queue \ + rabbit_core_metrics_gc \ + rabbit_fifo_prop \ + simple_ha \ + sync_detection \ + unit_inbroker_parallel \ + vhost FAST_CT_SUITES := $(filter-out $(sort $(SLOW_CT_SUITES)),$(CT_SUITES)) ct-fast: CT_SUITES = $(FAST_CT_SUITES) diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index a62ed38291..b82956a267 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -470,7 +470,7 @@ ## Disabling background GC may reduce latency for client operations, ## keeping it enabled may reduce median RAM usage by the binary heap ## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html). -## +## ## Before trying this option, please take a look at the memory ## breakdown (http://www.rabbitmq.com/memory-use.html). ## @@ -533,18 +533,49 @@ ## # management.http_log_dir = /path/to/access.log -## Change the port on which the HTTP listener listens, -## specifying an interface for the web server to bind to. -## Also set the listener to use TLS and provide TLS options. -## +## HTTP listener and embedded Web server settings. +# ## See https://rabbitmq.com/management.html for details. +# +# management.tcp.port = 15672 +# management.tcp.ip = 0.0.0.0 +# +# management.tcp.shutdown_timeout = 7000 +# management.tcp.max_keepalive = 120 +# management.tcp.idle_timeout = 120 +# management.tcp.inactivity_timeout = 120 +# management.tcp.request_timeout = 120 +# management.tcp.compress = true + +## HTTPS listener settings. +## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details. +## +# management.ssl.port = 15671 +# management.ssl.cacertfile = /path/to/ca_certificate.pem +# management.ssl.certfile = /path/to/server_certificate.pem +# management.ssl.keyfile = /path/to/server_key.pem + +## More TLS options +# management.ssl.honor_cipher_order = true +# management.ssl.honor_ecc_order = true +# management.ssl.client_renegotiation = false +# management.ssl.secure_renegotiate = true + +## Supported TLS versions +# management.ssl.versions.1 = tlsv1.2 +# management.ssl.versions.2 = tlsv1.1 + +## Cipher suites the server is allowed to use +# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384 +# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384 +# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384 +# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384 +# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384 +# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384 +# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384 +# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384 +# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384 -# management.listener.port = 15672 -# management.listener.ip = 127.0.0.1 -# management.listener.ssl = true -# management.listener.ssl_opts.cacertfile = /path/to/cacert.pem -# management.listener.ssl_opts.certfile = /path/to/cert.pem -# management.listener.ssl_opts.keyfile = /path/to/key.pem ## One of 'basic', 'detailed' or 'none'. See ## http://rabbitmq.com/management.html#fine-stats for more details. @@ -583,13 +614,39 @@ # STOMP section # ======================================= -## Network Configuration. The format is generally the same as for the core broker. +## See https://rabbitmq.com/stomp.html for details. + +## TCP listeners. +## +# stomp.listeners.tcp.1 = 127.0.0.1:61613 +# stomp.listeners.tcp.2 = ::1:61613 + +## TCP listener settings ## -# stomp.listeners.tcp.default = 61613 +# stomp.tcp_listen_options.backlog = 2048 +# stomp.tcp_listen_options.recbuf = 131072 +# stomp.tcp_listen_options.sndbuf = 131072 +# +# stomp.tcp_listen_options.keepalive = true +# stomp.tcp_listen_options.nodelay = true +# +# stomp.tcp_listen_options.exit_on_close = true +# stomp.tcp_listen_options.send_timeout = 120 -## Same for ssl listeners +## Proxy protocol support ## +# stomp.proxy_protocol = false + +## TLS listeners +## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details. # stomp.listeners.ssl.default = 61614 +# +# ssl_options.cacertfile = path/to/cacert.pem +# ssl_options.certfile = path/to/cert.pem +# ssl_options.keyfile = path/to/key.pem +# ssl_options.verify = verify_peer +# ssl_options.fail_if_no_peer_cert = true + ## Number of Erlang processes that will accept connections for the TCP ## and TLS listeners. @@ -642,6 +699,52 @@ # MQTT section # ======================================= +## TCP listener settings. +## +# mqtt.listeners.tcp.1 = 127.0.0.1:61613 +# mqtt.listeners.tcp.2 = ::1:61613 + +## TCP listener options (as per the broker configuration). +## +# mqtt.tcp_listen_options.backlog = 4096 +# mqtt.tcp_listen_options.recbuf = 131072 +# mqtt.tcp_listen_options.sndbuf = 131072 +# +# mqtt.tcp_listen_options.keepalive = true +# mqtt.tcp_listen_options.nodelay = true +# +# mqtt.tcp_listen_options.exit_on_close = true +# mqtt.tcp_listen_options.send_timeout = 120 + +## TLS listener settings +## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details. +# +# mqtt.listeners.ssl.default = 8883 +# +# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem +# ssl_options.certfile = /path/to/tls/server_certificate.pem +# ssl_options.keyfile = /path/to/tls/server_key.pem +# ssl_options.verify = verify_peer +# ssl_options.fail_if_no_peer_cert = true +# + + +## Number of Erlang processes that will accept connections for the TCP +## and TLS listeners. +## +# mqtt.num_acceptors.tcp = 10 +# mqtt.num_acceptors.ssl = 10 + +## Whether or not to enable proxy protocol support. +## Once enabled, clients cannot directly connect to the broker +## anymore. They must connect through a load balancer that sends the +## proxy protocol header to the broker at connection time. +## This setting applies only to STOMP clients, other protocols +## like STOMP or AMQP have their own setting to enable proxy protocol. +## See the plugins or broker documentation for more information. +## +# mqtt.proxy_protocol = false + ## Set the default user name and password used for anonymous connections (when client ## provides no credentials). Anonymous connections are highly discouraged! ## @@ -672,34 +775,6 @@ ## # mqtt.prefetch = 10 -## TCP/SSL Configuration (as per the broker configuration). -## -# mqtt.listeners.tcp.default = 1883 - -## Same for ssl listener -## -# mqtt.listeners.ssl.default = 1884 - -## Number of Erlang processes that will accept connections for the TCP -## and TLS listeners. -## -# mqtt.num_acceptors.tcp = 10 -# mqtt.num_acceptors.ssl = 10 - -## TCP listener options (as per the broker configuration). -## -# mqtt.tcp_listen_options.backlog = 128 -# mqtt.tcp_listen_options.nodelay = true - -## Whether or not to enable proxy protocol support. -## Once enabled, clients cannot directly connect to the broker -## anymore. They must connect through a load balancer that sends the -## proxy protocol header to the broker at connection time. -## This setting applies only to STOMP clients, other protocols -## like STOMP or AMQP have their own setting to enable proxy protocol. -## See the plugins or broker documentation for more information. -## -# mqtt.proxy_protocol = false ## ---------------------------------------------------------------------------- ## RabbitMQ AMQP 1.0 Support diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index ef3dafd116..9bccb9a89e 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -554,7 +554,7 @@ end}. }. -{mapping, "msx_message_size", "rabbit.max_message_size", +{mapping, "max_message_size", "rabbit.max_message_size", [{datatype, integer}, {validators, ["less_then_512MB"]}]}. %% Customising Socket Options. @@ -1355,6 +1355,160 @@ end}. {validators, ["non_zero_positive_integer"]} ]}. +% ========================== +% sysmon_handler section +% ========================== + +%% @doc The threshold at which to warn about the number of processes +%% that are overly busy. Processes with large heaps or that take a +%% long time to garbage collect will count toward this threshold. +{mapping, "sysmon_handler.thresholds.busy_processes", "sysmon_handler.process_limit", [ + {datatype, integer}, + hidden +]}. + +{translation, "sysmon_handler.process_limit", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.thresholds.busy_processes", Conf, undefined) of + undefined -> + cuttlefish:unset(); + Int when is_integer(Int) -> + Int; + _ -> + cuttlefish:invalid("should be a non-negative integer") + end + end +}. + +%% @doc The threshold at which to warn about the number of ports that +%% are overly busy. Ports with full input buffers count toward this +%% threshold. +{mapping, "sysmon_handler.thresholds.busy_ports", "sysmon_handler.port_limit", [ + {datatype, integer}, + hidden +]}. + +{translation, "sysmon_handler.port_limit", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.thresholds.busy_ports", Conf, undefined) of + undefined -> + cuttlefish:unset(); + Int when is_integer(Int) -> + Int; + _ -> + cuttlefish:invalid("should be a non-negative integer") + end + end +}. + +%% @doc A process will become busy when it exceeds this amount of time +%% doing garbage collection. +%% @see sysmon_handler.thresholds.busy_processes +{mapping, "sysmon_handler.triggers.process.garbage_collection", "sysmon_handler.gc_ms_limit", [ + {datatype, [{atom, off}, + {duration, ms}]}, + hidden +]}. + +{translation, "sysmon_handler.gc_ms_limit", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.triggers.process.garbage_collection", Conf, undefined) of + undefined -> + cuttlefish:unset(); + off -> + 0; + Int when is_integer(Int) -> + Int; + _ -> + cuttlefish:invalid("should be a non-negative integer") + end + end +}. + +%% @doc A process will become busy when it exceeds this amount of time +%% during a single process scheduling & execution cycle. +{mapping, "sysmon_handler.triggers.process.long_scheduled_execution", "sysmon_handler.schedule_ms_limit", [ + {datatype, [{atom, off}, + {duration, ms}]}, + hidden +]}. + +{translation, "sysmon_handler.schedule_ms_limit", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.triggers.process.long_scheduled_execution", Conf, undefined) of + undefined -> + cuttlefish:unset(); + off -> + 0; + Int when is_integer(Int) -> + Int; + _ -> + cuttlefish:invalid("should be a non-negative integer") + end + end +}. + +%% @doc A process will become busy when its heap exceeds this size. +%% @see sysmon_handler.thresholds.busy_processes +{mapping, "sysmon_handler.triggers.process.heap_size", "sysmon_handler.heap_word_limit", [ + {datatype, [{atom, off}, + bytesize]}, + hidden +]}. + +{translation, "sysmon_handler.heap_word_limit", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.triggers.process.heap_size", Conf, undefined) of + undefined -> + cuttlefish:unset(); + off -> + 0; + Bytes when is_integer(Bytes) -> + WordSize = erlang:system_info(wordsize), + Bytes div WordSize; + _ -> + cuttlefish:invalid("should be a non-negative integer") + end + end +}. + +%% @doc Whether ports with full input buffers will be counted as +%% busy. Ports can represent open files or network sockets. +%% @see sysmon_handler.thresholds.busy_ports +{mapping, "sysmon_handler.triggers.port", "sysmon_handler.busy_port", [ + {datatype, flag}, + hidden +]}. + +{translation, "sysmon_handler.busy_port", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.triggers.port", Conf, undefined) of + undefined -> + cuttlefish:unset(); + Val -> Val + end + end +}. + +%% @doc Whether distribution ports with full input buffers will be +%% counted as busy. Distribution ports connect Erlang nodes within a +%% single cluster. +%% @see sysmon_handler.thresholds.busy_ports +{mapping, "sysmon_handler.triggers.distribution_port", "sysmon_handler.busy_dist_port", [ + {datatype, flag}, + hidden +]}. + +{translation, "sysmon_handler.busy_dist_port", + fun(Conf) -> + case cuttlefish:conf_get("sysmon_handler.triggers.distribution_port", Conf, undefined) of + undefined -> + cuttlefish:unset(); + Val -> Val + end + end +}. + % =============================== % Validators % =============================== diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 1bec3c0942..0cc6bd544a 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -115,6 +115,7 @@ dep_lager = hex 3.6.5 dep_ra = git https://github.com/rabbitmq/ra.git master dep_ranch = hex 1.7.1 dep_recon = hex 2.3.6 +dep_sysmon_handler = hex 1.1.0 RABBITMQ_COMPONENTS = amqp_client \ amqp10_common \ diff --git a/src/rabbit.erl b/src/rabbit.erl index 3401391b09..980b629a19 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -156,6 +156,13 @@ {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_sysmon_minder, + [{description, "sysmon_handler supervisor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_sysmon_minder]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({core_initialized, [{description, "core initialized"}, {requires, kernel_ready}]}). @@ -225,7 +232,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]). +-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]). -define(ASYNC_THREADS_WARNING_THRESHOLD, 8). @@ -516,6 +523,7 @@ start_apps(Apps) -> start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), + ensure_sysmon_handler_app_config(), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of undefined -> []; @@ -546,7 +554,6 @@ start_apps(Apps, RestartTypes) -> PassPhrase }, decrypt_config(Apps, Algo), - OrderedApps = app_utils:app_dependency_order(Apps, false), case lists:member(rabbit, Apps) of false -> rabbit_boot_steps:run_boot_steps(Apps); %% plugin activation @@ -556,6 +563,30 @@ start_apps(Apps, RestartTypes) -> handle_app_error(could_not_start), RestartTypes). +%% rabbitmq/rabbitmq-server#952 +%% This function is to be called after configuration has been optionally generated +%% and the sysmon_handler application loaded, but not started. It will ensure that +%% sane defaults are used for configuration settings that haven't been set by the +%% user +ensure_sysmon_handler_app_config() -> + Defaults = [ + {process_limit, 100}, + {port_limit, 100}, + {gc_ms_limit, 0}, + {schedule_ms_limit, 0}, + {heap_word_limit, 10485760}, + {busy_port, false}, + {busy_dist_port, true} + ], + lists:foreach(fun({K, V}) -> + case application:get_env(sysmon_handler, K) of + undefined -> + application:set_env(sysmon_handler, K, V); + _ -> + ok + end + end, Defaults). + %% This function retrieves the correct IoDevice for requesting %% input. The problem with using the default IoDevice is that %% the Erlang shell prevents us from getting the input. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d938bece8c..c65ad299ed 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]). -export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). @@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> not_found -> Q1 = rabbit_policy:set(Q), Q2 = Q1#amqqueue{state = live}, ok = store_queue(Q2), - B = add_default_binding(Q2), - fun () -> B(), {created, Q2} end; + fun () -> {created, Q2} end; {absent, _Q, _} = R -> rabbit_misc:const(R) end; [ExistingQ] -> @@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -add_default_binding(#amqqueue{name = QueueName}) -> - ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), - RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{source = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}, - ?INTERNAL_USER). - lookup([]) -> []; %% optimisation lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> @@ -666,6 +656,7 @@ declare_args() -> {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, {<<"x-queue-mode">>, fun check_queue_mode/2}, + {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, {<<"x-queue-type">>, fun check_queue_type/2}, {<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}]. @@ -708,6 +699,12 @@ check_max_priority_arg({Type, Val}, Args) -> Error -> Error end. +check_single_active_consumer_arg({Type, Val}, Args) -> + case check_bool_arg({Type, Val}, Args) of + ok -> ok; + Error -> Error + end. + check_default_quorum_initial_group_size_arg({Type, Val}, Args) -> case check_non_neg_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; @@ -757,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)]. + list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 52925ce165..37ee8d0a15 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). @@ -36,8 +36,8 @@ -record(q, { %% an #amqqueue record q, - %% none | {exclusive consumer channel PID, consumer tag} - exclusive_consumer, + %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer} + active_consumer, %% Set to true if a queue has ever had a consumer. %% This is used to determine when to delete auto-delete queues. has_had_consumers, @@ -94,7 +94,9 @@ %% example. mirroring_policy_version = 0, %% running | flow | idle - status + status, + %% true | false + single_active_consumer_on }). %%---------------------------------------------------------------------------- @@ -155,15 +157,20 @@ init(Q) -> ?MODULE}. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0, - overflow = 'drop-head'}, + SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of + {bool, true} -> true; + _ -> false + end, + State = #q{q = Q, + active_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0, + overflow = 'drop-head', + single_active_consumer_on = SingleActiveConsumerOn}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -545,7 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{consumers = Consumers}) -> +assert_invariant(#q{single_active_consumer_on = true}) -> + %% queue may contain messages and have available consumers with exclusive consumer + ok; +assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) -> true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -619,7 +629,8 @@ run_message_queue(ActiveConsumersChanged, State) -> true -> maybe_notify_decorators(ActiveConsumersChanged, State); false -> case rabbit_queue_consumers:deliver( fun(AckRequired) -> fetch(AckRequired, State) end, - qname(State), State#q.consumers) of + qname(State), State#q.consumers, + State#q.single_active_consumer_on, State#q.active_consumer) of {delivered, ActiveConsumersChanged1, State1, Consumers} -> run_message_queue( ActiveConsumersChanged or ActiveConsumersChanged1, @@ -645,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} - end, qname(State), State#q.consumers) of + end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -814,9 +825,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{consumers = Consumers, - exclusive_consumer = Holder, - senders = Senders}) -> +handle_ch_down(DownPid, State = #q{consumers = Consumers, + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn, + senders = Senders}) -> State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of false -> Senders; @@ -840,12 +852,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, {ChAckTags, ChCTags, Consumers1} -> QName = qname(State1), [emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags], - Holder1 = case Holder of - {DownPid, _} -> none; - Other -> Other - end, + Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1), State2 = State1#q{consumers = Consumers1, - exclusive_consumer = Holder1}, + active_consumer = Holder1}, notify_decorators(State2), case should_auto_delete(State2) of true -> @@ -860,6 +869,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end end. +new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) -> + case CurrentSingleActiveConsumer of + {DownChPid, _} -> + case rabbit_queue_consumers:get_consumer(Consumers) of + undefined -> none; + Consumer -> Consumer + end; + false -> + CurrentSingleActiveConsumer + end; +new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) -> + case CurrentSingleActiveConsumer of + {DownChPid, _} -> none; + Other -> Other + end. + check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -1007,14 +1032,14 @@ i(effective_policy_definition, #q{q = Q}) -> undefined -> []; Def -> Def end; -i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> - ''; -i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> +i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTag}, single_active_consumer_on = false}) -> ChPid; -i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> +i(exclusive_consumer_pid, _) -> ''; -i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> +i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_active_consumer_on = false}) -> ConsumerTag; +i(exclusive_consumer_tag, _) -> + ''; i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> @@ -1213,49 +1238,81 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser}, - _From, State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> - case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> reply({error, exclusive_consume_unavailable}, State); - ok -> Consumers1 = rabbit_queue_consumers:add( - ChPid, ConsumerTag, NoAck, - LimiterPid, LimiterActive, - PrefetchCount, Args, is_empty(State), - ActingUser, Consumers), - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{consumers = Consumers1, - has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - QName = qname(State1), - AckRequired = not NoAck, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, PrefetchCount, - Args, none, ActingUser), - notify_decorators(State1), - reply(ok, run_message_queue(State1)) + _From, State = #q{consumers = Consumers, + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn}) -> + ConsumerRegistration = case SingleActiveConsumerOn of + true -> + case ExclusiveConsume of + true -> + {error, reply({error, exclusive_consume_unavailable}, State)}; + false -> + Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + + case Holder of + none -> + NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), + {state, State#q{consumers = Consumers1, + has_had_consumers = true, + active_consumer = NewConsumer}}; + _ -> + {state, State#q{consumers = Consumers1, + has_had_consumers = true}} + end + end; + false -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of + in_use -> {error, reply({error, exclusive_consume_unavailable}, State)}; + ok -> + Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + {state, State#q{consumers = Consumers1, + has_had_consumers = true, + active_consumer = ExclusiveConsumer}} + end + end, + case ConsumerRegistration of + {error, Reply} -> + Reply; + {state, State1} -> + ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), + notify_decorators(State1), + reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> + State = #q{consumers = Consumers, + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn }) -> ok = maybe_send_reply(ChPid, OkMsg), case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); Consumers1 -> - Holder1 = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, + Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, + Holder, SingleActiveConsumerOn, Consumers1 + ), State1 = State#q{consumers = Consumers1, - exclusive_consumer = Holder1}, + active_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), notify_decorators(State1), case should_auto_delete(State1) of @@ -1325,6 +1382,24 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). +new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer, + _SingleActiveConsumerIsOn = true, Consumers) -> + case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of + true -> + case rabbit_queue_consumers:get_consumer(Consumers) of + undefined -> none; + Consumer -> Consumer + end; + false -> + CurrentSingleActiveConsumer + end; +new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer, + _SingleActiveConsumerIsOn = false, _Consumers) -> + case CurrentSingleActiveConsumer of + {ChPid, ConsumerTag} -> none; + _ -> CurrentSingleActiveConsumer + end. + handle_cast(init, State) -> try init_it({no_barrier, non_clean_shutdown}, none, State) @@ -1432,7 +1507,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, run_message_queue(true, State1) end); - handle_cast(notify_decorators, State) -> notify_decorators(State), noreply(State); diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e96dfd7673..258e85ffa2 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -27,6 +27,10 @@ -export([has_for_source/1, remove_for_source/1, remove_for_destination/2, remove_transient_for_destination/1]). +-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, + kind = exchange, + name = <<>>}). + %%---------------------------------------------------------------------------- -export_type([key/0, deletions/0]). @@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) -> (Serial, false) -> x_callback(Serial, X, add_binding, B) end). +exists(#binding{source = ?DEFAULT_EXCHANGE(_), + destination = #resource{kind = queue, name = QName} = Queue, + key = QName, + args = []}) -> + case rabbit_amqqueue:lookup(Queue) of + {ok, _} -> true; + {error, not_found} -> false + end; exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> @@ -243,9 +255,17 @@ list(VHostPath) -> destination = VHostResource, _ = '_'}, _ = '_'}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. - + %% if there are any default exchange bindings left after an upgrade + %% of a pre-3.8 database, filter them out + AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)], + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_bindings(VHostPath) ++ Filtered. + +list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> + implicit_bindings(VHostPath); list_for_source(SrcName) -> mnesia:async_dirty( fun() -> @@ -255,16 +275,43 @@ list_for_source(SrcName) -> end). list_for_destination(DstName) -> - mnesia:async_dirty( - fun() -> - Route = #route{binding = #binding{destination = DstName, - _ = '_'}}, - [reverse_binding(B) || - #reverse_route{reverse_binding = B} <- - mnesia:match_object(rabbit_reverse_route, - reverse_route(Route), read)] - end). - + implicit_for_destination(DstName) ++ + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{destination = DstName, + _ = '_'}}, + [reverse_binding(B) || + #reverse_route{reverse_binding = B} <- + mnesia:match_object(rabbit_reverse_route, + reverse_route(Route), read)] + end). + +implicit_bindings(VHostPath) -> + DstQueues = rabbit_amqqueue:list_names(VHostPath), + [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []} + || DstQueue = #resource{name = QName} <- DstQueues ]. + +implicit_for_destination(DstQueue = #resource{kind = queue, + virtual_host = VHostPath, + name = QName}) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; +implicit_for_destination(_) -> + []. + +list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), + #resource{kind = queue, + virtual_host = VHostPath, + name = QName} = DstQueue) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; list_for_source_and_destination(SrcName, DstName) -> mnesia:async_dirty( fun() -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1ae0fd6bd7..4ed6a6055b 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_fifo). @@ -184,6 +184,8 @@ suspected_down = false :: boolean() }). +-type consumer() :: #consumer{}. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -241,7 +243,12 @@ msg_bytes_checkout = 0 :: non_neg_integer(), max_length :: maybe(non_neg_integer()), max_bytes :: maybe(non_neg_integer()), - overflow :: 'drop-head' | 'reject-publish' + overflow :: 'drop-head' | 'reject-publish', + %% whether single active consumer is on or not for this queue + consumer_strategy = default :: default | single_active, + %% waiting consumers, one is picked active consumer is cancelled or dies + %% used only when single active consumer is on + waiting_consumers = [] :: [{consumer_id(), consumer()}] }). -opaque state() :: #state{}. @@ -252,7 +259,8 @@ become_leader_handler => applied_mfa(), shadow_copy_interval => non_neg_integer(), max_length => non_neg_integer(), - max_bytes => non_neg_integer()}. + max_bytes => non_neg_integer(), + single_active_consumer_on => boolean()}. -export_type([protocol/0, delivery/0, @@ -282,12 +290,19 @@ update_config(Conf, State) -> MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), Overflow = maps:get(overflow, Conf, undefined), + ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of + true -> + single_active; + false -> + default + end, State#state{dead_letter_handler = DLH, become_leader_handler = BLH, shadow_copy_interval = SHI, max_length = MaxLength, max_bytes = MaxBytes, - overflow = init_overflow(Overflow)}. + overflow = init_overflow(Overflow), + consumer_strategy = ConsumerStrategy}. init_overflow(undefined) -> 'drop-head'; @@ -547,7 +562,8 @@ state_enter(leader, #state{consumers = Cons, Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], - Effects = Mons ++ Nots, + NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -735,6 +751,42 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, + {Effects0, #state{consumer_strategy = default} = S0}) -> + %% general case, single active consumer off + cancel_consumer0(ConsumerId, {Effects0, S0}); +cancel_consumer(ConsumerId, + {Effects0, #state{consumer_strategy = single_active, + waiting_consumers = [] } = S0}) -> + %% single active consumer on, no consumers are waiting + cancel_consumer0(ConsumerId, {Effects0, S0}); +cancel_consumer(ConsumerId, + {Effects0, #state{consumers = Cons0, + consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0 } = State0}) -> + %% single active consumer on, consumers are waiting + case maps:take(ConsumerId, Cons0) of + {_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} -> + % The active consumer is to be removed + % Cancel it + S = return_all(State0, Checked0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + % Take another one from the waiting consumers and put it in consumers + [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, + #state{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), + State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = RemainingWaitingConsumers}, + {Effects, State1}; + error -> + % The cancelled consumer is not the active one + % Just remove it from idle_consumers + {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0), + % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here + {Effects0, State0#state{waiting_consumers = WaitingConsumers1}} + end. + +cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> @@ -745,9 +797,9 @@ cancel_consumer(ConsumerId, {[{aux, inactive} | Effects], S#state{consumers = Cons}}; _ -> {Effects, S#state{consumers = Cons}} - end; + end; error -> - % already removed - do nothing + %% already removed: do nothing {Effects0, S0} end. @@ -1168,23 +1220,42 @@ uniq_queue_in(Key, Queue) -> end. +update_consumer(ConsumerId, Meta, Spec, + #state{consumer_strategy = default} = State0) -> + %% general case, single active consumer off + update_consumer0(ConsumerId, Meta, Spec, State0); +update_consumer(ConsumerId, Meta, Spec, + #state{consumers = Cons0, + consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 -> + %% single active consumer on, no one is consuming yet + update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> + %% single active consumer on and one active consumer already + %% adding the new consumer to the waiting list + Consumer = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, + WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], + State0#state{waiting_consumers = WaitingConsumers1}. + +update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, + #state{consumers = Cons0, + service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, Cons = maps:update_with(ConsumerId, - fun(S) -> - %% remove any in-flight messages from - %% the credit update - N = maps:size(S#consumer.checked_out), - C = max(0, Credit - N), - S#consumer{lifetime = Life, - credit = C} - end, Init, Cons0), + fun(S) -> + %% remove any in-flight messages from + %% the credit update + N = maps:size(S#consumer.checked_out), + C = max(0, Credit - N), + S#consumer{lifetime = Life, + credit = C} + end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), - ServiceQueue0), + ServiceQueue0), State0#state{consumers = Cons, service_queue = ServiceQueue}. @@ -1600,7 +1671,6 @@ cancelled_checkout_out_test() -> {State3, {dequeue, {0, {_, first}}}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), - ?debugFmt("State3 ~p", [State3]), {_State, {dequeue, {_, {_, second}}}, _} = apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), ok. @@ -1798,8 +1868,7 @@ return_prefix_msg_count_test() -> ], Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), - {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), - ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]), + {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), ok. @@ -1871,7 +1940,6 @@ run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin - ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Name, C) end || C <- prefixes(Commands, 1, [])]. @@ -1884,11 +1952,8 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), - ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - ?debugFmt("Name ~p~nS~p~nState~p~nn", - [Name, S, State]), ?assertEqual(State, S) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -2011,6 +2076,71 @@ down_returns_checked_out_in_order_test() -> ?assertEqual(lists:sort(Returns), Returns), ok. +single_active_consumer_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#state.consumer_strategy), + ?assertEqual(0, map_size(State0#state.consumers)), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + % the first registered consumer is the active one, the others are waiting + ?assertEqual(1, map_size(State1#state.consumers)), + ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)), + ?assertEqual(3, length(State1#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), + + % cancelling a waiting consumer + {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), + % the active consumer should still be in place + ?assertEqual(1, map_size(State2#state.consumers)), + ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), + % the cancelled consumer has been removed from waiting consumers + ?assertEqual(2, length(State2#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)), + + % cancelling the active consumer + {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + % the second registered consumer is now the active one + ?assertEqual(1, map_size(State3#state.consumers)), + ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), + % the new active consumer is no longer in the waiting list + ?assertEqual(1, length(State3#state.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), + + % cancelling the active consumer + {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), + % the last waiting consumer became the active one + ?assertEqual(1, map_size(State4#state.consumers)), + ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), + % the waiting consumer list is now empty + ?assertEqual(0, length(State4#state.waiting_consumers)), + + % cancelling the last consumer + {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), + % no active consumer anymore + ?assertEqual(0, map_size(State5#state.consumers)), + % still nothing in the waiting list + ?assertEqual(0, length(State5#state.waiting_consumers)), + + ok. + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 98582c8117..e743fbce18 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -11,17 +11,18 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/3, + send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, - credit/6, utilisation/1]). + credit/6, utilisation/1, is_same/3, get_consumer/1, get/3, + consumer_tag/1]). %%---------------------------------------------------------------------------- @@ -42,7 +43,7 @@ acktags, consumer_count, %% Queue of {ChPid, #consumer{}} for consumers which have - %% been blocked for any reason + %% been blocked (rate/prefetch limited) for any reason blocked_consumers, %% The limiter itself limiter, @@ -57,6 +58,9 @@ use :: {'inactive', time_micros(), time_micros(), ratio()} | {'active', time_micros(), ratio()}}. +-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(), + prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(), + user::rabbit_types:username()}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). @@ -81,7 +85,8 @@ state()}. -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), - rabbit_amqqueue:name(), state()) -> + rabbit_amqqueue:name(), state(), boolean(), + none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) -> {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. @@ -95,6 +100,7 @@ -spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). +-spec consumer_tag(consumer()) -> rabbit_types:ctag(). %%---------------------------------------------------------------------------- @@ -189,10 +195,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). - +deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) -> + deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer). + +deliver(_FetchFun, _QName, false, State, true, none) -> + {undelivered, false, + State#state{use = update_use(State#state.use, inactive)}}; +deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) -> + {ChPid, Consumer} = SingleActiveConsumer, + %% blocked (rate/prefetch limited) consumers are removed from the queue state, but not the exclusive_consumer field, + %% so we need to do this check to avoid adding the exclusive consumer to the channel record + %% over and over + case is_blocked(SingleActiveConsumer) of + true -> + {undelivered, false, + State#state{use = update_use(State#state.use, inactive)}}; + false -> + case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of + {delivered, R} -> + {delivered, false, R, State}; + undelivered -> + {ChPid, Consumer} = SingleActiveConsumer, + Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), + {undelivered, true, + State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} + end + end; deliver(FetchFun, QName, ConsumersChanged, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers}, false, _SingleActiveConsumer) -> case priority_queue:out_p(Consumers) of {empty, _} -> {undelivered, ConsumersChanged, @@ -205,7 +235,7 @@ deliver(FetchFun, QName, ConsumersChanged, Tail)}}; undelivered -> deliver(FetchFun, QName, true, - State#state{consumers = Tail}) + State#state{consumers = Tail}, false, _SingleActiveConsumer) end end. @@ -246,6 +276,10 @@ deliver_to_consumer(FetchFun, unsent_message_count = Count + 1}), R. +is_blocked(Consumer = {ChPid, _C}) -> + #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid), + priority_queue:member(Consumer, BlockedConsumers). + record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}), @@ -357,6 +391,29 @@ utilisation(#state{use = {active, Since, Avg}}) -> utilisation(#state{use = {inactive, Since, Active, Avg}}) -> use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg). +is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) -> + true; +is_same(_ChPid, _ConsumerTag, _Consumer) -> + false. + +get_consumer(#state{consumers = Consumers}) -> + case priority_queue:out_p(Consumers) of + {{value, Consumer, _Priority}, _Tail} -> Consumer; + {empty, _} -> undefined + end. + +get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> + Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP == ChPid) and (CT == ConsumerTag) + end, Consumers), + case priority_queue:out_p(Consumers1) of + {empty, _} -> undefined; + {{value, Consumer, _Priority}, _Tail} -> Consumer + end. + +consumer_tag(#consumer{tag = CTag}) -> + CTag. + %%---------------------------------------------------------------------------- parse_credit_args(Default, Args) -> diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index c532714d41..7c386855f6 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -38,45 +38,33 @@ description() -> <<"Locate queue master node from cluster node with least bound queues">>}]. queue_master_location(#amqqueue{} = Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - VHosts = rabbit_vhost:list(), - BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), - {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), - {ok, MinMaster}. + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), + QueueNames = rabbit_amqqueue:list_names(), + MastersPerNode = lists:foldl( + fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> + case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + case maps:is_key(Master, NodeMasters) of + true -> maps:update_with(Master, + fun(N) -> N + 1 end, + NodeMasters); + false -> NodeMasters + end; + _ -> NodeMasters + end + end, + maps:from_list([{N, 0} || N <- Cluster]), + QueueNames), -%%--------------------------------------------------------------------------- -%% Private helper functions -%%--------------------------------------------------------------------------- -get_min_master(Cluster, BoundQueueMasters) -> - lists:min([ {count_masters(Node, BoundQueueMasters), Node} || - Node <- Cluster ]). - -count_masters(Node, Masters) -> - length([ X || X <- Masters, X == Node ]). - -get_bound_queue_masters_per_vhost([], Acc) -> - lists:flatten(Acc); -get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> - BoundQueueNames = - lists:filtermap( - fun(#binding{destination =#resource{kind = queue, - name = QueueName}}) -> - {true, QueueName}; - (_) -> - false - end, - rabbit_binding:list(VHost)), - UniqQueueNames = lists:usort(BoundQueueNames), - BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []), - get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). - - -get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; -get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) -> - QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( - QueueName, VHost) of - {ok, Master} when is_atom(Master) -> - [Master|QueueMastersAcc]; - _ -> QueueMastersAcc - end, - get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0). + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, + MastersPerNode), + {ok, MinNode}.
\ No newline at end of file diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index f1be871872..58d715dce7 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,7 +37,8 @@ -export([policy_changed/2]). -export([cleanup_data_dir/0]). --include_lib("rabbit_common/include/rabbit.hrl"). +%%-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). -type ra_server_id() :: {Name :: atom(), Node :: node()}. @@ -156,7 +157,14 @@ ra_machine_config(Q = #amqqueue{name = QName, become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - overflow => Overflow}. + overflow => Overflow, + single_active_consumer_on => single_active_consumer_on(Q)}. + +single_active_consumer_on(#amqqueue{arguments = QArguments}) -> + case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of + {bool, true} -> true; + _ -> false + end. cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> Node = node(ChPid), diff --git a/src/rabbit_sysmon_handler.erl b/src/rabbit_sysmon_handler.erl new file mode 100644 index 0000000000..9b778bcfc3 --- /dev/null +++ b/src/rabbit_sysmon_handler.erl @@ -0,0 +1,235 @@ +%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. + +%% @doc A custom event handler to the `sysmon_handler' application's +%% `system_monitor' event manager. +%% +%% This module attempts to discover more information about a process +%% that generates a system_monitor event. + +-module(rabbit_sysmon_handler). + +-behaviour(gen_event). + +%% API +-export([add_handler/0]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +-record(state, {timer_ref :: reference()}). + +-define(INACTIVITY_TIMEOUT, 5000). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +add_handler() -> + %% Vulnerable to race conditions (installing handler multiple + %% times), but risk is zero in the common OTP app startup case. + case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of + true -> + ok; + false -> + sysmon_handler_filter:add_custom_handler(?MODULE, []) + end. + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a new event handler is added to an event manager, +%% this function is called to initialize the event handler. +%% +%% @spec init(Args) -> {ok, State} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}, hibernate}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives an event sent using +%% gen_event:notify/2 or gen_event:sync_notify/2, this function is +%% called for each installed event handler to handle the event. +%% +%% @spec handle_event(Event, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_event({monitor, Pid, Type, _Info}, + State=#state{timer_ref=TimerRef}) when Pid == self() -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + maybe_collect_garbage(Type), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + {Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort), + rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event({suppressed, Type, Info}, State=#state{timer_ref=TimerRef}) -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + rabbit_log:debug("~p encountered a suppressed event of type ~w: ~w", [?MODULE, Type, Info]), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event(Event, State=#state{timer_ref=TimerRef}) -> + NewTimerRef = reset_timer(TimerRef), + rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]), + {ok, State#state{timer_ref=NewTimerRef}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives a request sent using +%% gen_event:call/3,4, this function is called for the specified +%% event handler to handle the request. +%% +%% @spec handle_call(Request, State) -> +%% {ok, Reply, State} | +%% {swap_handler, Reply, Args1, State1, Mod2, Args2} | +%% {remove_handler, Reply} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Call, State) -> + Reply = not_supported, + {ok, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for each installed event handler when +%% an event manager receives any other message than an event or a +%% synchronous request (or a system message). +%% +%% @spec handle_info(Info, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_info(inactivity_timeout, State) -> + %% No events have arrived for the timeout period + %% so hibernate to free up resources. + {ok, State, hibernate}; +handle_info(Info, State) -> + rabbit_log:info("handle_info got ~p", [Info]), + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event handler is deleted from an event manager, this +%% function is called. It should be the opposite of Module:init/1 and +%% do any necessary cleaning up. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +format_pretty_proc_or_port_info(PidOrPort) -> + try + case get_pretty_proc_or_port_info(PidOrPort) of + undefined -> + {"", []}; + Res -> + Res + end + catch C:E:S -> + {"Pid ~w, ~W ~W at ~w\n", + [PidOrPort, C, 20, E, 20, S]} + end. + +get_pretty_proc_or_port_info(Pid) when is_pid(Pid) -> + Infos = [registered_name, initial_call, current_function, message_queue_len], + case process_info(Pid, Infos) of + undefined -> + undefined; + [] -> + undefined; + [{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] -> + ICT = case proc_lib:translate_initial_call(Pid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + ICT1; + ICT2 -> + {initial_call, ICT2} + end, + RNL = if RN0 == [] -> []; + true -> [{name, RN0}] + end, + {"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]} + end; +get_pretty_proc_or_port_info(Port) when is_port(Port) -> + PortInfo = erlang:port_info(Port), + {value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo), + QueueSize = [erlang:port_info(Port, queue_size)], + Connected = case proplists:get_value(connected, PortInfo2) of + undefined -> + []; + ConnectedPid -> + case proc_lib:translate_initial_call(ConnectedPid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + []; + ICT -> + [{initial_call, ICT}] + end + end, + {"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}. + + +%% @doc If the message type is due to a large heap warning +%% and the source is ourself, go ahead and collect garbage +%% to avoid the death spiral. +-spec maybe_collect_garbage(atom()) -> ok. +maybe_collect_garbage(large_heap) -> + erlang:garbage_collect(), + ok; +maybe_collect_garbage(_) -> + ok. + +-spec reset_timer(undefined | reference()) -> reference(). +reset_timer(undefined) -> + erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout); +reset_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + reset_timer(undefined). diff --git a/src/rabbit_sysmon_minder.erl b/src/rabbit_sysmon_minder.erl new file mode 100644 index 0000000000..b2f5dd6316 --- /dev/null +++ b/src/rabbit_sysmon_minder.erl @@ -0,0 +1,156 @@ +%% ------------------------------------------------------------------- +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(rabbit_sysmon_minder). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + %% Add our system_monitor event handler. We do that here because + %% we have a process at our disposal (i.e. ourself) to receive the + %% notification in the very unlikely event that the + %% sysmon_handler has crashed and been removed from the + %% sysmon_handler gen_event server. (If we had a supervisor + %% or app-starting process add the handler, then if the handler + %% crashes, nobody will act on the crash notification.) + rabbit_sysmon_handler:add_handler(), + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info({gen_event_EXIT, rabbit_sysmon_handler, _}, State) -> + %% SASL will create an error message, no need for us to duplicate it. + %% + %% Our handler should never crash, but it did indeed crash. If + %% there's a pathological condition somewhere that's generating + %% lots of unforseen things that crash core's custom handler, we + %% could make things worse by jumping back into the exploding + %% volcano. Wait a little bit before jumping back. Besides, the + %% system_monitor data is nice but is not critical: there is no + %% need to make things worse if things are indeed bad, and if we + %% miss a few seconds of system_monitor events, the world will not + %% end. + timer:sleep(2*1000), + rabbit_sysmon_handler:add_handler(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 50ca777aa8..0cd274b757 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -171,6 +171,10 @@ tcp_listen_options.exit_on_close = false", "channel_max = 16", [{rabbit,[{channel_max, 16}]}], []}, + {max_message_size, + "max_message_size = 131072", + [{rabbit, [{max_message_size, 131072}]}], + []}, {listeners_tcp_ip, "listeners.tcp.1 = 192.168.1.99:5672", [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}], diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index 051f3f13b7..20adf704a0 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) -> Conn4 = open_unmanaged_connection(Config, B), Conn5 = open_unmanaged_connection(Config, C), Conn6 = open_unmanaged_connection(Config, C), - ?assertEqual(6, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 6, 60000), %% B drops off the network, non-reachable by either A or C rabbit_ct_broker_helpers:block_traffic_between(A, B), @@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) -> timer:sleep(?DELAY), %% A and C are still connected, so 4 connections are tracked - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), rabbit_ct_broker_helpers:allow_traffic_between(A, B), rabbit_ct_broker_helpers:allow_traffic_between(B, C), timer:sleep(?DELAY), %% during autoheal B's connections were dropped - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), lists:foreach(fun (Conn) -> (catch rabbit_ct_client_helpers:close_connection(Conn)) @@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) -> passed. - %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 -> + ?assertEqual(Expected, count_connections_in(Config, VHost)); +wait_for_count_connections_in(Config, VHost, Expected, Time) -> + case count_connections_in(Config, VHost) of + Expected -> + ok; + _ -> + Sleep = 3000, + timer:sleep(Sleep), + wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep) + end. + count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, 0). count_connections_in(Config, VHost, NodeIndex) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ecf4abe58d..0f4ea6cadb 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -338,7 +338,7 @@ start_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -354,7 +354,7 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -415,13 +415,13 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -440,7 +440,7 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -519,7 +519,7 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -561,7 +561,7 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -1261,7 +1261,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup]) + [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1298,7 +1298,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1962,7 +1962,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -2353,7 +2353,8 @@ wait_for_cleanup(Server, Channel, Number) -> wait_for_cleanup(Server, Channel, Number, 60). wait_for_cleanup(Server, Channel, Number, 0) -> - ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel]))); + ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])), + Number); wait_for_cleanup(Server, Channel, Number, N) -> case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of Length when Number == Length -> @@ -2379,7 +2380,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> (_) -> -1 end, Msgs), - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); + ?assertEqual([Number || _ <- lists:seq(1, length(Servers))], + Totals); wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), case lists:all(fun(M) when is_map(M) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 3263a733a9..0512e8161a 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) -> meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), - ra_server_sup:remove_all(), + ra_server_sup_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 1af0d3b4b0..a8604b46af 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -259,7 +259,8 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) -> _ -> T end; -handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> +handle_op({input_event, Settlement}, #t{effects = Effs, + down = Down} = T) -> case queue:out(Effs) of {{value, {settle, MsgIds, CId}}, Q} -> Cmd = case Settlement of @@ -269,7 +270,14 @@ handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> end, do_apply(Cmd, T#t{effects = Q}); {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> - do_apply(Cmd, T#t{effects = Q}); + case maps:is_key(element(2, Cmd), Down) of + true -> + %% enqueues cannot arrive after down for the same process + %% drop message + T#t{effects = Q}; + false -> + do_apply(Cmd, T#t{effects = Q}) + end; _ -> T end; diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl new file mode 100644 index 0000000000..945229e372 --- /dev/null +++ b/test/single_active_consumer_SUITE.erl @@ -0,0 +1,286 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(single_active_consumer_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, classic_queue}, {group, quorum_queue} + ]. + +groups() -> + [ + {classic_queue, [], [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled, + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + amqp_exclusive_consume_fails_on_exclusive_consumer_queue + ]}, + {quorum_queue, [], [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled, + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(classic_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"classic">>} + ], + auto_delete = true} + } | Config]; +init_per_group(quorum_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +all_messages_go_to_one_consumer(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 5, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)], + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(MessageCount, MessageCount), + ?assertEqual(2, maps:size(MessagesPerConsumer)), + ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)), + ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 10, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag3} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], + + {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), + ?assertEqual(1, length(FirstActiveConsumerInList)), + + FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}), + + {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], + + {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + SecondActiveConsumerInList = maps:keys(maps:filter( + fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, + MessagesPerConsumer2) + ), + ?assertEqual(1, length(SecondActiveConsumerInList)), + SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList), + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), + wait_for_messages(1), + + LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(MessageCount, MessageCount), + ?assertEqual(3, maps:size(MessagesPerConsumer)), + ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)), + ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)), + ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> + {C, Ch} = connection_and_channel(Config), + {C1, Ch1} = connection_and_channel(Config), + {C2, Ch2} = connection_and_channel(Config), + {C3, Ch3} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 10, + Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]), + Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), + Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], + + {MessagesPerConsumer1, MessageCount1} = consume_results(), + ?assertEqual(MessageCount div 2, MessageCount1), + ?assertEqual(1, maps:size(MessagesPerConsumer1)), + ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)), + + ok = amqp_channel:close(Ch1), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], + + {MessagesPerConsumer2, MessageCount2} = consume_results(), + ?assertEqual(MessageCount div 2 - 1, MessageCount2), + ?assertEqual(1, maps:size(MessagesPerConsumer2)), + + ok = amqp_channel:close(Ch2), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}), + + {MessagesPerConsumer3, MessageCount3} = consume_results(), + ?assertEqual(1, MessageCount3), + ?assertEqual(1, maps:size(MessagesPerConsumer3)), + + [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]], + ok. + +amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + ?assertExit( + {{shutdown, {server_initiated_close, 403, _}}, _}, + amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true}) + ), + amqp_connection:close(C), + ok. + +connection_and_channel(Config) -> + C = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Ch} = amqp_connection:open_channel(C), + {C, Ch}. + +queue_declare(Channel, Config) -> + Declare = ?config(single_active_consumer_queue_declare, Config), + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare), + Q. + +consume({Parent, State, 0}) -> + Parent ! {consumer_done, State}; +consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown}); + {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} -> + Parent ! {consumer_done, + {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}}; + {#'basic.deliver'{consumer_tag = CTag}, _Content} -> + NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}, + Parent ! {message, NewState}, + consume({Parent, NewState, CountDown - 1}); + #'basic.cancel_ok'{consumer_tag = CTag} -> + Parent ! {cancel_ok, CTag}, + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}); + _ -> + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) + after 10000 -> + Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}}, + exit(consumer_timeout) + end. + +consume_results() -> + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + {MessagesPerConsumer, MessageCount}; + {consumer_timeout, {MessagesPerConsumer, MessageCount}} -> + {MessagesPerConsumer, MessageCount}; + _ -> + consume_results() + after 1000 -> + throw(failed) + end. + +wait_for_messages(ExpectedCount) -> + wait_for_messages(ExpectedCount, {}). + +wait_for_messages(0, State) -> + State; +wait_for_messages(ExpectedCount, _) -> + receive + {message, {MessagesPerConsumer, MessageCount}} -> + wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) + after 5000 -> + throw(message_waiting_timeout) + end. + +wait_for_cancel_ok() -> + receive + {cancel_ok, CTag} -> + {cancel_ok, CTag} + after 5000 -> + throw(consumer_cancel_ok_timeout) + end. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index c64374a73e..0b8477fbd4 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -77,7 +77,7 @@ groups() -> suite() -> [ - {timetrap, {seconds, 60}} + {timetrap, {minutes, 3}} ]. %% ------------------------------------------------------------------- diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl new file mode 100644 index 0000000000..08d12e7ec5 --- /dev/null +++ b/test/unit_queue_consumers_SUITE.erl @@ -0,0 +1,102 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(unit_queue_consumers_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + is_same, + get_consumer, + get + ]. + +is_same(_Config) -> + ?assertEqual( + true, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"1">>) + )), + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"2">>) + )), + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(Pid, <<"1">>) + )), + ok. + +get(_Config) -> + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {Pid, {consumer, <<"2">>, _, _, _, _}} = + rabbit_queue_consumers:get(Pid, <<"2">>, State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(self(), <<"2">>, State) + ), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(Pid, <<"1">>, State) + ), + ok. + +get_consumer(_Config) -> + Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {_Pid, {consumer, _, _, _, _, _}} = + rabbit_queue_consumers:get_consumer(State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get_consumer(state(consumers([]))) + ), + ok. + +consumers([]) -> + priority_queue:new(); +consumers(Consumers) -> + consumers(Consumers, priority_queue:new()). + +consumers([H], Q) -> + priority_queue:in(H, Q); +consumers([H | T], Q) -> + consumers(T, priority_queue:in(H, Q)). + + +consumer(Pid, ConsumerTag) -> + {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}. + +state(Consumers) -> + {state, Consumers, {}}. + +function_for_process() -> + receive + _ -> ok + end.
\ No newline at end of file |