summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-02-21 01:15:06 +0300
committerMichael Klishin <mklishin@pivotal.io>2019-02-21 01:15:06 +0300
commitd3eb661efd89e87bc7859512bcdbe2f95fc5667d (patch)
tree5f1ee9b2de715e03f98a76e5ac72616a849b7183
parent017545d13ace947c02fb64c239ab28bb3f10b8d2 (diff)
parent9e4095fd906da893ca08b02836ce0716bbfac39f (diff)
downloadrabbitmq-server-git-d3eb661efd89e87bc7859512bcdbe2f95fc5667d.tar.gz
Merge branch 'master' into unavailable-qq-publish-fix
-rw-r--r--docs/rabbitmq.conf.example6
-rw-r--r--priv/schema/rabbit.schema10
-rwxr-xr-xscripts/rabbitmq-env17
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_binding.erl39
-rw-r--r--src/rabbit_channel.erl121
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_queue_index.erl8
-rw-r--r--src/rabbit_quorum_queue.erl108
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--src/rabbit_vhost.erl7
-rw-r--r--test/backing_queue_SUITE.erl3
-rw-r--r--test/channel_source_SUITE.erl142
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets13
-rw-r--r--test/quorum_queue_SUITE.erl39
-rw-r--r--test/rabbitmq_queues_cli_integration_SUITE.erl137
17 files changed, 558 insertions, 138 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index 81958c89fd..1aa3943a86 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -509,10 +509,10 @@
# net_ticktime = 60
## Inter-node communication port range.
+## The parameters inet_dist_listen_min and inet_dist_listen_max
+## can be configured in the classic config format only.
## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
-##
-# inet_dist_listen_min = 25672
-# inet_dist_listen_max = 25692
+
## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 2617f558f7..4cc543d99f 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -1352,16 +1352,6 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
-{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[
- {datatype, [integer]},
- {validators, ["non_zero_positive_integer"]}
-]}.
-
-{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[
- {datatype, [integer]},
- {validators, ["non_zero_positive_integer"]}
-]}.
-
% ==========================
% sysmon_handler section
% ==========================
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 2b06dec2cd..a70e902b78 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -330,7 +330,22 @@ if [ "${RABBITMQ_DEV_ENV}" ]; then
"$RABBITMQ_ENABLED_PLUGINS_FILE_source" != 'environment' ]; then
# We need to query the running node for the plugins directory
# and the "enabled plugins" file.
- eval $( (${RABBITMQ_SCRIPTS_DIR}/rabbitmqctl eval \
+ for arg in "$@"; do
+ case "$arg" in
+ -n)
+ next_is_node=1
+ ;;
+ *)
+ if test "$next_is_node"; then
+ # If the executed script is being passed a remote node
+ # name, use it here to query the remote node.
+ node_arg="-n $arg"
+ break
+ fi
+ ;;
+ esac
+ done
+ eval $( (${RABBITMQ_SCRIPTS_DIR}/rabbitmqctl $node_arg eval \
'{ok, F} = application:get_env(rabbit, feature_flags_file),
{ok, P} = application:get_env(rabbit, plugins_dir),
{ok, E} = application:get_env(rabbit, enabled_plugins_file),
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b4863057f0..4bb680cccf 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -187,6 +187,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost"
NOTIFY_SOCKET= \
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
+RABBITMQ_CONFIG_ARG_FILE=$RABBITMQ_CONFIG_ARG_FILE \
RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
-boot "${CLEAN_BOOT_FILE}" \
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 337786b571..5488a88836 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -108,12 +108,16 @@ warn_file_limit() ->
ok
end.
--spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+-spec recover(rabbit_types:vhost()) ->
+ {RecoveredClassic :: [amqqueue:amqqueue()],
+ FailedClassic :: [amqqueue:amqqueue()],
+ Quorum :: [amqqueue:amqqueue()]}.
recover(VHost) ->
- Classic = find_local_durable_classic_queues(VHost),
+ AllClassic = find_local_durable_classic_queues(VHost),
Quorum = find_local_quorum_queues(VHost),
- recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
+ {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic),
+ {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}.
recover_classic_queues(VHost, Queues) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
{ok, _} ->
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
+ RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs],
+ FailedQueues = [Q || Q <- Queues,
+ not lists:member(amqqueue:get_name(Q), RecoveredNames)],
+ {RecoveredQs, FailedQueues};
{error, Reason} ->
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
throw({error, Reason})
end.
-filter_per_type(Queues) ->
- lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
-
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
@@ -156,12 +161,14 @@ stop(VHost) ->
-spec start([amqqueue:amqqueue()]) -> 'ok'.
start(Qs) ->
- {Classic, _Quorum} = filter_per_type(Qs),
%% At this point all recovered queues and their bindings are
%% visible to routing, so now it is safe for them to complete
%% their initialisation (which may involve interacting with other
%% queues).
- _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
+ _ = [amqqueue:get_pid(Q) ! {self(), go}
+ || Q <- Qs,
+ %% All queues are supposed to be classic here.
+ amqqueue:is_classic(Q)],
ok.
mark_local_durable_queues_stopped(VHost) ->
@@ -609,14 +616,14 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) ->
rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
-assert_equivalence(Q, Durable1, AD1, Args1, Owner) ->
+assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) ->
QName = amqqueue:get_name(Q),
- Durable = amqqueue:is_durable(Q),
- AD = amqqueue:is_auto_delete(Q),
- rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable),
- rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete),
- assert_args_equivalence(Q, Args1),
- check_exclusive_access(Q, Owner, strict).
+ DurableQ = amqqueue:is_durable(Q),
+ AutoDeleteQ = amqqueue:is_auto_delete(Q),
+ ok = check_exclusive_access(Q, Owner, strict),
+ ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable),
+ ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete),
+ ok = assert_args_equivalence(Q, Args1).
-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
'ok' | rabbit_types:channel_exit().
@@ -633,7 +640,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
QueueName = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
resource_locked,
- "cannot obtain exclusive access to locked ~s",
+ "cannot obtain exclusive access to locked ~s. It could be originally "
+ "declared on another connection or the exclusive property value does not "
+ "match that of the original declaration.",
[rabbit_misc:rs(QueueName)]).
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ab3bc6c819..05db4188ba 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -49,7 +49,6 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error(
- 'binding_not_found' |
{'binding_invalid', string(), [any()]}).
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
@@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) ->
lock_resource(Src),
lock_resource(Dst),
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (SrcDurable andalso DstDurable andalso
- mnesia:read({rabbit_durable_route, B}) =/= []) of
- false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
- fun mnesia:write/3),
- x_callback(transaction, Src, add_binding, B),
- Serial = rabbit_exchange:serial(Src),
- fun () ->
- x_callback(Serial, Src, add_binding, B),
- ok = rabbit_event:notify(
- binding_created,
- info(B) ++ [{user_who_performed_action, ActingUser}])
- end;
- true -> rabbit_misc:const({error, binding_not_found})
+ ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
+ x_callback(transaction, Src, add_binding, B),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(
+ binding_created,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
end.
-spec remove(rabbit_types:binding()) -> bind_res().
@@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) ->
case mnesia:read(rabbit_route, B, write) of
[] -> case mnesia:read(rabbit_durable_route, B, write) of
[] -> rabbit_misc:const(ok);
- _ -> rabbit_misc:const({error, binding_not_found})
+ %% We still delete the binding and run
+ %% all post-delete functions if there is only
+ %% a durable route in the database
+ _ -> remove(Src, Dst, B, ActingUser)
end;
_ -> case InnerFun(Src, Dst) of
ok -> remove(Src, Dst, B, ActingUser);
@@ -275,9 +273,8 @@ list_for_source(SrcName) ->
-spec list_for_destination
(rabbit_types:binding_destination()) -> bindings().
-list_for_destination(DstName) ->
- implicit_for_destination(DstName) ++
- mnesia:async_dirty(
+list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
+ AllBindings = mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
@@ -285,7 +282,11 @@ list_for_destination(DstName) ->
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
- end).
+ end),
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_for_destination(DstName) ++ Filtered.
implicit_bindings(VHostPath) ->
DstQueues = rabbit_amqqueue:list_names(VHostPath),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 036aa9a60c..8f5f159f88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,6 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
+-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +79,7 @@
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
--export([handle_method/5]).
+-export([handle_method/6]).
-record(ch, {
%% starting | running | flow | closing
@@ -97,6 +98,9 @@
%% same as reader's name, see #v1.name
%% in rabbit_reader
conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
%% limiter pid, see rabbit_limiter
limiter,
%% none | {Msgs, Acks} | committing | failed |
@@ -448,6 +452,14 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
+-spec source(pid(), any()) -> any().
+
+source(Pid, Source) when is_pid(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid ! {channel_source, Source};
+ false -> {error, channel_terminated}
+ end.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
@@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})).
+ noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
+
+handle_info({channel_source, Source}, State = #ch{}) ->
+ noreply(State#ch{source = Source}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -938,11 +953,11 @@ check_write_permitted(Resource, User) ->
check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -978,14 +993,17 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, Permission) ->
+ User, none, RoutingKey, _ChSrc, Permission) ->
%% Called from outside the channel by mgmt API
AmqpParams = [],
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid),
+ User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
+ AmqpParams = get_amqp_params(ConnPid, ChSrc),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
+check_topic_authorisation(_, _, _, _, _, _) ->
+ ok.
+
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
AmqpParams, RoutingKey, Permission) ->
@@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end;
-check_topic_authorisation(_, _, _, _, _) ->
- ok.
+ end.
-get_amqp_params(ConnPid) when is_pid(ConnPid) ->
+get_amqp_params(_ConnPid, rabbit_reader) -> [];
+get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
Timeout = get_operation_timeout(),
get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout).
@@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
+ source = ChSrc,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
user = User,
queue_collector_pid = CollectorPid,
- conn_pid = ConnPid}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ conn_pid = ConnPid,
+ source = ChSrc}) ->
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- case handle_method(Method, ConnPid, CollectorPid,
+ case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid,
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, User),
- ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
- case ExchangeLookup of
+ case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
- %% no-op
- ExchangeLookup;
+ ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
- ExchangeLookup
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(source, #ch{source = ChSrc}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _CollectorPid, VHost, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, CollectorPid, VHostPath,
+ ConnPid, ChSrc, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, _CollectorPid, VHostPath,
+ ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
@@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
@@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, _CollectorPid, VHostPath, User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
@@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 50e8f3d2b0..e43bfba90c 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}]),
+ _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 61373e49c1..665eea12df 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -1472,14 +1472,18 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name_legacy(QueueName)]),
NewQueueDir = queue_dir(QueueName),
+ rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
+ [OldQueueDir, NewQueueDir]),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
false ->
- rabbit_log:info("Queue index directory not found for queue ~p~n",
- [QueueName])
+ Msg = "Queue index directory '~s' not found for ~s~n",
+ Args = [OldQueueDir, rabbit_misc:rs(QueueName)],
+ rabbit_log_upgrade:error(Msg, Args),
+ rabbit_log:error(Msg, Args)
end,
ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e1c1c320f4..9e73981541 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,6 +37,8 @@
-export([requeue/3]).
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
+-export([shrink_all/1,
+ grow/4]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -288,8 +290,7 @@ reductions(Name) ->
0
end.
--spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(), atom()}].
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
recover(Queues) ->
[begin
@@ -666,7 +667,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
case ra:start_server(RaName, ServerId, ra_machine(Q),
- [{RaName, N} || N <- QNodes]) of
+ [{RaName, N} || N <- QNodes]) of
ok ->
case ra:add_member(ServerRef, ServerId) of
{ok, _, Leader} ->
@@ -679,11 +680,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
+ timeout ->
+ {error, timeout};
E ->
%% TODO should we stop the ra process here?
E
end;
- {error, _} = E ->
+ timeout ->
+ {error, timeout};
+ E ->
E
end.
@@ -713,20 +718,91 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q),
ServerId = {RaName, Node},
- case ra:leave_and_delete_server(ServerId) of
- ok ->
- Fun = fun(Q1) ->
- amqqueue:set_quorum_nodes(
- Q1,
- lists:delete(Node, amqqueue:get_quorum_nodes(Q1)))
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun() -> rabbit_amqqueue:update(QName, Fun) end),
- ok;
- E ->
- E
+ case amqqueue:get_quorum_nodes(Q) of
+ [Node] ->
+ %% deleting the last member is not allowed
+ {error, last_node};
+ _ ->
+ case ra:leave_and_delete_server(ServerId) of
+ ok ->
+ Fun = fun(Q1) ->
+ amqqueue:set_quorum_nodes(
+ Q1,
+ lists:delete(Node,
+ amqqueue:get_quorum_nodes(Q1)))
+ end,
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> rabbit_amqqueue:update(QName, Fun) end),
+ ok;
+ timeout ->
+ {error, timeout};
+ E ->
+ E
+ end
end.
+-spec shrink_all(node()) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
+shrink_all(Node) ->
+ [begin
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: removing member (replica) on node ~w",
+ [rabbit_misc:rs(QName), Node]),
+ Size = length(amqqueue:get_quorum_nodes(Q)),
+ case delete_member(Q, Node) of
+ ok ->
+ {QName, {ok, Size-1}};
+ {error, Err} ->
+ rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w",
+ [rabbit_misc:rs(QName), Node, Err]),
+ {QName, {error, Size, Err}}
+ end
+ end || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ lists:member(Node, amqqueue:get_quorum_nodes(Q))].
+
+-spec grow(node(), binary(), binary(), all | even) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
+grow(Node, VhostSpec, QueueSpec, Strategy) ->
+ Running = rabbit_mnesia:cluster_nodes(running),
+ [begin
+ Size = length(amqqueue:get_quorum_nodes(Q)),
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: adding a new member (replica) on node ~w",
+ [rabbit_misc:rs(QName), Node]),
+ case add_member(Q, Node) of
+ ok ->
+ {QName, {ok, Size + 1}};
+ {error, Err} ->
+ rabbit_log:warning(
+ "~s: failed to add member (replica) on node ~w, error: ~w",
+ [rabbit_misc:rs(QName), Node, Err]),
+ {QName, {error, Size, Err}}
+ end
+ end
+ || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ %% don't add a member if there is already one on the node
+ not lists:member(Node, amqqueue:get_quorum_nodes(Q)),
+ %% node needs to be running
+ lists:member(Node, Running),
+ matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)),
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
+
+get_resource_name(#resource{name = Name}) ->
+ Name.
+
+matches_strategy(all, _) -> true;
+matches_strategy(even, Members) ->
+ length(Members) rem 2 == 0.
+
+is_match(Subj, E) ->
+ nomatch /= re:run(Subj, E).
+
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c0cb9c57d5..6f0b0a5ea5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -924,6 +924,7 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
+ _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 9180f9ca0a..1721c9b806 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,10 +53,11 @@ recover(VHost) ->
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
- Qs = rabbit_amqqueue:recover(VHost),
- QNames = [amqqueue:get_name(Q) || Q <- Qs],
+ {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost),
+ AllQs = RecoveredClassic ++ FailedClassic ++ Quorum,
+ QNames = [amqqueue:get_name(Q) || Q <- AllQs],
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
- ok = rabbit_amqqueue:start(Qs),
+ ok = rabbit_amqqueue:start(RecoveredClassic),
%% Start queue mirrors.
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index c3f87cce59..d262e4c513 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -733,7 +733,8 @@ bq_queue_recover1(Config) ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
- rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
+ {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
+ rabbit_amqqueue:start(Recovered),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
new file mode 100644
index 0000000000..56b287e913
--- /dev/null
+++ b/test/channel_source_SUITE.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(channel_source_SUITE).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ network_rabbit_reader_channel_source,
+ network_arbitrary_channel_source,
+ direct_channel_source,
+ undefined_channel_source
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+network_rabbit_reader_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, network_rabbit_reader_channel_source1, [Config]).
+
+network_rabbit_reader_channel_source1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+network_arbitrary_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, network_arbitrary_channel_source1, [Config]).
+
+network_arbitrary_channel_source1(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end),
+ {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id),
+ {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1,
+ rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [],
+ Collector, Limiter),
+ _ = rabbit_channel:source(Ch, ?MODULE),
+ [{amqp_params, #amqp_params_network{username = <<"guest">>,
+ password = <<"guest">>, host = "localhost", virtual_host = <<"/">>}}] =
+ rabbit_amqp_connection:amqp_params(Conn, 1000),
+ [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]),
+ [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]],
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(Ch, ?MODULE),
+ passed.
+
+direct_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, direct_channel_source1, [Config]).
+
+direct_channel_source1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+undefined_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, undefined_channel_source1, [Config]).
+
+undefined_channel_source1(_Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ passed.
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index 0cd274b757..9afddd9b1e 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -580,19 +580,6 @@ credential_validator.regexp = ^abc\\d+",
]}],
[]},
- {kernel_inet_dist_listen_min,
- "inet_dist_listen_min = 16000",
- [{kernel, [
- {inet_dist_listen_min, 16000}
- ]}],
- []},
- {kernel_inet_dist_listen_max,
- "inet_dist_listen_max = 16100",
- [{kernel, [
- {inet_dist_listen_max, 16100}
- ]}],
- []},
-
{log_syslog_settings,
"log.syslog = true
log.syslog.identity = rabbitmq
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 164d37de4b..406c02de83 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -69,7 +69,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
+ consume_in_minority,
+ shrink_all
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -190,7 +191,8 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
[{rmq_nodes_count, 3},
{rmq_nodename_suffix, Testcase},
{tcp_ports_base},
- {queue_name, Q}
+ {queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
]),
Config3 = rabbit_ct_helpers:run_steps(
Config2,
@@ -210,7 +212,8 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Q = rabbit_data_coercion:to_binary(Testcase),
Config2 = rabbit_ct_helpers:set_config(Config1,
- [{queue_name, Q}
+ [{queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
@@ -622,7 +625,33 @@ consume_in_minority(Config) ->
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
- no_ack = false})).
+ no_ack = false})),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok.
+
+shrink_all(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ AQ = ?config(alt_queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', AQ, 0, 0},
+ declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(500),
+ Result = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]),
+ ?assertMatch([{_, {ok, 2}}, {_, {ok, 2}}], Result),
+ Result1 = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server1]),
+ ?assertMatch([{_, {ok, 1}}, {_, {ok, 1}}], Result1),
+ Result2 = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server0]),
+ ?assertMatch([{_, {error, 1, last_node}},
+ {_, {error, 1, last_node}}], Result2),
+ ok.
+
+
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -641,7 +670,7 @@ subscribe_should_fail_when_global_qos_true(Config) ->
_ -> exit(subscribe_should_not_pass)
catch
_:_ = Err ->
- ct:pal("Err ~p", [Err])
+ ct:pal("subscribe_should_fail_when_global_qos_true caught an error: ~p", [Err])
end,
ok.
diff --git a/test/rabbitmq_queues_cli_integration_SUITE.erl b/test/rabbitmq_queues_cli_integration_SUITE.erl
new file mode 100644
index 0000000000..1601ba82fe
--- /dev/null
+++ b/test/rabbitmq_queues_cli_integration_SUITE.erl
@@ -0,0 +1,137 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017-2019 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbitmq_queues_cli_integration_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+groups() ->
+ [
+ {tests, [], [
+ shrink,
+ grow,
+ grow_invalid_node_filtered
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(tests, Config0) ->
+ NumNodes = 3,
+ Config1 = rabbit_ct_helpers:set_config(
+ Config0, [{rmq_nodes_count, NumNodes},
+ {rmq_nodes_clustered, true}]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()
+ ).
+
+end_per_group(tests, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config0) ->
+ rabbit_ct_helpers:ensure_rabbitmq_queues_cmd(
+ rabbit_ct_helpers:testcase_started(Config0, Testcase)).
+
+end_per_testcase(Testcase, Config0) ->
+ rabbit_ct_helpers:testcase_finished(Config0, Testcase).
+
+shrink(Config) ->
+ NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2),
+ Nodename2 = ?config(nodename, NodeConfig),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2),
+ %% declare a quorum queue
+ QName = "shrink1",
+ #'queue.declare_ok'{} = declare_qq(Ch, QName),
+ {ok, Out1} = rabbitmq_queues(Config, 0, ["shrink", Nodename2]),
+ ?assertMatch(#{{"/", "shrink1"} := {2, ok}}, parse_result(Out1)),
+ Nodename1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ {ok, Out2} = rabbitmq_queues(Config, 0, ["shrink", Nodename1]),
+ ?assertMatch(#{{"/", "shrink1"} := {1, ok}}, parse_result(Out2)),
+ Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, Out3} = rabbitmq_queues(Config, 0, ["shrink", Nodename0]),
+ ?assertMatch(#{{"/", "shrink1"} := {1, error}}, parse_result(Out3)),
+ ok.
+
+grow(Config) ->
+ NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2),
+ Nodename2 = ?config(nodename, NodeConfig),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2),
+ %% declare a quorum queue
+ QName = "grow1",
+ Args = [{<<"x-quorum-initial-group-size">>, long, 1}],
+ #'queue.declare_ok'{} = declare_qq(Ch, QName, Args),
+ Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, Out1} = rabbitmq_queues(Config, 0, ["grow", Nodename0, "all"]),
+ ?assertMatch(#{{"/", "grow1"} := {2, ok}}, parse_result(Out1)),
+ Nodename1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ {ok, Out2} = rabbitmq_queues(Config, 0, ["grow", Nodename1, "all"]),
+ ?assertMatch(#{{"/", "grow1"} := {3, ok}}, parse_result(Out2)),
+
+ {ok, Out3} = rabbitmq_queues(Config, 0, ["grow", Nodename0, "all"]),
+ ?assertNotMatch(#{{"/", "grow1"} := _}, parse_result(Out3)),
+ ok.
+
+grow_invalid_node_filtered(Config) ->
+ NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2),
+ Nodename2 = ?config(nodename, NodeConfig),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2),
+ %% declare a quorum queue
+ QName = "grow-err",
+ Args = [{<<"x-quorum-initial-group-size">>, long, 1}],
+ #'queue.declare_ok'{} = declare_qq(Ch, QName, Args),
+ DummyNode = not_really_a_node@nothing,
+ {ok, Out1} = rabbitmq_queues(Config, 0, ["grow", DummyNode, "all"]),
+ ?assertNotMatch(#{{"/", "grow-err"} := _}, parse_result(Out1)),
+ ok.
+
+parse_result(S) ->
+ Lines = string:split(S, "\n", all),
+ maps:from_list(
+ [{{Vhost, QName},
+ {erlang:list_to_integer(Size), case Result of
+ "ok" -> ok;
+ _ -> error
+ end}}
+ || [Vhost, QName, Size, Result] <-
+ [string:split(L, "\t", all) || L <- Lines]]).
+
+declare_qq(Ch, Q, Args0) ->
+ Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}] ++ Args0,
+ amqp_channel:call(Ch, #'queue.declare'{queue = list_to_binary(Q),
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+declare_qq(Ch, Q) ->
+ declare_qq(Ch, Q, []).
+
+rabbitmq_queues(Config, N, Args) ->
+ rabbit_ct_broker_helpers:rabbitmq_queues(Config, N, ["--silent" | Args]).