diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 102 |
1 files changed, 67 insertions, 35 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f71d85c4af..32d39ccbb6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -272,7 +272,9 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(Method, Content, State) of + %% handle MRDQ before calling handle method + Method2 = handle_expand_shortcuts(Method, State), + try handle_method(Method2, Content, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), noreply(NewState); @@ -282,7 +284,7 @@ handle_cast({method, Method, Content, Flow}, {stop, normal, State} catch exit:Reason = #amqp_error{} -> - MethodName = rabbit_misc:method_record_type(Method), + MethodName = rabbit_misc:method_record_type(Method2), handle_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} @@ -519,14 +521,24 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +binding_to_resource(queue, DestinationNameBin, State) -> + queue_bin_to_resource(DestinationNameBin, State); +binding_to_resource(exchange, DestinationNameBin, State) -> + exchange_bin_to_resource(DestinationNameBin, State). + +queue_bin_to_resource(QueueNameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, queue, QueueNameBin). + +exchange_bin_to_resource(ExchangeNameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, queue, ExchangeNameBin). + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, - most_recently_declared_queue = MRDQ}) -> - rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> - rabbit_misc:r(VHostPath, queue, QueueNameBin). +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_queue_name_shortcut(QueueNameBin, _) -> + QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> @@ -541,9 +553,46 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> expand_binding(queue, DestinationNameBin, RoutingKey, State) -> {expand_queue_name_shortcut(DestinationNameBin, State), expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)}; -expand_binding(exchange, DestinationNameBin, RoutingKey, State) -> - {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin), - RoutingKey}. +expand_binding(exchange, DestinationNameBin, RoutingKey, _) -> + {DestinationNameBin, RoutingKey}. + +handle_expand_shortcuts(#'basic.get'{queue = QueueNameBin} = Method, State) -> + setelement(#'basic.get'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'basic.consume'{queue = QueueNameBin} = Method, State) -> + setelement(#'basic.consume'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'queue.delete'{queue = QueueNameBin} = Method, State) -> + setelement(#'queue.delete'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'queue.purge'{queue = QueueNameBin} = Method, State) -> + setelement(#'queue.purge'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'exchange.bind'{destination = DestinationNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(exchange, DestinationNameBin, RoutingKey, State), + Method#'exchange.bind'{destination = DestinationName, + routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'exchange.unbind'{destination = DestinationNameBin, + routing_key = RoutingKey} = Method, State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(exchange, DestinationNameBin, RoutingKey, State), + Method#'exchange.unbind'{destination = DestinationName, + routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'queue.bind'{queue = QueueNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(queue, QueueNameBin, RoutingKey, State), + Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'queue.unbind'{queue = QueueNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(queue, QueueNameBin, RoutingKey, State), + Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}. check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> rabbit_misc:protocol_error( @@ -716,8 +765,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, conn_pid = ConnPid, limiter = Limiter, next_tag = DeliveryTag}) -> - OrigQN = expand_queue_name_shortcut(QueueNameBin, State), - QueueName = intercept_method('basic_get', OrigQN), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -755,8 +803,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - OrigQN = expand_queue_name_shortcut(QueueNameBin, State), - QueueName = intercept_method('basic_consume', OrigQN), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of @@ -997,8 +1044,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, "amq.gen"); Other -> check_name('queue', Other) end, - OrigQN = rabbit_misc:r(VHostPath, queue, ActualNameBin), - QueueName = intercept_method('queue_declare', OrigQN), + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with( QueueName, @@ -1057,8 +1103,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid}) -> - OrigQN = rabbit_misc:r(VHostPath, queue, QueueNameBin), - QueueName = intercept_method('queue_declare', OrigQN), + QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), @@ -1071,8 +1116,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - OrigQN = expand_queue_name_shortcut(QueueNameBin, State), - QueueName = intercept_method('queue_delete', OrigQN), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with( QueueName, @@ -1112,8 +1156,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - OrigQN = expand_queue_name_shortcut(QueueNameBin, State), - QueueName = intercept_method('queue_purge', OrigQN), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -1287,16 +1330,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> - {OrigDName, ActualRoutingKey} = - expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), - DestinationName = intercept_binding_method(OrigDName, DestinationType, ReturnMethod), + DestinationName = binding_to_resource(DestinationType, DestinationNameBin, State), check_write_permitted(DestinationName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, State), case Fun(#binding{source = ExchangeName, destination = DestinationName, - key = ActualRoutingKey, + key = RoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) @@ -1687,15 +1728,6 @@ erase_queue_stats(QName) -> {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. -intercept_binding_method(OrigDName, queue, #'queue.bind_ok'{}) -> - intercept_method('queue_bind', OrigDName); - -intercept_binding_method(OrigDName, queue, #'queue.unbind_ok'{}) -> - intercept_method('queue_unbind', OrigDName); - -intercept_binding_method(OrigDName, _DestinationType, _Method) -> - OrigDName. - intercept_method(M, Q) -> case rabbit_channel_interceptor:run_filter_chain(M, Q, rabbit_channel_interceptor:select(Q, M)) of |
