diff options
| author | Marek Majkowski <majek@lshift.net> | 2009-08-14 12:37:26 +0100 |
|---|---|---|
| committer | Marek Majkowski <majek@lshift.net> | 2009-08-14 12:37:26 +0100 |
| commit | 37d0c8d21a00bb23860a7688f8774507029f177e (patch) | |
| tree | 987aae7965242c5b40d8da67d8e9280012644110 /src | |
| parent | 3744e9042d23445ca5742316efc32f1bd95cf5da (diff) | |
| parent | 345d4d0ceb837d6a69a91b26d63349fc83837152 (diff) | |
| download | rabbitmq-server-git-37d0c8d21a00bb23860a7688f8774507029f177e.tar.gz | |
Merging default to bug20463
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 364 | ||||
| -rw-r--r-- | src/rabbit.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_hooks.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 198 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 36 |
12 files changed, 721 insertions, 110 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ba8becfca9..36fb4fa8c3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -21,6 +21,42 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% +%% 5) The callback module can optionally implement +%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be +%% called immediately prior to and post hibernation, respectively. If +%% handle_pre_hibernate returns {hibernate, NewState} then the process +%% will hibernate. If the module does not implement +%% handle_pre_hibernate/1 then the default action is to hibernate. +%% +%% 6) init can return a 4th arg, {backoff, InitialTimeout, +%% MinimumTimeout, DesiredHibernatePeriod} (all in +%% milliseconds). Then, on all callbacks which can return a timeout +%% (including init), timeout can be 'hibernate'. When this is the +%% case, the current timeout value will be used (initially, the +%% InitialTimeout supplied from init). After this timeout has +%% occurred, hibernation will occur as normal. Upon awaking, a new +%% current timeout value will be calculated. +%% +%% The purpose is that the gen_server2 takes care of adjusting the +%% current timeout value such that the process will increase the +%% timeout value repeatedly if it is unable to sleep for the +%% DesiredHibernatePeriod. If it is able to sleep for the +%% DesiredHibernatePeriod it will decrease the current timeout down to +%% the MinimumTimeout, so that the process is put to sleep sooner (and +%% hopefully stays asleep for longer). In short, should a process +%% using this receive a burst of messages, it should not hibernate +%% between those messages, but as the messages become less frequent, +%% the process will not only hibernate, it will do so sooner after +%% each message. +%% +%% When using this backoff mechanism, normal timeout values (i.e. not +%% 'hibernate') can still be used, and if they are used then the +%% handle_info(timeout, State) will be called as normal. In this case, +%% returning 'hibernate' from handle_info(timeout, State) will not +%% hibernate the process immediately, as it would if backoff wasn't +%% being used. Instead it'll wait for the current timeout as described +%% above. + %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -55,6 +91,7 @@ %%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} +%%% {ok, State, Timeout, Backoff} %%% ignore %%% {stop, Reason} %%% @@ -86,6 +123,17 @@ %%% %%% ==> ok %%% +%%% handle_pre_hibernate(State) +%%% +%%% ==> {hibernate, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% handle_post_hibernate(State) +%%% +%%% ==> {noreply, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% The work flow (of the server) can be described as follows: %%% @@ -116,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -290,7 +338,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -301,20 +349,30 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), infinity, undefined). + +enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, infinity, undefined); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -329,23 +387,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -354,33 +426,159 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +%% name(Pid) when is_pid(Pid) -> Pid; +%% when R11 goes away, drop the line beneath and uncomment the line above +name(Name) -> Name. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid. + +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue), Debug) - after 0 -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) + Input -> drain(in(Input, Queue)) + after 0 -> Queue + end. + +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, TimeoutState, Queue1, Debug, Msg); + {empty, Queue1} -> + {Time1, HibOnTimeout} + = case {Time, TimeoutState} of + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> + {Current, true}; + {hibernate, _} -> + %% wake_hib/7 will set Time to hibernate. If + %% we were woken and didn't receive a msg + %% then we will get here and need a sensible + %% value for Time1, otherwise we crash. + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; + _ -> {Time, false} + end, + receive + Input -> + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) + after Time1 -> + case HibOnTimeout of + true -> + pre_hibernate( + Parent, Name, State, Mod, TimeoutState, Queue1, + Debug); + false -> + process_msg( + Parent, Name, State, Mod, Time, TimeoutState, + Queue1, Debug, timeout) end end end. +wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> + TimeoutState1 = case TS of + undefined -> + undefined; + {SleptAt, TimeoutState} -> + adjust_timeout_state(SleptAt, now(), TimeoutState) + end, + post_hibernate(Parent, Name, State, Mod, TimeoutState1, + drain(Queue), Debug). + +hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + TS = case TimeoutState of + undefined -> undefined; + {backoff, _, _, _, _} -> {now(), TimeoutState} + end, + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + TS, Queue, Debug]). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, + Debug); + Reply -> + handle_common_termination(Reply, Name, pre_hibernate, + Mod, State, Debug) + end; + false -> + hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + end. + +post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_post_hibernate, 1) of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState, Queue, Debug); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState, Queue, Debug); + Reply -> + handle_common_termination(Reply, Name, post_hibernate, + Mod, State, Debug) + end; + false -> + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState, Queue, Debug) + end. + +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> @@ -388,19 +586,25 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, + Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + sys:handle_system_msg + (Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, + Debug1) end. %%% --------------------------------------------------- @@ -598,87 +802,95 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); - Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), reply(Name, From, Reply, NState, Debug), exit(R); Other -> - handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, + Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, Debug) + end. + +handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> + case Reply of {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -696,16 +908,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +-ifdef(use_specs). +-spec system_terminate(_, _, _, [_]) -> no_return(). +-endif. + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, + _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, + OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; - Else -> Else + {ok, NewState} -> + {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; + Else -> + Else end. %%----------------------------------------------------------------- @@ -747,6 +967,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) @@ -871,8 +1093,8 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = - StatusData, + [PDict, SysState, Parent, Debug, + [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index 196212eaee..b0d62b5ab8 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -133,6 +133,7 @@ start(normal, []) -> {"core processes", fun () -> ok = start_child(rabbit_log), + ok = rabbit_hooks:start(), ok = rabbit_amqqueue:start(), @@ -207,8 +208,21 @@ log_location(Type) -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n", - [Product, Version, + ProductLen = string:len(Product), + io:format("~n" + "+---+ +---+~n" + "| | | |~n" + "| | | |~n" + "| | | |~n" + "| +---+ +-------+~n" + "| |~n" + "| ~s +---+ |~n" + "| | | |~n" + "| ~s +---+ |~n" + "| |~n" + "+-------------------+~n" + "AMQP ~p-~p~n~s~n~s~n~n", + [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 198e2782b4..4903c2c57f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -51,8 +51,6 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(CALL_TIMEOUT, 5000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f5c..fe2e8509f7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,8 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -export([start_link/1]). @@ -101,7 +102,8 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +118,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +noreply(NewState) -> {noreply, NewState, hibernate}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -813,11 +815,6 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); - handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2dc619c1fb..4033aaafda 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -35,6 +35,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). +-export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -53,6 +54,8 @@ -spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), maybe(txn()), properties_input(), binary()) -> publish_result()). +-spec(build_content/2 :: (amqp_properties(), binary()) -> content()). +-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). -endif. @@ -72,16 +75,26 @@ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message}. +build_content(Properties, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + #content{class_id = ClassId, + properties = Properties, + properties_bin = none, + payload_fragments_rev = [BodyBin]}. + +from_content(Content) -> + #content{class_id = ClassId, + properties = Props, + payload_fragments_rev = FragmentsRev} = + rabbit_binary_parser:ensure_content_decoded(Content), + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + {Props, list_to_binary(lists:reverse(FragmentsRev))}. + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = Properties, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, - content = Content, + content = build_content(Properties, BodyBin), persistent_key = none}. properties(P = #'P_basic'{}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3089bb6293..58b9423460 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:cast(Pid, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- @@ -162,9 +162,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info(timeout, State) -> ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). + {noreply, State, hibernate}. terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, state = terminating}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6649899ade..37e4d18993 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -36,7 +36,7 @@ -record(params, {quiet, node, command, args}). --define(RPC_TIMEOUT, 30000). +-define(RPC_TIMEOUT, infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl new file mode 100644 index 0000000000..b3d271c28d --- /dev/null +++ b/src/rabbit_hooks.erl @@ -0,0 +1,73 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_hooks). + +-export([start/0]). +-export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). + +-define(TableName, rabbit_hooks). + +-ifdef(use_specs). + +-spec(start/0 :: () -> 'ok'). +-spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). +-spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). +-spec(trigger/2 :: (atom(), list()) -> 'ok'). +-spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). + +-endif. + +start() -> + ets:new(?TableName, [bag, public, named_table]), + ok. + +subscribe(Hook, HandlerName, Handler) -> + ets:insert(?TableName, {Hook, HandlerName, Handler}), + ok. + +unsubscribe(Hook, HandlerName) -> + ets:match_delete(?TableName, {Hook, HandlerName, '_'}), + ok. + +trigger(Hook, Args) -> + Hooks = ets:lookup(?TableName, Hook), + [case catch apply(M, F, [Hook, Name, Args | A]) of + {'EXIT', Reason} -> + rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", + [Name, Hook, Reason]); + _ -> ok + end || {_, Name, {M, F, A}} <- Hooks], + ok. + +notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> + Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, + ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58c8f0d33d..abf4c7ccfa 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,8 @@ -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([r/3, r/2, r_arg/4, rs/1]). --export([enable_cover/1, report_cover/0]). +-export([enable_cover/0, report_cover/0]). +-export([enable_cover/1, report_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -87,8 +88,10 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/1 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -187,21 +190,28 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). +enable_cover() -> + enable_cover("."). - -enable_cover(Dirs) -> - Results = lists:map(fun cover:compile_beam_directory/1, Dirs), - case lists:filter(fun(X) -> X /= [] end, Results) of - [{error, Reason} | _] -> {error, Reason}; +enable_cover([Root]) when is_atom(Root) -> + enable_cover(atom_to_list(Root)); +enable_cover(Root) -> + case cover:compile_beam_directory(filename:join(Root, "ebin")) of + {error,Reason} -> {error,Reason}; _ -> ok end. report_cover() -> - Dir = "cover/", - ok = filelib:ensure_dir(Dir), + report_cover("."). + +report_cover([Root]) when is_atom(Root) -> + report_cover(atom_to_list(Root)); +report_cover(Root) -> + Dir = filename:join(Root, "cover"), + ok = filelib:ensure_dir(filename:join(Dir,"junk")), lists:foreach(fun(F) -> file:delete(F) end, - filelib:wildcard(Dir ++ "*.html")), - {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]), + filelib:wildcard(filename:join(Dir, "*.html"))), + {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( fun(M,{CovTot, NotCovTot}) -> @@ -210,7 +220,7 @@ report_cover() -> Cov, NotCov, M), {ok,_} = cover:analyze_to_file( M, - Dir ++ atom_to_list(M) ++ ".html", + filename:join(Dir, atom_to_list(M) ++ ".html"), [html]), {CovTot+Cov, NotCovTot+NotCov} end, diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl new file mode 100644 index 0000000000..71278bfb2a --- /dev/null +++ b/src/rabbit_plugin_activator.erl @@ -0,0 +1,198 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_plugin_activator). + +-export([start/0, stop/0]). + +-define(DefaultPluginDir, "plugins"). +-define(DefaultUnpackedPluginDir, "priv/plugins"). +-define(DefaultRabbitEBin, "ebin"). +-define(BaseApps, [rabbit]). + +%%---------------------------------------------------------------------------- + +start() -> + %% Ensure Rabbit is loaded so we can access it's environment + application:load(rabbit), + + %% Determine our various directories + PluginDir = get_env(plugins_dir, ?DefaultPluginDir), + UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), + RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + + %% Unpack any .ez plugins + unpack_ez_plugins(PluginDir, UnpackedPluginDir), + + %% Build a list of required apps based on the fixed set, and any plugins + RequiredApps = ?BaseApps ++ + find_plugins(PluginDir) ++ + find_plugins(UnpackedPluginDir), + + %% Build the entire set of dependencies - this will load the + %% applications along the way + AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of + {unknown_app, {App, Err}} -> + io:format("ERROR: Failed to load application " ++ + "~s: ~p~n", [App, Err]), + halt(1); + AppList -> + AppList + end, + AppVersions = [determine_version(App) || App <- AllApps], + {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + + %% Build the overall release descriptor + RDesc = {release, + {"rabbit", RabbitVersion}, + {erts, erlang:system_info(version)}, + AppVersions}, + + %% Write it out to ebin/rabbit.rel + file:write_file(RabbitEBin ++ "/rabbit.rel", + io_lib:format("~p.~n", [RDesc])), + + %% Compile the script + case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + {ok, Module, Warnings} -> + %% This gets lots of spurious no-source warnings when we + %% have .ez files, so we want to supress them to prevent + %% hiding real issues. + WarningStr = Module:format_warning( + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + _ -> true + end]), + case length(WarningStr) of + 0 -> ok; + _ -> io:format("~s", [WarningStr]) + end, + ok; + {error, Module, Error} -> + io:format("Boot file generation failed: ~s~n", + [Module:format_error(Error)]), + halt(1) + end, + halt(), + ok. + +stop() -> + ok. + +get_env(Key, Default) -> + case application:get_env(rabbit, Key) of + {ok, V} -> V; + _ -> Default + end. + +determine_version(App) -> + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), + {App, Vsn}. + +assert_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> ok; + false -> + ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) + end. +delete_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> + case file:list_dir(Dir) of + {ok, Files} -> + [case Dir ++ "/" ++ F of + Fn -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of + true -> delete_dir(Fn); + false -> file:delete(Fn) + end + end || F <- Files] + end, + ok = file:del_dir(Dir); + false -> + ok + end. +is_symlink(Name) -> + case file:read_link(Name) of + {ok, _} -> true; + _ -> false + end. + +unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> + %% Eliminate the contents of the destination directory + delete_dir(PluginDestDir), + + assert_dir(PluginDestDir), + [unpack_ez_plugin(PluginName, PluginDestDir) || + PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + +unpack_ez_plugin(PluginFn, PluginDestDir) -> + zip:unzip(PluginFn, [{cwd, PluginDestDir}]), + ok. + +find_plugins(PluginDir) -> + [prepare_dir_plugin(PluginName) || + PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. + +prepare_dir_plugin(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN), + + %% We want the second-last token + NameTokens = string:tokens(PluginAppDescFn,"/."), + PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), + list_to_atom(PluginNameString). + +expand_dependencies(Pending) -> + expand_dependencies(sets:new(), Pending). +expand_dependencies(Current, []) -> + Current; +expand_dependencies(Current, [Next|Rest]) -> + case sets:is_element(Next, Current) of + true -> + expand_dependencies(Current, Rest); + false -> + case application:load(Next) of + ok -> + ok; + {error, {already_loaded, _}} -> + ok; + X -> + throw({unknown_app, {Next, X}}) + end, + {ok, Required} = application:get_key(Next, applications), + Unique = [A || A <- Required, not(sets:is_element(A, Current))], + expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 01757509ec..e5100ccd16 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -33,6 +33,9 @@ -export([all_tests/0, test_parsing/0]). +%% Exported so the hook mechanism can call back +-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). + -import(lists). -include("rabbit.hrl"). @@ -54,6 +57,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_hooks(), passed. test_priority_queue() -> @@ -601,6 +605,52 @@ test_server_status() -> passed. +test_hooks() -> + %% Firing of hooks calls all hooks in an isolated manner + rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:trigger(test_hook, [arg1, arg2]), + [arg1, arg2] = get(test_hook_test_fired), + [arg1, arg2] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Hook Deletion works + put(test_hook_test_fired, undefined), + put(test_hook_test2_fired, undefined), + rabbit_hooks:unsubscribe(test_hook, test), + rabbit_hooks:trigger(test_hook, [arg3, arg4]), + undefined = get(test_hook_test_fired), + [arg3, arg4] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Catches exceptions from bad hooks + rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), + ok = rabbit_hooks:trigger(test_hook3, []), + + %% Passing extra arguments to hooks + rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), + rabbit_hooks:trigger(arg_hook, [arg1, arg2]), + {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), + + %% Invoking Pids + Remote = fun() -> + receive + {rabbitmq_hook,[remote_test,test,[],Target]} -> + Target ! invoked + end + end, + P = spawn(Remote), + rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), + rabbit_hooks:trigger(remote_test, []), + receive + invoked -> ok + after 100 -> + io:format("Remote hook not invoked"), + throw(timeout) + end, + passed. + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). @@ -684,3 +734,11 @@ delete_log_handlers(Handlers) -> [[] = error_logger:delete_report_handler(Handler) || Handler <- Handlers], ok. + +handle_hook(HookName, Handler, Args) -> + A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", + put(list_to_atom(A), Args). +bad_handle_hook(_, _, _) -> + bad:bad(). +extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> + handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 9cf9f8aef9..e338ddfe9d 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, - send_command_and_notify/5]). +-export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([send_command/2, send_command/3, send_command_and_signal_back/3, + send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/3, internal_send_command/5]). -import(gen_tcp). @@ -49,8 +49,12 @@ -ifdef(use_specs). -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). +-spec(send_command_and_signal_back/4 :: + (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: @@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) -> channel = Channel, frame_max = FrameMax}]). +start_link(Sock, Channel, FrameMax) -> + spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}]). + mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) @@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax), State; +handle_message({send_command_and_signal_back, MethodRecord, Parent}, + State = #wstate{sock = Sock, channel = Channel}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord), + Parent ! rabbit_writer_send_command_signal, + State; +handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + Parent ! rabbit_writer_send_command_signal, + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, @@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_and_signal_back(W, MethodRecord, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Parent}, + ok. + +send_command_and_signal_back(W, MethodRecord, Content, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. |
