diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-02-19 16:43:35 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2019-02-19 16:43:35 +0300 |
| commit | 4965daac497aa3148cc47ed8347b514a484ed3d6 (patch) | |
| tree | 90892d9552647de9c934a7ce7cb72fcf5f40d845 | |
| parent | ed13f5398197f5ff735c2c1b97519ecc0d22dd6f (diff) | |
| parent | c5df441e8130d33683082e165b3ff993f1580ee0 (diff) | |
| download | rabbitmq-server-git-4965daac497aa3148cc47ed8347b514a484ed3d6.tar.gz | |
Merge branch 'master' into pre-node-removal-tool
| -rw-r--r-- | docs/rabbitmq.conf.example | 6 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 10 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 1 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 7 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 142 | ||||
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 13 |
13 files changed, 274 insertions, 116 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index 81958c89fd..1aa3943a86 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -509,10 +509,10 @@ # net_ticktime = 60 ## Inter-node communication port range. +## The parameters inet_dist_listen_min and inet_dist_listen_max +## can be configured in the classic config format only. ## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range. -## -# inet_dist_listen_min = 25672 -# inet_dist_listen_max = 25692 + ## ---------------------------------------------------------------------------- ## RabbitMQ Management Plugin diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 2617f558f7..4cc543d99f 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -1352,16 +1352,6 @@ end}. {validators, ["non_zero_positive_integer"]} ]}. -{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[ - {datatype, [integer]}, - {validators, ["non_zero_positive_integer"]} -]}. - -{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[ - {datatype, [integer]}, - {validators, ["non_zero_positive_integer"]} -]}. - % ========================== % sysmon_handler section % ========================== diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b4863057f0..4bb680cccf 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -187,6 +187,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost" NOTIFY_SOCKET= \ RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \ ERL_CRASH_DUMP=$ERL_CRASH_DUMP \ +RABBITMQ_CONFIG_ARG_FILE=$RABBITMQ_CONFIG_ARG_FILE \ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \ ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ -boot "${CLEAN_BOOT_FILE}" \ diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 337786b571..5488a88836 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -108,12 +108,16 @@ warn_file_limit() -> ok end. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec recover(rabbit_types:vhost()) -> + {RecoveredClassic :: [amqqueue:amqqueue()], + FailedClassic :: [amqqueue:amqqueue()], + Quorum :: [amqqueue:amqqueue()]}. recover(VHost) -> - Classic = find_local_durable_classic_queues(VHost), + AllClassic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), - recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum). + {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic), + {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}. recover_classic_queues(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) -> BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)), + RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs], + FailedQueues = [Q || Q <- Queues, + not lists:member(amqqueue:get_name(Q), RecoveredNames)], + {RecoveredQs, FailedQueues}; {error, Reason} -> rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), throw({error, Reason}) end. -filter_per_type(Queues) -> - lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). - filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -156,12 +161,14 @@ stop(VHost) -> -spec start([amqqueue:amqqueue()]) -> 'ok'. start(Qs) -> - {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], ok. mark_local_durable_queues_stopped(VHost) -> @@ -609,14 +616,14 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) -> rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). -assert_equivalence(Q, Durable1, AD1, Args1, Owner) -> +assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) -> QName = amqqueue:get_name(Q), - Durable = amqqueue:is_durable(Q), - AD = amqqueue:is_auto_delete(Q), - rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable), - rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete), - assert_args_equivalence(Q, Args1), - check_exclusive_access(Q, Owner, strict). + DurableQ = amqqueue:is_durable(Q), + AutoDeleteQ = amqqueue:is_auto_delete(Q), + ok = check_exclusive_access(Q, Owner, strict), + ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable), + ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete), + ok = assert_args_equivalence(Q, Args1). -spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> 'ok' | rabbit_types:channel_exit(). @@ -633,7 +640,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) -> QueueName = amqqueue:get_name(Q), rabbit_misc:protocol_error( resource_locked, - "cannot obtain exclusive access to locked ~s", + "cannot obtain exclusive access to locked ~s. It could be originally " + "declared on another connection or the exclusive property value does not " + "match that of the original declaration.", [rabbit_misc:rs(QueueName)]). -spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ab3bc6c819..05db4188ba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -49,7 +49,6 @@ -type bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error( - 'binding_not_found' | {'binding_invalid', string(), [any()]}). -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: @@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) -> lock_resource(Src), lock_resource(Dst), [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], - case (SrcDurable andalso DstDurable andalso - mnesia:read({rabbit_durable_route, B}) =/= []) of - false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, - fun mnesia:write/3), - x_callback(transaction, Src, add_binding, B), - Serial = rabbit_exchange:serial(Src), - fun () -> - x_callback(Serial, Src, add_binding, B), - ok = rabbit_event:notify( - binding_created, - info(B) ++ [{user_who_performed_action, ActingUser}]) - end; - true -> rabbit_misc:const({error, binding_not_found}) + ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + fun mnesia:write/3), + x_callback(transaction, Src, add_binding, B), + Serial = rabbit_exchange:serial(Src), + fun () -> + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify( + binding_created, + info(B) ++ [{user_who_performed_action, ActingUser}]) end. -spec remove(rabbit_types:binding()) -> bind_res(). @@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) -> case mnesia:read(rabbit_route, B, write) of [] -> case mnesia:read(rabbit_durable_route, B, write) of [] -> rabbit_misc:const(ok); - _ -> rabbit_misc:const({error, binding_not_found}) + %% We still delete the binding and run + %% all post-delete functions if there is only + %% a durable route in the database + _ -> remove(Src, Dst, B, ActingUser) end; _ -> case InnerFun(Src, Dst) of ok -> remove(Src, Dst, B, ActingUser); @@ -275,9 +273,8 @@ list_for_source(SrcName) -> -spec list_for_destination (rabbit_types:binding_destination()) -> bindings(). -list_for_destination(DstName) -> - implicit_for_destination(DstName) ++ - mnesia:async_dirty( +list_for_destination(DstName = #resource{virtual_host = VHostPath}) -> + AllBindings = mnesia:async_dirty( fun() -> Route = #route{binding = #binding{destination = DstName, _ = '_'}}, @@ -285,7 +282,11 @@ list_for_destination(DstName) -> #reverse_route{reverse_binding = B} <- mnesia:match_object(rabbit_reverse_route, reverse_route(Route), read)] - end). + end), + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_for_destination(DstName) ++ Filtered. implicit_bindings(VHostPath) -> DstQueues = rabbit_amqqueue:list_names(VHostPath), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..8f5f159f88 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +79,7 @@ -export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor --export([handle_method/5]). +-export([handle_method/6]). -record(ch, { %% starting | running | flow | closing @@ -97,6 +98,9 @@ %% same as reader's name, see #v1.name %% in rabbit_reader conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, %% limiter pid, see rabbit_limiter limiter, %% none | {Msgs, Acks} | committing | failed | @@ -448,6 +452,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State = #ch{}) -> + noreply(State#ch{source = Source}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -938,11 +953,11 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -978,14 +993,17 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, Permission) -> + User, none, RoutingKey, _ChSrc, Permission) -> %% Called from outside the channel by mgmt API AmqpParams = [], check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> + AmqpParams = get_amqp_params(ConnPid, ChSrc), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); +check_topic_authorisation(_, _, _, _, _, _) -> + ok. + check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, AmqpParams, RoutingKey, Permission) -> @@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end; -check_topic_authorisation(_, _, _, _, _) -> - ok. + end. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, conn_name = ConnName, delivery_flow = Flow, conn_pid = ConnPid, + source = ChSrc, max_message_size = MaxMessageSize}) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, user = User, queue_collector_pid = CollectorPid, - conn_pid = ConnPid}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + conn_pid = ConnPid, + source = ChSrc}) -> + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - case handle_method(Method, ConnPid, CollectorPid, + case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), @@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, User), - ExchangeLookup = rabbit_exchange:lookup(ExchangeName), - case ExchangeLookup of + case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> - %% no-op - ExchangeLookup; + ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), - ExchangeLookup + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(source, #ch{source = ChSrc}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _CollectorPid, VHost, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, CollectorPid, VHostPath, + ConnPid, ChSrc, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _CollectorPid, VHostPath, _User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, _CollectorPid, VHostPath, + ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), @@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), @@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, _CollectorPid, VHostPath, User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), rabbit_amqqueue:with_exclusive_access_or_die( @@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), @@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _CollectorPid, VHostPath, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 3ca607689c..63b182ef20 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -287,8 +287,7 @@ reductions(Name) -> 0 end. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. recover(Queues) -> [begin diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 9180f9ca0a..1721c9b806 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,10 +53,11 @@ recover(VHost) -> VHostStubFile = filename:join(VHostDir, ".vhost"), ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), - Qs = rabbit_amqqueue:recover(VHost), - QNames = [amqqueue:get_name(Q) || Q <- Qs], + {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost), + AllQs = RecoveredClassic ++ FailedClassic ++ Quorum, + QNames = [amqqueue:get_name(Q) || Q <- AllQs], ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), - ok = rabbit_amqqueue:start(Qs), + ok = rabbit_amqqueue:start(RecoveredClassic), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index c3f87cce59..d262e4c513 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -733,7 +733,8 @@ bq_queue_recover1(Config) -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), - rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)), + {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST), + rabbit_amqqueue:start(Recovered), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl new file mode 100644 index 0000000000..56b287e913 --- /dev/null +++ b/test/channel_source_SUITE.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(channel_source_SUITE). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + network_rabbit_reader_channel_source, + network_arbitrary_channel_source, + direct_channel_source, + undefined_channel_source + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +network_rabbit_reader_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, network_rabbit_reader_channel_source1, [Config]). + +network_rabbit_reader_channel_source1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +network_arbitrary_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, network_arbitrary_channel_source1, [Config]). + +network_arbitrary_channel_source1(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end), + {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id), + {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id), + {ok, Ch} = rabbit_channel:start_link( + 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1, + rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [], + Collector, Limiter), + _ = rabbit_channel:source(Ch, ?MODULE), + [{amqp_params, #amqp_params_network{username = <<"guest">>, + password = <<"guest">>, host = "localhost", virtual_host = <<"/">>}}] = + rabbit_amqp_connection:amqp_params(Conn, 1000), + [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]), + [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]], + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(Ch, ?MODULE), + passed. + +direct_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, direct_channel_source1, [Config]). + +direct_channel_source1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), + {ok, ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +undefined_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, undefined_channel_source1, [Config]). + +undefined_channel_source1(_Config) -> + ExistingChannels = rabbit_channel:list(), + {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, undefined}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + passed. diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 0cd274b757..9afddd9b1e 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -580,19 +580,6 @@ credential_validator.regexp = ^abc\\d+", ]}], []}, - {kernel_inet_dist_listen_min, - "inet_dist_listen_min = 16000", - [{kernel, [ - {inet_dist_listen_min, 16000} - ]}], - []}, - {kernel_inet_dist_listen_max, - "inet_dist_listen_max = 16100", - [{kernel, [ - {inet_dist_listen_max, 16100} - ]}], - []}, - {log_syslog_settings, "log.syslog = true log.syslog.identity = rabbitmq |
