summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <alvaro@rabbitmq.com>2013-12-16 19:06:05 +0100
committerAlvaro Videla <alvaro@rabbitmq.com>2013-12-16 19:06:05 +0100
commitc00442b82bd636a8fff0242db0963837c7faa177 (patch)
treec65c43711959313d1b523b52cdfaa266719fbb6f /src
parent5bfde2116ad9f0e5e537058eaba6bb73cf38d817 (diff)
downloadrabbitmq-server-git-c00442b82bd636a8fff0242db0963837c7faa177.tar.gz
moves the expand_* functinos to happen before handle_method
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl102
1 files changed, 67 insertions, 35 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f71d85c4af..32d39ccbb6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -272,7 +272,9 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(Method, Content, State) of
+ %% handle MRDQ before calling handle method
+ Method2 = handle_expand_shortcuts(Method, State),
+ try handle_method(Method2, Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -282,7 +284,7 @@ handle_cast({method, Method, Content, Flow},
{stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
- MethodName = rabbit_misc:method_record_type(Method),
+ MethodName = rabbit_misc:method_record_type(Method2),
handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
@@ -519,14 +521,24 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+binding_to_resource(queue, DestinationNameBin, State) ->
+ queue_bin_to_resource(DestinationNameBin, State);
+binding_to_resource(exchange, DestinationNameBin, State) ->
+ exchange_bin_to_resource(DestinationNameBin, State).
+
+queue_bin_to_resource(QueueNameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, queue, QueueNameBin).
+
+exchange_bin_to_resource(ExchangeNameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, queue, ExchangeNameBin).
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ}) ->
- rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
- rabbit_misc:r(VHostPath, queue, QueueNameBin).
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_queue_name_shortcut(QueueNameBin, _) ->
+ QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
@@ -541,9 +553,46 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
expand_binding(queue, DestinationNameBin, RoutingKey, State) ->
{expand_queue_name_shortcut(DestinationNameBin, State),
expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)};
-expand_binding(exchange, DestinationNameBin, RoutingKey, State) ->
- {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin),
- RoutingKey}.
+expand_binding(exchange, DestinationNameBin, RoutingKey, _) ->
+ {DestinationNameBin, RoutingKey}.
+
+handle_expand_shortcuts(#'basic.get'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'basic.get'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'basic.consume'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'basic.consume'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'queue.delete'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'queue.delete'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'queue.purge'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'queue.purge'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'exchange.bind'{destination = DestinationNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(exchange, DestinationNameBin, RoutingKey, State),
+ Method#'exchange.bind'{destination = DestinationName,
+ routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'exchange.unbind'{destination = DestinationNameBin,
+ routing_key = RoutingKey} = Method, State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(exchange, DestinationNameBin, RoutingKey, State),
+ Method#'exchange.unbind'{destination = DestinationName,
+ routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'queue.bind'{queue = QueueNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(queue, QueueNameBin, RoutingKey, State),
+ Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'queue.unbind'{queue = QueueNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(queue, QueueNameBin, RoutingKey, State),
+ Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}.
check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
rabbit_misc:protocol_error(
@@ -716,8 +765,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
conn_pid = ConnPid,
limiter = Limiter,
next_tag = DeliveryTag}) ->
- OrigQN = expand_queue_name_shortcut(QueueNameBin, State),
- QueueName = intercept_method('basic_get', OrigQN),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -755,8 +803,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- OrigQN = expand_queue_name_shortcut(QueueNameBin, State),
- QueueName = intercept_method('basic_consume', OrigQN),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
@@ -997,8 +1044,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
"amq.gen");
Other -> check_name('queue', Other)
end,
- OrigQN = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- QueueName = intercept_method('queue_declare', OrigQN),
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with(
QueueName,
@@ -1057,8 +1103,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid}) ->
- OrigQN = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- QueueName = intercept_method('queue_declare', OrigQN),
+ QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
@@ -1071,8 +1116,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- OrigQN = expand_queue_name_shortcut(QueueNameBin, State),
- QueueName = intercept_method('queue_delete', OrigQN),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with(
QueueName,
@@ -1112,8 +1156,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- OrigQN = expand_queue_name_shortcut(QueueNameBin, State),
- QueueName = intercept_method('queue_purge', OrigQN),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -1287,16 +1330,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- {OrigDName, ActualRoutingKey} =
- expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
- DestinationName = intercept_binding_method(OrigDName, DestinationType, ReturnMethod),
+ DestinationName = binding_to_resource(DestinationType, DestinationNameBin, State),
check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, State),
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
- key = ActualRoutingKey,
+ key = RoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
@@ -1687,15 +1728,6 @@ erase_queue_stats(QName) ->
{{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
-intercept_binding_method(OrigDName, queue, #'queue.bind_ok'{}) ->
- intercept_method('queue_bind', OrigDName);
-
-intercept_binding_method(OrigDName, queue, #'queue.unbind_ok'{}) ->
- intercept_method('queue_unbind', OrigDName);
-
-intercept_binding_method(OrigDName, _DestinationType, _Method) ->
- OrigDName.
-
intercept_method(M, Q) ->
case rabbit_channel_interceptor:run_filter_chain(M, Q,
rabbit_channel_interceptor:select(Q, M)) of