diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-01-08 23:24:06 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-01-08 23:24:06 +0300 |
| commit | d21db02c75a732839ce90ad2bd587127905b975c (patch) | |
| tree | 1867fa3e0cd232f0fbf1bab5feade00344ee1394 | |
| parent | 7908688ce692ad93929bce9da19171add4772a1f (diff) | |
| parent | 1e2a202fcba50dcdce33ca363f93dd06fe78ebf9 (diff) | |
| download | rabbitmq-server-git-d21db02c75a732839ce90ad2bd587127905b975c.tar.gz | |
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
| -rw-r--r-- | .travis.yml | 17 | ||||
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | docs/rabbitmq.conf.example | 159 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 70 | ||||
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 12 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_partitions_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 30 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 77 |
15 files changed, 402 insertions, 195 deletions
diff --git a/.travis.yml b/.travis.yml index 9237d9632a..6ccdbab1ce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ # vim:sw=2:et: +dist: xenial sudo: false language: erlang notifications: @@ -10,15 +11,8 @@ notifications: on_failure: always addons: apt: - sources: - - sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib - key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc packages: - awscli - # Use Elixir from Erlang Solutions. The provided Elixir is - # installed with kiex but is old. We also can't use kiex to - # install a newer one because of GitHub API rate limiting. - - elixir=1.6.0-1 cache: apt: true env: @@ -27,10 +21,10 @@ env: - secure: L1t0CHGR4RzOXwtkpM6feRKax95rszScBLqzjstEiMPkhjTsYTlAecnNxx6lTrGMnk5hQoi4PtbhmyZOX0siHTngTogoA/Nyn8etYzicU5ZO+qmBQOYpegz51lEu70ewXgkhEHzk9DtEPxfYviH9WiILrdUVRXXgZpoXq13p1QA= otp_release: - - "19.3" - - "20.3" + - "21.2" before_script: + - elixir --version # The checkout made by Travis is a "detached HEAD" and branches # information is missing. Our Erlang.mk's git_rmq fetch method relies # on it, so we need to restore it. @@ -42,11 +36,6 @@ before_script: git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git git fetch upstream v3.8.x:v3.8.x || : git fetch upstream master:master || : - # Make sure we use Elixir from Erlang Solutions and not kiex. - - | - echo YES | kiex implode - elixir --version - elixir --version | grep -q 'Elixir 1.6.0' script: - make xref @@ -130,7 +130,9 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000} + {channel_queue_cleanup_interval, 60000}, + %% Default max message size is 128 MB + {max_message_size, 134217728} ] endef diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index a62ed38291..b82956a267 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -470,7 +470,7 @@ ## Disabling background GC may reduce latency for client operations, ## keeping it enabled may reduce median RAM usage by the binary heap ## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html). -## +## ## Before trying this option, please take a look at the memory ## breakdown (http://www.rabbitmq.com/memory-use.html). ## @@ -533,18 +533,49 @@ ## # management.http_log_dir = /path/to/access.log -## Change the port on which the HTTP listener listens, -## specifying an interface for the web server to bind to. -## Also set the listener to use TLS and provide TLS options. -## +## HTTP listener and embedded Web server settings. +# ## See https://rabbitmq.com/management.html for details. +# +# management.tcp.port = 15672 +# management.tcp.ip = 0.0.0.0 +# +# management.tcp.shutdown_timeout = 7000 +# management.tcp.max_keepalive = 120 +# management.tcp.idle_timeout = 120 +# management.tcp.inactivity_timeout = 120 +# management.tcp.request_timeout = 120 +# management.tcp.compress = true + +## HTTPS listener settings. +## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details. +## +# management.ssl.port = 15671 +# management.ssl.cacertfile = /path/to/ca_certificate.pem +# management.ssl.certfile = /path/to/server_certificate.pem +# management.ssl.keyfile = /path/to/server_key.pem + +## More TLS options +# management.ssl.honor_cipher_order = true +# management.ssl.honor_ecc_order = true +# management.ssl.client_renegotiation = false +# management.ssl.secure_renegotiate = true + +## Supported TLS versions +# management.ssl.versions.1 = tlsv1.2 +# management.ssl.versions.2 = tlsv1.1 + +## Cipher suites the server is allowed to use +# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384 +# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384 +# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384 +# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384 +# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384 +# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384 +# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384 +# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384 +# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384 -# management.listener.port = 15672 -# management.listener.ip = 127.0.0.1 -# management.listener.ssl = true -# management.listener.ssl_opts.cacertfile = /path/to/cacert.pem -# management.listener.ssl_opts.certfile = /path/to/cert.pem -# management.listener.ssl_opts.keyfile = /path/to/key.pem ## One of 'basic', 'detailed' or 'none'. See ## http://rabbitmq.com/management.html#fine-stats for more details. @@ -583,13 +614,39 @@ # STOMP section # ======================================= -## Network Configuration. The format is generally the same as for the core broker. +## See https://rabbitmq.com/stomp.html for details. + +## TCP listeners. +## +# stomp.listeners.tcp.1 = 127.0.0.1:61613 +# stomp.listeners.tcp.2 = ::1:61613 + +## TCP listener settings ## -# stomp.listeners.tcp.default = 61613 +# stomp.tcp_listen_options.backlog = 2048 +# stomp.tcp_listen_options.recbuf = 131072 +# stomp.tcp_listen_options.sndbuf = 131072 +# +# stomp.tcp_listen_options.keepalive = true +# stomp.tcp_listen_options.nodelay = true +# +# stomp.tcp_listen_options.exit_on_close = true +# stomp.tcp_listen_options.send_timeout = 120 -## Same for ssl listeners +## Proxy protocol support ## +# stomp.proxy_protocol = false + +## TLS listeners +## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details. # stomp.listeners.ssl.default = 61614 +# +# ssl_options.cacertfile = path/to/cacert.pem +# ssl_options.certfile = path/to/cert.pem +# ssl_options.keyfile = path/to/key.pem +# ssl_options.verify = verify_peer +# ssl_options.fail_if_no_peer_cert = true + ## Number of Erlang processes that will accept connections for the TCP ## and TLS listeners. @@ -642,6 +699,52 @@ # MQTT section # ======================================= +## TCP listener settings. +## +# mqtt.listeners.tcp.1 = 127.0.0.1:61613 +# mqtt.listeners.tcp.2 = ::1:61613 + +## TCP listener options (as per the broker configuration). +## +# mqtt.tcp_listen_options.backlog = 4096 +# mqtt.tcp_listen_options.recbuf = 131072 +# mqtt.tcp_listen_options.sndbuf = 131072 +# +# mqtt.tcp_listen_options.keepalive = true +# mqtt.tcp_listen_options.nodelay = true +# +# mqtt.tcp_listen_options.exit_on_close = true +# mqtt.tcp_listen_options.send_timeout = 120 + +## TLS listener settings +## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details. +# +# mqtt.listeners.ssl.default = 8883 +# +# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem +# ssl_options.certfile = /path/to/tls/server_certificate.pem +# ssl_options.keyfile = /path/to/tls/server_key.pem +# ssl_options.verify = verify_peer +# ssl_options.fail_if_no_peer_cert = true +# + + +## Number of Erlang processes that will accept connections for the TCP +## and TLS listeners. +## +# mqtt.num_acceptors.tcp = 10 +# mqtt.num_acceptors.ssl = 10 + +## Whether or not to enable proxy protocol support. +## Once enabled, clients cannot directly connect to the broker +## anymore. They must connect through a load balancer that sends the +## proxy protocol header to the broker at connection time. +## This setting applies only to STOMP clients, other protocols +## like STOMP or AMQP have their own setting to enable proxy protocol. +## See the plugins or broker documentation for more information. +## +# mqtt.proxy_protocol = false + ## Set the default user name and password used for anonymous connections (when client ## provides no credentials). Anonymous connections are highly discouraged! ## @@ -672,34 +775,6 @@ ## # mqtt.prefetch = 10 -## TCP/SSL Configuration (as per the broker configuration). -## -# mqtt.listeners.tcp.default = 1883 - -## Same for ssl listener -## -# mqtt.listeners.ssl.default = 1884 - -## Number of Erlang processes that will accept connections for the TCP -## and TLS listeners. -## -# mqtt.num_acceptors.tcp = 10 -# mqtt.num_acceptors.ssl = 10 - -## TCP listener options (as per the broker configuration). -## -# mqtt.tcp_listen_options.backlog = 128 -# mqtt.tcp_listen_options.nodelay = true - -## Whether or not to enable proxy protocol support. -## Once enabled, clients cannot directly connect to the broker -## anymore. They must connect through a load balancer that sends the -## proxy protocol header to the broker at connection time. -## This setting applies only to STOMP clients, other protocols -## like STOMP or AMQP have their own setting to enable proxy protocol. -## See the plugins or broker documentation for more information. -## -# mqtt.proxy_protocol = false ## ---------------------------------------------------------------------------- ## RabbitMQ AMQP 1.0 Support diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 5c6078a413..ef3dafd116 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -258,7 +258,7 @@ end}. {translation, "rabbit.ssl_options.ciphers", fun(Conf) -> Settings = cuttlefish_variable:filter_by_prefix("ssl_options.ciphers", Conf), - [V || {_, V} <- Settings] + lists:reverse([V || {_, V} <- Settings]) end}. %% =========================================================================== @@ -554,6 +554,9 @@ end}. }. +{mapping, "msx_message_size", "rabbit.max_message_size", + [{datatype, integer}, {validators, ["less_then_512MB"]}]}. + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) -> Size > 0 andalso Size < 2147483648 end}. +{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0", +fun(Size) when is_integer(Size) -> + Size > 0 andalso Size < 536870912 +end}. + {validator, "less_than_1", "Flooat is not beetween 0 and 1", fun(Float) when is_float(Float) -> Float > 0 andalso Float < 1 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 704ead75a7..c65ad299ed 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]). -export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). @@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> not_found -> Q1 = rabbit_policy:set(Q), Q2 = Q1#amqqueue{state = live}, ok = store_queue(Q2), - B = add_default_binding(Q2), - fun () -> B(), {created, Q2} end; + fun () -> {created, Q2} end; {absent, _Q, _} = R -> rabbit_misc:const(R) end; [ExistingQ] -> @@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -add_default_binding(#amqqueue{name = QueueName}) -> - ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), - RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{source = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}, - ?INTERNAL_USER). - lookup([]) -> []; %% optimisation lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> @@ -764,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)]. + list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e96dfd7673..258e85ffa2 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -27,6 +27,10 @@ -export([has_for_source/1, remove_for_source/1, remove_for_destination/2, remove_transient_for_destination/1]). +-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, + kind = exchange, + name = <<>>}). + %%---------------------------------------------------------------------------- -export_type([key/0, deletions/0]). @@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) -> (Serial, false) -> x_callback(Serial, X, add_binding, B) end). +exists(#binding{source = ?DEFAULT_EXCHANGE(_), + destination = #resource{kind = queue, name = QName} = Queue, + key = QName, + args = []}) -> + case rabbit_amqqueue:lookup(Queue) of + {ok, _} -> true; + {error, not_found} -> false + end; exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> @@ -243,9 +255,17 @@ list(VHostPath) -> destination = VHostResource, _ = '_'}, _ = '_'}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. - + %% if there are any default exchange bindings left after an upgrade + %% of a pre-3.8 database, filter them out + AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)], + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_bindings(VHostPath) ++ Filtered. + +list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> + implicit_bindings(VHostPath); list_for_source(SrcName) -> mnesia:async_dirty( fun() -> @@ -255,16 +275,43 @@ list_for_source(SrcName) -> end). list_for_destination(DstName) -> - mnesia:async_dirty( - fun() -> - Route = #route{binding = #binding{destination = DstName, - _ = '_'}}, - [reverse_binding(B) || - #reverse_route{reverse_binding = B} <- - mnesia:match_object(rabbit_reverse_route, - reverse_route(Route), read)] - end). - + implicit_for_destination(DstName) ++ + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{destination = DstName, + _ = '_'}}, + [reverse_binding(B) || + #reverse_route{reverse_binding = B} <- + mnesia:match_object(rabbit_reverse_route, + reverse_route(Route), read)] + end). + +implicit_bindings(VHostPath) -> + DstQueues = rabbit_amqqueue:list_names(VHostPath), + [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []} + || DstQueue = #resource{name = QName} <- DstQueues ]. + +implicit_for_destination(DstQueue = #resource{kind = queue, + virtual_host = VHostPath, + name = QName}) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; +implicit_for_destination(_) -> + []. + +list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), + #resource{kind = queue, + virtual_host = VHostPath, + name = QName} = DstQueue) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; list_for_source_and_destination(SrcName, DstName) -> mnesia:async_dirty( fun() -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b06528..eeae247193 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> ok + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B is larger than max size ~B"; + _ -> + "message size ~B is larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 938e8f77fd..1536cd1f51 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -428,26 +428,20 @@ apply(_, #checkout{spec = Spec, meta = Meta, State1 = update_consumer(ConsumerId, Meta, Spec, State0), checkout(State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, - #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> - Total = rabbit_fifo_index:size(Indexes), - {State1, Effects1} = - maps:fold( - fun(ConsumerId, C = #consumer{checked_out = Checked0}, - {StateAcc0, EffectsAcc0}) -> - MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} - <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, - #{}, EffectsAcc0, StateAcc0) - end, {State0, []}, Cons0), - {State, _, Effects} = - update_smallest_raft_index( - RaftIdx, Indexes, - State1#state{ra_indexes = rabbit_fifo_index:empty(), - messages = #{}, - returns = lqueue:new(), - msg_bytes_enqueue = 0, - msg_bytes_checkout = 0, - low_msg_num = undefined}, Effects1), + #state{ra_indexes = Indexes0, + messages = Messages} = State0) -> + Total = maps:size(Messages), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, + Indexes0, + [I || {I, _} <- lists:sort(maps:values(Messages))]), + {State, _, Effects} = + update_smallest_raft_index(RaftIdx, Indexes0, + State0#state{ra_indexes = Indexes, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + low_msg_num = undefined}, + []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves {State, {purge, Total}, @@ -554,7 +548,8 @@ state_enter(leader, #state{consumers = Cons, Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], - Effects = Mons ++ Nots, + NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -1940,11 +1935,12 @@ purge_with_checkout_test() -> %% assert message bytes are non zero ?assert(State2#state.msg_bytes_checkout > 0), ?assert(State2#state.msg_bytes_enqueue > 0), - {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2), - ?assertEqual(0, State3#state.msg_bytes_checkout), + {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2), + ?assert(State2#state.msg_bytes_checkout > 0), ?assertEqual(0, State3#state.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), - ?assertEqual(0, maps:size(Checked)), + ?assertEqual(1, maps:size(Checked)), ok. down_returns_checked_out_in_order_test() -> diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 345a99a03c..f8f414f453 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> State#?MODULE{data = maps:put(Key, Value, Data)}. --spec delete(integer(), state()) -> state(). +-spec delete(Index :: integer(), state()) -> state(). delete(Smallest, #?MODULE{data = Data0, - largest = Largest, - smallest = Smallest} = State) -> + largest = Largest, + smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index c532714d41..7c386855f6 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -38,45 +38,33 @@ description() -> <<"Locate queue master node from cluster node with least bound queues">>}]. queue_master_location(#amqqueue{} = Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - VHosts = rabbit_vhost:list(), - BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), - {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), - {ok, MinMaster}. + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), + QueueNames = rabbit_amqqueue:list_names(), + MastersPerNode = lists:foldl( + fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> + case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + case maps:is_key(Master, NodeMasters) of + true -> maps:update_with(Master, + fun(N) -> N + 1 end, + NodeMasters); + false -> NodeMasters + end; + _ -> NodeMasters + end + end, + maps:from_list([{N, 0} || N <- Cluster]), + QueueNames), -%%--------------------------------------------------------------------------- -%% Private helper functions -%%--------------------------------------------------------------------------- -get_min_master(Cluster, BoundQueueMasters) -> - lists:min([ {count_masters(Node, BoundQueueMasters), Node} || - Node <- Cluster ]). - -count_masters(Node, Masters) -> - length([ X || X <- Masters, X == Node ]). - -get_bound_queue_masters_per_vhost([], Acc) -> - lists:flatten(Acc); -get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> - BoundQueueNames = - lists:filtermap( - fun(#binding{destination =#resource{kind = queue, - name = QueueName}}) -> - {true, QueueName}; - (_) -> - false - end, - rabbit_binding:list(VHost)), - UniqQueueNames = lists:usort(BoundQueueNames), - BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []), - get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). - - -get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; -get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) -> - QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( - QueueName, VHost) of - {ok, Master} when is_atom(Master) -> - [Master|QueueMastersAcc]; - _ -> QueueMastersAcc - end, - get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0). + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, + MastersPerNode), + {ok, MinNode}.
\ No newline at end of file diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index b318adaa12..50ca777aa8 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -326,15 +326,15 @@ tcp_listen_options.exit_on_close = false", {ssl_options, [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, {ciphers, [ - "DHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-SHA384", + "ECDHE-RSA-AES256-SHA384", "ECDH-ECDSA-AES256-GCM-SHA384", - "ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-SHA384", - "ECDHE-ECDSA-AES256-GCM-SHA384", - "ECDHE-ECDSA-AES256-SHA384", - "ECDHE-RSA-AES256-GCM-SHA384", - "ECDHE-RSA-AES256-SHA384" + "DHE-RSA-AES256-GCM-SHA384" ]}, {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index 051f3f13b7..20adf704a0 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) -> Conn4 = open_unmanaged_connection(Config, B), Conn5 = open_unmanaged_connection(Config, C), Conn6 = open_unmanaged_connection(Config, C), - ?assertEqual(6, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 6, 60000), %% B drops off the network, non-reachable by either A or C rabbit_ct_broker_helpers:block_traffic_between(A, B), @@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) -> timer:sleep(?DELAY), %% A and C are still connected, so 4 connections are tracked - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), rabbit_ct_broker_helpers:allow_traffic_between(A, B), rabbit_ct_broker_helpers:allow_traffic_between(B, C), timer:sleep(?DELAY), %% during autoheal B's connections were dropped - ?assertEqual(4, count_connections_in(Config, VHost)), + wait_for_count_connections_in(Config, VHost, 4, 60000), lists:foreach(fun (Conn) -> (catch rabbit_ct_client_helpers:close_connection(Conn)) @@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) -> passed. - %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 -> + ?assertEqual(Expected, count_connections_in(Config, VHost)); +wait_for_count_connections_in(Config, VHost, Expected, Time) -> + case count_connections_in(Config, VHost) of + Expected -> + ok; + _ -> + Sleep = 3000, + timer:sleep(Sleep), + wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep) + end. + count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, 0). count_connections_in(Config, VHost, NodeIndex) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 6f06f3b874..c84eaf4a5e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -340,7 +340,7 @@ start_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -356,7 +356,7 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -417,13 +417,13 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -442,7 +442,7 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -521,7 +521,7 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -563,7 +563,7 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -1263,7 +1263,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup]) + [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1300,7 +1300,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1539,8 +1539,8 @@ purge(Config) -> _DeliveryTag = consume(Ch, QQ, false), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 1), - {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), - wait_for_messages_pending_ack(Servers, RaName, 0), + {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), + wait_for_messages_pending_ack(Servers, RaName, 1), wait_for_messages_ready(Servers, RaName, 0). sync_queue(Config) -> @@ -1964,7 +1964,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -2235,7 +2235,8 @@ wait_for_cleanup(Server, Channel, Number) -> wait_for_cleanup(Server, Channel, Number, 60). wait_for_cleanup(Server, Channel, Number, 0) -> - ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel]))); + ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])), + Number); wait_for_cleanup(Server, Channel, Number, N) -> case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of Length when Number == Length -> @@ -2261,7 +2262,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> (_) -> -1 end, Msgs), - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); + ?assertEqual([Number || _ <- lists:seq(1, length(Servers))], + Totals); wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), case lists:all(fun(M) when is_map(M) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 3263a733a9..0512e8161a 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) -> meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), - ra_server_sup:remove_all(), + ra_server_sup_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d8031ce6d7..466df684af 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -25,6 +25,7 @@ -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). +-define(TIMEOUT_CHANNEL_EXCEPTION, 5000). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). @@ -60,10 +61,16 @@ groups() -> topic_matching, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]} + {max_length_mirrored, [], MaxLengthTests}]}, + max_message_size ]} ]. +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) -> _ -> ok end. +gen_binary_mb(N) -> + B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>, + << B1M || _ <- lists:seq(1, N) >>. + +assert_channel_alive(Ch) -> + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, + #amqp_msg{payload = <<"HI">>}). + +assert_channel_fail_max_size(Ch, Monitor) -> + receive + {'DOWN', Monitor, process, Ch, + {shutdown, + {server_initiated_close, 406, _Error}}} -> + ok + after ?TIMEOUT_CHANNEL_EXCEPTION -> + error({channel_exception_expected, max_message_size}) + end. + +max_message_size(Config) -> + Binary2M = gen_binary_mb(2), + Binary4M = gen_binary_mb(4), + Binary6M = gen_binary_mb(6), + Binary10M = gen_binary_mb(10), + + Size2Mb = 1024 * 1024 * 2, + Size2Mb = byte_size(Binary2M), + + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]), + + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + %% Binary is whithin the max size limit + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), + %% The channel process is alive + assert_channel_alive(Ch), + + Monitor = monitor(process, Ch), + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), + assert_channel_fail_max_size(Ch, Monitor), + + %% increase the limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]), + + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}), + assert_channel_alive(Ch1), + + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}), + assert_channel_fail_max_size(Ch1, Monitor1), + + %% increase beyond the hard limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]), + Val = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_channel, get_max_message_size, []), + + ?assertEqual(?MAX_MSG_SIZE, Val). + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- |
