diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-01-10 08:59:41 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-01-10 08:59:41 +0000 |
| commit | f1f2cac56844224ea6eaeedc911ede90d53f1f1c (patch) | |
| tree | 91e70a0a8b2b905caeab961d4f36ebfea2f7f3d3 /test | |
| parent | 0a254da651d6ff34b5ef35daf14cb31e0dafda88 (diff) | |
| parent | 7a4b4eb3ba1a66cb67d66ce8d09df5f02ac54f9b (diff) | |
| download | rabbitmq-server-git-f1f2cac56844224ea6eaeedc911ede90d53f1f1c.tar.gz | |
Merge remote-tracking branch 'origin/master' into qq-testing
Diffstat (limited to 'test')
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 845 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_partitions_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 23 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 12 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 286 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 77 | ||||
| -rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 102 |
8 files changed, 1258 insertions, 108 deletions
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index b318adaa12..a7a06aaadf 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -1,24 +1,56 @@ [{internal_auth_backend, "auth_backends.1 = internal", - [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]}], + [{rabbit,[{auth_backends,[rabbit_auth_backend_internal]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ldap_auth_backend, "auth_backends.1 = ldap", - [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]}], + [{rabbit,[{auth_backends,[rabbit_auth_backend_ldap]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {multiple_auth_backends, "auth_backends.1 = ldap auth_backends.2 = internal", [{rabbit, [{auth_backends, - [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]}], + [rabbit_auth_backend_ldap,rabbit_auth_backend_internal]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {full_name_auth_backend, "auth_backends.1 = ldap # uses module name instead of a short alias, \"http\" auth_backends.2 = rabbit_auth_backend_http", [{rabbit, - [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]}], + [{auth_backends,[rabbit_auth_backend_ldap,rabbit_auth_backend_http]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {third_party_auth_backend, "auth_backends.1.authn = internal @@ -26,14 +58,30 @@ auth_backends.2 = rabbit_auth_backend_http", auth_backends.1.authz = rabbit_auth_backend_ip_range", [{rabbit, [{auth_backends, - [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]}], + [{rabbit_auth_backend_internal,rabbit_auth_backend_ip_range}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {authn_authz_backend, "auth_backends.1.authn = ldap auth_backends.1.authz = internal", [{rabbit, [{auth_backends, - [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]}], + [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {authn_authz_multiple_backends, "auth_backends.1.authn = ldap @@ -42,13 +90,29 @@ auth_backends.2 = internal", [{rabbit, [{auth_backends, [{rabbit_auth_backend_ldap,rabbit_auth_backend_internal}, - rabbit_auth_backend_internal]}]}], + rabbit_auth_backend_internal]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {authn_backend_only, "auth_backends.1.authn = ldap", [{rabbit, [{auth_backends, - [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]}], + [{rabbit_auth_backend_ldap,rabbit_auth_backend_ldap}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options, "ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem @@ -62,15 +126,50 @@ ssl_options.fail_if_no_peer_cert = true", {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, {verify,verify_peer}, - {fail_if_no_peer_cert,true}]}]}], + {fail_if_no_peer_cert,true}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listener, "listeners.tcp.default = 5673", - [{rabbit,[{tcp_listeners,[5673]}]}],[]}, + [{rabbit,[{tcp_listeners,[5673]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], + []}, {ssl_listener, - "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]}],[]}, + "listeners.ssl = none",[{rabbit,[{ssl_listeners,[]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], + []}, {num_acceptors, - "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]}],[]}, + "num_acceptors.ssl = 1",[{rabbit,[{num_ssl_acceptors,1}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], + []}, {default_user_settings, "default_user = guest default_pass = guest @@ -82,7 +181,15 @@ default_permissions.write = .*", [{default_user,<<"guest">>}, {default_pass,<<"guest">>}, {default_user_tags,[administrator]}, - {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]}], + {default_permissions,[<<".*">>,<<".*">>,<<".*">>]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation, "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config @@ -93,7 +200,15 @@ cluster_formation.node_type = disc", [{cluster_formation, [{peer_discovery_backend,rabbit_peer_discovery_classic_config}, {node_type,disc}]}, - {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}], + {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_disK, "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config @@ -104,88 +219,248 @@ cluster_formation.node_type = disc", [{cluster_formation, [{peer_discovery_backend,rabbit_peer_discovery_classic_config}, {node_type,disc}]}, - {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}], + {cluster_nodes,{[rabbit@hostname2,rabbit@hostname1],disc}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_ram_ignored, - "cluster_formation.node_type = ram",[],[]}, + "cluster_formation.node_type = ram",[ + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}],[]}, {tcp_listen_options, "tcp_listen_options.backlog = 128 tcp_listen_options.nodelay = true tcp_listen_options.exit_on_close = false", [{rabbit, [{tcp_listen_options, - [{backlog,128},{nodelay,true},{exit_on_close,false}]}]}], + [{backlog,128},{nodelay,true},{exit_on_close,false}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_watermark_absolute, "vm_memory_high_watermark.absolute = 1073741824", - [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]}], + [{rabbit,[{vm_memory_high_watermark,{absolute,1073741824}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_watermark_absolute_units, "vm_memory_high_watermark.absolute = 1024MB", - [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]}], + [{rabbit,[{vm_memory_high_watermark,{absolute,"1024MB"}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_watermark_paging_ratio, "vm_memory_high_watermark_paging_ratio = 0.75 vm_memory_high_watermark.relative = 0.4", [{rabbit, [{vm_memory_high_watermark_paging_ratio,0.75}, - {vm_memory_high_watermark,0.4}]}], + {vm_memory_high_watermark,0.4}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {memory_monitor_interval, "memory_monitor_interval = 5000", [{rabbit, - [{memory_monitor_interval, 5000}]}], + [{memory_monitor_interval, 5000}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = rss", [{rabbit, - [{vm_memory_calculation_strategy, rss}]}], + [{vm_memory_calculation_strategy, rss}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = erlang", [{rabbit, - [{vm_memory_calculation_strategy, erlang}]}], + [{vm_memory_calculation_strategy, erlang}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = allocated", [{rabbit, - [{vm_memory_calculation_strategy, allocated}]}], + [{vm_memory_calculation_strategy, allocated}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {vm_memory_calculation_strategy, "vm_memory_calculation_strategy = legacy", [{rabbit, - [{vm_memory_calculation_strategy, legacy}]}], + [{vm_memory_calculation_strategy, legacy}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {total_memory_available_override_value, "total_memory_available_override_value = 1000000000", - [{rabbit,[{total_memory_available_override_value, 1000000000}]}], + [{rabbit,[{total_memory_available_override_value, 1000000000}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {total_memory_available_override_value_units, "total_memory_available_override_value = 1024MB", - [{rabbit,[{total_memory_available_override_value, "1024MB"}]}], + [{rabbit,[{total_memory_available_override_value, "1024MB"}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {connection_max, "connection_max = 999", - [{rabbit,[{connection_max, 999}]}], + [{rabbit,[{connection_max, 999}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {connection_max, "connection_max = infinity", - [{rabbit,[{connection_max, infinity}]}], + [{rabbit,[{connection_max, infinity}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {channel_max, "channel_max = 16", - [{rabbit,[{channel_max, 16}]}], + [{rabbit,[{channel_max, 16}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {listeners_tcp_ip, "listeners.tcp.1 = 192.168.1.99:5672", - [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}], + [{rabbit,[{tcp_listeners,[{"192.168.1.99",5672}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {listeners_tcp_ip_multiple, "listeners.tcp.1 = 127.0.0.1:5672 listeners.tcp.2 = ::1:5672", - [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]}], + [{rabbit,[{tcp_listeners,[{"127.0.0.1",5672},{"::1",5672}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {listeners_tcp_ip_all,"listeners.tcp.1 = :::5672", - [{rabbit,[{tcp_listeners,[{"::",5672}]}]}], + [{rabbit,[{tcp_listeners,[{"::",5672}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {listeners_tcp_ipv6, "listeners.tcp.1 = fe80::2acf:e9ff:fe17:f97b:5672", - [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]}], + [{rabbit,[{tcp_listeners,[{"fe80::2acf:e9ff:fe17:f97b",5672}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_options_sndbuf, "tcp_listen_options.backlog = 128 @@ -194,7 +469,15 @@ tcp_listen_options.exit_on_close = false", tcp_listen_options.recbuf = 196608", [{rabbit, [{tcp_listen_options, - [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]}], + [{backlog,128},{nodelay,true},{sndbuf,196608},{recbuf,196608}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_nodelay_with_kernel, "tcp_listen_options.backlog = 4096 @@ -205,16 +488,40 @@ tcp_listen_options.exit_on_close = false", [{kernel, [{inet_default_connect_options,[{nodelay,true}]}, {inet_default_listen_options,[{nodelay,true}]}]}, - {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}], + {rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_nodelay, "tcp_listen_options.backlog = 4096 tcp_listen_options.nodelay = true", - [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}], + [{rabbit,[{tcp_listen_options,[{backlog,4096},{nodelay,true}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_handshake_timeout, "ssl_handshake_timeout = 10000", - [{rabbit,[{ssl_handshake_timeout,10000}]}], + [{rabbit,[{ssl_handshake_timeout,10000}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_partition_handling_pause_if_all_down, "cluster_partition_handling = pause_if_all_down @@ -227,15 +534,39 @@ tcp_listen_options.exit_on_close = false", cluster_partition_handling.pause_if_all_down.nodes.2 = rabbit@myhost2", [{rabbit, [{cluster_partition_handling, - {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]}], + {pause_if_all_down,[rabbit@myhost2,rabbit@myhost1],ignore}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_partition_handling_autoheal, "cluster_partition_handling = autoheal", - [{rabbit,[{cluster_partition_handling,autoheal}]}], + [{rabbit,[{cluster_partition_handling,autoheal}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {password_hashing, "password_hashing_module = rabbit_password_hashing_sha512", - [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]}], + [{rabbit,[{password_hashing_module,rabbit_password_hashing_sha512}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_verify_peer, "listeners.ssl.1 = 5671 @@ -251,7 +582,15 @@ tcp_listen_options.exit_on_close = false", {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, {verify,verify_peer}, - {fail_if_no_peer_cert,false}]}]}], + {fail_if_no_peer_cert,false}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_password, "listeners.ssl.1 = 5671 @@ -265,7 +604,15 @@ tcp_listen_options.exit_on_close = false", [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, - {password,"t0p$3kRe7"}]}]}], + {password,"t0p$3kRe7"}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_tls_ver_old, "listeners.ssl.1 = 5671 @@ -283,7 +630,15 @@ tcp_listen_options.exit_on_close = false", [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, - {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]}], + {versions,['tlsv1.2','tlsv1.1',tlsv1]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_tls_ver_new, "listeners.ssl.1 = 5671 @@ -300,7 +655,15 @@ tcp_listen_options.exit_on_close = false", [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, - {versions,['tlsv1.2','tlsv1.1']}]}]}], + {versions,['tlsv1.2','tlsv1.1']}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_ciphers, @@ -326,19 +689,27 @@ tcp_listen_options.exit_on_close = false", {ssl_options, [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {ciphers, [ - "DHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-SHA384", + "ECDHE-RSA-AES256-SHA384", "ECDH-ECDSA-AES256-GCM-SHA384", - "ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-SHA384", - "ECDHE-ECDSA-AES256-GCM-SHA384", - "ECDHE-ECDSA-AES256-SHA384", - "ECDHE-RSA-AES256-GCM-SHA384", - "ECDHE-RSA-AES256-SHA384" + "DHE-RSA-AES256-GCM-SHA384" ]}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, - {versions,['tlsv1.2','tlsv1.1']}]}]}], + {versions,['tlsv1.2','tlsv1.1']}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_allow_poodle, @@ -357,7 +728,15 @@ tcp_listen_options.exit_on_close = false", {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, {verify,verify_peer}, - {fail_if_no_peer_cert,false}]}]}], + {fail_if_no_peer_cert,false}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_depth, "listeners.ssl.1 = 5671 @@ -375,7 +754,15 @@ tcp_listen_options.exit_on_close = false", {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, {depth,2}, {verify,verify_peer}, - {fail_if_no_peer_cert,false}]}]}], + {fail_if_no_peer_cert,false}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_honor_cipher_order, "listeners.ssl.1 = 5671 @@ -395,7 +782,15 @@ tcp_listen_options.exit_on_close = false", {depth,2}, {verify,verify_peer}, {fail_if_no_peer_cert, false}, - {honor_cipher_order, true}]}]}], + {honor_cipher_order, true}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_options_honor_ecc_order, "listeners.ssl.1 = 5671 @@ -415,29 +810,77 @@ tcp_listen_options.exit_on_close = false", {depth,2}, {verify,verify_peer}, {fail_if_no_peer_cert, false}, - {honor_ecc_order, true}]}]}], + {honor_ecc_order, true}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {ssl_cert_login_from, "ssl_cert_login_from = common_name", - [{rabbit,[{ssl_cert_login_from,common_name}]}], + [{rabbit,[{ssl_cert_login_from,common_name}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_linger_on, "tcp_listen_options.linger.on = true tcp_listen_options.linger.timeout = 100", - [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]}], + [{rabbit,[{tcp_listen_options,[{linger,{true,100}}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_linger_off, "tcp_listen_options.linger.on = false tcp_listen_options.linger.timeout = 100", - [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}], + [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_linger_on_notimeout, "tcp_listen_options.linger.on = true", - [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]}], + [{rabbit,[{tcp_listen_options,[{linger,{true,0}}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {tcp_listen_options_linger_timeout, "tcp_listen_options.linger.timeout = 100", - [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}], + [{rabbit,[{tcp_listen_options,[{linger,{false,100}}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_randomized_startup_delay_both_values, @@ -445,21 +888,45 @@ tcp_listen_options.exit_on_close = false", cluster_formation.randomized_startup_delay_range.max = 30", [{rabbit, [{cluster_formation, [ {randomized_startup_delay_range, {10, 30}} - ]}]}], + ]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_randomized_startup_delay_min_only, "cluster_formation.randomized_startup_delay_range.min = 10", [{rabbit, [{cluster_formation, [ {randomized_startup_delay_range, {10, 60}} - ]}]}], + ]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_randomized_startup_delay_max_only, "cluster_formation.randomized_startup_delay_range.max = 30", [{rabbit, [{cluster_formation, [ {randomized_startup_delay_range, {5, 30}} - ]}]}], + ]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_dns, @@ -470,7 +937,15 @@ tcp_listen_options.exit_on_close = false", [{cluster_formation, [{peer_discovery_dns,[{hostname,<<"192.168.0.2.xip.io">>}]}, {peer_discovery_backend,rabbit_peer_discovery_dns}, - {node_type,disc}]}]}], + {node_type,disc}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_classic, "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config @@ -478,7 +953,15 @@ tcp_listen_options.exit_on_close = false", [{rabbit, [{cluster_formation, [{peer_discovery_backend,rabbit_peer_discovery_classic_config}, - {node_type,disc}]}]}], + {node_type,disc}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {cluster_formation_classic_ram, "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config @@ -486,19 +969,43 @@ tcp_listen_options.exit_on_close = false", [{rabbit, [{cluster_formation, [{peer_discovery_backend,rabbit_peer_discovery_classic_config}, - {node_type,ram}]}]}], + {node_type,ram}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {background_gc_enabled, "background_gc_enabled = true background_gc_target_interval = 30000", [{rabbit, - [{background_gc_enabled,true},{background_gc_target_interval,30000}]}], + [{background_gc_enabled,true},{background_gc_target_interval,30000}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {background_gc_disabled, "background_gc_enabled = false background_gc_target_interval = 30000", [{rabbit, - [{background_gc_enabled,false},{background_gc_target_interval,30000}]}], + [{background_gc_enabled,false},{background_gc_target_interval,30000}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {credential_validator_length, "credential_validator.validation_backend = rabbit_credential_validator_min_password_length @@ -507,7 +1014,15 @@ credential_validator.min_length = 10", [{credential_validator, [{validation_backend, rabbit_credential_validator_min_password_length}, - {min_length,10}]}]}], + {min_length,10}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {credential_validator_regexp, "credential_validator.validation_backend = rabbit_credential_validator_password_regexp @@ -515,78 +1030,198 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [{credential_validator, [{validation_backend,rabbit_credential_validator_password_regexp}, - {regexp,"^abc\\d+"}]}]}], + {regexp,"^abc\\d+"}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {proxy_protocol_on, "proxy_protocol = true", - [{rabbit,[{proxy_protocol,true}]}],[]}, + [{rabbit,[{proxy_protocol,true}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}],[]}, {proxy_protocol_off, "proxy_protocol = false", - [{rabbit,[{proxy_protocol,false}]}],[]}, + [{rabbit,[{proxy_protocol,false}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}],[]}, {log_debug_file, "log.file.level = debug", - [{rabbit,[{log, [{file, [{level, debug}]}]}]}], + [{rabbit,[{log, [{file, [{level, debug}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_debug_console, "log.console = true log.console.level = debug", - [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]}], + [{rabbit,[{log, [{console, [{enabled, true}, {level, debug}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_debug_exchange, "log.exchange = true log.exchange.level = debug", - [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]}], + [{rabbit,[{log, [{exchange, [{enabled, true}, {level, debug}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_debug_syslog, "log.syslog = true log.syslog.level = debug", - [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]}], + [{rabbit,[{log, [{syslog, [{enabled, true}, {level, debug}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_file_name, "log.file = file_name", - [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]}], + [{rabbit,[{log, [{file, [{file, "file_name"}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_file_disabled, "log.file = false", - [{rabbit,[{log, [{file, [{file, false}]}]}]}], + [{rabbit,[{log, [{file, [{file, false}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_category_level, "log.connection.level = debug log.channel.level = error", [{rabbit,[{log, [{categories, [{connection, [{level, debug}]}, - {channel, [{level, error}]}]}]}]}], + {channel, [{level, error}]}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_category_file, "log.connection.file = file_name_connection log.channel.file = file_name_channel", [{rabbit,[{log, [{categories, [{connection, [{file, "file_name_connection"}]}, - {channel, [{file, "file_name_channel"}]}]}]}]}], + {channel, [{file, "file_name_channel"}]}]}]}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {delegate_count, "delegate_count = 64", [{rabbit, [ {delegate_count, 64} - ]}], + ]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {kernel_net_ticktime, "net_ticktime = 20", [{kernel, [ {net_ticktime, 20} - ]}], + ]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {kernel_inet_dist_listen_min, "inet_dist_listen_min = 16000", [{kernel, [ {inet_dist_listen_min, 16000} - ]}], + ]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {kernel_inet_dist_listen_max, "inet_dist_listen_max = 16100", [{kernel, [ {inet_dist_listen_max, 16100} - ]}], + ]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []}, {log_syslog_settings, @@ -602,7 +1237,15 @@ credential_validator.regexp = ^abc\\d+", {facility, user}, {multiline_mode, true}, {dest_host, "10.10.10.10"}, - {dest_port, 123}]} + {dest_port, 123}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]} ], []}, {log_syslog_tcp, @@ -613,7 +1256,15 @@ credential_validator.regexp = ^abc\\d+", [ {rabbit,[{log, [{syslog, [{enabled, true}]}]}]}, {syslog, [{protocol, {rfc5424, tcp}}, - {dest_host, "syslog.my-network.com"}]} + {dest_host, "syslog.my-network.com"}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]} ], []}, {log_syslog_udp_default, @@ -621,7 +1272,15 @@ credential_validator.regexp = ^abc\\d+", log.syslog.protocol = rfc3164", [ {rabbit,[{log, [{syslog, [{enabled, true}]}]}]}, - {syslog, [{protocol, {rfc3164, udp}}]} + {syslog, [{protocol, {rfc3164, udp}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]} ], []}, {log_syslog_tls, @@ -638,6 +1297,14 @@ credential_validator.regexp = ^abc\\d+", {fail_if_no_peer_cert,false}, {cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, - {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]}], + {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}]}}]}, + {sysmon_handler, + [{busy_dist_port,true}, + {busy_port,true}, + {gc_ms_limit,0}, + {heap_word_limit,20055500}, + {port_limit,2}, + {process_limit,30}, + {schedule_ms_limit,0}]}], []} ]. diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index 051f3f13b7..20adf704a0 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) -> Conn4 = open_unmanaged_connection(Config, B), Conn5 = open_unmanaged_connection(Config, C), Conn6 = open_unmanaged_connection(Config, C), - ?assertEqual(6, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 6, 60000), %% B drops off the network, non-reachable by either A or C rabbit_ct_broker_helpers:block_traffic_between(A, B), @@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) -> timer:sleep(?DELAY), %% A and C are still connected, so 4 connections are tracked - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), rabbit_ct_broker_helpers:allow_traffic_between(A, B), rabbit_ct_broker_helpers:allow_traffic_between(B, C), timer:sleep(?DELAY), %% during autoheal B's connections were dropped - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), lists:foreach(fun (Conn) -> (catch rabbit_ct_client_helpers:close_connection(Conn)) @@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) -> passed. - %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 -> + ?assertEqual(Expected, count_connections_in(Config, VHost)); +wait_for_count_connections_in(Config, VHost, Expected, Time) -> + case count_connections_in(Config, VHost) of + Expected -> + ok; + _ -> + Sleep = 3000, + timer:sleep(Sleep), + wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep) + end. + count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, 0). count_connections_in(Config, VHost, NodeIndex) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index bfb9eeec12..b453f5cdb3 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -345,7 +345,7 @@ start_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -361,7 +361,7 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -422,13 +422,13 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -447,7 +447,7 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -526,7 +526,7 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -568,7 +568,7 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -1265,7 +1265,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup]) + [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1302,7 +1302,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1966,7 +1966,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -2237,7 +2237,8 @@ wait_for_cleanup(Server, Channel, Number) -> wait_for_cleanup(Server, Channel, Number, 60). wait_for_cleanup(Server, Channel, Number, 0) -> - ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel]))); + ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])), + Number); wait_for_cleanup(Server, Channel, Number, N) -> case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of Length when Number == Length -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 3263a733a9..0512e8161a 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) -> meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), - ra_server_sup:remove_all(), + ra_server_sup_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 1af0d3b4b0..a8604b46af 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -259,7 +259,8 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) -> _ -> T end; -handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> +handle_op({input_event, Settlement}, #t{effects = Effs, + down = Down} = T) -> case queue:out(Effs) of {{value, {settle, MsgIds, CId}}, Q} -> Cmd = case Settlement of @@ -269,7 +270,14 @@ handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> end, do_apply(Cmd, T#t{effects = Q}); {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> - do_apply(Cmd, T#t{effects = Q}); + case maps:is_key(element(2, Cmd), Down) of + true -> + %% enqueues cannot arrive after down for the same process + %% drop message + T#t{effects = Q}; + false -> + do_apply(Cmd, T#t{effects = Q}) + end; _ -> T end; diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl new file mode 100644 index 0000000000..945229e372 --- /dev/null +++ b/test/single_active_consumer_SUITE.erl @@ -0,0 +1,286 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(single_active_consumer_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, classic_queue}, {group, quorum_queue} + ]. + +groups() -> + [ + {classic_queue, [], [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled, + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + amqp_exclusive_consume_fails_on_exclusive_consumer_queue + ]}, + {quorum_queue, [], [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled, + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(classic_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"classic">>} + ], + auto_delete = true} + } | Config]; +init_per_group(quorum_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +all_messages_go_to_one_consumer(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 5, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)], + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(MessageCount, MessageCount), + ?assertEqual(2, maps:size(MessagesPerConsumer)), + ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)), + ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 10, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag3} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], + + {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), + ?assertEqual(1, length(FirstActiveConsumerInList)), + + FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}), + + {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], + + {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + SecondActiveConsumerInList = maps:keys(maps:filter( + fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, + MessagesPerConsumer2) + ), + ?assertEqual(1, length(SecondActiveConsumerInList)), + SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList), + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), + wait_for_messages(1), + + LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(MessageCount, MessageCount), + ?assertEqual(3, maps:size(MessagesPerConsumer)), + ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)), + ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)), + ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> + {C, Ch} = connection_and_channel(Config), + {C1, Ch1} = connection_and_channel(Config), + {C2, Ch2} = connection_and_channel(Config), + {C3, Ch3} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + MessageCount = 10, + Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]), + Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), + Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], + + {MessagesPerConsumer1, MessageCount1} = consume_results(), + ?assertEqual(MessageCount div 2, MessageCount1), + ?assertEqual(1, maps:size(MessagesPerConsumer1)), + ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)), + + ok = amqp_channel:close(Ch1), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], + + {MessagesPerConsumer2, MessageCount2} = consume_results(), + ?assertEqual(MessageCount div 2 - 1, MessageCount2), + ?assertEqual(1, maps:size(MessagesPerConsumer2)), + + ok = amqp_channel:close(Ch2), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}), + + {MessagesPerConsumer3, MessageCount3} = consume_results(), + ?assertEqual(1, MessageCount3), + ?assertEqual(1, maps:size(MessagesPerConsumer3)), + + [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]], + ok. + +amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + ?assertExit( + {{shutdown, {server_initiated_close, 403, _}}, _}, + amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true}) + ), + amqp_connection:close(C), + ok. + +connection_and_channel(Config) -> + C = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Ch} = amqp_connection:open_channel(C), + {C, Ch}. + +queue_declare(Channel, Config) -> + Declare = ?config(single_active_consumer_queue_declare, Config), + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare), + Q. + +consume({Parent, State, 0}) -> + Parent ! {consumer_done, State}; +consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown}); + {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} -> + Parent ! {consumer_done, + {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}}; + {#'basic.deliver'{consumer_tag = CTag}, _Content} -> + NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}, + Parent ! {message, NewState}, + consume({Parent, NewState, CountDown - 1}); + #'basic.cancel_ok'{consumer_tag = CTag} -> + Parent ! {cancel_ok, CTag}, + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}); + _ -> + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) + after 10000 -> + Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}}, + exit(consumer_timeout) + end. + +consume_results() -> + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + {MessagesPerConsumer, MessageCount}; + {consumer_timeout, {MessagesPerConsumer, MessageCount}} -> + {MessagesPerConsumer, MessageCount}; + _ -> + consume_results() + after 1000 -> + throw(failed) + end. + +wait_for_messages(ExpectedCount) -> + wait_for_messages(ExpectedCount, {}). + +wait_for_messages(0, State) -> + State; +wait_for_messages(ExpectedCount, _) -> + receive + {message, {MessagesPerConsumer, MessageCount}} -> + wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) + after 5000 -> + throw(message_waiting_timeout) + end. + +wait_for_cancel_ok() -> + receive + {cancel_ok, CTag} -> + {cancel_ok, CTag} + after 5000 -> + throw(consumer_cancel_ok_timeout) + end. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d8031ce6d7..466df684af 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -25,6 +25,7 @@ -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). +-define(TIMEOUT_CHANNEL_EXCEPTION, 5000). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). @@ -60,10 +61,16 @@ groups() -> topic_matching, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]} + {max_length_mirrored, [], MaxLengthTests}]}, + max_message_size ]} ]. +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) -> _ -> ok end. +gen_binary_mb(N) -> + B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>, + << B1M || _ <- lists:seq(1, N) >>. + +assert_channel_alive(Ch) -> + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, + #amqp_msg{payload = <<"HI">>}). + +assert_channel_fail_max_size(Ch, Monitor) -> + receive + {'DOWN', Monitor, process, Ch, + {shutdown, + {server_initiated_close, 406, _Error}}} -> + ok + after ?TIMEOUT_CHANNEL_EXCEPTION -> + error({channel_exception_expected, max_message_size}) + end. + +max_message_size(Config) -> + Binary2M = gen_binary_mb(2), + Binary4M = gen_binary_mb(4), + Binary6M = gen_binary_mb(6), + Binary10M = gen_binary_mb(10), + + Size2Mb = 1024 * 1024 * 2, + Size2Mb = byte_size(Binary2M), + + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]), + + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + %% Binary is whithin the max size limit + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), + %% The channel process is alive + assert_channel_alive(Ch), + + Monitor = monitor(process, Ch), + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), + assert_channel_fail_max_size(Ch, Monitor), + + %% increase the limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]), + + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}), + assert_channel_alive(Ch1), + + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}), + assert_channel_fail_max_size(Ch1, Monitor1), + + %% increase beyond the hard limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]), + Val = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_channel, get_max_message_size, []), + + ?assertEqual(?MAX_MSG_SIZE, Val). + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl new file mode 100644 index 0000000000..08d12e7ec5 --- /dev/null +++ b/test/unit_queue_consumers_SUITE.erl @@ -0,0 +1,102 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(unit_queue_consumers_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + is_same, + get_consumer, + get + ]. + +is_same(_Config) -> + ?assertEqual( + true, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"1">>) + )), + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"2">>) + )), + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(Pid, <<"1">>) + )), + ok. + +get(_Config) -> + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {Pid, {consumer, <<"2">>, _, _, _, _}} = + rabbit_queue_consumers:get(Pid, <<"2">>, State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(self(), <<"2">>, State) + ), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(Pid, <<"1">>, State) + ), + ok. + +get_consumer(_Config) -> + Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {_Pid, {consumer, _, _, _, _, _}} = + rabbit_queue_consumers:get_consumer(State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get_consumer(state(consumers([]))) + ), + ok. + +consumers([]) -> + priority_queue:new(); +consumers(Consumers) -> + consumers(Consumers, priority_queue:new()). + +consumers([H], Q) -> + priority_queue:in(H, Q); +consumers([H | T], Q) -> + consumers(T, priority_queue:in(H, Q)). + + +consumer(Pid, ConsumerTag) -> + {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}. + +state(Consumers) -> + {state, Consumers, {}}. + +function_for_process() -> + receive + _ -> ok + end.
\ No newline at end of file |
