summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-07-20 16:39:56 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-07-20 16:39:56 +0200
commit556a81d5227bce8fcd1488afda6ada124f3fbd0d (patch)
treefaf914e346d9e81de5685f50dcf4f11af6de0cf0
parentcfc84df229bc014d5ebfdb9ce033d57a5d2b2dfa (diff)
parentc426b5f85a22f36611088cab388f8987d41b1fd8 (diff)
downloadrabbitmq-server-git-556a81d5227bce8fcd1488afda6ada124f3fbd0d.tar.gz
Merge pull request #214 from lshift/interceptors
change rabbit_channel_interceptor API
-rw-r--r--src/rabbit_channel.erl38
-rw-r--r--src/rabbit_channel_interceptor.erl114
2 files changed, 96 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3a3c2aa6d3..1e3d516097 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -64,6 +64,7 @@
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0, deliver_reply_local/3]).
+-export([get_vhost/1, get_user/1]).
-record(ch, {
%% starting | running | flow | closing
@@ -140,7 +141,8 @@
%% used by "one shot RPC" (amq.
reply_consumer,
%% flow | noflow, see rabbitmq-server#114
- delivery_flow
+ delivery_flow,
+ interceptor_state
}).
@@ -183,6 +185,10 @@
-type(channel_number() :: non_neg_integer()).
+-export_type([channel/0]).
+
+-type(channel() :: #ch{}).
+
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
@@ -370,12 +376,15 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = 0,
reply_consumer = none,
- delivery_flow = Flow},
- State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #ch.stats_timer,
- fun() -> emit_stats(State1) end),
- {ok, State1, hibernate,
+ delivery_flow = Flow,
+ interceptor_state = undefined},
+ State1 = State#ch{
+ interceptor_state = rabbit_channel_interceptor:init(State)},
+ State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State2)),
+ rabbit_event:if_enabled(State2, #ch.stats_timer,
+ fun() -> emit_stats(State2) end),
+ {ok, State2, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _Len, _State) ->
@@ -425,14 +434,15 @@ handle_call(_Request, _From, State) ->
handle_cast({method, Method, Content, Flow},
State = #ch{reader_pid = Reader,
- virtual_host = VHost}) ->
+ interceptor_state = IState}) ->
case Flow of
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(rabbit_channel_interceptor:intercept_method(
- expand_shortcuts(Method, State), VHost),
- Content, State) of
+ Method1 = expand_shortcuts(Method, State),
+ {Method2, Content1} = rabbit_channel_interceptor:intercept_in(
+ Method1, Content, IState),
+ try handle_method(Method2, Content1, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -442,7 +452,7 @@ handle_cast({method, Method, Content, Flow},
{stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
- MethodName = rabbit_misc:method_record_type(Method),
+ MethodName = rabbit_misc:method_record_type(Method2),
handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
@@ -1979,3 +1989,7 @@ erase_queue_stats(QName) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
+
+get_vhost(#ch{virtual_host = VHost}) -> VHost.
+
+get_user(#ch{user = User}) -> User.
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 25c5df8a7b..6a925b69fe 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -14,77 +14,103 @@
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
-%% Since the AMQP methods used here are queue related,
-%% maybe we want this to be a queue_interceptor.
-
-module(rabbit_channel_interceptor).
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([intercept_method/2]).
+-export([init/1, intercept_in/3]).
-ifdef(use_specs).
--type(intercept_method() :: rabbit_framing:amqp_method_name()).
+-type(method_name() :: rabbit_framing:amqp_method_name()).
-type(original_method() :: rabbit_framing:amqp_method_record()).
-type(processed_method() :: rabbit_framing:amqp_method_record()).
+-type(original_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(interceptor_state() :: term()).
-callback description() -> [proplists:property()].
-
--callback intercept(original_method(), rabbit_types:vhost()) ->
- processed_method() | rabbit_misc:channel_or_connection_exit().
-
-%% Whether the interceptor wishes to intercept the amqp method
--callback applies_to(intercept_method()) -> boolean().
+%% Derive some initial state from the channel. This will be passed back
+%% as the third argument of intercept/3.
+-callback init(rabbit_channel:channel()) -> interceptor_state().
+-callback intercept(original_method(), original_content(),
+ interceptor_state()) ->
+ {processed_method(), processed_content()} |
+ rabbit_misc:channel_or_connection_exit().
+-callback applies_to() -> list(method_name()).
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {intercept, 2}, {applies_to, 1}];
+ [{description, 0}, {init, 1}, {intercept, 3}, {applies_to, 0}];
behaviour_info(_Other) ->
undefined.
-endif.
-%%----------------------------------------------------------------------------
-
-intercept_method(#'basic.publish'{} = M, _VHost) -> M;
-intercept_method(#'basic.ack'{} = M, _VHost) -> M;
-intercept_method(#'basic.nack'{} = M, _VHost) -> M;
-intercept_method(#'basic.reject'{} = M, _VHost) -> M;
-intercept_method(#'basic.credit'{} = M, _VHost) -> M;
-intercept_method(M, VHost) ->
- intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
-
-intercept_method(M, _VHost, []) ->
- M;
-intercept_method(M, VHost, [I]) ->
- M2 = I:intercept(M, VHost),
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
-intercept_method(M, _VHost, Is) ->
- internal_error("More than one interceptor for method: ~p -- ~p",
- [rabbit_misc:method_record_type(M), Is]).
-
-%% select the interceptors that apply to intercept_method().
-select(Method) ->
- [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor),
- code:which(M) =/= non_existing,
- M:applies_to(Method)].
+init(Ch) ->
+ Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
+ check_no_overlap(Mods),
+ [{Mod, Mod:init(Ch)} || Mod <- Mods].
+
+check_no_overlap(Mods) ->
+ check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]).
+
+%% Check no non-empty pairwise intersection in a list of sets
+check_no_overlap1(Sets) ->
+ lists:foldl(fun(Set, Union) ->
+ Is = sets:intersection(Set, Union),
+ case sets:size(Is) of
+ 0 -> ok;
+ _ ->
+ internal_error("Interceptor: more than one "
+ "module handles ~p~n", [Is])
+ end,
+ sets:union(Set, Union)
+ end,
+ sets:new(),
+ Sets),
+ ok.
+
+intercept_in(M, C, Mods) ->
+ lists:foldl(fun({Mod, ModState}, {M1, C1}) ->
+ call_module(Mod, ModState, M1, C1)
+ end,
+ {M, C},
+ Mods).
+
+call_module(Mod, St, M, C) ->
+ % this little dance is because Mod might be unloaded at any point
+ case (catch {ok, Mod:intercept(M, C, St)}) of
+ {ok, R} -> validate_response(Mod, M, C, R);
+ {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C}
+ end.
+
+validate_response(Mod, M1, C1, R = {M2, C2}) ->
+ case {validate_method(M1, M2), validate_content(C1, C2)} of
+ {true, true} -> R;
+ {false, _} ->
+ internal_error("Interceptor: ~p expected to return "
+ "method: ~p but returned: ~p",
+ [Mod, rabbit_misc:method_record_type(M1),
+ rabbit_misc:method_record_type(M2)]);
+ {_, false} ->
+ internal_error("Interceptor: ~p expected to return "
+ "content iff content is provided but "
+ "content in = ~p; content out = ~p",
+ [Mod, C1, C2])
+ end.
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+validate_content(none, none) -> true;
+validate_content(#content{}, #content{}) -> true;
+validate_content(_, _) -> false.
+
%% keep dialyzer happy
-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->