summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorash-lshift <ash@lshift.net>2015-07-20 11:50:41 +0100
committerash <ash@lshift.net>2015-07-20 15:36:06 +0100
commitc426b5f85a22f36611088cab388f8987d41b1fd8 (patch)
treefaf914e346d9e81de5685f50dcf4f11af6de0cf0 /src
parentcfc84df229bc014d5ebfdb9ce033d57a5d2b2dfa (diff)
downloadrabbitmq-server-git-c426b5f85a22f36611088cab388f8987d41b1fd8.tar.gz
change rabbit_channel_interceptor API...
...to allow: - all methods to be intercepted - content to be intercepted as well as the method Previously only a subset of methods were allowed due to performance concerns. Here we move work previously done at method handling time to channel initialization time. This seems to reduce the performance impact to an acceptable level.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl38
-rw-r--r--src/rabbit_channel_interceptor.erl114
2 files changed, 96 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3a3c2aa6d3..1e3d516097 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -64,6 +64,7 @@
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0, deliver_reply_local/3]).
+-export([get_vhost/1, get_user/1]).
-record(ch, {
%% starting | running | flow | closing
@@ -140,7 +141,8 @@
%% used by "one shot RPC" (amq.
reply_consumer,
%% flow | noflow, see rabbitmq-server#114
- delivery_flow
+ delivery_flow,
+ interceptor_state
}).
@@ -183,6 +185,10 @@
-type(channel_number() :: non_neg_integer()).
+-export_type([channel/0]).
+
+-type(channel() :: #ch{}).
+
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
@@ -370,12 +376,15 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = 0,
reply_consumer = none,
- delivery_flow = Flow},
- State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #ch.stats_timer,
- fun() -> emit_stats(State1) end),
- {ok, State1, hibernate,
+ delivery_flow = Flow,
+ interceptor_state = undefined},
+ State1 = State#ch{
+ interceptor_state = rabbit_channel_interceptor:init(State)},
+ State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State2)),
+ rabbit_event:if_enabled(State2, #ch.stats_timer,
+ fun() -> emit_stats(State2) end),
+ {ok, State2, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _Len, _State) ->
@@ -425,14 +434,15 @@ handle_call(_Request, _From, State) ->
handle_cast({method, Method, Content, Flow},
State = #ch{reader_pid = Reader,
- virtual_host = VHost}) ->
+ interceptor_state = IState}) ->
case Flow of
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(rabbit_channel_interceptor:intercept_method(
- expand_shortcuts(Method, State), VHost),
- Content, State) of
+ Method1 = expand_shortcuts(Method, State),
+ {Method2, Content1} = rabbit_channel_interceptor:intercept_in(
+ Method1, Content, IState),
+ try handle_method(Method2, Content1, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -442,7 +452,7 @@ handle_cast({method, Method, Content, Flow},
{stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
- MethodName = rabbit_misc:method_record_type(Method),
+ MethodName = rabbit_misc:method_record_type(Method2),
handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
@@ -1979,3 +1989,7 @@ erase_queue_stats(QName) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
+
+get_vhost(#ch{virtual_host = VHost}) -> VHost.
+
+get_user(#ch{user = User}) -> User.
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 25c5df8a7b..6a925b69fe 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -14,77 +14,103 @@
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
-%% Since the AMQP methods used here are queue related,
-%% maybe we want this to be a queue_interceptor.
-
-module(rabbit_channel_interceptor).
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([intercept_method/2]).
+-export([init/1, intercept_in/3]).
-ifdef(use_specs).
--type(intercept_method() :: rabbit_framing:amqp_method_name()).
+-type(method_name() :: rabbit_framing:amqp_method_name()).
-type(original_method() :: rabbit_framing:amqp_method_record()).
-type(processed_method() :: rabbit_framing:amqp_method_record()).
+-type(original_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(interceptor_state() :: term()).
-callback description() -> [proplists:property()].
-
--callback intercept(original_method(), rabbit_types:vhost()) ->
- processed_method() | rabbit_misc:channel_or_connection_exit().
-
-%% Whether the interceptor wishes to intercept the amqp method
--callback applies_to(intercept_method()) -> boolean().
+%% Derive some initial state from the channel. This will be passed back
+%% as the third argument of intercept/3.
+-callback init(rabbit_channel:channel()) -> interceptor_state().
+-callback intercept(original_method(), original_content(),
+ interceptor_state()) ->
+ {processed_method(), processed_content()} |
+ rabbit_misc:channel_or_connection_exit().
+-callback applies_to() -> list(method_name()).
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {intercept, 2}, {applies_to, 1}];
+ [{description, 0}, {init, 1}, {intercept, 3}, {applies_to, 0}];
behaviour_info(_Other) ->
undefined.
-endif.
-%%----------------------------------------------------------------------------
-
-intercept_method(#'basic.publish'{} = M, _VHost) -> M;
-intercept_method(#'basic.ack'{} = M, _VHost) -> M;
-intercept_method(#'basic.nack'{} = M, _VHost) -> M;
-intercept_method(#'basic.reject'{} = M, _VHost) -> M;
-intercept_method(#'basic.credit'{} = M, _VHost) -> M;
-intercept_method(M, VHost) ->
- intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
-
-intercept_method(M, _VHost, []) ->
- M;
-intercept_method(M, VHost, [I]) ->
- M2 = I:intercept(M, VHost),
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
-intercept_method(M, _VHost, Is) ->
- internal_error("More than one interceptor for method: ~p -- ~p",
- [rabbit_misc:method_record_type(M), Is]).
-
-%% select the interceptors that apply to intercept_method().
-select(Method) ->
- [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor),
- code:which(M) =/= non_existing,
- M:applies_to(Method)].
+init(Ch) ->
+ Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
+ check_no_overlap(Mods),
+ [{Mod, Mod:init(Ch)} || Mod <- Mods].
+
+check_no_overlap(Mods) ->
+ check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]).
+
+%% Check no non-empty pairwise intersection in a list of sets
+check_no_overlap1(Sets) ->
+ lists:foldl(fun(Set, Union) ->
+ Is = sets:intersection(Set, Union),
+ case sets:size(Is) of
+ 0 -> ok;
+ _ ->
+ internal_error("Interceptor: more than one "
+ "module handles ~p~n", [Is])
+ end,
+ sets:union(Set, Union)
+ end,
+ sets:new(),
+ Sets),
+ ok.
+
+intercept_in(M, C, Mods) ->
+ lists:foldl(fun({Mod, ModState}, {M1, C1}) ->
+ call_module(Mod, ModState, M1, C1)
+ end,
+ {M, C},
+ Mods).
+
+call_module(Mod, St, M, C) ->
+ % this little dance is because Mod might be unloaded at any point
+ case (catch {ok, Mod:intercept(M, C, St)}) of
+ {ok, R} -> validate_response(Mod, M, C, R);
+ {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C}
+ end.
+
+validate_response(Mod, M1, C1, R = {M2, C2}) ->
+ case {validate_method(M1, M2), validate_content(C1, C2)} of
+ {true, true} -> R;
+ {false, _} ->
+ internal_error("Interceptor: ~p expected to return "
+ "method: ~p but returned: ~p",
+ [Mod, rabbit_misc:method_record_type(M1),
+ rabbit_misc:method_record_type(M2)]);
+ {_, false} ->
+ internal_error("Interceptor: ~p expected to return "
+ "content iff content is provided but "
+ "content in = ~p; content out = ~p",
+ [Mod, C1, C2])
+ end.
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+validate_content(none, none) -> true;
+validate_content(#content{}, #content{}) -> true;
+validate_content(_, _) -> false.
+
%% keep dialyzer happy
-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->