summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-01-10 08:59:41 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-01-10 08:59:41 +0000
commitf1f2cac56844224ea6eaeedc911ede90d53f1f1c (patch)
tree91e70a0a8b2b905caeab961d4f36ebfea2f7f3d3 /test
parent0a254da651d6ff34b5ef35daf14cb31e0dafda88 (diff)
parent7a4b4eb3ba1a66cb67d66ce8d09df5f02ac54f9b (diff)
downloadrabbitmq-server-git-f1f2cac56844224ea6eaeedc911ede90d53f1f1c.tar.gz
Merge remote-tracking branch 'origin/master' into qq-testing
Diffstat (limited to 'test')
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets845
-rw-r--r--test/per_vhost_connection_limit_partitions_SUITE.erl19
-rw-r--r--test/quorum_queue_SUITE.erl23
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl12
-rw-r--r--test/single_active_consumer_SUITE.erl286
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl77
-rw-r--r--test/unit_queue_consumers_SUITE.erl102
8 files changed, 1258 insertions, 108 deletions
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index b318adaa12..a7a06aaadf 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -1,24 +1,56 @@
[{internal_auth_backend,
"auth_backends.1 = internal",
- [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]}],
+ [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ldap_auth_backend,
"auth_backends.1 = ldap",
- [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]}],
+ [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{multiple_auth_backends,
"auth_backends.1 = ldap
auth_backends.2 = internal",
[{rabbit,
[{auth_backends,
- [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]}],
+ [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{full_name_auth_backend,
"auth_backends.1 = ldap
# uses module name instead of a short alias, \"http\"
auth_backends.2 = rabbit_auth_backend_http",
[{rabbit,
- [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]}],
+ [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{third_party_auth_backend,
"auth_backends.1.authn = internal
@@ -26,14 +58,30 @@ auth_backends.2 = rabbit_auth_backend_http",
auth_backends.1.authz = rabbit_auth_backend_ip_range",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]}],
+ [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_authz_backend,
"auth_backends.1.authn = ldap
auth_backends.1.authz = internal",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]}],
+ [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_authz_multiple_backends,
"auth_backends.1.authn = ldap
@@ -42,13 +90,29 @@ auth_backends.2 = internal",
[{rabbit,
[{auth_backends,
[{rabbit_auth_backend_ldap,rabbit_auth_backend_internal},
- rabbit_auth_backend_internal]}]}],
+ rabbit_auth_backend_internal]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{authn_backend_only,
"auth_backends.1.authn = ldap",
[{rabbit,
[{auth_backends,
- [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]}],
+ [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options,
"ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
@@ -62,15 +126,50 @@ ssl_options.fail_if_no_peer_cert = true",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,true}]}]}],
+ {fail_if_no_peer_cert,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listener,
"listeners.tcp.default = 5673",
- [{rabbit,[{tcp_listeners,[5673]}]}],[]},
+ [{rabbit,[{tcp_listeners,[5673]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{ssl_listener,
- "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]}],[]},
+ "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{num_acceptors,
- "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]}],[]},
+ "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
+ []},
{default_user_settings,
"default_user = guest
default_pass = guest
@@ -82,7 +181,15 @@ default_permissions.write = .*",
[{default_user,<<"guest">>},
{default_pass,<<"guest">>},
{default_user_tags,[administrator]},
- {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]}],
+ {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -93,7 +200,15 @@ cluster_formation.node_type = disc",
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
{node_type,disc}]},
- {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}],
+ {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_disK,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -104,88 +219,248 @@ cluster_formation.node_type = disc",
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
{node_type,disc}]},
- {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}],
+ {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_ram_ignored,
- "cluster_formation.node_type = ram",[],[]},
+ "cluster_formation.node_type = ram",[
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{tcp_listen_options,
"tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
tcp_listen_options.exit_on_close = false",
[{rabbit,
[{tcp_listen_options,
- [{backlog,128},{nodelay,true},{exit_on_close,false}]}]}],
+ [{backlog,128},{nodelay,true},{exit_on_close,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_absolute,
"vm_memory_high_watermark.absolute = 1073741824",
- [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]}],
+ [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_absolute_units,
"vm_memory_high_watermark.absolute = 1024MB",
- [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]}],
+ [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_watermark_paging_ratio,
"vm_memory_high_watermark_paging_ratio = 0.75
vm_memory_high_watermark.relative = 0.4",
[{rabbit,
[{vm_memory_high_watermark_paging_ratio,0.75},
- {vm_memory_high_watermark,0.4}]}],
+ {vm_memory_high_watermark,0.4}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{memory_monitor_interval, "memory_monitor_interval = 5000",
[{rabbit,
- [{memory_monitor_interval, 5000}]}],
+ [{memory_monitor_interval, 5000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = rss",
[{rabbit,
- [{vm_memory_calculation_strategy, rss}]}],
+ [{vm_memory_calculation_strategy, rss}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = erlang",
[{rabbit,
- [{vm_memory_calculation_strategy, erlang}]}],
+ [{vm_memory_calculation_strategy, erlang}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = allocated",
[{rabbit,
- [{vm_memory_calculation_strategy, allocated}]}],
+ [{vm_memory_calculation_strategy, allocated}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{vm_memory_calculation_strategy, "vm_memory_calculation_strategy = legacy",
[{rabbit,
- [{vm_memory_calculation_strategy, legacy}]}],
+ [{vm_memory_calculation_strategy, legacy}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{total_memory_available_override_value,
"total_memory_available_override_value = 1000000000",
- [{rabbit,[{total_memory_available_override_value, 1000000000}]}],
+ [{rabbit,[{total_memory_available_override_value, 1000000000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{total_memory_available_override_value_units,
"total_memory_available_override_value = 1024MB",
- [{rabbit,[{total_memory_available_override_value, "1024MB"}]}],
+ [{rabbit,[{total_memory_available_override_value, "1024MB"}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{connection_max,
"connection_max = 999",
- [{rabbit,[{connection_max, 999}]}],
+ [{rabbit,[{connection_max, 999}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{connection_max,
"connection_max = infinity",
- [{rabbit,[{connection_max, infinity}]}],
+ [{rabbit,[{connection_max, infinity}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{channel_max,
"channel_max = 16",
- [{rabbit,[{channel_max, 16}]}],
+ [{rabbit,[{channel_max, 16}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip,
"listeners.tcp.1 = 192.168.1.99:5672",
- [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip_multiple,
"listeners.tcp.1 = 127.0.0.1:5672
listeners.tcp.2 = ::1:5672",
- [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ip_all,"listeners.tcp.1 = :::5672",
- [{rabbit,[{tcp_listeners,[{"::",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"::",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{listeners_tcp_ipv6,
"listeners.tcp.1 = fe80::2acf:e9ff:fe17:f97b:5672",
- [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]}],
+ [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_options_sndbuf,
"tcp_listen_options.backlog = 128
@@ -194,7 +469,15 @@ tcp_listen_options.exit_on_close = false",
tcp_listen_options.recbuf = 196608",
[{rabbit,
[{tcp_listen_options,
- [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]}],
+ [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_nodelay_with_kernel,
"tcp_listen_options.backlog = 4096
@@ -205,16 +488,40 @@ tcp_listen_options.exit_on_close = false",
[{kernel,
[{inet_default_connect_options,[{nodelay,true}]},
{inet_default_listen_options,[{nodelay,true}]}]},
- {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}],
+ {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_nodelay,
"tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true",
- [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}],
+ [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_handshake_timeout,
"ssl_handshake_timeout = 10000",
- [{rabbit,[{ssl_handshake_timeout,10000}]}],
+ [{rabbit,[{ssl_handshake_timeout,10000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_partition_handling_pause_if_all_down,
"cluster_partition_handling = pause_if_all_down
@@ -227,15 +534,39 @@ tcp_listen_options.exit_on_close = false",
cluster_partition_handling.pause_if_all_down.nodes.2 = rabbit@myhost2",
[{rabbit,
[{cluster_partition_handling,
- {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]}],
+ {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_partition_handling_autoheal,
"cluster_partition_handling = autoheal",
- [{rabbit,[{cluster_partition_handling,autoheal}]}],
+ [{rabbit,[{cluster_partition_handling,autoheal}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{password_hashing,
"password_hashing_module = rabbit_password_hashing_sha512",
- [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]}],
+ [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_verify_peer,
"listeners.ssl.1 = 5671
@@ -251,7 +582,15 @@ tcp_listen_options.exit_on_close = false",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_password,
"listeners.ssl.1 = 5671
@@ -265,7 +604,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {password,"t0p$3kRe7"}]}]}],
+ {password,"t0p$3kRe7"}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_tls_ver_old,
"listeners.ssl.1 = 5671
@@ -283,7 +630,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]}],
+ {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_tls_ver_new,
"listeners.ssl.1 = 5671
@@ -300,7 +655,15 @@ tcp_listen_options.exit_on_close = false",
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1']}]}]}],
+ {versions,['tlsv1.2','tlsv1.1']}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_ciphers,
@@ -326,19 +689,27 @@ tcp_listen_options.exit_on_close = false",
{ssl_options,
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{ciphers, [
- "DHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-GCM-SHA384",
+ "ECDHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-SHA384",
+ "ECDHE-RSA-AES256-SHA384",
"ECDH-ECDSA-AES256-GCM-SHA384",
- "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-GCM-SHA384",
+ "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-SHA384",
- "ECDHE-ECDSA-AES256-GCM-SHA384",
- "ECDHE-ECDSA-AES256-SHA384",
- "ECDHE-RSA-AES256-GCM-SHA384",
- "ECDHE-RSA-AES256-SHA384"
+ "DHE-RSA-AES256-GCM-SHA384"
]},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
- {versions,['tlsv1.2','tlsv1.1']}]}]}],
+ {versions,['tlsv1.2','tlsv1.1']}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_allow_poodle,
@@ -357,7 +728,15 @@ tcp_listen_options.exit_on_close = false",
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_depth,
"listeners.ssl.1 = 5671
@@ -375,7 +754,15 @@ tcp_listen_options.exit_on_close = false",
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{depth,2},
{verify,verify_peer},
- {fail_if_no_peer_cert,false}]}]}],
+ {fail_if_no_peer_cert,false}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_honor_cipher_order,
"listeners.ssl.1 = 5671
@@ -395,7 +782,15 @@ tcp_listen_options.exit_on_close = false",
{depth,2},
{verify,verify_peer},
{fail_if_no_peer_cert, false},
- {honor_cipher_order, true}]}]}],
+ {honor_cipher_order, true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_options_honor_ecc_order,
"listeners.ssl.1 = 5671
@@ -415,29 +810,77 @@ tcp_listen_options.exit_on_close = false",
{depth,2},
{verify,verify_peer},
{fail_if_no_peer_cert, false},
- {honor_ecc_order, true}]}]}],
+ {honor_ecc_order, true}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{ssl_cert_login_from,
"ssl_cert_login_from = common_name",
- [{rabbit,[{ssl_cert_login_from,common_name}]}],
+ [{rabbit,[{ssl_cert_login_from,common_name}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_on,
"tcp_listen_options.linger.on = true
tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_off,
"tcp_listen_options.linger.on = false
tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_on_notimeout,
"tcp_listen_options.linger.on = true",
- [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{tcp_listen_options_linger_timeout,
"tcp_listen_options.linger.timeout = 100",
- [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}],
+ [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_both_values,
@@ -445,21 +888,45 @@ tcp_listen_options.exit_on_close = false",
cluster_formation.randomized_startup_delay_range.max = 30",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {10, 30}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_min_only,
"cluster_formation.randomized_startup_delay_range.min = 10",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {10, 60}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_randomized_startup_delay_max_only,
"cluster_formation.randomized_startup_delay_range.max = 30",
[{rabbit, [{cluster_formation, [
{randomized_startup_delay_range, {5, 30}}
- ]}]}],
+ ]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_dns,
@@ -470,7 +937,15 @@ tcp_listen_options.exit_on_close = false",
[{cluster_formation,
[{peer_discovery_dns,[{hostname,<<"192.168.0.2.xip.io">>}]},
{peer_discovery_backend,rabbit_peer_discovery_dns},
- {node_type,disc}]}]}],
+ {node_type,disc}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_classic,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -478,7 +953,15 @@ tcp_listen_options.exit_on_close = false",
[{rabbit,
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
- {node_type,disc}]}]}],
+ {node_type,disc}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{cluster_formation_classic_ram,
"cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
@@ -486,19 +969,43 @@ tcp_listen_options.exit_on_close = false",
[{rabbit,
[{cluster_formation,
[{peer_discovery_backend,rabbit_peer_discovery_classic_config},
- {node_type,ram}]}]}],
+ {node_type,ram}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{background_gc_enabled,
"background_gc_enabled = true
background_gc_target_interval = 30000",
[{rabbit,
- [{background_gc_enabled,true},{background_gc_target_interval,30000}]}],
+ [{background_gc_enabled,true},{background_gc_target_interval,30000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{background_gc_disabled,
"background_gc_enabled = false
background_gc_target_interval = 30000",
[{rabbit,
- [{background_gc_enabled,false},{background_gc_target_interval,30000}]}],
+ [{background_gc_enabled,false},{background_gc_target_interval,30000}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{credential_validator_length,
"credential_validator.validation_backend = rabbit_credential_validator_min_password_length
@@ -507,7 +1014,15 @@ credential_validator.min_length = 10",
[{credential_validator,
[{validation_backend,
rabbit_credential_validator_min_password_length},
- {min_length,10}]}]}],
+ {min_length,10}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{credential_validator_regexp,
"credential_validator.validation_backend = rabbit_credential_validator_password_regexp
@@ -515,78 +1030,198 @@ credential_validator.regexp = ^abc\\d+",
[{rabbit,
[{credential_validator,
[{validation_backend,rabbit_credential_validator_password_regexp},
- {regexp,"^abc\\d+"}]}]}],
+ {regexp,"^abc\\d+"}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{proxy_protocol_on,
"proxy_protocol = true",
- [{rabbit,[{proxy_protocol,true}]}],[]},
+ [{rabbit,[{proxy_protocol,true}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{proxy_protocol_off,
"proxy_protocol = false",
- [{rabbit,[{proxy_protocol,false}]}],[]},
+ [{rabbit,[{proxy_protocol,false}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],[]},
{log_debug_file,
"log.file.level = debug",
- [{rabbit,[{log, [{file, [{level, debug}]}]}]}],
+ [{rabbit,[{log, [{file, [{level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_console,
"log.console = true
log.console.level = debug",
- [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_exchange,
"log.exchange = true
log.exchange.level = debug",
- [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_debug_syslog,
"log.syslog = true
log.syslog.level = debug",
- [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]}],
+ [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_file_name,
"log.file = file_name",
- [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]}],
+ [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_file_disabled,
"log.file = false",
- [{rabbit,[{log, [{file, [{file, false}]}]}]}],
+ [{rabbit,[{log, [{file, [{file, false}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_category_level,
"log.connection.level = debug
log.channel.level = error",
[{rabbit,[{log, [{categories, [{connection, [{level, debug}]},
- {channel, [{level, error}]}]}]}]}],
+ {channel, [{level, error}]}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_category_file,
"log.connection.file = file_name_connection
log.channel.file = file_name_channel",
[{rabbit,[{log, [{categories, [{connection, [{file, "file_name_connection"}]},
- {channel, [{file, "file_name_channel"}]}]}]}]}],
+ {channel, [{file, "file_name_channel"}]}]}]}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{delegate_count,
"delegate_count = 64",
[{rabbit, [
{delegate_count, 64}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_net_ticktime,
"net_ticktime = 20",
[{kernel, [
{net_ticktime, 20}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_inet_dist_listen_min,
"inet_dist_listen_min = 16000",
[{kernel, [
{inet_dist_listen_min, 16000}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{kernel_inet_dist_listen_max,
"inet_dist_listen_max = 16100",
[{kernel, [
{inet_dist_listen_max, 16100}
- ]}],
+ ]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]},
{log_syslog_settings,
@@ -602,7 +1237,15 @@ credential_validator.regexp = ^abc\\d+",
{facility, user},
{multiline_mode, true},
{dest_host, "10.10.10.10"},
- {dest_port, 123}]}
+ {dest_port, 123}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_tcp,
@@ -613,7 +1256,15 @@ credential_validator.regexp = ^abc\\d+",
[
{rabbit,[{log, [{syslog, [{enabled, true}]}]}]},
{syslog, [{protocol, {rfc5424, tcp}},
- {dest_host, "syslog.my-network.com"}]}
+ {dest_host, "syslog.my-network.com"}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_udp_default,
@@ -621,7 +1272,15 @@ credential_validator.regexp = ^abc\\d+",
log.syslog.protocol = rfc3164",
[
{rabbit,[{log, [{syslog, [{enabled, true}]}]}]},
- {syslog, [{protocol, {rfc3164, udp}}]}
+ {syslog, [{protocol, {rfc3164, udp}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}
],
[]},
{log_syslog_tls,
@@ -638,6 +1297,14 @@ credential_validator.regexp = ^abc\\d+",
{fail_if_no_peer_cert,false},
{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
- {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]}],
+ {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]},
+ {sysmon_handler,
+ [{busy_dist_port,true},
+ {busy_port,true},
+ {gc_ms_limit,0},
+ {heap_word_limit,20055500},
+ {port_limit,2},
+ {process_limit,30},
+ {schedule_ms_limit,0}]}],
[]}
].
diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl
index 051f3f13b7..20adf704a0 100644
--- a/test/per_vhost_connection_limit_partitions_SUITE.erl
+++ b/test/per_vhost_connection_limit_partitions_SUITE.erl
@@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) ->
Conn4 = open_unmanaged_connection(Config, B),
Conn5 = open_unmanaged_connection(Config, C),
Conn6 = open_unmanaged_connection(Config, C),
- ?assertEqual(6, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 6, 60000),
%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
@@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) ->
timer:sleep(?DELAY),
%% A and C are still connected, so 4 connections are tracked
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),
%% during autoheal B's connections were dropped
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
lists:foreach(fun (Conn) ->
(catch rabbit_ct_client_helpers:close_connection(Conn))
@@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) ->
passed.
-
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
+wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 ->
+ ?assertEqual(Expected, count_connections_in(Config, VHost));
+wait_for_count_connections_in(Config, VHost, Expected, Time) ->
+ case count_connections_in(Config, VHost) of
+ Expected ->
+ ok;
+ _ ->
+ Sleep = 3000,
+ timer:sleep(Sleep),
+ wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep)
+ end.
+
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bfb9eeec12..b453f5cdb3 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -345,7 +345,7 @@ start_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -361,7 +361,7 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -422,13 +422,13 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -447,7 +447,7 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -526,7 +526,7 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -568,7 +568,7 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -1265,7 +1265,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup])
+ [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1302,7 +1302,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1966,7 +1966,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -2237,7 +2237,8 @@ wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 0) ->
- ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+ ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
+ Number);
wait_for_cleanup(Server, Channel, Number, N) ->
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
Length when Number == Length ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 3263a733a9..0512e8161a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
- ra_server_sup:remove_all(),
+ ra_server_sup_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 1af0d3b4b0..a8604b46af 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -259,7 +259,8 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
_ ->
T
end;
-handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
+handle_op({input_event, Settlement}, #t{effects = Effs,
+ down = Down} = T) ->
case queue:out(Effs) of
{{value, {settle, MsgIds, CId}}, Q} ->
Cmd = case Settlement of
@@ -269,7 +270,14 @@ handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
end,
do_apply(Cmd, T#t{effects = Q});
{{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
- do_apply(Cmd, T#t{effects = Q});
+ case maps:is_key(element(2, Cmd), Down) of
+ true ->
+ %% enqueues cannot arrive after down for the same process
+ %% drop message
+ T#t{effects = Q};
+ false ->
+ do_apply(Cmd, T#t{effects = Q})
+ end;
_ ->
T
end;
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
new file mode 100644
index 0000000000..945229e372
--- /dev/null
+++ b/test/single_active_consumer_SUITE.erl
@@ -0,0 +1,286 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(single_active_consumer_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, classic_queue}, {group, quorum_queue}
+ ].
+
+groups() ->
+ [
+ {classic_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ amqp_exclusive_consume_fails_on_exclusive_consumer_queue
+ ]},
+ {quorum_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(classic_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"classic">>}
+ ],
+ auto_delete = true}
+ } | Config];
+init_per_group(quorum_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config].
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+all_messages_go_to_one_consumer(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 5,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)],
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(2, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)),
+ ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag3} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
+ ?assertEqual(1, length(FirstActiveConsumerInList)),
+
+ FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
+
+ {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+ SecondActiveConsumerInList = maps:keys(maps:filter(
+ fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
+ MessagesPerConsumer2)
+ ),
+ ?assertEqual(1, length(SecondActiveConsumerInList)),
+ SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
+ wait_for_messages(1),
+
+ LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(3, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ {C1, Ch1} = connection_and_channel(Config),
+ {C2, Ch2} = connection_and_channel(Config),
+ {C3, Ch3} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]),
+ Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, MessageCount1} = consume_results(),
+ ?assertEqual(MessageCount div 2, MessageCount1),
+ ?assertEqual(1, maps:size(MessagesPerConsumer1)),
+ ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)),
+
+ ok = amqp_channel:close(Ch1),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, MessageCount2} = consume_results(),
+ ?assertEqual(MessageCount div 2 - 1, MessageCount2),
+ ?assertEqual(1, maps:size(MessagesPerConsumer2)),
+
+ ok = amqp_channel:close(Ch2),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}),
+
+ {MessagesPerConsumer3, MessageCount3} = consume_results(),
+ ?assertEqual(1, MessageCount3),
+ ?assertEqual(1, maps:size(MessagesPerConsumer3)),
+
+ [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
+ ok.
+
+amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 403, _}}, _},
+ amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
+ ),
+ amqp_connection:close(C),
+ ok.
+
+connection_and_channel(Config) ->
+ C = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Ch} = amqp_connection:open_channel(C),
+ {C, Ch}.
+
+queue_declare(Channel, Config) ->
+ Declare = ?config(single_active_consumer_queue_declare, Config),
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
+ Q.
+
+consume({Parent, State, 0}) ->
+ Parent ! {consumer_done, State};
+consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown});
+ {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} ->
+ Parent ! {consumer_done,
+ {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1}};
+ {#'basic.deliver'{consumer_tag = CTag}, _Content} ->
+ NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1},
+ Parent ! {message, NewState},
+ consume({Parent, NewState, CountDown - 1});
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ Parent ! {cancel_ok, CTag},
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
+ _ ->
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
+ after 10000 ->
+ Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}},
+ exit(consumer_timeout)
+ end.
+
+consume_results() ->
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ {consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ _ ->
+ consume_results()
+ after 1000 ->
+ throw(failed)
+ end.
+
+wait_for_messages(ExpectedCount) ->
+ wait_for_messages(ExpectedCount, {}).
+
+wait_for_messages(0, State) ->
+ State;
+wait_for_messages(ExpectedCount, _) ->
+ receive
+ {message, {MessagesPerConsumer, MessageCount}} ->
+ wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
+ after 5000 ->
+ throw(message_waiting_timeout)
+ end.
+
+wait_for_cancel_ok() ->
+ receive
+ {cancel_ok, CTag} ->
+ {cancel_ok, CTag}
+ after 5000 ->
+ throw(consumer_cancel_ok_timeout)
+ end.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d8031ce6d7..466df684af 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -25,6 +25,7 @@
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
@@ -60,10 +61,16 @@ groups() ->
topic_matching,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]}
+ {max_length_mirrored, [], MaxLengthTests}]},
+ max_message_size
]}
].
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
+gen_binary_mb(N) ->
+ B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
+ << B1M || _ <- lists:seq(1, N) >>.
+
+assert_channel_alive(Ch) ->
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
+ #amqp_msg{payload = <<"HI">>}).
+
+assert_channel_fail_max_size(Ch, Monitor) ->
+ receive
+ {'DOWN', Monitor, process, Ch,
+ {shutdown,
+ {server_initiated_close, 406, _Error}}} ->
+ ok
+ after ?TIMEOUT_CHANNEL_EXCEPTION ->
+ error({channel_exception_expected, max_message_size})
+ end.
+
+max_message_size(Config) ->
+ Binary2M = gen_binary_mb(2),
+ Binary4M = gen_binary_mb(4),
+ Binary6M = gen_binary_mb(6),
+ Binary10M = gen_binary_mb(10),
+
+ Size2Mb = 1024 * 1024 * 2,
+ Size2Mb = byte_size(Binary2M),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
+
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ %% Binary is whithin the max size limit
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
+ %% The channel process is alive
+ assert_channel_alive(Ch),
+
+ Monitor = monitor(process, Ch),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_fail_max_size(Ch, Monitor),
+
+ %% increase the limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
+
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
+ assert_channel_alive(Ch1),
+
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
+ assert_channel_fail_max_size(Ch1, Monitor1),
+
+ %% increase beyond the hard limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
+ Val = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_channel, get_max_message_size, []),
+
+ ?assertEqual(?MAX_MSG_SIZE, Val).
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
new file mode 100644
index 0000000000..08d12e7ec5
--- /dev/null
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -0,0 +1,102 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(unit_queue_consumers_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ is_same,
+ get_consumer,
+ get
+ ].
+
+is_same(_Config) ->
+ ?assertEqual(
+ true,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"1">>)
+ )),
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"2">>)
+ )),
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(Pid, <<"1">>)
+ )),
+ ok.
+
+get(_Config) ->
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {Pid, {consumer, <<"2">>, _, _, _, _}} =
+ rabbit_queue_consumers:get(Pid, <<"2">>, State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(self(), <<"2">>, State)
+ ),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(Pid, <<"1">>, State)
+ ),
+ ok.
+
+get_consumer(_Config) ->
+ Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {_Pid, {consumer, _, _, _, _, _}} =
+ rabbit_queue_consumers:get_consumer(State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get_consumer(state(consumers([])))
+ ),
+ ok.
+
+consumers([]) ->
+ priority_queue:new();
+consumers(Consumers) ->
+ consumers(Consumers, priority_queue:new()).
+
+consumers([H], Q) ->
+ priority_queue:in(H, Q);
+consumers([H | T], Q) ->
+ consumers(T, priority_queue:in(H, Q)).
+
+
+consumer(Pid, ConsumerTag) ->
+ {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}.
+
+state(Consumers) ->
+ {state, Consumers, {}}.
+
+function_for_process() ->
+ receive
+ _ -> ok
+ end. \ No newline at end of file