diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-02-21 01:15:06 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2019-02-21 01:15:06 +0300 |
| commit | d3eb661efd89e87bc7859512bcdbe2f95fc5667d (patch) | |
| tree | 5f1ee9b2de715e03f98a76e5ac72616a849b7183 /src | |
| parent | 017545d13ace947c02fb64c239ab28bb3f10b8d2 (diff) | |
| parent | 9e4095fd906da893ca08b02836ce0716bbfac39f (diff) | |
| download | rabbitmq-server-git-d3eb661efd89e87bc7859512bcdbe2f95fc5667d.tar.gz | |
Merge branch 'master' into unavailable-qq-publish-fix
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 7 |
8 files changed, 223 insertions, 105 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 337786b571..5488a88836 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -108,12 +108,16 @@ warn_file_limit() -> ok end. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec recover(rabbit_types:vhost()) -> + {RecoveredClassic :: [amqqueue:amqqueue()], + FailedClassic :: [amqqueue:amqqueue()], + Quorum :: [amqqueue:amqqueue()]}. recover(VHost) -> - Classic = find_local_durable_classic_queues(VHost), + AllClassic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), - recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum). + {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic), + {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}. recover_classic_queues(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) -> BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)), + RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs], + FailedQueues = [Q || Q <- Queues, + not lists:member(amqqueue:get_name(Q), RecoveredNames)], + {RecoveredQs, FailedQueues}; {error, Reason} -> rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), throw({error, Reason}) end. -filter_per_type(Queues) -> - lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). - filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -156,12 +161,14 @@ stop(VHost) -> -spec start([amqqueue:amqqueue()]) -> 'ok'. start(Qs) -> - {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], ok. mark_local_durable_queues_stopped(VHost) -> @@ -609,14 +616,14 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) -> rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). -assert_equivalence(Q, Durable1, AD1, Args1, Owner) -> +assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) -> QName = amqqueue:get_name(Q), - Durable = amqqueue:is_durable(Q), - AD = amqqueue:is_auto_delete(Q), - rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable), - rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete), - assert_args_equivalence(Q, Args1), - check_exclusive_access(Q, Owner, strict). + DurableQ = amqqueue:is_durable(Q), + AutoDeleteQ = amqqueue:is_auto_delete(Q), + ok = check_exclusive_access(Q, Owner, strict), + ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable), + ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete), + ok = assert_args_equivalence(Q, Args1). -spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> 'ok' | rabbit_types:channel_exit(). @@ -633,7 +640,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) -> QueueName = amqqueue:get_name(Q), rabbit_misc:protocol_error( resource_locked, - "cannot obtain exclusive access to locked ~s", + "cannot obtain exclusive access to locked ~s. It could be originally " + "declared on another connection or the exclusive property value does not " + "match that of the original declaration.", [rabbit_misc:rs(QueueName)]). -spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ab3bc6c819..05db4188ba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -49,7 +49,6 @@ -type bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error( - 'binding_not_found' | {'binding_invalid', string(), [any()]}). -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: @@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) -> lock_resource(Src), lock_resource(Dst), [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], - case (SrcDurable andalso DstDurable andalso - mnesia:read({rabbit_durable_route, B}) =/= []) of - false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, - fun mnesia:write/3), - x_callback(transaction, Src, add_binding, B), - Serial = rabbit_exchange:serial(Src), - fun () -> - x_callback(Serial, Src, add_binding, B), - ok = rabbit_event:notify( - binding_created, - info(B) ++ [{user_who_performed_action, ActingUser}]) - end; - true -> rabbit_misc:const({error, binding_not_found}) + ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + fun mnesia:write/3), + x_callback(transaction, Src, add_binding, B), + Serial = rabbit_exchange:serial(Src), + fun () -> + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify( + binding_created, + info(B) ++ [{user_who_performed_action, ActingUser}]) end. -spec remove(rabbit_types:binding()) -> bind_res(). @@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) -> case mnesia:read(rabbit_route, B, write) of [] -> case mnesia:read(rabbit_durable_route, B, write) of [] -> rabbit_misc:const(ok); - _ -> rabbit_misc:const({error, binding_not_found}) + %% We still delete the binding and run + %% all post-delete functions if there is only + %% a durable route in the database + _ -> remove(Src, Dst, B, ActingUser) end; _ -> case InnerFun(Src, Dst) of ok -> remove(Src, Dst, B, ActingUser); @@ -275,9 +273,8 @@ list_for_source(SrcName) -> -spec list_for_destination (rabbit_types:binding_destination()) -> bindings(). -list_for_destination(DstName) -> - implicit_for_destination(DstName) ++ - mnesia:async_dirty( +list_for_destination(DstName = #resource{virtual_host = VHostPath}) -> + AllBindings = mnesia:async_dirty( fun() -> Route = #route{binding = #binding{destination = DstName, _ = '_'}}, @@ -285,7 +282,11 @@ list_for_destination(DstName) -> #reverse_route{reverse_binding = B} <- mnesia:match_object(rabbit_reverse_route, reverse_route(Route), read)] - end). + end), + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_for_destination(DstName) ++ Filtered. implicit_bindings(VHostPath) -> DstQueues = rabbit_amqqueue:list_names(VHostPath), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..8f5f159f88 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +79,7 @@ -export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor --export([handle_method/5]). +-export([handle_method/6]). -record(ch, { %% starting | running | flow | closing @@ -97,6 +98,9 @@ %% same as reader's name, see #v1.name %% in rabbit_reader conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, %% limiter pid, see rabbit_limiter limiter, %% none | {Msgs, Acks} | committing | failed | @@ -448,6 +452,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State = #ch{}) -> + noreply(State#ch{source = Source}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -938,11 +953,11 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -978,14 +993,17 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, Permission) -> + User, none, RoutingKey, _ChSrc, Permission) -> %% Called from outside the channel by mgmt API AmqpParams = [], check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> + AmqpParams = get_amqp_params(ConnPid, ChSrc), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); +check_topic_authorisation(_, _, _, _, _, _) -> + ok. + check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, AmqpParams, RoutingKey, Permission) -> @@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end; -check_topic_authorisation(_, _, _, _, _) -> - ok. + end. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, conn_name = ConnName, delivery_flow = Flow, conn_pid = ConnPid, + source = ChSrc, max_message_size = MaxMessageSize}) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, user = User, queue_collector_pid = CollectorPid, - conn_pid = ConnPid}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + conn_pid = ConnPid, + source = ChSrc}) -> + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - case handle_method(Method, ConnPid, CollectorPid, + case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), @@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, User), - ExchangeLookup = rabbit_exchange:lookup(ExchangeName), - case ExchangeLookup of + case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> - %% no-op - ExchangeLookup; + ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), - ExchangeLookup + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(source, #ch{source = ChSrc}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _CollectorPid, VHost, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, CollectorPid, VHostPath, + ConnPid, ChSrc, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _CollectorPid, VHostPath, _User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, _CollectorPid, VHostPath, + ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), @@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), @@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, _CollectorPid, VHostPath, User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), rabbit_amqqueue:with_exclusive_access_or_die( @@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), @@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _CollectorPid, VHostPath, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 61373e49c1..665eea12df 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -1472,14 +1472,18 @@ move_to_per_vhost_stores(#resource{} = QueueName) -> OldQueueDir = filename:join([queues_base_dir(), "queues", queue_name_to_dir_name_legacy(QueueName)]), NewQueueDir = queue_dir(QueueName), + rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'", + [OldQueueDir, NewQueueDir]), case rabbit_file:is_dir(OldQueueDir) of true -> ok = rabbit_file:ensure_dir(NewQueueDir), ok = rabbit_file:rename(OldQueueDir, NewQueueDir), ok = ensure_queue_name_stub_file(NewQueueDir, QueueName); false -> - rabbit_log:info("Queue index directory not found for queue ~p~n", - [QueueName]) + Msg = "Queue index directory '~s' not found for ~s~n", + Args = [OldQueueDir, rabbit_misc:rs(QueueName)], + rabbit_log_upgrade:error(Msg, Args), + rabbit_log:error(Msg, Args) end, ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e1c1c320f4..9e73981541 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,6 +37,8 @@ -export([requeue/3]). -export([policy_changed/2]). -export([cleanup_data_dir/0]). +-export([shrink_all/1, + grow/4]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -288,8 +290,7 @@ reductions(Name) -> 0 end. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. recover(Queues) -> [begin @@ -666,7 +667,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + [{RaName, N} || N <- QNodes]) of ok -> case ra:add_member(ServerRef, ServerId) of {ok, _, Leader} -> @@ -679,11 +680,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; + timeout -> + {error, timeout}; E -> %% TODO should we stop the ra process here? E end; - {error, _} = E -> + timeout -> + {error, timeout}; + E -> E end. @@ -713,20 +718,91 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case ra:leave_and_delete_server(ServerId) of - ok -> - Fun = fun(Q1) -> - amqqueue:set_quorum_nodes( - Q1, - lists:delete(Node, amqqueue:get_quorum_nodes(Q1))) - end, - rabbit_misc:execute_mnesia_transaction( - fun() -> rabbit_amqqueue:update(QName, Fun) end), - ok; - E -> - E + case amqqueue:get_quorum_nodes(Q) of + [Node] -> + %% deleting the last member is not allowed + {error, last_node}; + _ -> + case ra:leave_and_delete_server(ServerId) of + ok -> + Fun = fun(Q1) -> + amqqueue:set_quorum_nodes( + Q1, + lists:delete(Node, + amqqueue:get_quorum_nodes(Q1))) + end, + rabbit_misc:execute_mnesia_transaction( + fun() -> rabbit_amqqueue:update(QName, Fun) end), + ok; + timeout -> + {error, timeout}; + E -> + E + end end. +-spec shrink_all(node()) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +shrink_all(Node) -> + [begin + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: removing member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(amqqueue:get_quorum_nodes(Q)), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + lists:member(Node, amqqueue:get_quorum_nodes(Q))]. + +-spec grow(node(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +grow(Node, VhostSpec, QueueSpec, Strategy) -> + Running = rabbit_mnesia:cluster_nodes(running), + [begin + Size = length(amqqueue:get_quorum_nodes(Q)), + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning( + "~s: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + %% don't add a member if there is already one on the node + not lists:member(Node, amqqueue:get_quorum_nodes(Q)), + %% node needs to be running + lists:member(Node, Running), + matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + +get_resource_name(#resource{name = Name}) -> + Name. + +matches_strategy(all, _) -> true; +matches_strategy(even, Members) -> + length(Members) rem 2 == 0. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 9180f9ca0a..1721c9b806 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,10 +53,11 @@ recover(VHost) -> VHostStubFile = filename:join(VHostDir, ".vhost"), ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), - Qs = rabbit_amqqueue:recover(VHost), - QNames = [amqqueue:get_name(Q) || Q <- Qs], + {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost), + AllQs = RecoveredClassic ++ FailedClassic ++ Quorum, + QNames = [amqqueue:get_name(Q) || Q <- AllQs], ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), - ok = rabbit_amqqueue:start(Qs), + ok = rabbit_amqqueue:start(RecoveredClassic), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. |
