summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_channel_interceptor.erl43
2 files changed, 48 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9be310f8f0..e6b68c2488 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -740,17 +740,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- OrigQueueName = expand_queue_name_shortcut(QueueNameBin, State),
- QueueName =
- case rabbit_channel_interceptor:run_filter_chain(OrigQueueName,
- rabbit_channel_interceptor:select('basic_consume')) of
- {ok, QN} ->
- QN;
- {error, Reason} ->
- rabbit_misc:protocol_error(
- internal_error, "~s",
- [Reason])
- end,
+ OrigQN = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = intercept_method('basic_consume', OrigQN),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
@@ -1672,3 +1663,14 @@ erase_queue_stats(QName) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
+
+intercept_method(M, Q) ->
+ case rabbit_channel_interceptor:run_filter_chain(Q,
+ rabbit_channel_interceptor:select(Q, M)) of
+ {ok, QN} ->
+ QN;
+ {error, Reason} ->
+ rabbit_misc:protocol_error(
+ internal_error, "~s",
+ [Reason])
+ end. \ No newline at end of file
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index b74e679f48..a1da7bafda 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -21,7 +21,9 @@
-include("rabbit.hrl").
--export([select/1, run_filter_chain/2]).
+-export([select/2, run_filter_chain/2]).
+
+-define(DEFAULT_PRIORITY, 0).
%% TODO: docs
@@ -49,14 +51,15 @@
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
--callback priority() -> non_neg_integer().
+-callback priority_param() -> binary().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {process_queue_name, 2}, {applies_to, 1}, {priority, 0}];
+ [{description, 0}, {process_queue_name, 2}, {applies_to, 1},
+ {priority_param, 0}];
behaviour_info(_Other) ->
undefined.
@@ -65,10 +68,16 @@ behaviour_info(_Other) ->
%%----------------------------------------------------------------------------
%% select the interceptors that apply to intercept_method().
-select(Method) ->
- lists:sort(fun (A, B) -> A:priority() > B:priority() end,
- [I || I <- filter(list()), I:applies_to(Method)]).
-
+select(#resource{virtual_host=VHost}, Method) ->
+ lists:sort(fun (A, B) ->
+ get_priority(A, Method, VHost) > get_priority(B, Method, VHost)
+ end, [I || I <- filter(list()), I:applies_to(Method)]).
+
+%% We have a chain of filters because one interceptor might want to modify the queue name,
+%% while another might want just to stop the filter chain and prevent the user from
+%% declaring certain queue names, or deleteign certain queue names.
+%% By providing priorities to each interceptor, then the user can decide the order in which
+%% the filters are applied.
run_filter_chain(QName, Interceptors) ->
run_filter_chain(QName, QName, Interceptors).
@@ -90,4 +99,22 @@ run_filter_chain(#resource{virtual_host=_VHost},
filter(Modules) ->
[M || M <- Modules, code:which(M) =/= non_existing].
-list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)]. \ No newline at end of file
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)].
+
+%% Every implementation of rabbit_channel_interceptor should also expect a
+%% runtime parameter for each intercept_method().
+%% The rabbit_channel_interceptor needs to return the Component name to find
+%% those runtime parameters. The parameters will be the priorities on which
+%% the interceptors will apply to the specified intercept_method(), this the
+%% user can decide how to compose interceptors provided by plugins.
+get_priority(I, Method, VHost) ->
+ Component = I:priority_param(),
+ Name = a2b(Method),
+ case rabbit_runtime_parameters:value(
+ VHost, Component, Name) of
+ not_found -> ?DEFAULT_PRIORITY;
+ Value -> Value
+ end.
+
+%% since this code is proliferating everywhere, we might want to add it to rabbit_misc
+a2b(A) -> list_to_binary(atom_to_list(A)). \ No newline at end of file