summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2013-10-13 18:23:22 -0500
committerAlvaro Videla <videlalvaro@gmail.com>2013-10-13 18:23:22 -0500
commit85c61821c0ffe4a542029dc06a5268c54bbc0bde (patch)
treee6b361f8972470e62767d5ba59ddf09344cc5199
parentc3c3bcf197529825975a62bcd107e89351a22e5a (diff)
downloadrabbitmq-server-git-85c61821c0ffe4a542029dc06a5268c54bbc0bde.tar.gz
adds channel interceptor
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_channel_interceptor.erl93
-rw-r--r--src/rabbit_registry.erl15
3 files changed, 112 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dc37959bc8..9be310f8f0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -740,7 +740,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ OrigQueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName =
+ case rabbit_channel_interceptor:run_filter_chain(OrigQueueName,
+ rabbit_channel_interceptor:select('basic_consume')) of
+ {ok, QN} ->
+ QN;
+ {error, Reason} ->
+ rabbit_misc:protocol_error(
+ internal_error, "~s",
+ [Reason])
+ end,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
new file mode 100644
index 0000000000..b74e679f48
--- /dev/null
+++ b/src/rabbit_channel_interceptor.erl
@@ -0,0 +1,93 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% Since the AMQP methods used here are queue related,
+%% maybe we want this to be a queue_interceptor.
+
+-module(rabbit_channel_interceptor).
+
+-include("rabbit.hrl").
+
+-export([select/1, run_filter_chain/2]).
+
+%% TODO: docs
+
+-ifdef(use_specs).
+
+%% TODO: maybe we want to use rabbit_framing:amqp_method_name() instead?
+-type(intercept_method() :: 'basic_consume' |
+ 'basic_get' |
+ 'queue_declare' |
+ 'queue_bind' |
+ 'queue_delete').
+
+-type(initial_queue_name() :: rabbit_amqqueue:name()).
+-type(processed_queue_name() :: rabbit_amqqueue:name()).
+
+-callback description() -> [proplists:property()].
+
+%% TODO: maybe we want to also pass a second argument that's the amqp.method
+%% intercepted like 'basic.consume', 'queue.decalre' and so on.
+%% The interceptor might wish to modify the processed_queue_name() based on
+%% what was the initial_queue_name().
+-callback process_queue_name(initial_queue_name(), processed_queue_name()) ->
+ rabbit_types:ok_or_error2(rabbit_amqqueue:name(), any()).
+
+%% Whether the interceptor wishes to intercept the amqp method
+-callback applies_to(intercept_method()) -> boolean().
+
+-callback priority() -> non_neg_integer().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {process_queue_name, 2}, {applies_to, 1}, {priority, 0}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% select the interceptors that apply to intercept_method().
+select(Method) ->
+ lists:sort(fun (A, B) -> A:priority() > B:priority() end,
+ [I || I <- filter(list()), I:applies_to(Method)]).
+
+run_filter_chain(QName, Interceptors) ->
+ run_filter_chain(QName, QName, Interceptors).
+
+run_filter_chain(#resource{virtual_host=VHost}, #resource{virtual_host=VHost} = NewQueName, []) ->
+ {ok, NewQueName};
+run_filter_chain(#resource{virtual_host=VHost} = QName,
+ #resource{virtual_host=VHost} = NewQueName, [I|T]) ->
+ case I:process_queue_name(QName, NewQueName) of
+ {ok, QName2} ->
+ run_filter_chain(QName, QName2, T);
+ {error, Reason} ->
+ {error, Reason}
+ end;
+run_filter_chain(#resource{virtual_host=_VHost},
+ #resource{virtual_host=_Other}, _Interceptors) ->
+ %% TODO pass along the previous interceptor name so we can log it.
+ {error, "Interceptor attempted to modify resource virtual host"}.
+
+filter(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)]. \ No newline at end of file
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3014aeb734..abb71e7aed 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(queue_decorator) -> rabbit_queue_decorator;
-class_module(policy_validator) -> rabbit_policy_validator;
-class_module(ha_mode) -> rabbit_mirror_queue_mode.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
+class_module(policy_validator) -> rabbit_policy_validator;
+class_module(ha_mode) -> rabbit_mirror_queue_mode;
+class_module(channel_interceptor) -> rabbit_channel_interceptor.
%%---------------------------------------------------------------------------