summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-01-08 23:24:06 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-01-08 23:24:06 +0300
commitd21db02c75a732839ce90ad2bd587127905b975c (patch)
tree1867fa3e0cd232f0fbf1bab5feade00344ee1394
parent7908688ce692ad93929bce9da19171add4772a1f (diff)
parent1e2a202fcba50dcdce33ca363f93dd06fe78ebf9 (diff)
downloadrabbitmq-server-git-d21db02c75a732839ce90ad2bd587127905b975c.tar.gz
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
-rw-r--r--.travis.yml17
-rw-r--r--Makefile4
-rw-r--r--docs/rabbitmq.conf.example159
-rw-r--r--priv/schema/rabbit.schema10
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_binding.erl73
-rw-r--r--src/rabbit_channel.erl58
-rw-r--r--src/rabbit_fifo.erl44
-rw-r--r--src/rabbit_fifo_index.erl6
-rw-r--r--src/rabbit_queue_location_min_masters.erl70
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets12
-rw-r--r--test/per_vhost_connection_limit_partitions_SUITE.erl19
-rw-r--r--test/quorum_queue_SUITE.erl30
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl77
15 files changed, 402 insertions, 195 deletions
diff --git a/.travis.yml b/.travis.yml
index 9237d9632a..6ccdbab1ce 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,5 +1,6 @@
# vim:sw=2:et:
+dist: xenial
sudo: false
language: erlang
notifications:
@@ -10,15 +11,8 @@ notifications:
on_failure: always
addons:
apt:
- sources:
- - sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib
- key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
- awscli
- # Use Elixir from Erlang Solutions. The provided Elixir is
- # installed with kiex but is old. We also can't use kiex to
- # install a newer one because of GitHub API rate limiting.
- - elixir=1.6.0-1
cache:
apt: true
env:
@@ -27,10 +21,10 @@ env:
- secure: L1t0CHGR4RzOXwtkpM6feRKax95rszScBLqzjstEiMPkhjTsYTlAecnNxx6lTrGMnk5hQoi4PtbhmyZOX0siHTngTogoA/Nyn8etYzicU5ZO+qmBQOYpegz51lEu70ewXgkhEHzk9DtEPxfYviH9WiILrdUVRXXgZpoXq13p1QA=
otp_release:
- - "19.3"
- - "20.3"
+ - "21.2"
before_script:
+ - elixir --version
# The checkout made by Travis is a "detached HEAD" and branches
# information is missing. Our Erlang.mk's git_rmq fetch method relies
# on it, so we need to restore it.
@@ -42,11 +36,6 @@ before_script:
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream v3.8.x:v3.8.x || :
git fetch upstream master:master || :
- # Make sure we use Elixir from Erlang Solutions and not kiex.
- - |
- echo YES | kiex implode
- elixir --version
- elixir --version | grep -q 'Elixir 1.6.0'
script:
- make xref
diff --git a/Makefile b/Makefile
index e26f32d89f..094affc2df 100644
--- a/Makefile
+++ b/Makefile
@@ -130,7 +130,9 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
- {channel_queue_cleanup_interval, 60000}
+ {channel_queue_cleanup_interval, 60000},
+ %% Default max message size is 128 MB
+ {max_message_size, 134217728}
]
endef
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index a62ed38291..b82956a267 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -470,7 +470,7 @@
## Disabling background GC may reduce latency for client operations,
## keeping it enabled may reduce median RAM usage by the binary heap
## (see https://www.erlang-solutions.com/blog/erlang-garbage-collector.html).
-##
+##
## Before trying this option, please take a look at the memory
## breakdown (http://www.rabbitmq.com/memory-use.html).
##
@@ -533,18 +533,49 @@
##
# management.http_log_dir = /path/to/access.log
-## Change the port on which the HTTP listener listens,
-## specifying an interface for the web server to bind to.
-## Also set the listener to use TLS and provide TLS options.
-##
+## HTTP listener and embedded Web server settings.
+# ## See https://rabbitmq.com/management.html for details.
+#
+# management.tcp.port = 15672
+# management.tcp.ip = 0.0.0.0
+#
+# management.tcp.shutdown_timeout = 7000
+# management.tcp.max_keepalive = 120
+# management.tcp.idle_timeout = 120
+# management.tcp.inactivity_timeout = 120
+# management.tcp.request_timeout = 120
+# management.tcp.compress = true
+
+## HTTPS listener settings.
+## See https://rabbitmq.com/management.html and https://rabbitmq.com/ssl.html for details.
+##
+# management.ssl.port = 15671
+# management.ssl.cacertfile = /path/to/ca_certificate.pem
+# management.ssl.certfile = /path/to/server_certificate.pem
+# management.ssl.keyfile = /path/to/server_key.pem
+
+## More TLS options
+# management.ssl.honor_cipher_order = true
+# management.ssl.honor_ecc_order = true
+# management.ssl.client_renegotiation = false
+# management.ssl.secure_renegotiate = true
+
+## Supported TLS versions
+# management.ssl.versions.1 = tlsv1.2
+# management.ssl.versions.2 = tlsv1.1
+
+## Cipher suites the server is allowed to use
+# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
+# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
+# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
+# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384
+# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
+# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
+# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384
+# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384
+# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384
-# management.listener.port = 15672
-# management.listener.ip = 127.0.0.1
-# management.listener.ssl = true
-# management.listener.ssl_opts.cacertfile = /path/to/cacert.pem
-# management.listener.ssl_opts.certfile = /path/to/cert.pem
-# management.listener.ssl_opts.keyfile = /path/to/key.pem
## One of 'basic', 'detailed' or 'none'. See
## http://rabbitmq.com/management.html#fine-stats for more details.
@@ -583,13 +614,39 @@
# STOMP section
# =======================================
-## Network Configuration. The format is generally the same as for the core broker.
+## See https://rabbitmq.com/stomp.html for details.
+
+## TCP listeners.
+##
+# stomp.listeners.tcp.1 = 127.0.0.1:61613
+# stomp.listeners.tcp.2 = ::1:61613
+
+## TCP listener settings
##
-# stomp.listeners.tcp.default = 61613
+# stomp.tcp_listen_options.backlog = 2048
+# stomp.tcp_listen_options.recbuf = 131072
+# stomp.tcp_listen_options.sndbuf = 131072
+#
+# stomp.tcp_listen_options.keepalive = true
+# stomp.tcp_listen_options.nodelay = true
+#
+# stomp.tcp_listen_options.exit_on_close = true
+# stomp.tcp_listen_options.send_timeout = 120
-## Same for ssl listeners
+## Proxy protocol support
##
+# stomp.proxy_protocol = false
+
+## TLS listeners
+## See https://rabbitmq.com/stomp.html and https://rabbitmq.com/ssl.html for details.
# stomp.listeners.ssl.default = 61614
+#
+# ssl_options.cacertfile = path/to/cacert.pem
+# ssl_options.certfile = path/to/cert.pem
+# ssl_options.keyfile = path/to/key.pem
+# ssl_options.verify = verify_peer
+# ssl_options.fail_if_no_peer_cert = true
+
## Number of Erlang processes that will accept connections for the TCP
## and TLS listeners.
@@ -642,6 +699,52 @@
# MQTT section
# =======================================
+## TCP listener settings.
+##
+# mqtt.listeners.tcp.1 = 127.0.0.1:61613
+# mqtt.listeners.tcp.2 = ::1:61613
+
+## TCP listener options (as per the broker configuration).
+##
+# mqtt.tcp_listen_options.backlog = 4096
+# mqtt.tcp_listen_options.recbuf = 131072
+# mqtt.tcp_listen_options.sndbuf = 131072
+#
+# mqtt.tcp_listen_options.keepalive = true
+# mqtt.tcp_listen_options.nodelay = true
+#
+# mqtt.tcp_listen_options.exit_on_close = true
+# mqtt.tcp_listen_options.send_timeout = 120
+
+## TLS listener settings
+## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
+#
+# mqtt.listeners.ssl.default = 8883
+#
+# ssl_options.cacertfile = /path/to/tls/ca_certificate_bundle.pem
+# ssl_options.certfile = /path/to/tls/server_certificate.pem
+# ssl_options.keyfile = /path/to/tls/server_key.pem
+# ssl_options.verify = verify_peer
+# ssl_options.fail_if_no_peer_cert = true
+#
+
+
+## Number of Erlang processes that will accept connections for the TCP
+## and TLS listeners.
+##
+# mqtt.num_acceptors.tcp = 10
+# mqtt.num_acceptors.ssl = 10
+
+## Whether or not to enable proxy protocol support.
+## Once enabled, clients cannot directly connect to the broker
+## anymore. They must connect through a load balancer that sends the
+## proxy protocol header to the broker at connection time.
+## This setting applies only to STOMP clients, other protocols
+## like STOMP or AMQP have their own setting to enable proxy protocol.
+## See the plugins or broker documentation for more information.
+##
+# mqtt.proxy_protocol = false
+
## Set the default user name and password used for anonymous connections (when client
## provides no credentials). Anonymous connections are highly discouraged!
##
@@ -672,34 +775,6 @@
##
# mqtt.prefetch = 10
-## TCP/SSL Configuration (as per the broker configuration).
-##
-# mqtt.listeners.tcp.default = 1883
-
-## Same for ssl listener
-##
-# mqtt.listeners.ssl.default = 1884
-
-## Number of Erlang processes that will accept connections for the TCP
-## and TLS listeners.
-##
-# mqtt.num_acceptors.tcp = 10
-# mqtt.num_acceptors.ssl = 10
-
-## TCP listener options (as per the broker configuration).
-##
-# mqtt.tcp_listen_options.backlog = 128
-# mqtt.tcp_listen_options.nodelay = true
-
-## Whether or not to enable proxy protocol support.
-## Once enabled, clients cannot directly connect to the broker
-## anymore. They must connect through a load balancer that sends the
-## proxy protocol header to the broker at connection time.
-## This setting applies only to STOMP clients, other protocols
-## like STOMP or AMQP have their own setting to enable proxy protocol.
-## See the plugins or broker documentation for more information.
-##
-# mqtt.proxy_protocol = false
## ----------------------------------------------------------------------------
## RabbitMQ AMQP 1.0 Support
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 5c6078a413..ef3dafd116 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -258,7 +258,7 @@ end}.
{translation, "rabbit.ssl_options.ciphers",
fun(Conf) ->
Settings = cuttlefish_variable:filter_by_prefix("ssl_options.ciphers", Conf),
- [V || {_, V} <- Settings]
+ lists:reverse([V || {_, V} <- Settings])
end}.
%% ===========================================================================
@@ -554,6 +554,9 @@ end}.
}.
+{mapping, "msx_message_size", "rabbit.max_message_size",
+ [{datatype, integer}, {validators, ["less_then_512MB"]}]}.
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.
+{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
+fun(Size) when is_integer(Size) ->
+ Size > 0 andalso Size < 536870912
+end}.
+
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 704ead75a7..c65ad299ed 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
--export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
not_found -> Q1 = rabbit_policy:set(Q),
Q2 = Q1#amqqueue{state = live},
ok = store_queue(Q2),
- B = add_default_binding(Q2),
- fun () -> B(), {created, Q2} end;
+ fun () -> {created, Q2} end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
end;
[ExistingQ] ->
@@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
-add_default_binding(#amqqueue{name = QueueName}) ->
- ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
- RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{source = ExchangeName,
- destination = QueueName,
- key = RoutingKey,
- args = []},
- ?INTERNAL_USER).
-
lookup([]) -> []; %% optimisation
lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
@@ -764,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
+list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
+
list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e96dfd7673..258e85ffa2 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -27,6 +27,10 @@
-export([has_for_source/1, remove_for_source/1,
remove_for_destination/2, remove_transient_for_destination/1]).
+-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = <<>>}).
+
%%----------------------------------------------------------------------------
-export_type([key/0, deletions/0]).
@@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
(Serial, false) -> x_callback(Serial, X, add_binding, B)
end).
+exists(#binding{source = ?DEFAULT_EXCHANGE(_),
+ destination = #resource{kind = queue, name = QName} = Queue,
+ key = QName,
+ args = []}) ->
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, _} -> true;
+ {error, not_found} -> false
+ end;
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
@@ -243,9 +255,17 @@ list(VHostPath) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
- [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
- Route)].
-
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)],
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_bindings(VHostPath) ++ Filtered.
+
+list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
+ implicit_bindings(VHostPath);
list_for_source(SrcName) ->
mnesia:async_dirty(
fun() ->
@@ -255,16 +275,43 @@ list_for_source(SrcName) ->
end).
list_for_destination(DstName) ->
- mnesia:async_dirty(
- fun() ->
- Route = #route{binding = #binding{destination = DstName,
- _ = '_'}},
- [reverse_binding(B) ||
- #reverse_route{reverse_binding = B} <-
- mnesia:match_object(rabbit_reverse_route,
- reverse_route(Route), read)]
- end).
-
+ implicit_for_destination(DstName) ++
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{destination = DstName,
+ _ = '_'}},
+ [reverse_binding(B) ||
+ #reverse_route{reverse_binding = B} <-
+ mnesia:match_object(rabbit_reverse_route,
+ reverse_route(Route), read)]
+ end).
+
+implicit_bindings(VHostPath) ->
+ DstQueues = rabbit_amqqueue:list_names(VHostPath),
+ [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}
+ || DstQueue = #resource{name = QName} <- DstQueues ].
+
+implicit_for_destination(DstQueue = #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName}) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
+implicit_for_destination(_) ->
+ [].
+
+list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
+ #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName} = DstQueue) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
list_for_source_and_destination(SrcName, DstName) ->
mnesia:async_dirty(
fun() ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d1f3b06528..eeae247193 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
--export([list_queue_states/1]).
+-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
-export([handle_method/5]).
@@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ queue_cleanup_timer,
+ %% Message content size limit
+ max_message_size
}).
-define(QUEUE, lqueue).
@@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
+ MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{}},
+ queue_states = #{},
+ max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) ->
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+-spec get_max_message_size() -> non_neg_integer().
+
+get_max_message_size() ->
+ case application:get_env(rabbit, max_message_size) of
+ {ok, MS} when is_integer(MS) ->
+ erlang:min(MS, ?MAX_MSG_SIZE);
+ _ ->
+ ?MAX_MSG_SIZE
+ end.
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.
-check_msg_size(Content) ->
+check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
- case Size > ?MAX_MSG_SIZE of
- true -> precondition_failed("message size ~B larger than max size ~B",
- [Size, ?MAX_MSG_SIZE]);
- false -> ok
+ case Size of
+ S when S > MaxMessageSize ->
+ ErrorMessage = case MaxMessageSize of
+ ?MAX_MSG_SIZE ->
+ "message size ~B is larger than max size ~B";
+ _ ->
+ "message size ~B is larger than configured max size ~B"
+ end,
+ precondition_failed(ErrorMessage,
+ [Size, MaxMessageSize]);
+ _ -> ok
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
- tx = Tx,
- channel = ChannelNum,
- confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid}) ->
- check_msg_size(Content),
+ Content, State = #ch{virtual_host = VHostPath,
+ tx = Tx,
+ channel = ChannelNum,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState,
+ user = #user{username = Username} = User,
+ conn_name = ConnName,
+ delivery_flow = Flow,
+ conn_pid = ConnPid,
+ max_message_size = MaxMessageSize}) ->
+ check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 938e8f77fd..1536cd1f51 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -428,26 +428,20 @@ apply(_, #checkout{spec = Spec, meta = Meta,
State1 = update_consumer(ConsumerId, Meta, Spec, State0),
checkout(State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
- #state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
- Total = rabbit_fifo_index:size(Indexes),
- {State1, Effects1} =
- maps:fold(
- fun(ConsumerId, C = #consumer{checked_out = Checked0},
- {StateAcc0, EffectsAcc0}) ->
- MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
- <- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
- #{}, EffectsAcc0, StateAcc0)
- end, {State0, []}, Cons0),
- {State, _, Effects} =
- update_smallest_raft_index(
- RaftIdx, Indexes,
- State1#state{ra_indexes = rabbit_fifo_index:empty(),
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- msg_bytes_checkout = 0,
- low_msg_num = undefined}, Effects1),
+ #state{ra_indexes = Indexes0,
+ messages = Messages} = State0) ->
+ Total = maps:size(Messages),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Indexes0,
+ [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ {State, _, Effects} =
+ update_smallest_raft_index(RaftIdx, Indexes0,
+ State0#state{ra_indexes = Indexes,
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ low_msg_num = undefined},
+ []),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
{State, {purge, Total},
@@ -554,7 +548,8 @@ state_enter(leader, #state{consumers = Cons,
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
- Effects = Mons ++ Nots,
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
@@ -1940,11 +1935,12 @@ purge_with_checkout_test() ->
%% assert message bytes are non zero
?assert(State2#state.msg_bytes_checkout > 0),
?assert(State2#state.msg_bytes_enqueue > 0),
- {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2),
- ?assertEqual(0, State3#state.msg_bytes_checkout),
+ {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2),
+ ?assert(State2#state.msg_bytes_checkout > 0),
?assertEqual(0, State3#state.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
- ?assertEqual(0, maps:size(Checked)),
+ ?assertEqual(1, maps:size(Checked)),
ok.
down_returns_checked_out_in_order_test() ->
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index 345a99a03c..f8f414f453 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State)
when is_integer(Key) ->
State#?MODULE{data = maps:put(Key, Value, Data)}.
--spec delete(integer(), state()) -> state().
+-spec delete(Index :: integer(), state()) -> state().
delete(Smallest, #?MODULE{data = Data0,
- largest = Largest,
- smallest = Smallest} = State) ->
+ largest = Largest,
+ smallest = Smallest} = State) ->
Data = maps:remove(Smallest, Data0),
case find_next(Smallest + 1, Largest, Data) of
undefined ->
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index c532714d41..7c386855f6 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -38,45 +38,33 @@ description() ->
<<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{} = Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- VHosts = rabbit_vhost:list(),
- BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
- {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
- {ok, MinMaster}.
+ Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
+ QueueNames = rabbit_amqqueue:list_names(),
+ MastersPerNode = lists:foldl(
+ fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
+ case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
+ {ok, Master} when is_atom(Master) ->
+ case maps:is_key(Master, NodeMasters) of
+ true -> maps:update_with(Master,
+ fun(N) -> N + 1 end,
+ NodeMasters);
+ false -> NodeMasters
+ end;
+ _ -> NodeMasters
+ end
+ end,
+ maps:from_list([{N, 0} || N <- Cluster]),
+ QueueNames),
-%%---------------------------------------------------------------------------
-%% Private helper functions
-%%---------------------------------------------------------------------------
-get_min_master(Cluster, BoundQueueMasters) ->
- lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
- Node <- Cluster ]).
-
-count_masters(Node, Masters) ->
- length([ X || X <- Masters, X == Node ]).
-
-get_bound_queue_masters_per_vhost([], Acc) ->
- lists:flatten(Acc);
-get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
- BoundQueueNames =
- lists:filtermap(
- fun(#binding{destination =#resource{kind = queue,
- name = QueueName}}) ->
- {true, QueueName};
- (_) ->
- false
- end,
- rabbit_binding:list(VHost)),
- UniqQueueNames = lists:usort(BoundQueueNames),
- BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
- get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
-
-
-get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
-get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
- QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
- QueueName, VHost) of
- {ok, Master} when is_atom(Master) ->
- [Master|QueueMastersAcc];
- _ -> QueueMastersAcc
- end,
- get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init,
+ MastersPerNode),
+ {ok, MinNode}. \ No newline at end of file
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index b318adaa12..50ca777aa8 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -326,15 +326,15 @@ tcp_listen_options.exit_on_close = false",
{ssl_options,
[{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
{ciphers, [
- "DHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-GCM-SHA384",
+ "ECDHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-SHA384",
+ "ECDHE-RSA-AES256-SHA384",
"ECDH-ECDSA-AES256-GCM-SHA384",
- "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-GCM-SHA384",
+ "ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-SHA384",
- "ECDHE-ECDSA-AES256-GCM-SHA384",
- "ECDHE-ECDSA-AES256-SHA384",
- "ECDHE-RSA-AES256-GCM-SHA384",
- "ECDHE-RSA-AES256-SHA384"
+ "DHE-RSA-AES256-GCM-SHA384"
]},
{certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl
index 051f3f13b7..20adf704a0 100644
--- a/test/per_vhost_connection_limit_partitions_SUITE.erl
+++ b/test/per_vhost_connection_limit_partitions_SUITE.erl
@@ -107,7 +107,7 @@ cluster_full_partition_with_autoheal(Config) ->
Conn4 = open_unmanaged_connection(Config, B),
Conn5 = open_unmanaged_connection(Config, C),
Conn6 = open_unmanaged_connection(Config, C),
- ?assertEqual(6, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 6, 60000),
%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
@@ -115,14 +115,14 @@ cluster_full_partition_with_autoheal(Config) ->
timer:sleep(?DELAY),
%% A and C are still connected, so 4 connections are tracked
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),
%% during autoheal B's connections were dropped
- ?assertEqual(4, count_connections_in(Config, VHost)),
+ wait_for_count_connections_in(Config, VHost, 4, 60000),
lists:foreach(fun (Conn) ->
(catch rabbit_ct_client_helpers:close_connection(Conn))
@@ -131,11 +131,22 @@ cluster_full_partition_with_autoheal(Config) ->
passed.
-
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
+wait_for_count_connections_in(Config, VHost, Expected, Time) when Time =< 0 ->
+ ?assertEqual(Expected, count_connections_in(Config, VHost));
+wait_for_count_connections_in(Config, VHost, Expected, Time) ->
+ case count_connections_in(Config, VHost) of
+ Expected ->
+ ok;
+ _ ->
+ Sleep = 3000,
+ timer:sleep(Sleep),
+ wait_for_count_connections_in(Config, VHost, Expected, Time - Sleep)
+ end.
+
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 6f06f3b874..c84eaf4a5e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -340,7 +340,7 @@ start_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -356,7 +356,7 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -417,13 +417,13 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -442,7 +442,7 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -521,7 +521,7 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -563,7 +563,7 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -1263,7 +1263,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup])
+ [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1300,7 +1300,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1539,8 +1539,8 @@ purge(Config) ->
_DeliveryTag = consume(Ch, QQ, false),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 1),
- {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
wait_for_messages_ready(Servers, RaName, 0).
sync_queue(Config) ->
@@ -1964,7 +1964,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -2235,7 +2235,8 @@ wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 0) ->
- ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+ ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
+ Number);
wait_for_cleanup(Server, Channel, Number, N) ->
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
Length when Number == Length ->
@@ -2261,7 +2262,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
(_) ->
-1
end, Msgs),
- ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+ ?assertEqual([Number || _ <- lists:seq(1, length(Servers))],
+ Totals);
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
case lists:all(fun(M) when is_map(M) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 3263a733a9..0512e8161a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
- ra_server_sup:remove_all(),
+ ra_server_sup_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d8031ce6d7..466df684af 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -25,6 +25,7 @@
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
@@ -60,10 +61,16 @@ groups() ->
topic_matching,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]}
+ {max_length_mirrored, [], MaxLengthTests}]},
+ max_message_size
]}
].
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
+gen_binary_mb(N) ->
+ B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
+ << B1M || _ <- lists:seq(1, N) >>.
+
+assert_channel_alive(Ch) ->
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
+ #amqp_msg{payload = <<"HI">>}).
+
+assert_channel_fail_max_size(Ch, Monitor) ->
+ receive
+ {'DOWN', Monitor, process, Ch,
+ {shutdown,
+ {server_initiated_close, 406, _Error}}} ->
+ ok
+ after ?TIMEOUT_CHANNEL_EXCEPTION ->
+ error({channel_exception_expected, max_message_size})
+ end.
+
+max_message_size(Config) ->
+ Binary2M = gen_binary_mb(2),
+ Binary4M = gen_binary_mb(4),
+ Binary6M = gen_binary_mb(6),
+ Binary10M = gen_binary_mb(10),
+
+ Size2Mb = 1024 * 1024 * 2,
+ Size2Mb = byte_size(Binary2M),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
+
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ %% Binary is whithin the max size limit
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
+ %% The channel process is alive
+ assert_channel_alive(Ch),
+
+ Monitor = monitor(process, Ch),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_fail_max_size(Ch, Monitor),
+
+ %% increase the limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
+
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
+ assert_channel_alive(Ch1),
+
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
+ assert_channel_fail_max_size(Ch1, Monitor1),
+
+ %% increase beyond the hard limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
+ Val = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_channel, get_max_message_size, []),
+
+ ?assertEqual(?MAX_MSG_SIZE, Val).
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------