diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2013-10-13 18:23:22 -0500 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2013-10-13 18:23:22 -0500 |
| commit | 85c61821c0ffe4a542029dc06a5268c54bbc0bde (patch) | |
| tree | e6b361f8972470e62767d5ba59ddf09344cc5199 /src | |
| parent | c3c3bcf197529825975a62bcd107e89351a22e5a (diff) | |
| download | rabbitmq-server-git-85c61821c0ffe4a542029dc06a5268c54bbc0bde.tar.gz | |
adds channel interceptor
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 93 | ||||
| -rw-r--r-- | src/rabbit_registry.erl | 15 |
3 files changed, 112 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dc37959bc8..9be310f8f0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -740,7 +740,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + OrigQueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = + case rabbit_channel_interceptor:run_filter_chain(OrigQueueName, + rabbit_channel_interceptor:select('basic_consume')) of + {ok, QN} -> + QN; + {error, Reason} -> + rabbit_misc:protocol_error( + internal_error, "~s", + [Reason]) + end, check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl new file mode 100644 index 0000000000..b74e679f48 --- /dev/null +++ b/src/rabbit_channel_interceptor.erl @@ -0,0 +1,93 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% Since the AMQP methods used here are queue related, +%% maybe we want this to be a queue_interceptor. + +-module(rabbit_channel_interceptor). + +-include("rabbit.hrl"). + +-export([select/1, run_filter_chain/2]). + +%% TODO: docs + +-ifdef(use_specs). + +%% TODO: maybe we want to use rabbit_framing:amqp_method_name() instead? +-type(intercept_method() :: 'basic_consume' | + 'basic_get' | + 'queue_declare' | + 'queue_bind' | + 'queue_delete'). + +-type(initial_queue_name() :: rabbit_amqqueue:name()). +-type(processed_queue_name() :: rabbit_amqqueue:name()). + +-callback description() -> [proplists:property()]. + +%% TODO: maybe we want to also pass a second argument that's the amqp.method +%% intercepted like 'basic.consume', 'queue.decalre' and so on. +%% The interceptor might wish to modify the processed_queue_name() based on +%% what was the initial_queue_name(). +-callback process_queue_name(initial_queue_name(), processed_queue_name()) -> + rabbit_types:ok_or_error2(rabbit_amqqueue:name(), any()). + +%% Whether the interceptor wishes to intercept the amqp method +-callback applies_to(intercept_method()) -> boolean(). + +-callback priority() -> non_neg_integer(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {process_queue_name, 2}, {applies_to, 1}, {priority, 0}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +%% select the interceptors that apply to intercept_method(). +select(Method) -> + lists:sort(fun (A, B) -> A:priority() > B:priority() end, + [I || I <- filter(list()), I:applies_to(Method)]). + +run_filter_chain(QName, Interceptors) -> + run_filter_chain(QName, QName, Interceptors). + +run_filter_chain(#resource{virtual_host=VHost}, #resource{virtual_host=VHost} = NewQueName, []) -> + {ok, NewQueName}; +run_filter_chain(#resource{virtual_host=VHost} = QName, + #resource{virtual_host=VHost} = NewQueName, [I|T]) -> + case I:process_queue_name(QName, NewQueName) of + {ok, QName2} -> + run_filter_chain(QName, QName2, T); + {error, Reason} -> + {error, Reason} + end; +run_filter_chain(#resource{virtual_host=_VHost}, + #resource{virtual_host=_Other}, _Interceptors) -> + %% TODO pass along the previous interceptor name so we can log it. + {error, "Interceptor attempted to modify resource virtual host"}. + +filter(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)].
\ No newline at end of file diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3014aeb734..abb71e7aed 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(queue_decorator) -> rabbit_queue_decorator; -class_module(policy_validator) -> rabbit_policy_validator; -class_module(ha_mode) -> rabbit_mirror_queue_mode. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode; +class_module(channel_interceptor) -> rabbit_channel_interceptor. %%--------------------------------------------------------------------------- |
