diff options
| author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2010-09-24 19:04:45 +0100 |
|---|---|---|
| committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2010-09-24 19:04:45 +0100 |
| commit | d9cf0e695bbb0cac7f3f09f8a2d0342c6be2d0e2 (patch) | |
| tree | d86aad16abe87fc3a2f5af8cdb75d4285bb6474b /src | |
| parent | bc328a8acfdf0e68ff2e1f3d23dcc7f956ad98ea (diff) | |
| parent | 146b3f12ad4c57da3285784a5725c1227ed2708b (diff) | |
| download | rabbitmq-server-git-d9cf0e695bbb0cac7f3f09f8a2d0342c6be2d0e2.tar.gz | |
merging in from default
Diffstat (limited to 'src')
29 files changed, 1194 insertions, 1086 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index aecfb09694..d2830a25ca 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -34,13 +34,15 @@ %% A File Handle Cache %% %% This extends a subset of the functionality of the Erlang file -%% module. +%% module. In the below, we use "file handle" to specifically refer to +%% file handles, and "file descriptor" to refer to descriptors which +%% are not file handles, e.g. sockets. %% %% Some constraints %% 1) This supports one writer, multiple readers per file. Nothing %% else. %% 2) Do not open the same file from different processes. Bad things -%% may happen. +%% may happen, especially for writes. %% 3) Writes are all appends. You cannot write to the middle of a %% file, although you can truncate and then append if you want. %% 4) Although there is a write buffer, there is no read buffer. Feel @@ -49,10 +51,10 @@ %% %% Some benefits %% 1) You do not have to remember to call sync before close -%% 2) Buffering is much more flexible than with plain file module, and -%% you can control when the buffer gets flushed out. This means that -%% you can rely on reads-after-writes working, without having to call -%% the expensive sync. +%% 2) Buffering is much more flexible than with the plain file module, +%% and you can control when the buffer gets flushed out. This means +%% that you can rely on reads-after-writes working, without having to +%% call the expensive sync. %% 3) Unnecessary calls to position and sync get optimised out. %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it @@ -60,14 +62,19 @@ %% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number -%% of open file handles in a "soft" way - the server will never -%% prevent a client from opening a handle, but may immediately tell it -%% to close the handle. Thus you can set the limit to zero and it will -%% still all work correctly, it is just that effectively no caching -%% will take place. The operation of limiting is as follows: +%% of open file descriptors. This is a hard limit: the server +%% component will ensure that clients do not have more file +%% descriptors open than it's configured to allow. %% -%% On open and close, the client sends messages to the server -%% informing it of opens and closes. This allows the server to keep +%% On open, the client requests permission from the server to open the +%% required number of file handles. The server may ask the client to +%% close other file handles that it has open, or it may queue the +%% request and ask other clients to close file handles they have open +%% in order to satisfy the request. Requests are always satisfied in +%% the order they arrive, even if a latter request (for a small number +%% of file handles) can be satisfied before an earlier request (for a +%% larger number of file handles). On close, the client sends a +%% message to the server. These messages allow the server to keep %% track of the number of open handles. The client also keeps a %% gb_tree which is updated on every use of a file handle, mapping the %% time at which the file handle was last used (timestamp) to the @@ -81,21 +88,38 @@ %% Note that this data can go very out of date, by the client using %% the least recently used handle. %% -%% When the limit is reached, the server calculates the average age of -%% the last reported least recently used file handle of all the -%% clients. It then tells all the clients to close any handles not -%% used for longer than this average, by invoking the callback the -%% client registered. The client should receive this message and pass -%% it into set_maximum_since_use/1. However, it is highly possible -%% this age will be greater than the ages of all the handles the -%% client knows of because the client has used its file handles in the -%% mean time. Thus at this point the client reports to the server the +%% When the limit is exceeded (i.e. the number of open file handles is +%% at the limit and there are pending 'open' requests), the server +%% calculates the average age of the last reported least recently used +%% file handle of all the clients. It then tells all the clients to +%% close any handles not used for longer than this average, by +%% invoking the callback the client registered. The client should +%% receive this message and pass it into +%% set_maximum_since_use/1. However, it is highly possible this age +%% will be greater than the ages of all the handles the client knows +%% of because the client has used its file handles in the mean +%% time. Thus at this point the client reports to the server the %% current timestamp at which its least recently used file handle was %% last used. The server will check two seconds later that either it %% is back under the limit, in which case all is well again, or if %% not, it will calculate a new average age. Its data will be much %% more recent now, and so it is very likely that when this is %% communicated to the clients, the clients will close file handles. +%% (In extreme cases, where it's very likely that all clients have +%% used their open handles since they last sent in an update, which +%% would mean that the average will never cause any file handles to +%% be closed, the server can send out an average age of 0, resulting +%% in all available clients closing all their file handles.) +%% +%% Care is taken to ensure that (a) processes which are blocked +%% waiting for file descriptors to become available are not sent +%% requests to close file handles; and (b) given it is known how many +%% file handles a process has open, when the average age is forced to +%% 0, close messages are only sent to enough processes to release the +%% correct number of file handles and the list of processes is +%% randomly shuffled. This ensures we don't cause processes to +%% needlessly close file handles, and ensures that we don't always +%% make such requests of the same processes. %% %% The advantage of this scheme is that there is only communication %% from the client to the server on open, close, and when in the @@ -103,11 +127,7 @@ %% communication from the client to the server on normal file handle %% operations. This scheme forms a feed-back loop - the server does %% not care which file handles are closed, just that some are, and it -%% checks this repeatedly when over the limit. Given the guarantees of -%% now(), even if there is just one file handle open, a limit of 1, -%% and one client, it is certain that when the client calculates the -%% age of the handle, it will be greater than when the server -%% calculated it, hence it should be closed. +%% checks this repeatedly when over the limit. %% %% Handles which are closed as a result of the server are put into a %% "soft-closed" state in which the handle is closed (data flushed out @@ -117,8 +137,19 @@ %% - reopening them when necessary is handled transparently. %% %% The server also supports obtain and transfer. obtain/0 blocks until -%% a file descriptor is available. transfer/1 is transfers ownership -%% of a file descriptor between processes. It is non-blocking. +%% a file descriptor is available, at which point the requesting +%% process is considered to 'own' one more descriptor. transfer/1 +%% transfers ownership of a file descriptor between processes. It is +%% non-blocking. Obtain is used to obtain permission to accept file +%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 +%% macro. File handles can use the entire limit, but will be evicted +%% by obtain calls up to the point at which no more obtain calls can +%% be satisfied by the obtains limit. Thus there will always be some +%% capacity available for file handles. Processes that use obtain are +%% never asked to return them, and they are not managed in any way by +%% the server. It is simply a mechanism to ensure that processes that +%% need file descriptors such as sockets can do so in such a way that +%% the overall number of open file descriptors is managed. %% %% The callers of register_callback/3, obtain/0, and the argument of %% transfer/1 are monitored, reducing the count of handles in use diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 9fb9e2fea7..b0379b95d1 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,10 +16,12 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 -%% allow callers to attach priorities to requests. Requests with -%% higher priorities are processed before requests with lower -%% priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/3, +%% prioritise_cast/2 and prioritise_info/2. These functions take +%% Message, From and State or just Message and State and return a +%% single integer representing the priority attached to the message. +%% Messages with higher priorities are processed before requests with +%% lower priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -64,16 +66,16 @@ %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved via the world wide web at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings %% AB. All Rights Reserved.'' -%% +%% %% $Id$ %% -module(gen_server2). @@ -82,13 +84,13 @@ %%% %%% The idea behind THIS server is that the user module %%% provides (different) functions to handle different -%%% kind of inputs. +%%% kind of inputs. %%% If the Parent process terminates the Module:terminate/2 %%% function is called. %%% %%% The user module should export: %%% -%%% init(Args) +%%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} @@ -101,21 +103,21 @@ %%% {reply, Reply, State, Timeout} %%% {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} +%%% {stop, Reason, Reply, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_cast(Msg, State) %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% terminate(Reason, State) Let the user module clean up @@ -159,37 +161,41 @@ %% API -export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, pcall/3, pcall/4, - cast/2, pcast/3, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). -export([behaviour_info/1]). %% System exports -export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). + system_terminate/4, + system_code_change/4, + format_status/2]). %% Internal exports -export([init_it/6, print_event/3]). -import(error_logger, [format/2]). +%% State record +-record(gs2_state, {parent, name, state, mod, time, + timeout_state, queue, debug, prioritise_call, + prioritise_cast, prioritise_info}). + %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings %%%========================================================================= -ifdef(use_specs). --spec(handle_common_termination/6 :: - (any(), any(), any(), atom(), any(), any()) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), #gs2_state{}) -> no_return()). --spec(hibernate/7 :: - (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). +-spec(hibernate/1 :: (#gs2_state{}) -> no_return()). -endif. @@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) -> %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- +%% ----------------------------------------------------------------- call(Name, Request) -> case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) end. call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -pcall(Name, Priority, Request) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) - end. - -pcall(Name, Priority, Request, Timeout) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. %% ----------------------------------------------------------------- @@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) -> cast({global,Name}, Request) -> catch global:send(Name, cast_msg(Request)), ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> do_cast(Dest, Request); cast(Dest, Request) when is_atom(Dest) -> do_cast(Dest, Request); cast(Dest, Request) when is_pid(Dest) -> do_cast(Dest, Request). -do_cast(Dest, Request) -> +do_cast(Dest, Request) -> do_send(Dest, cast_msg(Request)), ok. - -cast_msg(Request) -> {'$gen_cast',Request}. -pcast({global,Name}, Priority, Request) -> - catch global:send(Name, cast_msg(Priority, Request)), - ok; -pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_atom(Dest) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_pid(Dest) -> - do_cast(Dest, Priority, Request). - -do_cast(Dest, Priority, Request) -> - do_send(Dest, cast_msg(Priority, Request)), - ok. - -cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. +cast_msg(Request) -> {'$gen_cast',Request}. %% ----------------------------------------------------------------- %% Send a reply to the client. @@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. reply({To, Tag}, Reply) -> catch To ! {Tag, Reply}. -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n prey -%%----------------------------------------------------------------- +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n pray +%% ----------------------------------------------------------------- abcast(Name, Request) when is_atom(Name) -> do_abcast([node() | nodes()], Name, cast_msg(Request)). @@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast. %%% Make a call to servers at several nodes. %%% Returns: {[Replies],[BadNodes]} %%% A Timeout can be given -%%% +%%% %%% A middleman process is used in case late answers arrives after %%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will +%%% queue, it would probably become confused. Late answers will %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- multi_call(Name, Req) when is_atom(Name) -> do_multi_call([node() | nodes()], Name, Req, infinity). -multi_call(Nodes, Name, Req) +multi_call(Nodes, Name, Req) when is_list(Nodes), is_atom(Name) -> do_multi_call(Nodes, Name, Req, infinity). multi_call(Nodes, Name, Req, infinity) -> do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) +multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive %% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> @@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Debug = debug_options(Name, Options), Queue = priority_queue:new(), Backoff1 = extend_backoff(Backoff), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). + loop(find_prioritisers( + #gs2_state { parent = Parent, name = Name, state = State, + mod = Mod, time = Timeout, timeout_state = Backoff1, + queue = Queue, debug = Debug })). %%%======================================================================== %%% Gen-callback functions @@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), + GS2State = find_prioritisers( + #gs2_state { parent = Parent, + name = Name, + mod = Mod, + queue = Queue, + debug = Debug }), case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = infinity, + timeout_state = undefined }); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = undefined }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = Backoff1 }); + {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + unregister_name(Name0), + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) end. name({local,Name}) -> Name; @@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); -loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - drain(Queue), Debug). +loop(GS2State = #gs2_state { time = hibernate, + timeout_state = undefined }) -> + pre_hibernate(GS2State); +loop(GS2State) -> + process_next_msg(drain(GS2State)). -drain(Queue) -> +drain(GS2State) -> receive - Input -> drain(in(Input, Queue)) - after 0 -> Queue + Input -> drain(in(Input, GS2State)) + after 0 -> GS2State end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> +process_next_msg(GS2State = #gs2_state { time = Time, + timeout_state = TimeoutState, + queue = Queue }) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Msg); + process_msg(Msg, GS2State #gs2_state { queue = Queue1 }); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of @@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> Input -> %% Time could be 'hibernate' here, so *don't* call loop process_next_msg( - Parent, Name, State, Mod, Time, TimeoutState, - drain(in(Input, Queue1)), Debug) + drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) after Time1 -> case HibOnTimeout of true -> pre_hibernate( - Parent, Name, State, Mod, TimeoutState, Queue1, - Debug); + GS2State #gs2_state { queue = Queue1 }); false -> - process_msg( - Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, timeout) + process_msg(timeout, + GS2State #gs2_state { queue = Queue1 }) end end end. -wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> +wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> TimeoutState1 = case TS of undefined -> undefined; {SleptAt, TimeoutState} -> adjust_timeout_state(SleptAt, now(), TimeoutState) end, - post_hibernate(Parent, Name, State, Mod, TimeoutState1, - drain(Queue), Debug). + post_hibernate( + drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). -hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of undefined -> undefined; {backoff, _, _, _, _} -> {now(), TimeoutState} end, - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - TS, Queue, Debug]). + proc_lib:hibernate(?MODULE, wake_hib, + [GS2State #gs2_state { timeout_state = TS }]). -pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +pre_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, - Debug); + hibernate(GS2State #gs2_state { state = NState } ); Reply -> - handle_common_termination(Reply, Name, pre_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, pre_hibernate, GS2State) end; false -> - hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + hibernate(GS2State) end. -post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +post_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_post_hibernate, 1) of true -> case catch Mod:handle_post_hibernate(State) of {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = infinity }); {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = Time }); Reply -> - handle_common_termination(Reply, Name, post_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, post_hibernate, GS2State) end; false -> %% use hibernate here, not infinity. This matches @@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> %% still set to hibernate, iff that msg is the very msg %% that woke us up (or the first msg we receive after %% waking up). - process_next_msg(Parent, Name, State, Mod, hibernate, - TimeoutState, Queue, Debug) + process_next_msg(GS2State #gs2_state { time = hibernate }) end. adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, @@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_pcast', {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_cast', Msg}, Priority, Queue); -in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); -in(Input, Queue) -> - priority_queue:in(Input, Queue). - -process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Msg) -> +in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_cast', Msg}, + PC(Msg, GS2State), Queue) }; +in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_call', From, Msg}, + PC(Msg, From, GS2State), Queue) }; +in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + Input, PI(Input, GS2State), Queue) }. + +process_msg(Msg, + GS2State = #gs2_state { parent = Parent, + name = Name, + debug = Debug }) -> case Msg of - {system, From, Req} -> - sys:handle_system_msg( + {system, From, Req} -> + sys:handle_system_msg( Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + GS2State); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Name, Msg, Mod, State, Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); - _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, - Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, - Debug1) + {'EXIT', Parent, Reason} -> + terminate(Reason, Msg, GS2State); + _Msg when Debug =:= [] -> + handle_msg(Msg, GS2State); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. %%% --------------------------------------------------- @@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Tag = make_ref(), Caller = self(), Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), + spawn( + fun () -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), Mref = erlang:monitor(process, Receiver), Receiver ! {self(),Tag}, receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) end. send_nodes(Nodes, Name, Tag, Req) -> @@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors) send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> %% Skip non-atom Node send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> +send_nodes([], _Name, _Tag, _Req, Monitors) -> Monitors. %% Against old nodes: @@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) -> %% Against contemporary nodes: %% Wait for reply, server 'DOWN', or timeout from TimerId. -rec_nodes(Tag, Nodes, Name, TimerId) -> +rec_nodes(Tag, Nodes, Name, TimerId) -> rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end end; rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok end, {Replies, Badnodes}. %% Collect all replies that already have arrived rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> {Replies, Badnodes}. @@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end end. %% Cancels a monitor started with Ref=erlang:monitor(_, _). unmonitor(Ref) when is_reference(Ref) -> erlang:demonitor(Ref), receive - {'DOWN', Ref, _, _, _} -> - true + {'DOWN', Ref, _, _, _} -> + true after 0 -> - true + true end. %%% --------------------------------------------------- @@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) -> dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {reply, Reply, NState, Time1} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, [])), - reply(From, Reply), - exit(R); - Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) - end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue). - -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +common_reply(_Name, From, Reply, _NState, [] = _Debug) -> + reply(From, Reply), + []; +common_reply(Name, From, Reply, NState, Debug) -> + reply(Name, From, Reply, NState, Debug). + +common_debug([] = _Debug, _Func, _Info, _Event) -> + []; +common_debug(Debug, Func, Info, Event) -> + sys:handle_debug(Debug, Func, Info, Event). + +handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, + state = State, + name = Name, + debug = Debug }) -> case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {reply, Reply, NState, Time1} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), - reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug) + {reply, Reply, NState} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {reply, Reply, NState, Time1} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1}); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = infinity, + debug = Debug1}); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = Time1, + debug = Debug1}); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Msg, + GS2State #gs2_state { state = NState })), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, Msg, GS2State) end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug). + handle_common_reply(Reply, Msg, GS2State). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) -> +handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, + debug = Debug}) -> case Reply of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1 }); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, []) + handle_common_termination(Reply, Msg, GS2State) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, - Debug) -> +handle_common_termination(Reply, Msg, GS2State) -> case Reply of - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Msg, GS2State #gs2_state { state = NState }); + {'EXIT', What} -> + terminate(What, Msg, GS2State); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, Debug) - end. - -handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> - case Reply of - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, Debug); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, Debug); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + terminate({bad_return_value, Reply}, Msg, GS2State) end. reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), - sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {out, Reply, To, State} ). + sys:handle_debug( + Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). +system_continue(Parent, Debug, GS2State) -> + loop(GS2State #gs2_state { parent = Parent, debug = Debug }). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). -endif. -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, - _TimeoutState, _Queue]) -> - terminate(Reason, Name, [], Mod, State, Debug). +system_terminate(Reason, _Parent, Debug, GS2State) -> + terminate(Reason, [], GS2State #gs2_state { debug = Debug }). -system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, - OldVsn, Extra) -> +system_code_change(GS2State = #gs2_state { mod = Mod, + state = State }, + _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; - Else -> + {ok, NewState} -> + NewGS2State = find_prioritisers( + GS2State #gs2_state { state = NewState }), + {ok, [NewGS2State]}; + Else -> Else end. @@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, %%----------------------------------------------------------------- print_event(Dev, {in, Msg}, Name) -> case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) end; print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); print_event(Dev, {noreply, State}, Name) -> io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); print_event(Dev, Event, Name) -> @@ -973,23 +950,26 @@ print_event(Dev, Event, Name) -> %%% Terminate the server. %%% --------------------------------------------------- -terminate(Reason, Name, Msg, Mod, State, Debug) -> +terminate(Reason, Msg, #gs2_state { name = Name, + mod = Mod, + state = State, + debug = Debug }) -> case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); - _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), - exit(Reason) - end + {'EXIT', R} -> + error_info(R, Reason, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); + _ -> + error_info(Reason, undefined, Name, Msg, State, Debug), + exit(Reason) + end end. error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> @@ -1038,74 +1018,109 @@ opt(_, []) -> debug_options(Name, Opts) -> case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) end. dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, dbg_opts(Name, Opts); dbg_options(Name, Opts) -> dbg_opts(Name, Opts). dbg_opts(Name, Opts) -> case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg end. get_proc_name(Pid) when is_pid(Pid) -> Pid; get_proc_name({local, Name}) -> case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; get_proc_name({global, Name}) -> case global:safe_whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) end. get_parent() -> case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> + [Parent | _] when is_pid(Parent)-> Parent; [Parent | _] when is_atom(Parent)-> name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) + _ -> + exit(process_was_not_started_by_proc_lib) end. name_to_pid(Name) -> case whereis(Name) of - undefined -> - case global:safe_whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> + PrioriCall = function_exported_or_default( + Mod, 'prioritise_call', 3, + fun (_Msg, _From, _State) -> 0 end), + PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + fun (_Msg, _State) -> 0 end), + PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + fun (_Msg, _State) -> 0 end), + GS2State #gs2_state { prioritise_call = PrioriCall, + prioritise_cast = PrioriCast, + prioritise_info = PrioriInfo }. + +function_exported_or_default(Mod, Fun, Arity, Default) -> + case erlang:function_exported(Mod, Fun, Arity) of + true -> case Arity of + 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end; + 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, From, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end + end; + false -> Default end. %%----------------------------------------------------------------- @@ -1115,25 +1130,23 @@ format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, + pid_to_list(Name); + is_atom(Name) -> + Name + end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> - case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> - [{data, [{"State", State}]}] - end, + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> [{data, [{"State", State}]}] + end, [{header, Header}, {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, + {"Parent", Parent}, + {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index c257497070..8c36a9f0a4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -489,11 +489,16 @@ maybe_insert_default_data() -> insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), + {ok, DefaultAdmin} = application:get_env(default_user_is_admin), {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + case DefaultAdmin of + true -> rabbit_access_control:set_admin(DefaultUser); + _ -> ok + end, ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, DefaultConfigurePerm, DefaultWritePerm, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 9cfe1ca8df..73fd6f0e51 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -35,11 +35,12 @@ -export([check_login/2, user_pass_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, list_users/0, - lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([add_user/2, delete_user/1, change_password/2, set_admin/1, + clear_admin/1, list_users/0, lookup_user/1]). +-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, set_permissions/6, clear_permissions/2, - list_vhost_permissions/1, list_user_permissions/1]). + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). %%---------------------------------------------------------------------------- @@ -52,6 +53,7 @@ -type(password() :: binary()). -type(regexp() :: binary()). -type(scope() :: binary()). +-type(scope_atom() :: 'client' | 'all'). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | @@ -68,26 +70,33 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(set_admin/1 :: (username()) -> 'ok'). +-spec(clear_admin/1 :: (username()) -> 'ok'). -spec(list_users/0 :: () -> [username()]). -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). --spec(add_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). +-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). +-spec(list_permissions/0 :: + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) - -> [{username(), regexp(), regexp(), regexp()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_user_permissions/1 :: - (username()) - -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). +-spec(list_user_vhost_permissions/2 :: + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), + scope_atom()}]). -endif. @@ -142,7 +151,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} @@ -160,7 +169,6 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -175,7 +183,7 @@ check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> false; @@ -184,11 +192,12 @@ check_resource_access(Username, {<<"amq.gen",_/binary>>, #permission{scope = client}} -> true; _ -> - PermRegexp = case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; RE -> RE - end, + end, case re:run(Name, PermRegexp, [{capture, none}]) of match -> true; nomatch -> false @@ -208,7 +217,8 @@ add_user(Username, Password) -> [] -> ok = mnesia:write(rabbit_user, #user{username = Username, - password = Password}, + password = Password, + is_admin = false}, write); _ -> mnesia:abort({user_already_exists, Username}) @@ -238,20 +248,39 @@ delete_user(Username) -> R. change_password(Username, Password) -> - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password = Password}, - write) - end)), + R = update_user(Username, fun(User) -> + User#user{password = Password} + end), rabbit_log:info("Changed password for user ~p~n", [Username]), R. +set_admin(Username) -> + set_admin(Username, true). + +clear_admin(Username) -> + set_admin(Username, false). + +set_admin(Username, IsAdmin) -> + R = update_user(Username, fun(User) -> + User#user{is_admin = IsAdmin} + end), + rabbit_log:info("Set user admin flag for user ~p to ~p~n", + [Username, IsAdmin]), + R. + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + list_users() -> - mnesia:dirty_all_keys(rabbit_user). + [{Username, IsAdmin} || + #user{username = Username, is_admin = IsAdmin} <- + mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})]. lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). @@ -301,7 +330,7 @@ delete_vhost(VHostPath) -> R. internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name=Name}) -> + lists:foreach(fun (#exchange{name = Name}) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), @@ -312,6 +341,9 @@ internal_delete_vhost(VHostPath) -> ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. +vhost_exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + list_vhosts() -> mnesia:dirty_all_keys(rabbit_vhost). @@ -339,13 +371,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer fun () -> ok = mnesia:write( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, + scope = Scope, configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -356,10 +388,15 @@ clear_permissions(Username, VHostPath) -> Username, VHostPath, fun () -> ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) end)). +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(match_user_vhost('_', '_'))]. + list_vhost_permissions(VHostPath) -> [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- @@ -372,15 +409,21 @@ list_user_permissions(Username) -> list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + list_permissions(QueryThunk) -> [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - #user_permission{user_vhost = #user_vhost{username = Username, + #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ - scope = Scope, - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- + permission = #permission{ scope = Scope, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. @@ -388,7 +431,7 @@ match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = '_'}, read) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a9237805d..3e677c3809 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -249,11 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(Exchange, QueueName, RoutingKey, [], - fun (_X, _Q) -> ok end), - ok. + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -331,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, info, infinity). + delegate_call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -344,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, consumers, infinity). + delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -356,7 +357,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> - delegate_pcast(QPid, 7, emit_stats). + delegate_cast(QPid, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -379,10 +380,10 @@ requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> - delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -418,10 +419,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate_pcast(QPid, 7, {unblock, ChPid}). + delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -451,20 +452,19 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, - infinity). + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> - gen_server2:pcast(QPid, 8, update_ram_duration). + gen_server2:cast(QPid, update_ram_duration). set_ram_duration_target(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> - gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(QPid, {set_maximum_since_use, Age}). maybe_expire(QPid) -> - gen_server2:pcast(QPid, 8, maybe_expire). + gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> [Hook() || @@ -504,11 +504,6 @@ safe_delegate_call_ok(F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). -delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, - fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, - fun (P) -> gen_server2:pcast(P, Pri, Msg) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0849586294..d15a6eb3a9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,8 @@ -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -import(queue). -import(erlang). @@ -152,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> + backing_queue = BQ, backing_queue_state = undefined, + stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -163,9 +165,12 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + State1 = init_expires(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State)), - noreply(init_expires(State#q{backing_queue_state = BQS})); + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> emit_stats(State1) end), + noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -201,7 +206,7 @@ next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS)of + case BQ:needs_idle_timeout(BQS) of true -> {ensure_sync_timer(State2), 0}; false -> {stop_sync_timer(State2), hibernate} end. @@ -265,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer, q = Q}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(State) end, fun() -> rabbit_amqqueue:emit_stats(Q) end)}. -stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> - State#q{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> emit_stats(State) end)}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -589,6 +588,29 @@ emit_stats(State) -> %--------------------------------------------------------------------------- +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + _ -> 0 + end. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -886,9 +908,12 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> + %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - noreply(State). + State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + assert_invariant(State1), + {noreply, State1}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -919,11 +944,14 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + stats_timer = StatsTimer}) -> BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_stats_timer( - stop_rate_timer(State#q{backing_queue_state = BQS2}))}. + rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end), + State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + backing_queue_state = BQS2}, + {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 3569ba93b7..19150fa9f9 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -32,8 +32,9 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, add/5, remove/5, list/1]). --export([list_for_exchange/1, list_for_queue/1]). +-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, remove_for_queue/1, remove_transient_for_queue/1]). @@ -46,37 +47,37 @@ -type(key() :: binary()). --type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). -spec(recover/0 :: () -> [rabbit_types:binding()]). --spec(add/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res()). --spec(remove/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list/1 :: (rabbit_types:vhost()) -> - [{rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_exchange/1 :: - (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_queue/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: - (rabbit_exchange:name()) -> [rabbit_types:binding()]). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_queue/1 :: (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(remove_transient_for_queue/1 :: @@ -86,6 +87,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + recover() -> rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> @@ -95,9 +98,18 @@ recover() -> [B | Acc] end, [], rabbit_durable_route). -add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +exists(Binding) -> + binding_action( + Binding, + fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + +add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). + +remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). + +add(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to @@ -105,58 +117,47 @@ add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case InnerFun(X, Q) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} end; {error, _} = E -> E end end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); + {new, X = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(X, B), + rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; {error, _} = Err -> Err end. -remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +remove(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - Del = rabbit_exchange:maybe_auto_delete(X), - {{Del, X}, B}; - {error, _} = E -> - E - end + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> @@ -164,50 +165,71 @@ remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok end. list(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. - -list_for_exchange(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. - -% Refactoring is left as an exercise for the reader + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_exchange(XName) -> + Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + list_for_queue(QueueName) -> Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. - -has_for_exchange(ExchangeName) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(XName, QueueName) -> + Route = #route{binding = #binding{exchange_name = XName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). + +has_for_exchange(XName) -> + Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(ExchangeName) -> +remove_for_exchange(XName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -215,7 +237,7 @@ remove_for_exchange(ExchangeName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, + #route{binding = #binding{exchange_name = XName, _ = '_'}}, write)]. @@ -227,15 +249,14 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> +binding_action(Binding = #binding{exchange_name = XName, + queue_name = QueueName, + args = Arguments}, Fun) -> call_with_exchange_and_queue( - ExchangeName, QueueName, + XName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -249,10 +270,10 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(Exchange, Queue, Fun) -> +call_with_exchange_and_queue(XName, QueueName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of + fun () -> case {mnesia:read({rabbit_exchange, XName}), + mnesia:read({rabbit_queue, QueueName})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; {[_], [ ]} -> {error, queue_not_found}; @@ -306,16 +327,15 @@ remove_for_queue(QueueName, FwdDeleteFun) -> group_bindings_and_auto_delete([], Acc) -> Acc; group_bindings_and_auto_delete( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc). + [B = #binding{exchange_name = XName} | Bs], Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B], Acc). group_bindings_and_auto_delete( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching ExchangeName - [X] = mnesia:read({rabbit_exchange, ExchangeName}), + XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching XName + [X] = mnesia:read({rabbit_exchange, XName}), NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], group_bindings_and_auto_delete(Removed, NewAcc). @@ -338,20 +358,20 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, + #binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}; -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, + #reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 60b807faa5..bde11f00e0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,7 +41,8 @@ -export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, @@ -131,10 +132,10 @@ list() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:pcall(Pid, 9, info, infinity). + gen_server2:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + case gen_server2:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -146,7 +147,7 @@ info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). emit_stats(Pid) -> - gen_server2:pcast(Pid, 7, emit_stats). + gen_server2:cast(Pid, emit_stats). flush(Pid) -> gen_server2:call(Pid, flush). @@ -157,6 +158,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), + StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -174,11 +176,26 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = rabbit_event:init_stats_timer()}, + stats_timer = StatsTimer}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -237,17 +254,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), - {noreply, State}. + {noreply, + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. -handle_pre_hibernate(State) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - {hibernate, stop_stats_timer(State)}. + rabbit_event:if_enabled(StatsTimer, fun () -> + internal_emit_stats(State) + end), + {hibernate, + State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -277,14 +299,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> internal_emit_stats(State) end, fun() -> emit_stats(ChPid) end)}. -stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> internal_emit_stats(State) end)}. - return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -805,7 +821,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/5, + binding_action(fun rabbit_binding:add/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -813,7 +829,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/5, + binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -893,7 +909,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index d193880555..21c39780a5 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -53,9 +53,7 @@ start_link() -> supervisor2:start_link(?MODULE, []). start_channel(Pid, Args) -> - {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), - link(ChSupPid), - Result. + supervisor2:start_child(Pid, [Args]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cca2e3d199..a3b6f369e3 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -209,6 +209,14 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); +action(set_admin, Node, [Username], _Opts, Inform) -> + Inform("Setting administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, set_admin, [Username]}); + +action(clear_admin, Node, [Username], _Opts, Inform) -> + Inform("Clearing administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_admin, [Username]}); + action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); @@ -246,14 +254,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_binding, list, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -304,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, Results), ok; display_info_list(Other, _) -> Other. @@ -315,25 +325,30 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Key, Items) -> - case proplists:get_value(Key, Items) of - #resource{name = Name} -> - escape(Name); - Value when Key =:= address; Key =:= peer_address andalso - is_tuple(Value) -> - inet_parse:ntoa(Value); - Value when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); - Value when is_binary(Value) -> - escape(Value); - Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] - when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); - Value -> - io_lib:format("~w", [Value]) - end. +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + inet_parse:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + inet_parse:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item(Value) -> + io_lib:format("~w", [Value]). display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 51bd6b1f93..a9806305ef 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -61,26 +61,27 @@ add_to_plt(PltPath, FilesString) -> {init_plt, PltPath}, {output_plt, PltPath}, {files, Files}]), - print_warnings(DialyzerWarnings), + print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1), ok. dialyze_files(PltPath, ModifiedFiles) -> Files = string:tokens(ModifiedFiles, " "), DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, - {files, Files}]), + {files, Files}, + {warnings, [behaviours, + race_conditions]}]), case DialyzerWarnings of - [] -> io:format("~nOk~n"), - ok; - _ -> io:format("~nFAILED with the following warnings:~n"), - print_warnings(DialyzerWarnings), - fail - end. - -print_warnings(Warnings) -> - [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings], - io:format("~n"), + [] -> io:format("~nOk~n"); + _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n", + [length(DialyzerWarnings)]), + print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1) + end, ok. +print_warnings(Warnings, FormatFun) -> + [io:format("~s~n", [FormatFun(W)]) || W <- Warnings], + io:format("~n"). + otp_apps_dependencies_paths() -> [code:lib_dir(App, ebin) || App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 0f00537a6a..2b23653112 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -34,9 +34,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). --export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). --export([stats_level/1]). +-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([reset_stats_timer/1]). +-export([stats_level/1, if_enabled/2]). -export([notify/2]). %%---------------------------------------------------------------------------- @@ -71,11 +71,11 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). --spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). --spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). --spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(stop_stats_timer/1 :: (state()) -> state()). +-spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). +-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -endif. @@ -85,44 +85,61 @@ start_link() -> gen_event:start_link({local, ?MODULE}). +%% The idea is, for each stat-emitting object: +%% +%% On startup: +%% Timer = init_stats_timer() +%% notify(created event) +%% if_enabled(internal_emit_stats) - so we immediately send something +%% +%% On wakeup: +%% ensure_stats_timer(Timer, emit_stats) +%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) +%% +%% emit_stats: +%% if_enabled(internal_emit_stats) +%% reset_stats_timer(Timer) - just bookkeeping +%% +%% Pre-hibernation: +%% if_enabled(internal_emit_stats) +%% stop_stats_timer(Timer) +%% +%% internal_emit_stats: +%% notify(stats) + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> +ensure_stats_timer(State = #state{level = none}, _Fun) -> State; -ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> - NowFun(), - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), +ensure_stats_timer(State = #state{timer = undefined}, Fun) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + erlang, apply, [Fun, []]), State#state{timer = TRef}; -ensure_stats_timer(State, _NowFun, _TimerFun) -> +ensure_stats_timer(State, _Fun) -> State. -stop_stats_timer(State = #state{level = none}, _NowFun) -> +stop_stats_timer(State = #state{level = none}) -> State; -stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> +stop_stats_timer(State = #state{timer = undefined}) -> State; -stop_stats_timer(State = #state{timer = TRef}, NowFun) -> +stop_stats_timer(State = #state{timer = TRef}) -> {ok, cancel} = timer:cancel(TRef), - NowFun(), State#state{timer = undefined}. -ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> - State; -ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> - {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), - State#state{timer = TRef}; -ensure_stats_timer_after(State, _TimerFun) -> - State. - -reset_stats_timer_after(State) -> +reset_stats_timer(State) -> State#state{timer = undefined}. stats_level(#state{level = Level}) -> Level. +if_enabled(#state{level = none}, _Fun) -> + ok; +if_enabled(_State, Fun) -> + Fun(), + ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 40bee25f8b..2a19d5b1c8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -92,15 +92,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). recover() -> - Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] - end, [], rabbit_durable_exchange), + Xs = rabbit_misc:table_fold( + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] + end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), - lists:keysort(#exchange.name, Exs), []). + lists:keysort(#exchange.name, Xs), []). recover_with_bindings([B = #binding{exchange_name = Name} | Rest], Xs = [#exchange{name = Name} | _], @@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end @@ -257,20 +257,20 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -call_with_exchange(Exchange, Fun) -> +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -280,21 +280,21 @@ delete(ExchangeName, IfUnused) -> maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> - case conditional_delete(Exchange) of - {error, in_use} -> not_deleted; - {deleted, Exchange, []} -> auto_deleted +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case rabbit_binding:has_for_exchange(ExchangeName) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = rabbit_binding:remove_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 02e829ecce..0a59a175cd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -80,7 +80,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort %% (rabbit_misc:sort_field_table) that publish/1 and -%% rabbit_binding:{add,remove}/5 do. +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index da7078f1ba..c323d7cef0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). + handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -107,7 +107,7 @@ get_limit(undefined) -> get_limit(Pid) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + fun () -> gen_server2:call(Pid, get_limit, infinity) end). block(undefined) -> ok; @@ -126,6 +126,9 @@ unblock(LimiterPid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +prioritise_call(get_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. + handle_call({can_send, _QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, State}; diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl deleted file mode 100644 index e0457b1e43..0000000000 --- a/src/rabbit_load.erl +++ /dev/null @@ -1,78 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_load). - --export([local_load/0, remote_loads/0, pick/0]). - --define(FUDGE_FACTOR, 0.98). --define(TIMEOUT, 100). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}). --spec(local_load/0 :: () -> load()). --spec(remote_loads/0 :: () -> [load()]). --spec(pick/0 :: () -> node()). - --endif. - -%%---------------------------------------------------------------------------- - -local_load() -> - LoadAvg = case whereis(cpu_sup) of - undefined -> unknown; - _ -> case cpu_sup:avg1() of - L when is_integer(L) -> L; - {error, timeout} -> unknown - end - end, - {{statistics(run_queue), LoadAvg}, node()}. - -remote_loads() -> - {ResL, _BadNodes} = - rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT), - ResL. - -pick() -> - RemoteLoads = remote_loads(), - {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node; we rely on Erlang's term order - %% of SomeFloat < local_unknown < unknown. - AdjustedLoadAvg = case LoadAvg of - unknown -> local_unknown; - _ -> LoadAvg * ?FUDGE_FACTOR - end, - Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], - {_, SelectedNode} = lists:min(Loads), - SelectedNode. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a9c7db76f0..bbecbfe211 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -40,7 +40,7 @@ -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -98,8 +98,7 @@ }). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked, readers}). + {file, valid_total_size, left, right, file_size, locked, readers}). %%---------------------------------------------------------------------------- @@ -159,8 +158,7 @@ %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: -%% {File, ValidTotalSize, ContiguousTop, Left, Right, -%% FileSize, Locked, Readers} +%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} %% %% The basic idea is that messages are appended to the current file up %% until that file becomes too big (> file_size_limit). At that point, @@ -176,9 +174,7 @@ %% %% As messages are removed from files, holes appear in these %% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. +%% data left in the file. This is needed for garbage collection. %% %% When we discover that a file is now empty, we delete it. When we %% discover that it can be combined with the useful data in either its @@ -224,9 +220,7 @@ %% above B (i.e. truncate to the limit of the good contiguous region %% at the start of the file), then write C and D on top and then write %% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). +%% good data at the bottom of files are not rewritten. %% %% +-------+ +-------+ +-------+ %% | X | | G | | G | @@ -328,8 +322,8 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:pcall( - Server, 2, {read, Guid}, infinity), + Defer = fun() -> {gen_server2:call( + Server, {read, Guid}, infinity), CState} end, case index_lookup(Guid, CState) of not_found -> Defer(); @@ -351,18 +345,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal +sync(Server) -> gen_server2:cast(Server, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). set_maximum_since_use(Server, Age) -> - gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Server, {set_maximum_since_use, Age}). client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -382,7 +376,7 @@ client_delete_and_terminate(CState, Server, Ref) -> ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> - gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). + gen_server2:call(Server, successfully_recovered_state, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -581,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + {new_client_state, _Ref} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + sync -> 8; + {gc_done, _Reclaimed, _Source, _Destination} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 + end. + handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), noreply(State1); @@ -628,20 +638,14 @@ handle_cast({write, Guid}, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, right = undefined, locked = false, file_size = FileSize }] = ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case CurOffset =:= ContiguousTop of - true -> ValidTotalSize1; - false -> ContiguousTop - end, true = ets:update_element( FileSummaryEts, CurFile, [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, noreply( @@ -902,8 +906,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = - index_lookup(Guid, State), + total_size = TotalSize } = index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -911,7 +914,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of @@ -919,12 +921,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, add_to_pending_gc_completion({remove, Guid}, State); false -> ok = index_delete(Guid, State), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}]), + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; @@ -1271,16 +1272,17 @@ scan_file_for_valid_messages(Dir, FileName) -> %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). +drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). -find_contiguous_block_prefix([], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}; -find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, Guids) -> +drop_contiguous_block_prefix([], ExpectedOffset) -> + {ExpectedOffset, []}; +drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, + total_size = TotalSize } | Tail], + ExpectedOffset) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}. + drop_contiguous_block_prefix(Tail, ExpectedOffset1); +drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> + {ExpectedOffset, MsgsAfterGap}. build_index(true, _StartupFunState, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1356,9 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, {VMAcc, VTSAcc} end end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), {Right, FileSize1} = case Files of %% if it's the last file, we'll truncate to remove any @@ -1375,7 +1374,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, left = Left, right = Right, file_size = FileSize1, @@ -1403,7 +1401,6 @@ maybe_roll_to_new_file( true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, - contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, @@ -1530,7 +1527,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> true = ets:update_element( FileSummaryEts, DstFile, [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; false -> concurrent_readers @@ -1541,7 +1537,6 @@ combine_files(#file_summary { file = Source, left = Destination }, #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), @@ -1557,41 +1552,32 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - case DestinationContiguousTop =:= DestinationValid of - true -> - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); - false -> - {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset =/= DestinationContiguousTop -> - %% it cannot be that Offset =:= - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - end, DestinationWorkList), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl) + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + {DestinationContiguousTop, DestinationWorkListTail} = + drop_contiguous_block_prefix(DestinationWorkList), + case DestinationWorkListTail of + [] -> ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), + ok = copy_messages( + DestinationWorkListTail, DestinationContiguousTop, + DestinationValid, DestinationHdl, TmpHdl, Destination, + State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index c7948b7eb3..a7855bbf79 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -record(gcstate, {dir, @@ -81,7 +81,7 @@ stop(Server) -> gen_server2:call(Server, stop, infinity). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index c7a5a60027..5cfd6a5ca1 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -310,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) -> is_dead(Pid) -> PidS = integer_to_list(Pid), with_os([{unix, fun () -> - Res = os:cmd("ps --no-headers --pid " ++ PidS), - Res == "" + system("kill -0 " ++ PidS + ++ " >/dev/null 2>&1") /= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -322,6 +322,16 @@ is_dead(Pid) -> end end}]). +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + call_all_nodes(Func) -> case read_pids_file() of [] -> throw(no_nodes_running); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 6baa4b8864..2286896b7b 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -46,7 +46,7 @@ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(error() :: rabbit_types:error(any())). --type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()). +-type(socket() :: port() | #ssl_socket{}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). @@ -72,72 +72,58 @@ %%--------------------------------------------------------------------------- +-define(IS_SSL(Sock), is_record(Sock, ssl_socket)). -async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> +async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), spawn(fun () -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} - end), + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), {ok, Ref}; - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); - async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when is_record(Sock, ssl_socket) -> +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); - close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); - controlling_process(Sock, Pid) when is_port(Sock) -> gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> +getstat(Sock, Stats) when ?IS_SSL(Sock) -> inet:getstat(Sock#ssl_socket.tcp, Stats); - getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). - -peername(Sock) when is_record(Sock, ssl_socket) -> +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); - peername(Sock) when is_port(Sock) -> inet:peername(Sock). - -port_command(Sock, Data) when is_record(Sock, ssl_socket) -> +port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of - ok -> - self() ! {inet_reply, Sock, ok}, - true; - {error, Reason} -> - erlang:error(Reason) + ok -> self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> erlang:error(Reason) end; - port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when is_record(Sock, ssl_socket) -> +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); - send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). -sockname(Sock) when is_record(Sock, ssl_socket) -> +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); - sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 08272afed4..6dbd54d2bc 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -46,6 +46,8 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("ssl/src/ssl_record.hrl"). + -define(RABBIT_TCP_OPTS, [ binary, @@ -108,6 +110,8 @@ boot_ssl() -> {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(ssl_options), + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required SslOpts = case proplists:get_value(verify, SslOptsConfig, verify_none) of verify_none -> SslOptsConfig; @@ -116,7 +120,26 @@ boot_ssl() -> end} | SslOptsConfig] end, - [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + % In R13B04 and R14A (at least), rc4 is incorrectly implemented. + CipherSuites = proplists:get_value(ciphers, + SslOpts, + ssl:cipher_suites()), + FilteredCipherSuites = + [C || C <- CipherSuites, + begin + SuiteCode = + if is_tuple(C) -> ssl_cipher:suite(C); + is_list(C) -> ssl_cipher:openssl_suite(C) + end, + SP = ssl_cipher:security_parameters( + SuiteCode, + #security_parameters{}), + SP#security_parameters.bulk_cipher_algorithm =/= ?RC4 + end], + SslOpts1 = [{ciphers, FilteredCipherSuites} + | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]], + [start_ssl_listener(Host, Port, SslOpts1) + || {Host, Port} <- SslListeners], ok end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a21961b5e0..745e008349 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -339,7 +339,7 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw(E); {channel_exit, ChannelOrFrPid, Reason} -> mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; @@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> end), mainloop(Deb, State); {'$gen_cast', emit_stats} -> - internal_emit_stats(State), - mainloop(Deb, State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + State1 = internal_emit_stats(State), + mainloop(Deb, State1); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -489,7 +487,7 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> case channel_cleanup(ChSupPid) of undefined -> exit({abnormal_dependent_exit, ChSupPid, Reason}); @@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). -ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> +ensure_stats_timer(State = #v1{stats_timer = StatsTimer, + connection_state = running}) -> Self = self(), - State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + State#v1{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(Self) end)}. + fun() -> emit_stats(Self) end)}; +ensure_stats_timer(State) -> + State. %%-------------------------------------------------------------------------- @@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - sock = Sock}) -> + sock = Sock, + stats_timer = StatsTimer}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -775,6 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = NewConnection}), rabbit_event:notify(connection_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -867,6 +871,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), + erlang:monitor(process, ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), put({ch_fr_pid, ChFrPid}, {channel, Channel}), @@ -937,5 +942,6 @@ amqp_exception_explanation(Text, Expl) -> true -> CompleteTextBin end. -internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). +internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), + State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d339642433..931dc6341a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -93,10 +93,10 @@ deliver_by_queue_names(Qs, Delivery) -> %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, + exchange_name = XName, queue_name = QName}} <- mnesia:table(rabbit_route), - ExchangeName == Name, + XName == Name, Match(Binding)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 495976a62c..fb0faebbf3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1042,6 +1042,8 @@ test_user_management() -> {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), ok = control_action(change_password, ["foo", "baz"]), + ok = control_action(set_admin, ["foo"]), + ok = control_action(clear_admin, ["foo"]), ok = control_action(list_users, []), %% vhost creation @@ -1107,7 +1109,15 @@ test_server_status() -> ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings - ok = control_action(list_bindings, []), + ok = info_action(list_bindings, rabbit_binding:info_keys(), true), + %% misc binding listing APIs + [_|_] = rabbit_binding:list_for_exchange( + rabbit_misc:r(<<"/">>, exchange, <<"">>)), + [_] = rabbit_binding:list_for_queue( + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + [_] = rabbit_binding:list_for_exchange_and_queue( + rabbit_misc:r(<<"/">>, exchange, <<"">>), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections [#listener{host = H, port = P} | _] = diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl deleted file mode 100644 index 484249b1df..0000000000 --- a/src/rabbit_tracer.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_tracer). --export([start/0]). - --import(erlang). - -start() -> - spawn(fun mainloop/0), - ok. - -mainloop() -> - erlang:trace(new, true, [all]), - mainloop1(). - -mainloop1() -> - receive - Msg -> - rabbit_log:info("TRACE: ~p~n", [Msg]) - end, - mainloop1(). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 9dfd33bd87..0b6a15ec83 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -38,7 +38,7 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, + amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). @@ -107,8 +107,6 @@ kind :: Kind, name :: Name}). --type(ssl_socket() :: #ssl_socket{}). - -type(listener() :: #listener{node :: node(), protocol :: atom(), @@ -142,7 +140,8 @@ -type(user() :: #user{username :: rabbit_access_control:username(), - password :: rabbit_access_control:password()}). + password :: rabbit_access_control:password(), + is_admin :: boolean()}). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 4a1c5832b3..93adfcb1b5 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -34,7 +34,9 @@ %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. %% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% @@ -545,6 +547,9 @@ do_restart(permanent, Reason, Child, State) -> restart(Child, State); do_restart(intrinsic, normal, Child, State) -> {shutdown, state_del_child(Child, State)}; +do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor}, + State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index e658f005a3..067fa9f2d8 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -296,6 +296,12 @@ get_total_memory({unix, sunos}) -> Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)), dict:fetch('Memory size', Dict); +get_total_memory({unix, aix}) -> + File = cmd("/usr/bin/vmstat -v"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)), + dict:fetch('memory pages', Dict) * 4096; + get_total_memory(_OsType) -> unknown. @@ -341,6 +347,17 @@ parse_line_sunos(Line) -> [Name] -> {list_to_atom(Name), none} end. +%% Lines look like " 12345 memory pages" +%% or " 80.1 maxpin percentage" +parse_line_aix(Line) -> + [Value | NameWords] = string:tokens(Line, " "), + Name = string:join(NameWords, " "), + {list_to_atom(Name), + case lists:member($., Value) of + true -> trunc(list_to_float(Value)); + false -> list_to_integer(Value) + end}. + freebsd_sysctl(Def) -> list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n"). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 42049d5068..f461a53946 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -71,7 +71,7 @@ submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). run({M, F, A}) -> apply(M, F, A); @@ -88,6 +88,9 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), |
