diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-07-10 14:55:04 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-10 14:55:04 +0300 |
| commit | a451a4b1158682d5931d59f00dfdfc411eadaef4 (patch) | |
| tree | 871bd7919bdde79df258b2589861c30c273f2e55 | |
| parent | 99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4 (diff) | |
| parent | caac1a65deb143a8750821174bfbb8a66a45d7c7 (diff) | |
| download | rabbitmq-server-git-a451a4b1158682d5931d59f00dfdfc411eadaef4.tar.gz | |
Merge pull request #1286 from rabbitmq/rabbitmq-management-421
Extract common parts of handle_method to be used directly from HTTP API
| -rw-r--r-- | src/rabbit_channel.erl | 607 |
1 files changed, 338 insertions, 269 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 378c1e583b..00a6607dfb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -70,6 +70,9 @@ %% For testing -export([build_topic_variable_map/3]). +%% Mgmt HTTP API refactor +-export([handle_method/5]). + -record(ch, { %% starting | running | flow | closing state, @@ -741,20 +744,20 @@ clear_permission_cache() -> erase(permission_cache), erase(topic_permission_cache), ok. -check_configure_permitted(Resource, #ch{user = User}) -> +check_configure_permitted(Resource, User) -> check_resource_access(User, Resource, configure). -check_write_permitted(Resource, #ch{user = User}) -> +check_write_permitted(Resource, User) -> check_resource_access(User, Resource, write). -check_read_permitted(Resource, #ch{user = User}) -> +check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, Channel, RoutingKey) -> - check_topic_authorisation(Resource, Channel, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). -check_read_permitted_on_topic(Resource, Channel, RoutingKey) -> - check_topic_authorisation(Resource, Channel, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -790,12 +793,19 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, - #ch{user = User = #user{username = Username}, conn_pid = ConnPid}, + User = #user{username = Username}, + ConnPid, RoutingKey, Permission) -> Resource = Name#resource{kind = topic}, - Timeout = get_operation_timeout(), - AmqpParams = rabbit_amqp_connection:amqp_params(ConnPid, Timeout), + Timeout = get_operation_timeout(), + AmqpParams = case ConnPid of + none -> + %% Called from outside the channel by mgmt API + []; + _ -> + rabbit_amqp_connection:amqp_params(ConnPid, Timeout) + end, VariableMap = build_topic_variable_map(AmqpParams, VHost, Username), Context = #{routing_key => RoutingKey, variable_map => VariableMap @@ -811,7 +821,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) end; -check_topic_authorisation(_, _, _, _) -> +check_topic_authorisation(_, _, _, _, _) -> ok. build_topic_variable_map(AmqpParams, VHost, Username) -> @@ -844,10 +854,10 @@ check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> end. -qbin_to_resource(QueueNameBin, State) -> - name_to_resource(queue, QueueNameBin, State). +qbin_to_resource(QueueNameBin, VHostPath) -> + name_to_resource(queue, QueueNameBin, VHostPath). -name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> +name_to_resource(Type, NameBin, VHostPath) -> rabbit_misc:r(VHostPath, Type, NameBin). expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> @@ -1002,15 +1012,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, channel = ChannelNum, confirm_enabled = ConfirmEnabled, trace_state = TraceState, - user = #user{username = Username}, + user = #user{username = Username} = User, conn_name = ConnName, - delivery_flow = Flow}) -> + delivery_flow = Flow, + conn_pid = ConnPid}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, State), + check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, State, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1063,9 +1074,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, limiter = Limiter, - next_tag = DeliveryTag}) -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), + next_tag = DeliveryTag, + user = User, + virtual_host = VHostPath}) -> + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get( @@ -1148,11 +1161,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait, arguments = Args}, _, State = #ch{consumer_prefetch = ConsumerPrefetch, - consumer_mapping = ConsumerMapping}) -> + consumer_mapping = ConsumerMapping, + user = User, + virtual_host = VHostPath}) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), @@ -1273,251 +1288,81 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, false, State); -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, - passive = false, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - nowait = NoWait, - arguments = Args}, +handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, - user = #user{username = Username}}) -> - CheckedType = rabbit_exchange:check_type(TypeNameBin), - ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), - check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, State), - X = case rabbit_exchange:lookup(ExchangeName) of - {ok, FoundX} -> FoundX; - {error, not_found} -> - check_name('exchange', strip_cr_lf(ExchangeNameBin)), - AeKey = <<"alternate-exchange">>, - case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of - undefined -> ok; - {error, {invalid_type, Type}} -> - precondition_failed( - "invalid type '~s' for arg '~s' in ~s", - [Type, AeKey, rabbit_misc:rs(ExchangeName)]); - AName -> check_read_permitted(ExchangeName, State), - check_write_permitted(AName, State), - ok - end, - rabbit_exchange:declare(ExchangeName, - CheckedType, - Durable, - AutoDelete, - Internal, - Args, - Username) - end, - ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Internal, Args), - return_ok(State, NoWait, #'exchange.declare_ok'{}); - -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - passive = true, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath}) -> - ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), - check_not_default_exchange(ExchangeName), - _ = rabbit_exchange:lookup_or_die(ExchangeName), + user = User, + queue_collector_pid = CollectorPid, + conn_pid = ConnPid}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{exchange = ExchangeNameBin, - if_unused = IfUnused, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath, - user = #user{username = Username}}) -> - StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), - ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), - check_not_default_exchange(ExchangeName), - check_exchange_deletion(ExchangeName), - check_configure_permitted(ExchangeName, State), - case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of - {error, not_found} -> - return_ok(State, NoWait, #'exchange.delete_ok'{}); - {error, in_use} -> - precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); - ok -> - return_ok(State, NoWait, #'exchange.delete_ok'{}) - end; - -handle_method(#'exchange.bind'{destination = DestinationNameBin, - source = SourceNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/3, - strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey, - Arguments, #'exchange.bind_ok'{}, NoWait, State); - -handle_method(#'exchange.unbind'{destination = DestinationNameBin, - source = SourceNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/3, - strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey, - Arguments, #'exchange.unbind_ok'{}, NoWait, State); - -%% Note that all declares to these are effectively passive. If it -%% exists it by definition has one consumer. -handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", - _/binary>> = QueueNameBin, - nowait = NoWait}, _, - State = #ch{virtual_host = VHost}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), - case declare_fast_reply_to(StrippedQueueNameBin) of - exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State); - not_found -> rabbit_misc:not_found(QueueName) - end; +handle_method(#'exchange.delete'{nowait = NoWait} = Method, + _, State = #ch{conn_pid = ConnPid, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.delete_ok'{}); -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = DurableDeclare, - exclusive = ExclusiveDeclare, - auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args} = Declare, +handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, queue_collector_pid = CollectorPid, - user = #user{username = Username}}) -> - Owner = case ExclusiveDeclare of - true -> ConnPid; - false -> none - end, - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - Durable = DurableDeclare andalso not ExclusiveDeclare, - ActualNameBin = case StrippedQueueNameBin of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.gen"); - Other -> check_name('queue', Other) - end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( - Q, Durable, AutoDelete, Args, Owner), - maybe_stat(NoWait, Q) - end) of - {ok, MessageCount, ConsumerCount} -> - return_queue_declare_ok(QueueName, NoWait, MessageCount, - ConsumerCount, State); - {error, not_found} -> - %% enforce the limit for newly declared queues only - check_vhost_queue_limit(QueueName, VHostPath), - DlxKey = <<"x-dead-letter-exchange">>, - case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of - undefined -> - ok; - {error, {invalid_type, Type}} -> - precondition_failed( - "invalid type '~s' for arg '~s' in ~s", - [Type, DlxKey, rabbit_misc:rs(QueueName)]); - DLX -> - check_read_permitted(QueueName, State), - check_write_permitted(DLX, State), - ok - end, - case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner, Username) of - {new, #amqqueue{pid = QPid}} -> - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as - %% the connection shuts down. - ok = case Owner of - none -> ok; - _ -> rabbit_queue_collector:register( - CollectorPid, QPid) - end, - return_queue_declare_ok(QueueName, NoWait, 0, 0, State); - {existing, _Q} -> - %% must have been created between the stat and the - %% declare. Loop around again. - handle_method(Declare, none, State); - {absent, Q, Reason} -> - rabbit_misc:absent(Q, Reason); - {owner_died, _Q} -> - %% Presumably our own days are numbered since the - %% connection has died. Pretend the queue exists though, - %% just so nothing fails. - return_queue_declare_ok(QueueName, NoWait, 0, 0, State) - end; - {error, {absent, Q, Reason}} -> - rabbit_misc:absent(Q, Reason) - end; + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.bind_ok'{}); -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = true, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), - {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = - rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), - ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, - State); +handle_method(#'exchange.unbind'{nowait = NoWait} = Method, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid, + queue_collector_pid = CollectorPid, + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.unbind_ok'{}); -handle_method(#'queue.delete'{queue = QueueNameBin, - if_unused = IfUnused, - if_empty = IfEmpty, - nowait = NoWait}, +handle_method(#'queue.declare'{nowait = NoWait} = Method, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid, + queue_collector_pid = CollectorPid, + user = User}) -> + {ok, QueueName, MessageCount, ConsumerCount} = + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + +handle_method(#'queue.delete'{nowait = NoWait} = Method, _, + State = #ch{conn_pid = ConnPid, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}) -> + {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid, + VHostPath, User), + return_ok(State, NoWait, + #'queue.delete_ok'{message_count = PurgedMessageCount}); + +handle_method(#'queue.bind'{nowait = NoWait} = Method, _, + State = #ch{conn_pid = ConnPid, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'queue.bind_ok'{}); + +handle_method(#'queue.unbind'{} = Method, _, + State = #ch{conn_pid = ConnPid, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, false, #'queue.unbind_ok'{}); + +handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, - user = #user{username = Username}}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = qbin_to_resource(StrippedQueueNameBin, State), - check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username) - end, - fun (not_found) -> {ok, 0}; - ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) - end) of - {error, in_use} -> - precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); - {error, not_empty} -> - precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); - {ok, PurgedMessageCount} -> - return_ok(State, NoWait, - #'queue.delete_ok'{message_count = PurgedMessageCount}) - end; - -handle_method(#'queue.bind'{queue = QueueNameBin, - exchange = ExchangeNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/3, - strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments, - #'queue.bind_ok'{}, NoWait, State); - -handle_method(#'queue.unbind'{queue = QueueNameBin, - exchange = ExchangeNameBin, - routing_key = RoutingKey, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/3, - strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments, - #'queue.unbind_ok'{}, false, State); - -handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{conn_pid = ConnPid}) -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:purge(Q) end), + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid, + VHostPath, User), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -1713,23 +1558,23 @@ queue_down_consumer_action(CTag, CMap) -> handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, - RoutingKey, Arguments, ReturnMethod, NoWait, - State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - user = #user{username = Username}}) -> - DestinationName = name_to_resource(DestinationType, DestinationNameBin, State), - check_write_permitted(DestinationName, State), +binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, + RoutingKey, Arguments, VHostPath, ConnPid, + #user{username = Username} = User) -> + ExchangeNameBin = strip_cr_lf(SourceNameBin0), + DestinationNameBin = strip_cr_lf(DestinationNameBin0), + DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath), + check_write_permitted(DestinationName, User), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], - check_read_permitted(ExchangeName, State), + check_read_permitted(ExchangeName, User), ExchangeLookup = rabbit_exchange:lookup(ExchangeName), case ExchangeLookup of {error, not_found} -> %% no-op ExchangeLookup; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, State, RoutingKey), + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), ExchangeLookup end, case Fun(#binding{source = ExchangeName, @@ -1757,7 +1602,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); - ok -> return_ok(State, NoWait, ReturnMethod) + ok -> + ok end. basic_return(#basic_message{exchange_name = ExchangeName, @@ -2148,3 +1994,226 @@ put_operation_timeout() -> get_operation_timeout() -> get(channel_operation_timeout). + +%% Refactored and exported to allow direct calls from the HTTP API, +%% avoiding the usage of AMQP 0-9-1 from the management. +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:add/3, + SourceNameBin, exchange, DestinationNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:remove/3, + SourceNameBin, exchange, DestinationNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:remove/3, + ExchangeNameBin, queue, QueueNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:add/3, + ExchangeNameBin, queue, QueueNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +%% Note that all declares to these are effectively passive. If it +%% exists it by definition has one consumer. +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin}, + _ConnPid, _CollectorPid, VHost, _User) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), + case declare_fast_reply_to(StrippedQueueNameBin) of + exists -> {ok, QueueName, 0, 1}; + not_found -> rabbit_misc:not_found(QueueName) + end; +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = DurableDeclare, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args} = Declare, + ConnPid, CollectorPid, VHostPath, #user{username = Username} = User) -> + Owner = case ExclusiveDeclare of + true -> ConnPid; + false -> none + end, + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + Durable = DurableDeclare andalso not ExclusiveDeclare, + ActualNameBin = case StrippedQueueNameBin of + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); + Other -> check_name('queue', Other) + end, + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, User), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + maybe_stat(NoWait, Q) + end) of + {ok, MessageCount, ConsumerCount} -> + {ok, QueueName, MessageCount, ConsumerCount}; + {error, not_found} -> + %% enforce the limit for newly declared queues only + check_vhost_queue_limit(QueueName, VHostPath), + DlxKey = <<"x-dead-letter-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of + undefined -> + ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, DlxKey, rabbit_misc:rs(QueueName)]); + DLX -> + check_read_permitted(QueueName, User), + check_write_permitted(DLX, User), + ok + end, + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner, Username) of + {new, #amqqueue{pid = QPid}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case {Owner, CollectorPid} of + {none, _} -> ok; + {_, none} -> ok; %% Supports call from mgmt API + _ -> rabbit_queue_collector:register( + CollectorPid, QPid) + end, + {ok, QueueName, 0, 0}; + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, ConnPid, CollectorPid, VHostPath, User); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); + {owner_died, _Q} -> + %% Presumably our own days are numbered since the + %% connection has died. Pretend the queue exists though, + %% just so nothing fails. + {ok, QueueName, 0, 0} + end; + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) + end; +handle_method(#'queue.declare'{queue = QueueNameBin, + nowait = NoWait, + passive = true}, + ConnPid, _CollectorPid, VHostPath, _User) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + {ok, QueueName, MessageCount, ConsumerCount}; +handle_method(#'queue.delete'{queue = QueueNameBin, + if_unused = IfUnused, + if_empty = IfEmpty}, + ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), + + check_configure_permitted(QueueName, User), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username) + end, + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + end) of + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); + {ok, _Count} = OK -> + OK + end; +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, + if_unused = IfUnused}, + _ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) -> + StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), + ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), + check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), + check_configure_permitted(ExchangeName, User), + case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of + {error, not_found} -> + ok; + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); + ok -> + ok + end; +handle_method(#'queue.purge'{queue = QueueNameBin}, + ConnPid, _CollectorPid, VHostPath, User) -> + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), + rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end); +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}, + _ConnPid, _CollectorPid, VHostPath, #user{username = Username} = User) -> + CheckedType = rabbit_exchange:check_type(TypeNameBin), + ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, User), + X = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> FoundX; + {error, not_found} -> + check_name('exchange', strip_cr_lf(ExchangeNameBin)), + AeKey = <<"alternate-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of + undefined -> ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, AeKey, rabbit_misc:rs(ExchangeName)]); + AName -> check_read_permitted(ExchangeName, User), + check_write_permitted(AName, User), + ok + end, + rabbit_exchange:declare(ExchangeName, + CheckedType, + Durable, + AutoDelete, + Internal, + Args, + Username) + end, + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, + AutoDelete, Internal, Args); +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + passive = true}, + _ConnPid, _CollectorPid, VHostPath, _User) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), + check_not_default_exchange(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName). |
