diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 114 |
2 files changed, 96 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3a3c2aa6d3..1e3d516097 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -64,6 +64,7 @@ prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal -export([list_local/0, deliver_reply_local/3]). +-export([get_vhost/1, get_user/1]). -record(ch, { %% starting | running | flow | closing @@ -140,7 +141,8 @@ %% used by "one shot RPC" (amq. reply_consumer, %% flow | noflow, see rabbitmq-server#114 - delivery_flow + delivery_flow, + interceptor_state }). @@ -183,6 +185,10 @@ -type(channel_number() :: non_neg_integer()). +-export_type([channel/0]). + +-type(channel() :: #ch{}). + -spec(start_link/11 :: (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), @@ -370,12 +376,15 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost), consumer_prefetch = 0, reply_consumer = none, - delivery_flow = Flow}, - State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #ch.stats_timer, - fun() -> emit_stats(State1) end), - {ok, State1, hibernate, + delivery_flow = Flow, + interceptor_state = undefined}, + State1 = State#ch{ + interceptor_state = rabbit_channel_interceptor:init(State)}, + State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State2)), + rabbit_event:if_enabled(State2, #ch.stats_timer, + fun() -> emit_stats(State2) end), + {ok, State2, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_call(Msg, _From, _Len, _State) -> @@ -425,14 +434,15 @@ handle_call(_Request, _From, State) -> handle_cast({method, Method, Content, Flow}, State = #ch{reader_pid = Reader, - virtual_host = VHost}) -> + interceptor_state = IState}) -> case Flow of flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(rabbit_channel_interceptor:intercept_method( - expand_shortcuts(Method, State), VHost), - Content, State) of + Method1 = expand_shortcuts(Method, State), + {Method2, Content1} = rabbit_channel_interceptor:intercept_in( + Method1, Content, IState), + try handle_method(Method2, Content1, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), noreply(NewState); @@ -442,7 +452,7 @@ handle_cast({method, Method, Content, Flow}, {stop, normal, State} catch exit:Reason = #amqp_error{} -> - MethodName = rabbit_misc:method_record_type(Method), + MethodName = rabbit_misc:method_record_type(Method2), handle_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} @@ -1979,3 +1989,7 @@ erase_queue_stats(QName) -> [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. + +get_vhost(#ch{virtual_host = VHost}) -> VHost. + +get_user(#ch{user = User}) -> User. diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 25c5df8a7b..6a925b69fe 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -14,77 +14,103 @@ %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% -%% Since the AMQP methods used here are queue related, -%% maybe we want this to be a queue_interceptor. - -module(rabbit_channel_interceptor). -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([intercept_method/2]). +-export([init/1, intercept_in/3]). -ifdef(use_specs). --type(intercept_method() :: rabbit_framing:amqp_method_name()). +-type(method_name() :: rabbit_framing:amqp_method_name()). -type(original_method() :: rabbit_framing:amqp_method_record()). -type(processed_method() :: rabbit_framing:amqp_method_record()). +-type(original_content() :: rabbit_types:maybe(rabbit_types:content())). +-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())). +-type(interceptor_state() :: term()). -callback description() -> [proplists:property()]. - --callback intercept(original_method(), rabbit_types:vhost()) -> - processed_method() | rabbit_misc:channel_or_connection_exit(). - -%% Whether the interceptor wishes to intercept the amqp method --callback applies_to(intercept_method()) -> boolean(). +%% Derive some initial state from the channel. This will be passed back +%% as the third argument of intercept/3. +-callback init(rabbit_channel:channel()) -> interceptor_state(). +-callback intercept(original_method(), original_content(), + interceptor_state()) -> + {processed_method(), processed_content()} | + rabbit_misc:channel_or_connection_exit(). +-callback applies_to() -> list(method_name()). -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {intercept, 2}, {applies_to, 1}]; + [{description, 0}, {init, 1}, {intercept, 3}, {applies_to, 0}]; behaviour_info(_Other) -> undefined. -endif. -%%---------------------------------------------------------------------------- - -intercept_method(#'basic.publish'{} = M, _VHost) -> M; -intercept_method(#'basic.ack'{} = M, _VHost) -> M; -intercept_method(#'basic.nack'{} = M, _VHost) -> M; -intercept_method(#'basic.reject'{} = M, _VHost) -> M; -intercept_method(#'basic.credit'{} = M, _VHost) -> M; -intercept_method(M, VHost) -> - intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))). - -intercept_method(M, _VHost, []) -> - M; -intercept_method(M, VHost, [I]) -> - M2 = I:intercept(M, VHost), - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; -intercept_method(M, _VHost, Is) -> - internal_error("More than one interceptor for method: ~p -- ~p", - [rabbit_misc:method_record_type(M), Is]). - -%% select the interceptors that apply to intercept_method(). -select(Method) -> - [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor), - code:which(M) =/= non_existing, - M:applies_to(Method)]. +init(Ch) -> + Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], + check_no_overlap(Mods), + [{Mod, Mod:init(Ch)} || Mod <- Mods]. + +check_no_overlap(Mods) -> + check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]). + +%% Check no non-empty pairwise intersection in a list of sets +check_no_overlap1(Sets) -> + lists:foldl(fun(Set, Union) -> + Is = sets:intersection(Set, Union), + case sets:size(Is) of + 0 -> ok; + _ -> + internal_error("Interceptor: more than one " + "module handles ~p~n", [Is]) + end, + sets:union(Set, Union) + end, + sets:new(), + Sets), + ok. + +intercept_in(M, C, Mods) -> + lists:foldl(fun({Mod, ModState}, {M1, C1}) -> + call_module(Mod, ModState, M1, C1) + end, + {M, C}, + Mods). + +call_module(Mod, St, M, C) -> + % this little dance is because Mod might be unloaded at any point + case (catch {ok, Mod:intercept(M, C, St)}) of + {ok, R} -> validate_response(Mod, M, C, R); + {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C} + end. + +validate_response(Mod, M1, C1, R = {M2, C2}) -> + case {validate_method(M1, M2), validate_content(C1, C2)} of + {true, true} -> R; + {false, _} -> + internal_error("Interceptor: ~p expected to return " + "method: ~p but returned: ~p", + [Mod, rabbit_misc:method_record_type(M1), + rabbit_misc:method_record_type(M2)]); + {_, false} -> + internal_error("Interceptor: ~p expected to return " + "content iff content is provided but " + "content in = ~p; content out = ~p", + [Mod, C1, C2]) + end. validate_method(M, M2) -> rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). +validate_content(none, none) -> true; +validate_content(#content{}, #content{}) -> true; +validate_content(_, _) -> false. + %% keep dialyzer happy -spec internal_error(string(), [any()]) -> no_return(). internal_error(Format, Args) -> |
