diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 43 |
2 files changed, 48 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9be310f8f0..e6b68c2488 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -740,17 +740,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - OrigQueueName = expand_queue_name_shortcut(QueueNameBin, State), - QueueName = - case rabbit_channel_interceptor:run_filter_chain(OrigQueueName, - rabbit_channel_interceptor:select('basic_consume')) of - {ok, QN} -> - QN; - {error, Reason} -> - rabbit_misc:protocol_error( - internal_error, "~s", - [Reason]) - end, + OrigQN = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = intercept_method('basic_consume', OrigQN), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of @@ -1672,3 +1663,14 @@ erase_queue_stats(QName) -> [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. + +intercept_method(M, Q) -> + case rabbit_channel_interceptor:run_filter_chain(Q, + rabbit_channel_interceptor:select(Q, M)) of + {ok, QN} -> + QN; + {error, Reason} -> + rabbit_misc:protocol_error( + internal_error, "~s", + [Reason]) + end.
\ No newline at end of file diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index b74e679f48..a1da7bafda 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -21,7 +21,9 @@ -include("rabbit.hrl"). --export([select/1, run_filter_chain/2]). +-export([select/2, run_filter_chain/2]). + +-define(DEFAULT_PRIORITY, 0). %% TODO: docs @@ -49,14 +51,15 @@ %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). --callback priority() -> non_neg_integer(). +-callback priority_param() -> binary(). -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {process_queue_name, 2}, {applies_to, 1}, {priority, 0}]; + [{description, 0}, {process_queue_name, 2}, {applies_to, 1}, + {priority_param, 0}]; behaviour_info(_Other) -> undefined. @@ -65,10 +68,16 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- %% select the interceptors that apply to intercept_method(). -select(Method) -> - lists:sort(fun (A, B) -> A:priority() > B:priority() end, - [I || I <- filter(list()), I:applies_to(Method)]). - +select(#resource{virtual_host=VHost}, Method) -> + lists:sort(fun (A, B) -> + get_priority(A, Method, VHost) > get_priority(B, Method, VHost) + end, [I || I <- filter(list()), I:applies_to(Method)]). + +%% We have a chain of filters because one interceptor might want to modify the queue name, +%% while another might want just to stop the filter chain and prevent the user from +%% declaring certain queue names, or deleteign certain queue names. +%% By providing priorities to each interceptor, then the user can decide the order in which +%% the filters are applied. run_filter_chain(QName, Interceptors) -> run_filter_chain(QName, QName, Interceptors). @@ -90,4 +99,22 @@ run_filter_chain(#resource{virtual_host=_VHost}, filter(Modules) -> [M || M <- Modules, code:which(M) =/= non_existing]. -list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)].
\ No newline at end of file +list() -> [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)]. + +%% Every implementation of rabbit_channel_interceptor should also expect a +%% runtime parameter for each intercept_method(). +%% The rabbit_channel_interceptor needs to return the Component name to find +%% those runtime parameters. The parameters will be the priorities on which +%% the interceptors will apply to the specified intercept_method(), this the +%% user can decide how to compose interceptors provided by plugins. +get_priority(I, Method, VHost) -> + Component = I:priority_param(), + Name = a2b(Method), + case rabbit_runtime_parameters:value( + VHost, Component, Name) of + not_found -> ?DEFAULT_PRIORITY; + Value -> Value + end. + +%% since this code is proliferating everywhere, we might want to add it to rabbit_misc +a2b(A) -> list_to_binary(atom_to_list(A)).
\ No newline at end of file |
