summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-07-10 14:55:04 +0300
committerGitHub <noreply@github.com>2017-07-10 14:55:04 +0300
commita451a4b1158682d5931d59f00dfdfc411eadaef4 (patch)
tree871bd7919bdde79df258b2589861c30c273f2e55 /src
parent99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4 (diff)
parentcaac1a65deb143a8750821174bfbb8a66a45d7c7 (diff)
downloadrabbitmq-server-git-a451a4b1158682d5931d59f00dfdfc411eadaef4.tar.gz
Merge pull request #1286 from rabbitmq/rabbitmq-management-421
Extract common parts of handle_method to be used directly from HTTP API
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl607
1 files changed, 338 insertions, 269 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 378c1e583b..00a6607dfb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -70,6 +70,9 @@
%% For testing
-export([build_topic_variable_map/3]).
+%% Mgmt HTTP API refactor
+-export([handle_method/5]).
+
-record(ch, {
%% starting | running | flow | closing
state,
@@ -741,20 +744,20 @@ clear_permission_cache() -> erase(permission_cache),
erase(topic_permission_cache),
ok.
-check_configure_permitted(Resource, #ch{user = User}) ->
+check_configure_permitted(Resource, User) ->
check_resource_access(User, Resource, configure).
-check_write_permitted(Resource, #ch{user = User}) ->
+check_write_permitted(Resource, User) ->
check_resource_access(User, Resource, write).
-check_read_permitted(Resource, #ch{user = User}) ->
+check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, Channel, RoutingKey) ->
- check_topic_authorisation(Resource, Channel, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
-check_read_permitted_on_topic(Resource, Channel, RoutingKey) ->
- check_topic_authorisation(Resource, Channel, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -790,12 +793,19 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
- #ch{user = User = #user{username = Username}, conn_pid = ConnPid},
+ User = #user{username = Username},
+ ConnPid,
RoutingKey,
Permission) ->
Resource = Name#resource{kind = topic},
- Timeout = get_operation_timeout(),
- AmqpParams = rabbit_amqp_connection:amqp_params(ConnPid, Timeout),
+ Timeout = get_operation_timeout(),
+ AmqpParams = case ConnPid of
+ none ->
+ %% Called from outside the channel by mgmt API
+ [];
+ _ ->
+ rabbit_amqp_connection:amqp_params(ConnPid, Timeout)
+ end,
VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
Context = #{routing_key => RoutingKey,
variable_map => VariableMap
@@ -811,7 +821,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
end;
-check_topic_authorisation(_, _, _, _) ->
+check_topic_authorisation(_, _, _, _, _) ->
ok.
build_topic_variable_map(AmqpParams, VHost, Username) ->
@@ -844,10 +854,10 @@ check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
end.
-qbin_to_resource(QueueNameBin, State) ->
- name_to_resource(queue, QueueNameBin, State).
+qbin_to_resource(QueueNameBin, VHostPath) ->
+ name_to_resource(queue, QueueNameBin, VHostPath).
-name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
+name_to_resource(Type, NameBin, VHostPath) ->
rabbit_misc:r(VHostPath, Type, NameBin).
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
@@ -1002,15 +1012,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
- user = #user{username = Username},
+ user = #user{username = Username} = User,
conn_name = ConnName,
- delivery_flow = Flow}) ->
+ delivery_flow = Flow,
+ conn_pid = ConnPid}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, State),
+ check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, State, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1063,9 +1074,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
limiter = Limiter,
- next_tag = DeliveryTag}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
+ next_tag = DeliveryTag,
+ user = User,
+ virtual_host = VHostPath}) ->
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(
@@ -1148,11 +1161,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{consumer_prefetch = ConsumerPrefetch,
- consumer_mapping = ConsumerMapping}) ->
+ consumer_mapping = ConsumerMapping,
+ user = User,
+ virtual_host = VHostPath}) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
@@ -1273,251 +1288,81 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
_, State) ->
reject(DeliveryTag, Requeue, false, State);
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
- passive = false,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- nowait = NoWait,
- arguments = Args},
+handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
- user = #user{username = Username}}) ->
- CheckedType = rabbit_exchange:check_type(TypeNameBin),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- X = case rabbit_exchange:lookup(ExchangeName) of
- {ok, FoundX} -> FoundX;
- {error, not_found} ->
- check_name('exchange', strip_cr_lf(ExchangeNameBin)),
- AeKey = <<"alternate-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
- undefined -> ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> check_read_permitted(ExchangeName, State),
- check_write_permitted(AName, State),
- ok
- end,
- rabbit_exchange:declare(ExchangeName,
- CheckedType,
- Durable,
- AutoDelete,
- Internal,
- Args,
- Username)
- end,
- ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Internal, Args),
- return_ok(State, NoWait, #'exchange.declare_ok'{});
-
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath}) ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
- check_not_default_exchange(ExchangeName),
- _ = rabbit_exchange:lookup_or_die(ExchangeName),
+ user = User,
+ queue_collector_pid = CollectorPid,
+ conn_pid = ConnPid}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
- if_unused = IfUnused,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- user = #user{username = Username}}) ->
- StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
- {error, not_found} ->
- return_ok(State, NoWait, #'exchange.delete_ok'{});
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
- ok ->
- return_ok(State, NoWait, #'exchange.delete_ok'{})
- end;
-
-handle_method(#'exchange.bind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/3,
- strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey,
- Arguments, #'exchange.bind_ok'{}, NoWait, State);
-
-handle_method(#'exchange.unbind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/3,
- strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey,
- Arguments, #'exchange.unbind_ok'{}, NoWait, State);
-
-%% Note that all declares to these are effectively passive. If it
-%% exists it by definition has one consumer.
-handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
- _/binary>> = QueueNameBin,
- nowait = NoWait}, _,
- State = #ch{virtual_host = VHost}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
- case declare_fast_reply_to(StrippedQueueNameBin) of
- exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State);
- not_found -> rabbit_misc:not_found(QueueName)
- end;
+handle_method(#'exchange.delete'{nowait = NoWait} = Method,
+ _, State = #ch{conn_pid = ConnPid,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.delete_ok'{});
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = DurableDeclare,
- exclusive = ExclusiveDeclare,
- auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args} = Declare,
+handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
queue_collector_pid = CollectorPid,
- user = #user{username = Username}}) ->
- Owner = case ExclusiveDeclare of
- true -> ConnPid;
- false -> none
- end,
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- Durable = DurableDeclare andalso not ExclusiveDeclare,
- ActualNameBin = case StrippedQueueNameBin of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.gen");
- Other -> check_name('queue', Other)
- end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
- Q, Durable, AutoDelete, Args, Owner),
- maybe_stat(NoWait, Q)
- end) of
- {ok, MessageCount, ConsumerCount} ->
- return_queue_declare_ok(QueueName, NoWait, MessageCount,
- ConsumerCount, State);
- {error, not_found} ->
- %% enforce the limit for newly declared queues only
- check_vhost_queue_limit(QueueName, VHostPath),
- DlxKey = <<"x-dead-letter-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
- undefined ->
- ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, DlxKey, rabbit_misc:rs(QueueName)]);
- DLX ->
- check_read_permitted(QueueName, State),
- check_write_permitted(DLX, State),
- ok
- end,
- case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner, Username) of
- {new, #amqqueue{pid = QPid}} ->
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as
- %% the connection shuts down.
- ok = case Owner of
- none -> ok;
- _ -> rabbit_queue_collector:register(
- CollectorPid, QPid)
- end,
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
- {existing, _Q} ->
- %% must have been created between the stat and the
- %% declare. Loop around again.
- handle_method(Declare, none, State);
- {absent, Q, Reason} ->
- rabbit_misc:absent(Q, Reason);
- {owner_died, _Q} ->
- %% Presumably our own days are numbered since the
- %% connection has died. Pretend the queue exists though,
- %% just so nothing fails.
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
- end;
- {error, {absent, Q, Reason}} ->
- rabbit_misc:absent(Q, Reason)
- end;
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.bind_ok'{});
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
- {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
- rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
- State);
+handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.unbind_ok'{});
-handle_method(#'queue.delete'{queue = QueueNameBin,
- if_unused = IfUnused,
- if_empty = IfEmpty,
- nowait = NoWait},
+handle_method(#'queue.declare'{nowait = NoWait} = Method,
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ {ok, QueueName, MessageCount, ConsumerCount} =
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+
+handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
+ VHostPath, User),
+ return_ok(State, NoWait,
+ #'queue.delete_ok'{message_count = PurgedMessageCount});
+
+handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'queue.bind_ok'{});
+
+handle_method(#'queue.unbind'{} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, false, #'queue.unbind_ok'{});
+
+handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
- user = #user{username = Username}}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = qbin_to_resource(StrippedQueueNameBin, State),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username)
- end,
- fun (not_found) -> {ok, 0};
- ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
- {ok, 0};
- ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
- end) of
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
- {error, not_empty} ->
- precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
- {ok, PurgedMessageCount} ->
- return_ok(State, NoWait,
- #'queue.delete_ok'{message_count = PurgedMessageCount})
- end;
-
-handle_method(#'queue.bind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/3,
- strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments,
- #'queue.bind_ok'{}, NoWait, State);
-
-handle_method(#'queue.unbind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/3,
- strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments,
- #'queue.unbind_ok'{}, false, State);
-
-handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
- _, State = #ch{conn_pid = ConnPid}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
+ VHostPath, User),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -1713,23 +1558,23 @@ queue_down_consumer_action(CTag, CMap) ->
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
- RoutingKey, Arguments, ReturnMethod, NoWait,
- State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- user = #user{username = Username}}) ->
- DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
- check_write_permitted(DestinationName, State),
+binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
+ RoutingKey, Arguments, VHostPath, ConnPid,
+ #user{username = Username} = User) ->
+ ExchangeNameBin = strip_cr_lf(SourceNameBin0),
+ DestinationNameBin = strip_cr_lf(DestinationNameBin0),
+ DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath),
+ check_write_permitted(DestinationName, User),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, State),
+ check_read_permitted(ExchangeName, User),
ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
case ExchangeLookup of
{error, not_found} ->
%% no-op
ExchangeLookup;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, State, RoutingKey),
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
ExchangeLookup
end,
case Fun(#binding{source = ExchangeName,
@@ -1757,7 +1602,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
- ok -> return_ok(State, NoWait, ReturnMethod)
+ ok ->
+ ok
end.
basic_return(#basic_message{exchange_name = ExchangeName,
@@ -2148,3 +1994,226 @@ put_operation_timeout() ->
get_operation_timeout() ->
get(channel_operation_timeout).
+
+%% Refactored and exported to allow direct calls from the HTTP API,
+%% avoiding the usage of AMQP 0-9-1 from the management.
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:add/3,
+ SourceNameBin, exchange, DestinationNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:remove/3,
+ SourceNameBin, exchange, DestinationNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:remove/3,
+ ExchangeNameBin, queue, QueueNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'queue.bind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:add/3,
+ ExchangeNameBin, queue, QueueNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+%% Note that all declares to these are effectively passive. If it
+%% exists it by definition has one consumer.
+handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
+ _/binary>> = QueueNameBin},
+ _ConnPid, _CollectorPid, VHost, _User) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
+ case declare_fast_reply_to(StrippedQueueNameBin) of
+ exists -> {ok, QueueName, 0, 1};
+ not_found -> rabbit_misc:not_found(QueueName)
+ end;
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = DurableDeclare,
+ exclusive = ExclusiveDeclare,
+ auto_delete = AutoDelete,
+ nowait = NoWait,
+ arguments = Args} = Declare,
+ ConnPid, CollectorPid, VHostPath, #user{username = Username} = User) ->
+ Owner = case ExclusiveDeclare of
+ true -> ConnPid;
+ false -> none
+ end,
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ Durable = DurableDeclare andalso not ExclusiveDeclare,
+ ActualNameBin = case StrippedQueueNameBin of
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
+ Other -> check_name('queue', Other)
+ end,
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, User),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ maybe_stat(NoWait, Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ {ok, QueueName, MessageCount, ConsumerCount};
+ {error, not_found} ->
+ %% enforce the limit for newly declared queues only
+ check_vhost_queue_limit(QueueName, VHostPath),
+ DlxKey = <<"x-dead-letter-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
+ undefined ->
+ ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, DlxKey, rabbit_misc:rs(QueueName)]);
+ DLX ->
+ check_read_permitted(QueueName, User),
+ check_write_permitted(DLX, User),
+ ok
+ end,
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner, Username) of
+ {new, #amqqueue{pid = QPid}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case {Owner, CollectorPid} of
+ {none, _} -> ok;
+ {_, none} -> ok; %% Supports call from mgmt API
+ _ -> rabbit_queue_collector:register(
+ CollectorPid, QPid)
+ end,
+ {ok, QueueName, 0, 0};
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, ConnPid, CollectorPid, VHostPath, User);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
+ {owner_died, _Q} ->
+ %% Presumably our own days are numbered since the
+ %% connection has died. Pretend the queue exists though,
+ %% just so nothing fails.
+ {ok, QueueName, 0, 0}
+ end;
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
+ end;
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ nowait = NoWait,
+ passive = true},
+ ConnPid, _CollectorPid, VHostPath, _User) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ {ok, QueueName, MessageCount, ConsumerCount};
+handle_method(#'queue.delete'{queue = QueueNameBin,
+ if_unused = IfUnused,
+ if_empty = IfEmpty},
+ ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
+
+ check_configure_permitted(QueueName, User),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username)
+ end,
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ end) of
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
+ {error, not_empty} ->
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
+ {ok, _Count} = OK ->
+ OK
+ end;
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
+ if_unused = IfUnused},
+ _ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
+ StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
+ check_configure_permitted(ExchangeName, User),
+ case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
+ {error, not_found} ->
+ ok;
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
+ ok ->
+ ok
+ end;
+handle_method(#'queue.purge'{queue = QueueNameBin},
+ ConnPid, _CollectorPid, VHostPath, User) ->
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
+ rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end);
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ type = TypeNameBin,
+ passive = false,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args},
+ _ConnPid, _CollectorPid, VHostPath, #user{username = Username} = User) ->
+ CheckedType = rabbit_exchange:check_type(TypeNameBin),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, User),
+ X = case rabbit_exchange:lookup(ExchangeName) of
+ {ok, FoundX} -> FoundX;
+ {error, not_found} ->
+ check_name('exchange', strip_cr_lf(ExchangeNameBin)),
+ AeKey = <<"alternate-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
+ undefined -> ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
+ AName -> check_read_permitted(ExchangeName, User),
+ check_write_permitted(AName, User),
+ ok
+ end,
+ rabbit_exchange:declare(ExchangeName,
+ CheckedType,
+ Durable,
+ AutoDelete,
+ Internal,
+ Args,
+ Username)
+ end,
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
+ AutoDelete, Internal, Args);
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ passive = true},
+ _ConnPid, _CollectorPid, VHostPath, _User) ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
+ check_not_default_exchange(ExchangeName),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName).