diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 862 | ||||
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 588 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 264 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 293 | ||||
| -rw-r--r-- | src/supervisor2.erl | 917 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 3 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 12 |
11 files changed, 2826 insertions, 298 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl new file mode 100644 index 0000000000..0f648dcd2b --- /dev/null +++ b/src/file_handle_cache.erl @@ -0,0 +1,862 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(file_handle_cache). + +%% A File Handle Cache +%% +%% This extends a subset of the functionality of the Erlang file +%% module. +%% +%% Some constraints +%% 1) This supports one writer, multiple readers per file. Nothing +%% else. +%% 2) Do not open the same file from different processes. Bad things +%% may happen. +%% 3) Writes are all appends. You cannot write to the middle of a +%% file, although you can truncate and then append if you want. +%% 4) Although there is a write buffer, there is no read buffer. Feel +%% free to use the read_ahead mode, but beware of the interaction +%% between that buffer and the write buffer. +%% +%% Some benefits +%% 1) You do not have to remember to call sync before close +%% 2) Buffering is much more flexible than with plain file module, and +%% you can control when the buffer gets flushed out. This means that +%% you can rely on reads-after-writes working, without having to call +%% the expensive sync. +%% 3) Unnecessary calls to position and sync get optimised out. +%% 4) You can find out what your 'real' offset is, and what your +%% 'virtual' offset is (i.e. where the hdl really is, and where it +%% would be after the write buffer is written out). +%% 5) You can find out what the offset was when you last sync'd. +%% +%% There is also a server component which serves to limit the number +%% of open file handles in a "soft" way - the server will never +%% prevent a client from opening a handle, but may immediately tell it +%% to close the handle. Thus you can set the limit to zero and it will +%% still all work correctly, it is just that effectively no caching +%% will take place. The operation of limiting is as follows: +%% +%% On open and close, the client sends messages to the server +%% informing it of opens and closes. This allows the server to keep +%% track of the number of open handles. The client also keeps a +%% gb_tree which is updated on every use of a file handle, mapping the +%% time at which the file handle was last used (timestamp) to the +%% handle. Thus the smallest key in this tree maps to the file handle +%% that has not been used for the longest amount of time. This +%% smallest key is included in the messages to the server. As such, +%% the server keeps track of when the least recently used file handle +%% was used *at the point of the most recent open or close* by each +%% client. +%% +%% Note that this data can go very out of date, by the client using +%% the least recently used handle. +%% +%% When the limit is reached, the server calculates the average age of +%% the last reported least recently used file handle of all the +%% clients. It then tells all the clients to close any handles not +%% used for longer than this average, by invoking the callback the +%% client registered. The client should receive this message and pass +%% it into set_maximum_since_use/1. However, it is highly possible +%% this age will be greater than the ages of all the handles the +%% client knows of because the client has used its file handles in the +%% mean time. Thus at this point the client reports to the server the +%% current timestamp at which its least recently used file handle was +%% last used. The server will check two seconds later that either it +%% is back under the limit, in which case all is well again, or if +%% not, it will calculate a new average age. Its data will be much +%% more recent now, and so it is very likely that when this is +%% communicated to the clients, the clients will close file handles. +%% +%% The advantage of this scheme is that there is only communication +%% from the client to the server on open, close, and when in the +%% process of trying to reduce file handle usage. There is no +%% communication from the client to the server on normal file handle +%% operations. This scheme forms a feed-back loop - the server does +%% not care which file handles are closed, just that some are, and it +%% checks this repeatedly when over the limit. Given the guarantees of +%% now(), even if there is just one file handle open, a limit of 1, +%% and one client, it is certain that when the client calculates the +%% age of the handle, it will be greater than when the server +%% calculated it, hence it should be closed. +%% +%% Handles which are closed as a result of the server are put into a +%% "soft-closed" state in which the handle is closed (data flushed out +%% and sync'd first) but the state is maintained. The handle will be +%% fully reopened again as soon as needed, thus users of this library +%% do not need to worry about their handles being closed by the server +%% - reopening them when necessary is handled transparently. +%% +%% The server also supports obtain and release_on_death. obtain/0 +%% blocks until a file descriptor is available. release_on_death/1 +%% takes a pid and monitors the pid, reducing the count by 1 when the +%% pid dies. Thus the assumption is that obtain/0 is called first, and +%% when that returns, release_on_death/1 is called with the pid who +%% "owns" the file descriptor. This is, for example, used to track the +%% use of file descriptors through network sockets. + +-behaviour(gen_server). + +-export([register_callback/3]). +-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, + last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, + flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). +-export([release_on_death/1, obtain/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(RESERVED_FOR_OTHERS, 100). +-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). +-define(FILE_HANDLES_LIMIT_OTHER, 1024). +-define(FILE_HANDLES_CHECK_INTERVAL, 2000). + +%%---------------------------------------------------------------------------- + +-record(file, + { reader_count, + has_writer + }). + +-record(handle, + { hdl, + offset, + trusted_offset, + is_dirty, + write_buffer_size, + write_buffer_size_limit, + write_buffer, + at_eof, + path, + mode, + options, + is_write, + is_read, + last_used_at + }). + +-record(fhc_state, + { elders, + limit, + count, + obtains, + callbacks, + client_mrefs, + timer_ref + }). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ref() :: any()). +-type(error() :: {'error', any()}). +-type(ok_or_error() :: ('ok' | error())). +-type(val_or_error(T) :: ({'ok', T} | error())). +-type(position() :: ('bof' | 'eof' | non_neg_integer() | + {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). +-type(offset() :: non_neg_integer()). + +-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(open/3 :: + (string(), [any()], + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) -> + val_or_error(ref())). +-spec(close/1 :: (ref()) -> ok_or_error()). +-spec(read/2 :: (ref(), non_neg_integer()) -> + val_or_error([char()] | binary()) | 'eof'). +-spec(append/2 :: (ref(), iodata()) -> ok_or_error()). +-spec(sync/1 :: (ref()) -> ok_or_error()). +-spec(position/2 :: (ref(), position()) -> val_or_error(offset())). +-spec(truncate/1 :: (ref()) -> ok_or_error()). +-spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(flush/1 :: (ref()) -> ok_or_error()). +-spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> + val_or_error(non_neg_integer())). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). +-spec(delete/1 :: (ref()) -> ok_or_error()). +-spec(clear/1 :: (ref()) -> ok_or_error()). +-spec(release_on_death/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + +register_callback(M, F, A) + when is_atom(M) andalso is_atom(F) andalso is_list(A) -> + gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). + +open(Path, Mode, Options) -> + Path1 = filename:absname(Path), + File1 = #file { reader_count = RCount, has_writer = HasWriter } = + case get({Path1, fhc_file}) of + File = #file {} -> File; + undefined -> #file { reader_count = 0, + has_writer = false } + end, + Mode1 = append_to_write(Mode), + IsWriter = is_writer(Mode1), + case IsWriter andalso HasWriter of + true -> {error, writer_exists}; + false -> Ref = make_ref(), + case open1(Path1, Mode1, Options, Ref, bof, new) of + {ok, _Handle} -> + RCount1 = case is_reader(Mode1) of + true -> RCount + 1; + false -> RCount + end, + HasWriter1 = HasWriter orelse IsWriter, + put({Path1, fhc_file}, + File1 #file { reader_count = RCount1, + has_writer = HasWriter1 }), + {ok, Ref}; + Error -> + Error + end + end. + +close(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> ok; + Handle -> case hard_close(Handle) of + ok -> ok; + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +read(Ref, Count) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_read = false }]) -> + {error, not_open_for_reading}; + ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> + case file:read(Hdl, Count) of + {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), + {Obj, + [Handle #handle { offset = Offset1 }]}; + eof -> {eof, [Handle #handle { at_eof = true }]}; + Error -> {Error, [Handle]} + end + end). + +append(Ref, Data) -> + with_handles( + [Ref], + fun ([#handle { is_write = false }]) -> + {error, not_open_for_writing}; + ([Handle]) -> + case maybe_seek(eof, Handle) of + {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset, + write_buffer_size_limit = 0, + at_eof = true } = Handle1} -> + Offset1 = Offset + iolist_size(Data), + {file:write(Hdl, Data), + [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; + {{ok, _Offset}, #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = Limit, + at_eof = true } = Handle1} -> + WriteBuffer1 = [Data | WriteBuffer], + Size1 = Size + iolist_size(Data), + Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, + write_buffer_size = Size1 }, + case Limit /= infinity andalso Size1 > Limit of + true -> {Result, Handle3} = write_buffer(Handle2), + {Result, [Handle3]}; + false -> {ok, [Handle2]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +sync(Ref) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> + ok; + ([Handle = #handle { hdl = Hdl, offset = Offset, + is_dirty = true, write_buffer = [] }]) -> + case file:sync(Hdl) of + ok -> {ok, [Handle #handle { trusted_offset = Offset, + is_dirty = false }]}; + Error -> {Error, [Handle]} + end + end). + +position(Ref, NewOffset) -> + with_flushed_handles( + [Ref], + fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), + {Result, [Handle1]} + end). + +truncate(Ref) -> + with_flushed_handles( + [Ref], + fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, + trusted_offset = TOffset }]) -> + case file:truncate(Hdl) of + ok -> TOffset1 = lists:min([Offset, TOffset]), + {ok, [Handle1 #handle { trusted_offset = TOffset1, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end + end). + +last_sync_offset(Ref) -> + with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) -> + {ok, TOffset} + end). + +current_virtual_offset(Ref) -> + with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, + offset = Offset, + write_buffer_size = Size }]) -> + {ok, Offset + Size}; + ([#handle { offset = Offset }]) -> + {ok, Offset} + end). + +current_raw_offset(Ref) -> + with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end). + +flush(Ref) -> + with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end). + +copy(Src, Dest, Count) -> + with_flushed_handles( + [Src, Dest], + fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, + DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] + ) -> + case file:copy(SHdl, DHdl, Count) of + {ok, Count1} = Result1 -> + {Result1, + [SHandle #handle { offset = SOffset + Count1 }, + DHandle #handle { offset = DOffset + Count1 }]}; + Error -> + {Error, [SHandle, DHandle]} + end; + (_Handles) -> + {error, incorrect_handle_modes} + end). + +delete(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> + ok; + Handle = #handle { path = Path } -> + case hard_close(Handle #handle { is_dirty = false, + write_buffer = [] }) of + ok -> file:delete(Path); + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +clear(Ref) -> + with_handles( + [Ref], + fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> + ok; + ([Handle]) -> + case maybe_seek(bof, Handle #handle { write_buffer = [], + write_buffer_size = 0 }) of + {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> + case file:truncate(Hdl) of + ok -> {ok, [Handle1 #handle {trusted_offset = 0, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +set_maximum_since_use(MaximumAge) -> + Now = now(), + case lists:foldl( + fun ({{Ref, fhc_handle}, + Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> + Age = timer:now_diff(Now, Then), + case Hdl /= closed andalso Age >= MaximumAge of + true -> {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + false; + _ -> put_handle(Ref, Handle1), + Rep + end; + false -> Rep + end; + (_KeyValuePair, Rep) -> + Rep + end, true, get()) of + true -> age_tree_change(), ok; + false -> ok + end. + +release_on_death(Pid) when is_pid(Pid) -> + gen_server:cast(?SERVER, {release_on_death, Pid}). + +obtain() -> + gen_server:call(?SERVER, obtain, infinity). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +is_reader(Mode) -> lists:member(read, Mode). + +is_writer(Mode) -> lists:member(write, Mode). + +append_to_write(Mode) -> + case lists:member(append, Mode) of + true -> [write | Mode -- [append, write]]; + false -> Mode + end. + +with_handles(Refs, Fun) -> + ResHandles = lists:foldl( + fun (Ref, {ok, HandlesAcc}) -> + case get_or_reopen(Ref) of + {ok, Handle} -> {ok, [Handle | HandlesAcc]}; + Error -> Error + end; + (_Ref, Error) -> + Error + end, {ok, []}, Refs), + case ResHandles of + {ok, Handles} -> + case Fun(lists:reverse(Handles)) of + {Result, Handles1} when is_list(Handles1) -> + lists:zipwith(fun put_handle/2, Refs, Handles1), + Result; + Result -> + Result + end; + Error -> + Error + end. + +with_flushed_handles(Refs, Fun) -> + with_handles( + Refs, + fun (Handles) -> + case lists:foldl( + fun (Handle, {ok, HandlesAcc}) -> + {Res, Handle1} = write_buffer(Handle), + {Res, [Handle1 | HandlesAcc]}; + (Handle, {Error, HandlesAcc}) -> + {Error, [Handle | HandlesAcc]} + end, {ok, []}, Handles) of + {ok, Handles1} -> + Fun(lists:reverse(Handles1)); + {Error, Handles1} -> + {Error, lists:reverse(Handles1)} + end + end). + +get_or_reopen(Ref) -> + case get({Ref, fhc_handle}) of + undefined -> + {error, not_open, Ref}; + #handle { hdl = closed, offset = Offset, + path = Path, mode = Mode, options = Options } -> + open1(Path, Mode, Options, Ref, Offset, reopen); + Handle -> + {ok, Handle} + end. + +put_handle(Ref, Handle = #handle { last_used_at = Then }) -> + Now = now(), + age_tree_update(Then, Now, Ref), + put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). + +with_age_tree(Fun) -> + put(fhc_age_tree, Fun(case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end)). + +age_tree_insert(Now, Ref) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + gen_server:cast(?SERVER, {open, self(), Oldest}), + Tree1 + end). + +age_tree_update(Then, Now, Ref) -> + with_age_tree( + fun (Tree) -> + gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree)) + end). + +age_tree_delete(Then) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete_any(Then, Tree), + Oldest = case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server:cast(?SERVER, {close, self(), Oldest}), + Tree1 + end). + +age_tree_change() -> + with_age_tree( + fun (Tree) -> + case gb_trees:is_empty(Tree) of + true -> Tree; + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + gen_server:cast(?SERVER, {update, self(), Oldest}) + end, + Tree + end). + +open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> + Mode1 = case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end, + case file:open(Path, Mode1) of + {ok, Hdl} -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Now = now(), + Handle = #handle { hdl = Hdl, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = Now }, + {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + age_tree_insert(Now, Ref), + {ok, Handle2}; + {error, Reason} -> + {error, Reason} + end. + +soft_close(Handle = #handle { hdl = closed }) -> + {ok, Handle}; +soft_close(Handle) -> + case write_buffer(Handle) of + {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + last_used_at = Then } = Handle1 } -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + age_tree_delete(Then), + {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, + is_dirty = false }}; + {_Error, _Handle} = Result -> + Result + end. + +hard_close(Handle) -> + case soft_close(Handle) of + {ok, #handle { path = Path, + is_read = IsReader, is_write = IsWriter }} -> + #file { reader_count = RCount, has_writer = HasWriter } = File = + get({Path, fhc_file}), + RCount1 = case IsReader of + true -> RCount - 1; + false -> RCount + end, + HasWriter1 = HasWriter andalso not IsWriter, + case RCount1 =:= 0 andalso not HasWriter1 of + true -> erase({Path, fhc_file}); + false -> put({Path, fhc_file}, + File #file { reader_count = RCount1, + has_writer = HasWriter1 }) + end, + ok; + {_Error, _Handle} = Result -> + Result + end. + +maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, + at_eof = AtEoF }) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), + case (case NeedsSeek of + true -> file:position(Hdl, NewOffset); + false -> {ok, Offset} + end) of + {ok, Offset1} = Result -> + {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; + {error, _} = Error -> + {Error, Handle} + end. + +needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; +needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false}; +needs_seek( true, _CurOffset, eof ) -> {true , false}; +needs_seek( true, _CurOffset, {eof, 0}) -> {true , false}; +needs_seek( false, _CurOffset, eof ) -> {true , true }; +needs_seek( false, _CurOffset, {eof, 0}) -> {true , true }; +needs_seek( AtEoF, 0, bof ) -> {AtEoF, false}; +needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false}; +needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false}; +needs_seek( true, CurOffset, {bof, DesiredOffset}) + when DesiredOffset >= CurOffset -> + {true, true}; +needs_seek( true, _CurOffset, {cur, DesiredOffset}) + when DesiredOffset > 0 -> + {true, true}; +needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO} + when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset -> + {true, true}; +%% because we can't really track size, we could well end up at EoF and not know +needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> + {false, true}. + +write_buffer(Handle = #handle { write_buffer = [] }) -> + {ok, Handle}; +write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, + write_buffer = WriteBuffer, + write_buffer_size = DataSize, + at_eof = true }) -> + case file:write(Hdl, lists:reverse(WriteBuffer)) of + ok -> + Offset1 = Offset + DataSize, + {ok, Handle #handle { offset = Offset1, is_dirty = true, + write_buffer = [], write_buffer_size = 0 }}; + {error, _} = Error -> + {Error, Handle} + end. + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + Limit = case application:get_env(file_handles_high_watermark) of + {ok, Watermark} when (is_integer(Watermark) andalso + Watermark > 0) -> + Watermark; + _ -> + ulimit() + end, + error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), + {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, + obtains = [], callbacks = dict:new(), + client_mrefs = dict:new(), timer_ref = undefined }}. + +handle_call(obtain, From, State = #fhc_state { count = Count }) -> + State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = + maybe_reduce(State #fhc_state { count = Count + 1 }), + case Limit /= infinity andalso Count1 >= Limit of + true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], + count = Count1 - 1 }}; + false -> {reply, ok, State1} + end. + +handle_cast({register_callback, Pid, MFA}, + State = #fhc_state { callbacks = Callbacks }) -> + {noreply, ensure_mref( + Pid, State #fhc_state { + callbacks = dict:store(Pid, MFA, Callbacks) })}; + +handle_cast({open, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + {noreply, maybe_reduce( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count + 1 }))}; + +handle_cast({update, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + %% don't call maybe_reduce from here otherwise we can create a + %% storm of messages + {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + +handle_cast({close, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = case EldestUnusedSince of + undefined -> dict:erase(Pid, Elders); + _ -> dict:store(Pid, EldestUnusedSince, Elders) + end, + {noreply, process_obtains( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count - 1 }))}; + +handle_cast(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; + +handle_cast({release_on_death, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State}. + +handle_info({'DOWN', MRef, process, Pid, _Reason}, State = + #fhc_state { count = Count, callbacks = Callbacks, + client_mrefs = ClientMRefs, elders = Elders }) -> + {noreply, process_obtains( + case dict:find(Pid, ClientMRefs) of + {ok, MRef} -> State #fhc_state { + elders = dict:erase(Pid, Elders), + client_mrefs = dict:erase(Pid, ClientMRefs), + callbacks = dict:erase(Pid, Callbacks) }; + _ -> State #fhc_state { count = Count - 1 } + end)}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% server helpers +%%---------------------------------------------------------------------------- + +process_obtains(State = #fhc_state { obtains = [] }) -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count }) + when Limit /= infinity andalso Count >= Limit -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count, + obtains = Obtains }) -> + ObtainsLen = length(Obtains), + ObtainableLen = lists:min([ObtainsLen, Limit - Count]), + Take = ObtainsLen - ObtainableLen, + {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), + [gen_server:reply(From, ok) || From <- ObtainableRev], + State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. + +maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, + callbacks = Callbacks, timer_ref = TRef }) + when Limit /= infinity andalso Count >= Limit -> + Now = now(), + {Pids, Sum, ClientCount} = + dict:fold(fun (_Pid, undefined, Accs) -> + Accs; + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> + {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end, {[], 0, 0}, Elders), + case Pids of + [] -> ok; + _ -> AverageAge = Sum / ClientCount, + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> ok; + {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) + end + end, Pids) + end, + case TRef of + undefined -> {ok, TRef1} = timer:apply_after( + ?FILE_HANDLES_CHECK_INTERVAL, + gen_server, cast, [?SERVER, check_counts]), + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end; +maybe_reduce(State) -> + State. + +%% Googling around suggests that Windows has a limit somewhere around +%% 16M, eg +%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx +%% For everything else, assume ulimit exists. Further googling +%% suggests that BSDs (incl OS X), solaris and linux all agree that +%% ulimit -n is file handles +ulimit() -> + case os:type() of + {win32, _OsName} -> + ?FILE_HANDLES_LIMIT_WINDOWS; + {unix, _OsName} -> + %% Under Linux, Solaris and FreeBSD, ulimit is a shell + %% builtin, not a command. In OS X, it's a command. + %% Fortunately, os:cmd invokes the cmd in a shell env, so + %% we're safe in all cases. + case os:cmd("ulimit -n") of + "unlimited" -> + infinity; + String = [C|_] when $0 =< C andalso C =< $9 -> + Num = list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)) - + ?RESERVED_FOR_OTHERS, + lists:max([1, Num]); + _ -> + %% probably a variant of + %% "/bin/sh: line 1: ulimit: command not found\n" + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end; + _ -> + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end. + +ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> + case dict:find(Pid, ClientMRefs) of + {ok, _MRef} -> State; + error -> MRef = erlang:monitor(process, Pid), + State #fhc_state { + client_mrefs = dict:store(Pid, MRef, ClientMRefs) } + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 806f26637c..7ca5b07b4b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -54,6 +54,12 @@ [{mfa, {rabbit_mnesia, init, []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({file_handle_cache, + [{description, "file handle cache server"}, + {mfa, {rabbit_sup, start_restartable_child, + [file_handle_cache]}}, + {enables, worker_pool}]}). + -rabbit_boot_step({worker_pool, [{description, "worker pool"}, {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, @@ -92,6 +98,13 @@ {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_memory_monitor, + [{description, "memory monitor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_memory_monitor]}}, + {requires, rabbit_alarm}, + {enables, core_initialized}]}). + -rabbit_boot_step({guid_generator, [{description, "guid generator"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5f045b2764..2d75b15b64 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,7 +32,10 @@ -module(rabbit_amqqueue). -export([start/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1]). +-export([internal_declare/2, internal_delete/1, + maybe_run_queue_via_backing_queue/2, + update_ram_duration/1, set_ram_duration_target/2, + set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). @@ -108,6 +111,10 @@ -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). +-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(update_ram_duration/1 :: (pid()) -> 'ok'). +-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -117,9 +124,8 @@ start() -> DurableQueues = find_durable_queues(), - ok = rabbit_sup:start_child( - rabbit_persister, - [[QName || #amqqueue{name = QName} <- DurableQueues]]), + {ok, BQ} = application:get_env(backing_queue_module), + ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -350,6 +356,19 @@ internal_delete(QueueName) -> ok end. +maybe_run_queue_via_backing_queue(QPid, Fun) -> + gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + infinity). + +update_ram_duration(QPid) -> + gen_server2:pcast(QPid, 8, update_ram_duration). + +set_ram_duration_target(QPid, Duration) -> + gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + +set_maximum_since_use(QPid, Age) -> + gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 82e3e05ef3..06712e9c3d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,11 +35,14 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 100). +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -export([start_link/1, info_keys/0]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1]). -import(queue). -import(erlang). @@ -50,21 +53,22 @@ owner, exclusive_consumer, has_had_consumers, - next_msg_id, - message_buffer, + backing_queue, + backing_queue_state, active_consumers, - blocked_consumers}). + blocked_consumers, + sync_timer_ref, + rate_timer_ref + }). -record(consumer, {tag, ack_required}). --record(tx, {is_persistent, pending_messages, pending_acks}). - %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, - unacked_messages, + acktags, is_limit_active, txn, unsent_message_count}). @@ -82,7 +86,9 @@ messages_unacknowledged, messages, consumers, - memory]). + memory, + backing_queue_status + ]). %%---------------------------------------------------------------------------- @@ -94,34 +100,108 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + process_flag(trap_exit, true), + {ok, BQ} = application:get_env(backing_queue_module), + {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - next_msg_id = 1, - message_buffer = undefined, + backing_queue = BQ, + backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + sync_timer_ref = undefined, + rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, #q{message_buffer = undefined}) -> - ok; -terminate(_Reason, State) -> +terminate(shutdown, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), - ok = rabbit_amqqueue:internal_delete(QName). + State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end, + State), + ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + stop_sync_timer(stop_rate_timer(State)), + case BQS of + undefined -> State; + _ -> ok = rabbit_memory_monitor:deregister(self()), + BQS1 = lists:foldl( + fun (#cr{txn = none}, BQSN) -> + BQSN; + (#cr{txn = Txn}, BQSN) -> + {_AckTags, BQSN1} = + BQ:tx_rollback(Txn, BQSN), + BQSN1 + end, BQS, all_ch_record()), + State1#q{backing_queue_state = Fun(BQS1)} + end. -noreply(NewState) -> {noreply, NewState, hibernate}. +reply(Reply, NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {reply, Reply, NewState1, Timeout}. + +noreply(NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {noreply, NewState1, Timeout}. + +next_state(State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + ensure_rate_timer(State), + case BQ:needs_sync(BQS)of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. + +ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, + rabbit_amqqueue, maybe_run_queue_via_backing_queue, + [self(), fun (BQS) -> BQ:sync(BQS) end]), + State#q{sync_timer_ref = TRef}; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> + State; +stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{sync_timer_ref = undefined}. + +ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State#q{rate_timer_ref = TRef}; +ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> + State; +stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{rate_timer_ref = undefined}. + +assert_invariant(#q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> + true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -137,7 +217,7 @@ ch_record(ChPid) -> C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, - unacked_messages = dict:new(), + acktags = sets:new(), is_limit_active = false, txn = none, unsent_message_count = 0}, @@ -168,29 +248,33 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_immediately(Message, IsDelivered, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + acktags = ChAckTags} = ch_record(ChPid), + IsMsgReady = PredFun(FunAcc, State), + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, IsDelivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element( + AckTag, ChAckTags); + false -> ChAckTags + end, NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, + acktags = ChAckTags1}, store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of @@ -204,88 +288,85 @@ deliver_immediately(Message, IsDelivered, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; - false -> + State2 = State1#q{ + active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}, + deliver_msgs_to_consumers(Funs, FunAcc1, State2); + %% if IsMsgReady then we've hit the limiter + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_immediately( - Message, IsDelivered, + deliver_msgs_to_consumers( + Funs, FunAcc, State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) - end; - {empty, _} -> - {not_offered, State} - end. - -run_message_queue(State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(MessageBuffer, State). - -run_message_queue(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - case deliver_immediately(Message, IsDelivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, IsDelivered), - run_message_queue(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_message_queue(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} + blocked_consumers = NewBlockedConsumers}); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc, State} end; {empty, _} -> - State#q{message_buffer = MessageBuffer} + {FunAcc, State} end. -attempt_delivery(none, _ChPid, Message, State) -> - case deliver_immediately(Message, false, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), - {true, State1}; - {not_offered, State1} -> - {false, State1} - end; -attempt_delivery(Txn, ChPid, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), - {true, State}. +deliver_from_queue_pred(IsEmpty, _State) -> + not IsEmpty. + +deliver_from_queue_deliver(AckRequired, false, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + {{Message, IsDelivered, AckTag}, 0 == Remaining, + State #q { backing_queue_state = BQS1 }}. + +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Funs = {fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3}, + IsEmpty = BQ:is_empty(BQS), + {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), + State1. + +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Message, BQS), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS1}} + end, + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + record_current_channel_tx(ChPid, Txn), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State) -> +deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none and no unblocked channels with consumers + BQS = BQ:publish(Message, State #q.backing_queue_state), + {false, NewState#q{backing_queue_state = BQS}} end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter( - fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(Queue))). + queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue))). + queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). move_consumers(ChPid, From, To) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, @@ -320,7 +401,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - unacked_messages = UAM} -> + acktags = ChAckTags} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), State1 = State#q{ @@ -334,15 +415,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State1)), - erase_tx(Txn) - end, - {ok, deliver_or_enqueue_n( - [{Message, true} || - {_MsgId, Message} <- dict:to_list(UAM)], - State1)} + false -> State2 = case Txn of + none -> State1; + _ -> rollback_transaction(Txn, ChPid, + State1) + end, + {ok, requeue_and_run(sets:to_list(ChAckTags), State2)} end end. @@ -373,122 +451,30 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.guid}}]). - -persist_delivery(_QName, _Message, - true) -> - ok; -persist_delivery(_QName, #basic_message{is_persistent = false}, - _IsDelivered) -> - ok; -persist_delivery(QName, #basic_message{guid = Guid}, - _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). - -persist_acks(Txn, QName, Messages) -> - persist_work(Txn, QName, - [{ack, {QName, Guid}} || #basic_message{ - guid = Guid, is_persistent = true} <- Messages]). - -persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> - ok; -persist_auto_ack(QName, #basic_message{guid = Guid}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, Guid}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx{is_persistent = false, - pending_messages = [], - pending_acks = []}; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -all_tx() -> - [Txn || {{txn, Txn}, _} <- get()]. - -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). +maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + %% ChPid must be known here because of the participant management + %% by the channel. + C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1}. + +rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here (would also require ChPid) + record_current_channel_tx(ChPid, none), + State#q{backing_queue_state = BQS1}. -record_pending_acks(Txn, ChPid, MsgIds) -> - Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). - -process_pending(Txn, ChPid, State) -> - #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = - lookup_tx(Txn), - C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), - {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}), - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). - -collect_messages(MsgIds, UAM) -> - lists:mapfoldl( - fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, - UAM, MsgIds). - -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _IsDelivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MsgId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). +subtract_acks(A, B) when is_list(B) -> + lists:foldl(fun sets:del_element/2, A, B). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -510,11 +496,10 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([dict:size(UAM) || - #cr{unacked_messages = UAM} <- all_ch_record()]); + lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); @@ -523,6 +508,8 @@ i(consumers, State) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; +i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). @@ -572,13 +559,8 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> - ok = commit_work(Txn, qname(State)), - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - NewState = process_pending(Txn, ChPid, State), - erase_tx(Txn), - record_current_channel_tx(ChPid, none), - noreply(NewState); + NewState = commit_transaction(Txn, From, ChPid, State), + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -593,26 +575,19 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, - next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - AckRequired = not(NoAck), + backing_queue_state = BQS, backing_queue = BQ}) -> + AckRequired = not NoAck, + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of - true -> - persist_delivery(QName, Message, IsDelivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + false -> ok end, - Msg = {QName, self(), NextId, IsDelivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + Msg = {QName, self(), AckTag, IsDelivered, Message}, + reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -630,7 +605,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, + ack_required = not NoAck}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), case ConsumerCount of @@ -692,14 +667,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + backing_queue = BQ, + backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - Length = queue:len(MessageBuffer), - reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); + reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> - IsEmpty = queue:is_empty(MessageBuffer), + State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> + IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> @@ -707,13 +682,13 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Count, BQS1} = BQ:purge(BQS), + reply({ok, Count}, State#q{backing_queue_state = BQS1}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -736,53 +711,54 @@ handle_call({claim_queue, ReaderPid}, _From, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({init, Recover}, State = #q{message_buffer = undefined}) -> - Messages = case Recover of - true -> rabbit_persister:queue_content(qname(State)); - false -> [] - end, - noreply(State#q{message_buffer = queue:from_list(Messages)}); +handle_cast({init, Recover}, + State = #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State) -> +handle_cast({ack, Txn, AckTags, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), - case Txn of - none -> - store_ch_record(C#cr{unacked_messages = Remaining}); - _ -> - record_pending_acks(Txn, ChPid, MsgIds) - end, - noreply(State) + C = #cr{acktags = ChAckTags} -> + {C1, BQS1} = + case Txn of + none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), + {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; + _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + end, + store_ch_record(C1), + noreply(State #q { backing_queue_state = BQS1 }) end; handle_cast({rollback, Txn, ChPid}, State) -> - ok = rollback_work(Txn, qname(State)), - erase_tx(Txn), - record_current_channel_tx(ChPid, none), - noreply(State); + noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> +handle_cast({requeue, AckTags, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), noreply(State); - C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) end; handle_cast({unblock, ChPid}, State) -> @@ -815,6 +791,24 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), + noreply(State); + +handle_cast(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_cast({set_ram_duration_target, Duration}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State#q{backing_queue_state = BQS1}); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -836,6 +830,24 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(timeout, State = #q{backing_queue = BQ}) -> + noreply(maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:sync(BQS) end, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. + +handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> + {hibernate, State}; +handle_pre_hibernate(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:handle_pre_hibernate(BQS), + %% no activity for a while == 0 egress and ingress rates + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), infinity), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index dbd65780fb..97d6cef9c2 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -31,21 +31,23 @@ -module(rabbit_amqqueue_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/0, start_child/1]). -export([init/1]). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). start_child(Args) -> - supervisor:start_child(?SERVER, Args). + supervisor2:start_child(?SERVER, Args). init([]) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, - temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}. + temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl new file mode 100644 index 0000000000..2dba00ad62 --- /dev/null +++ b/src/rabbit_backing_queue.erl @@ -0,0 +1,133 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_backing_queue). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% Called on startup with a list of durable queue names. The + %% queues aren't being started at this point, but this call + %% allows the backing queue to perform any checking necessary for + %% the consistency of those queues, or initialise any other + %% shared resources. + {start, 1}, + + %% Initialise the backing queue and its state. + {init, 3}, + + %% Called on queue shutdown when queue isn't being deleted. + {terminate, 1}, + + %% Called when the queue is terminating and needs to delete all + %% its content. + {delete_and_terminate, 1}, + + %% Remove all messages in the queue, but not messages which have + %% been fetched and are pending acks. + {purge, 1}, + + %% Publish a message. + {publish, 2}, + + %% Called for messages which have already been passed straight + %% out to a client. The queue will be empty for these calls + %% (i.e. saves the round trip through the backing queue). + {publish_delivered, 3}, + + %% Produce the next message. + {fetch, 2}, + + %% Acktags supplied are for messages which can now be forgotten + %% about. + {ack, 2}, + + %% A publish, but in the context of a transaction. + {tx_publish, 3}, + + %% Acks, but in the context of a transaction. + {tx_ack, 3}, + + %% Undo anything which has been done in the context of the + %% specified transaction. + {tx_rollback, 2}, + + %% Commit a transaction. The Fun passed in must be called once + %% the messages have really been commited. This CPS permits the + %% possibility of commit coalescing. + {tx_commit, 3}, + + %% Reinsert messages into the queue which have already been + %% delivered and were pending acknowledgement. + {requeue, 2}, + + %% How long is my queue? + {len, 1}, + + %% Is my queue empty? + {is_empty, 1}, + + %% For the next three functions, the assumption is that you're + %% monitoring something like the ingress and egress rates of the + %% queue. The RAM duration is thus the length of time represented + %% by the messages held in RAM given the current rates. If you + %% want to ignore all of this stuff, then do so, and return 0 in + %% ram_duration/1. + + %% The target is to have no more messages in RAM than indicated + %% by the duration and the current queue rates. + {set_ram_duration_target, 2}, + + %% Optionally recalculate the duration internally (likely to be + %% just update your internal rates), and report how many seconds + %% the messages in RAM represent given the current rates of the + %% queue. + {ram_duration, 1}, + + %% Should 'sync' be called as soon as the queue process can + %% manage (either on an empty mailbox, or when a timer fires)? + {needs_sync, 1}, + + %% Called (eventually) after needs_sync returns 'true'. Note this + %% may be called more than once for each 'true' returned from + %% needs_sync. + {sync, 1}, + + %% Called immediately before the queue hibernates. + {handle_pre_hibernate, 1}, + + %% Exists for debugging purposes, to be able to expose state via + %% rabbitmqctl list_queues backing_queue_status + {status, 1} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl new file mode 100644 index 0000000000..b4fd91560f --- /dev/null +++ b/src/rabbit_invariable_queue.erl @@ -0,0 +1,264 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_invariable_queue). + +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, + publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, + tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, + set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, + handle_pre_hibernate/1, status/1]). + +-export([start/1]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(iv_state, { queue, qname, len, pending_ack }). +-record(tx, { pending_messages, pending_acks, is_persistent }). + +-ifdef(use_specs). + +-type(ack() :: guid() | 'blank_ack'). +-type(state() :: #iv_state { queue :: queue(), + qname :: queue_name(), + len :: non_neg_integer(), + pending_ack :: dict() + }). +-include("rabbit_backing_queue_spec.hrl"). + +-endif. + +start(DurableQueues) -> + ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). + +init(QName, IsDurable, Recover) -> + Q = queue:from_list(case IsDurable andalso Recover of + true -> rabbit_persister:queue_content(QName); + false -> [] + end), + #iv_state { queue = Q, qname = QName, len = queue:len(Q), + pending_ack = dict:new() }. + +terminate(State) -> + State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. + +delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), + {_PLen, State1} = purge(State), + terminate(State1). + +purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> + %% We do not purge messages pending acks. + {AckTags, PA} = + rabbit_misc:queue_fold( + fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + Acc; + ({Msg = #basic_message { guid = Guid }, IsDelivered}, + {AckTagsN, PAN}) -> + ok = persist_delivery(QName, Msg, IsDelivered), + {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + end, {[], dict:new()}, Q), + ok = persist_acks(none, QName, AckTags, PA), + {Len, State #iv_state { len = 0, queue = queue:new() }}. + +publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> + ok = persist_message(none, QName, Msg), + State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + +publish_delivered(false, _Msg, State) -> + {blank_ack, State}; +publish_delivered(true, Msg = #basic_message { guid = Guid }, + State = #iv_state { qname = QName, len = 0, + pending_ack = PA }) -> + ok = persist_message(none, QName, Msg), + ok = persist_delivery(QName, Msg, false), + {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + +fetch(_AckRequired, State = #iv_state { len = 0 }) -> + {empty, State}; +fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, + pending_ack = PA }) -> + {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = + queue:out(Q), + Len1 = Len - 1, + ok = persist_delivery(QName, Msg, IsDelivered), + PA1 = dict:store(Guid, Msg, PA), + {AckTag, PA2} = case AckRequired of + true -> {Guid, PA1}; + false -> ok = persist_acks(none, QName, [Guid], PA1), + {blank_ack, PA} + end, + {{Msg, IsDelivered, AckTag, Len1}, + State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. + +ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, AckTags, PA), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1 }. + +tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + ok = persist_message(Txn, QName, Msg), + State. + +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), + ok = persist_acks(Txn, QName, AckTags, PA), + State. + +tx_rollback(Txn, State = #iv_state { qname = QName }) -> + #tx { pending_acks = AckTags } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName), + erase_tx(Txn), + {lists:flatten(AckTags), State}. + +tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, + queue = Q, len = Len }) -> + #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName), + erase_tx(Txn), + Fun(), + AckTags1 = lists:flatten(AckTags), + PA1 = remove_acks(AckTags1, PA), + {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> + {queue:in({Msg, false}, QN), LenN + 1} + end, {Q, Len}, PubsRev), + {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. + +requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, + len = Len }) -> + %% We don't need to touch the persister here - the persister will + %% already have these messages published and delivered as + %% necessary. The complication is that the persister's seq_id will + %% now be wrong, given the position of these messages in our queue + %% here. However, the persister's seq_id is only used for sorting + %% on startup, and requeue is silent as to where the requeued + %% messages should appear, thus the persister is permitted to sort + %% based on seq_id, even though it'll likely give a different + %% order to the last known state of our queue, prior to shutdown. + {Q1, Len1} = lists:foldl( + fun (Guid, {QN, LenN}) -> + {ok, Msg = #basic_message {}} = dict:find(Guid, PA), + {queue:in({Msg, true}, QN), LenN + 1} + end, {Q, Len}, AckTags), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. + +len(#iv_state { len = Len }) -> Len. + +is_empty(State) -> 0 == len(State). + +set_ram_duration_target(_DurationTarget, State) -> State. + +ram_duration(State) -> {0, State}. + +needs_sync(_State) -> false. + +sync(State) -> State. + +handle_pre_hibernate(State) -> State. + +status(_State) -> []. + +%%---------------------------------------------------------------------------- + +remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). + +%%---------------------------------------------------------------------------- + +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [], + is_persistent = false }; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +mark_tx_persistent(Txn) -> + store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). + +is_tx_persistent(Txn) -> + (lookup_tx(Txn)) #tx.is_persistent. + +do_if_persistent(F, Txn, QName) -> + ok = case is_tx_persistent(Txn) of + false -> ok; + true -> F({Txn, QName}) + end. + +%%---------------------------------------------------------------------------- + +persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> + ok; +persist_message(Txn, QName, Msg) -> + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties, + %% rebuild from properties_bin on restore + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + persist_work(Txn, QName, + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + +persist_delivery(_QName, #basic_message { is_persistent = false }, + _IsDelivered) -> + ok; +persist_delivery(_QName, _Message, true) -> + ok; +persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]). + +persist_acks(Txn, QName, AckTags, PA) -> + persist_work(Txn, QName, + [{ack, {QName, Guid}} || Guid <- AckTags, + begin + {ok, Msg} = dict:find(Guid, PA), + Msg #basic_message.is_persistent + end]). + +persist_work(_Txn,_QName, []) -> + ok; +persist_work(none, _QName, WorkList) -> + rabbit_persister:dirty_work(WorkList); +persist_work(Txn, QName, WorkList) -> + mark_tx_persistent(Txn), + rabbit_persister:extend_transaction({Txn, QName}, WorkList). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl new file mode 100644 index 0000000000..91e97ffe49 --- /dev/null +++ b/src/rabbit_memory_monitor.erl @@ -0,0 +1,293 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + + +%% This module handles the node-wide memory statistics. +%% It receives statistics from all queues, counts the desired +%% queue length (in seconds), and sends this information back to +%% queues. + +-module(rabbit_memory_monitor). + +-behaviour(gen_server2). + +-export([start_link/0, update/0, register/2, deregister/1, + report_ram_duration/2, stop/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(process, {pid, reported, sent, callback, monitor}). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets #process + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + desired_duration %% the desired queue duration + }). + +-define(SERVER, ?MODULE). +-define(DEFAULT_UPDATE_INTERVAL, 2500). +-define(TABLE_NAME, ?MODULE). + +%% Because we have a feedback loop here, we need to ensure that we +%% have some space for when the queues don't quite respond as fast as +%% we would like, or when there is buffering going on in other parts +%% of the system. In short, we aim to stay some distance away from +%% when the memory alarms will go off, which cause channel.flow. +%% Note that all other Thresholds are relative to this scaling. +-define(MEMORY_LIMIT_SCALING, 0.4). + +-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this + +%% If all queues are pushed to disk (duration 0), then the sum of +%% their reported lengths will be 0. If memory then becomes available, +%% unless we manually intervene, the sum will remain 0, and the queues +%% will never get a non-zero duration. Thus when the mem use is < +%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT. +-define(SUM_INC_THRESHOLD, 0.95). +-define(SUM_INC_AMOUNT, 1.0). + +%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. +-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). + +-define(EPSILON, 0.000001). %% less than this and we clamp to 0 + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). +-spec(update/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). +-spec(deregister/1 :: (pid()) -> 'ok'). +-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +update() -> + gen_server2:cast(?SERVER, update). + +register(Pid, MFA = {_M, _F, _A}) -> + gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). + +deregister(Pid) -> + gen_server2:cast(?SERVER, {deregister, Pid}). + +report_ram_duration(Pid, QueueDuration) -> + gen_server2:call(?SERVER, + {report_ram_duration, Pid, QueueDuration}, infinity). + +stop() -> + gen_server2:cast(?SERVER, stop). + +%%---------------------------------------------------------------------------- +%% Gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + MemoryLimit = trunc(?MEMORY_LIMIT_SCALING * + (try + vm_memory_monitor:get_memory_limit() + catch + exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM + end)), + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, + ?SERVER, update, []), + + Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), + + {ok, internal_update( + #state { timer = TRef, + queue_durations = Ets, + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + desired_duration = infinity })}. + +handle_call({report_ram_duration, Pid, QueueDuration}, From, + State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + desired_duration = SendDuration }) -> + + [Proc = #process { reported = PrevQueueDuration }] = + ets:lookup(Durations, Pid), + + gen_server2:reply(From, SendDuration), + + {Sum1, Count1} = + case {PrevQueueDuration, QueueDuration} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration, + Count} + end, + true = ets:insert(Durations, Proc #process { reported = QueueDuration, + sent = SendDuration }), + {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), + queue_duration_count = Count1 }}; + +handle_call({register, Pid, MFA}, _From, + State = #state { queue_durations = Durations }) -> + MRef = erlang:monitor(process, Pid), + true = ets:insert(Durations, #process { pid = Pid, reported = infinity, + sent = infinity, callback = MFA, + monitor = MRef }), + {reply, ok, State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast({deregister, Pid}, State) -> + {noreply, internal_deregister(Pid, true, State)}; + +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + {noreply, internal_deregister(Pid, false, State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state { timer = TRef }) -> + timer:cancel(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +zero_clamp(Sum) -> + case Sum < ?EPSILON of + true -> 0.0; + false -> Sum + end. + +internal_deregister(Pid, Demonitor, + State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations }) -> + case ets:lookup(Durations, Pid) of + [] -> State; + [#process { reported = PrevQueueDuration, monitor = MRef }] -> + true = case Demonitor of + true -> erlang:demonitor(MRef); + false -> true + end, + {Sum1, Count1} = + case PrevQueueDuration of + infinity -> {Sum, Count}; + _ -> {zero_clamp(Sum - PrevQueueDuration), + Count - 1} + end, + true = ets:delete(Durations, Pid), + State #state { queue_duration_sum = Sum1, + queue_duration_count = Count1 } + end. + +internal_update(State = #state { memory_limit = Limit, + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count }) -> + MemoryRatio = erlang:memory(total) / Limit, + DesiredDurationAvg1 = + case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of + true -> + infinity; + false -> + Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of + true -> Sum + ?SUM_INC_AMOUNT; + false -> Sum + end, + (Sum1 / Count) / MemoryRatio + end, + State1 = State #state { desired_duration = DesiredDurationAvg1 }, + + %% only inform queues immediately if the desired duration has + %% decreased + case DesiredDurationAvg1 == infinity orelse + (DesiredDurationAvg /= infinity andalso + DesiredDurationAvg1 >= DesiredDurationAvg) of + true -> + ok; + false -> + true = + ets:foldl( + fun (Proc = #process { reported = QueueDuration, + sent = PrevSendDuration, + callback = {M, F, A} }, true) -> + case (case {QueueDuration, PrevSendDuration} of + {infinity, infinity} -> + true; + {infinity, D} -> + DesiredDurationAvg1 < D; + {D, infinity} -> + DesiredDurationAvg1 < D; + {D1, D2} -> + DesiredDurationAvg1 < + lists:min([D1,D2]) + end) of + true -> + ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc #process {sent = DesiredDurationAvg1}); + false -> + true + end + end, true, Durations) + end, + State1. diff --git a/src/supervisor2.erl b/src/supervisor2.erl new file mode 100644 index 0000000000..5575351269 --- /dev/null +++ b/src/supervisor2.erl @@ -0,0 +1,917 @@ +%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is supervisor2 +%% +%% 2) there is a new strategy called +%% simple_one_for_one_terminate. This is exactly the same as for +%% simple_one_for_one, except that children *are* explicitly +%% terminated as per the shutdown component of the child_spec. +%% +%% All modifications are (C) 2010 LShift Ltd. +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(supervisor2). + +-behaviour(gen_server). + +%% External exports +-export([start_link/2,start_link/3, + start_child/2, restart_child/2, + delete_child/2, terminate_child/2, + which_children/1, + check_childspecs/1]). + +-export([behaviour_info/1]). + +%% Internal exports +-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). +-export([handle_cast/2]). + +-define(DICT, dict). + +-record(state, {name, + strategy, + children = [], + dynamics = ?DICT:new(), + intensity, + period, + restarts = [], + module, + args}). + +-record(child, {pid = undefined, % pid is undefined when child is not running + name, + mfa, + restart_type, + shutdown, + child_type, + modules = []}). + +-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse + State#state.strategy =:= simple_one_for_one_terminate). +-define(is_terminate_simple(State), + State#state.strategy =:= simple_one_for_one_terminate). + +behaviour_info(callbacks) -> + [{init,1}]; +behaviour_info(_Other) -> + undefined. + +%%% --------------------------------------------------- +%%% This is a general process supervisor built upon gen_server.erl. +%%% Servers/processes should/could also be built using gen_server.erl. +%%% SupName = {local, atom()} | {global, atom()}. +%%% --------------------------------------------------- +start_link(Mod, Args) -> + gen_server:start_link(?MODULE, {self, Mod, Args}, []). + +start_link(SupName, Mod, Args) -> + gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). + +%%% --------------------------------------------------- +%%% Interface functions. +%%% --------------------------------------------------- +start_child(Supervisor, ChildSpec) -> + call(Supervisor, {start_child, ChildSpec}). + +restart_child(Supervisor, Name) -> + call(Supervisor, {restart_child, Name}). + +delete_child(Supervisor, Name) -> + call(Supervisor, {delete_child, Name}). + +%%----------------------------------------------------------------- +%% Func: terminate_child/2 +%% Returns: ok | {error, Reason} +%% Note that the child is *always* terminated in some +%% way (maybe killed). +%%----------------------------------------------------------------- +terminate_child(Supervisor, Name) -> + call(Supervisor, {terminate_child, Name}). + +which_children(Supervisor) -> + call(Supervisor, which_children). + +call(Supervisor, Req) -> + gen_server:call(Supervisor, Req, infinity). + +check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> + case check_startspec(ChildSpecs) of + {ok, _} -> ok; + Error -> {error, Error} + end; +check_childspecs(X) -> {error, {badarg, X}}. + +%%% --------------------------------------------------- +%%% +%%% Initialize the supervisor. +%%% +%%% --------------------------------------------------- +init({SupName, Mod, Args}) -> + process_flag(trap_exit, true), + case Mod:init(Args) of + {ok, {SupFlags, StartSpec}} -> + case init_state(SupName, SupFlags, Mod, Args) of + {ok, State} when ?is_simple(State) -> + init_dynamic(State, StartSpec); + {ok, State} -> + init_children(State, StartSpec); + Error -> + {stop, {supervisor_data, Error}} + end; + ignore -> + ignore; + Error -> + {stop, {bad_return, {Mod, init, Error}}} + end. + +init_children(State, StartSpec) -> + SupName = State#state.name, + case check_startspec(StartSpec) of + {ok, Children} -> + case start_children(Children, SupName) of + {ok, NChildren} -> + {ok, State#state{children = NChildren}}; + {error, NChildren} -> + terminate_children(NChildren, SupName), + {stop, shutdown} + end; + Error -> + {stop, {start_spec, Error}} + end. + +init_dynamic(State, [StartSpec]) -> + case check_startspec([StartSpec]) of + {ok, Children} -> + {ok, State#state{children = Children}}; + Error -> + {stop, {start_spec, Error}} + end; +init_dynamic(_State, StartSpec) -> + {stop, {bad_start_spec, StartSpec}}. + +%%----------------------------------------------------------------- +%% Func: start_children/2 +%% Args: Children = [#child] in start order +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Purpose: Start all children. The new list contains #child's +%% with pids. +%% Returns: {ok, NChildren} | {error, NChildren} +%% NChildren = [#child] in termination order (reversed +%% start order) +%%----------------------------------------------------------------- +start_children(Children, SupName) -> start_children(Children, [], SupName). + +start_children([Child|Chs], NChildren, SupName) -> + case do_start_child(SupName, Child) of + {ok, Pid} -> + start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); + {ok, Pid, _Extra} -> + start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); + {error, Reason} -> + report_error(start_error, Reason, Child, SupName), + {error, lists:reverse(Chs) ++ [Child | NChildren]} + end; +start_children([], NChildren, _SupName) -> + {ok, NChildren}. + +do_start_child(SupName, Child) -> + #child{mfa = {M, F, A}} = Child, + case catch apply(M, F, A) of + {ok, Pid} when is_pid(Pid) -> + NChild = Child#child{pid = Pid}, + report_progress(NChild, SupName), + {ok, Pid}; + {ok, Pid, Extra} when is_pid(Pid) -> + NChild = Child#child{pid = Pid}, + report_progress(NChild, SupName), + {ok, Pid, Extra}; + ignore -> + {ok, undefined}; + {error, What} -> {error, What}; + What -> {error, What} + end. + +do_start_child_i(M, F, A) -> + case catch apply(M, F, A) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {ok, Pid, Extra} when is_pid(Pid) -> + {ok, Pid, Extra}; + ignore -> + {ok, undefined}; + {error, Error} -> + {error, Error}; + What -> + {error, What} + end. + + +%%% --------------------------------------------------- +%%% +%%% Callback functions. +%%% +%%% --------------------------------------------------- +handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> + #child{mfa = {M, F, A}} = hd(State#state.children), + Args = A ++ EArgs, + case do_start_child_i(M, F, Args) of + {ok, Pid} -> + NState = State#state{dynamics = + ?DICT:store(Pid, Args, State#state.dynamics)}, + {reply, {ok, Pid}, NState}; + {ok, Pid, Extra} -> + NState = State#state{dynamics = + ?DICT:store(Pid, Args, State#state.dynamics)}, + {reply, {ok, Pid, Extra}, NState}; + What -> + {reply, What, State} + end; + +%%% The requests terminate_child, delete_child and restart_child are +%%% invalid for simple_one_for_one and simple_one_for_one_terminate +%%% supervisors. +handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> + {reply, {error, State#state.strategy}, State}; + +handle_call({start_child, ChildSpec}, _From, State) -> + case check_childspec(ChildSpec) of + {ok, Child} -> + {Resp, NState} = handle_start_child(Child, State), + {reply, Resp, NState}; + What -> + {reply, {error, What}, State} + end; + +handle_call({restart_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} when Child#child.pid =:= undefined -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + NState = replace_child(Child#child{pid = Pid}, State), + {reply, {ok, Pid}, NState}; + {ok, Pid, Extra} -> + NState = replace_child(Child#child{pid = Pid}, State), + {reply, {ok, Pid, Extra}, NState}; + Error -> + {reply, Error, State} + end; + {value, _} -> + {reply, {error, running}, State}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call({delete_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} when Child#child.pid =:= undefined -> + NState = remove_child(Child, State), + {reply, ok, NState}; + {value, _} -> + {reply, {error, running}, State}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call({terminate_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} -> + NChild = do_terminate(Child, State#state.name), + {reply, ok, replace_child(NChild, State)}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call(which_children, _From, State) when ?is_simple(State) -> + [#child{child_type = CT, modules = Mods}] = State#state.children, + Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end, + ?DICT:to_list(State#state.dynamics)), + {reply, Reply, State}; + +handle_call(which_children, _From, State) -> + Resp = + lists:map(fun(#child{pid = Pid, name = Name, + child_type = ChildType, modules = Mods}) -> + {Name, Pid, ChildType, Mods} + end, + State#state.children), + {reply, Resp, State}. + + +%%% Hopefully cause a function-clause as there is no API function +%%% that utilizes cast. +handle_cast(null, State) -> + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + []), + + {noreply, State}. + +%% +%% Take care of terminated children. +%% +handle_info({'EXIT', Pid, Reason}, State) -> + case restart_child(Pid, Reason, State) of + {ok, State1} -> + {noreply, State1}; + {shutdown, State1} -> + {stop, shutdown, State1} + end; + +handle_info(Msg, State) -> + error_logger:error_msg("Supervisor received unexpected message: ~p~n", + [Msg]), + {noreply, State}. +%% +%% Terminate this server. +%% +terminate(_Reason, State) when ?is_terminate_simple(State) -> + terminate_simple_children( + hd(State#state.children), State#state.dynamics, State#state.name), + ok; +terminate(_Reason, State) -> + terminate_children(State#state.children, State#state.name), + ok. + +%% +%% Change code for the supervisor. +%% Call the new call-back module and fetch the new start specification. +%% Combine the new spec. with the old. If the new start spec. is +%% not valid the code change will not succeed. +%% Use the old Args as argument to Module:init/1. +%% NOTE: This requires that the init function of the call-back module +%% does not have any side effects. +%% +code_change(_, State, _) -> + case (State#state.module):init(State#state.args) of + {ok, {SupFlags, StartSpec}} -> + case catch check_flags(SupFlags) of + ok -> + {Strategy, MaxIntensity, Period} = SupFlags, + update_childspec(State#state{strategy = Strategy, + intensity = MaxIntensity, + period = Period}, + StartSpec); + Error -> + {error, Error} + end; + ignore -> + {ok, State}; + Error -> + Error + end. + +check_flags({Strategy, MaxIntensity, Period}) -> + validStrategy(Strategy), + validIntensity(MaxIntensity), + validPeriod(Period), + ok; +check_flags(What) -> + {bad_flags, What}. + +update_childspec(State, StartSpec) when ?is_simple(State) -> + case check_startspec(StartSpec) of + {ok, [Child]} -> + {ok, State#state{children = [Child]}}; + Error -> + {error, Error} + end; + +update_childspec(State, StartSpec) -> + case check_startspec(StartSpec) of + {ok, Children} -> + OldC = State#state.children, % In reverse start order ! + NewC = update_childspec1(OldC, Children, []), + {ok, State#state{children = NewC}}; + Error -> + {error, Error} + end. + +update_childspec1([Child|OldC], Children, KeepOld) -> + case update_chsp(Child, Children) of + {ok,NewChildren} -> + update_childspec1(OldC, NewChildren, KeepOld); + false -> + update_childspec1(OldC, Children, [Child|KeepOld]) + end; +update_childspec1([], Children, KeepOld) -> + % Return them in (keeped) reverse start order. + lists:reverse(Children ++ KeepOld). + +update_chsp(OldCh, Children) -> + case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> + Ch#child{pid = OldCh#child.pid}; + (Ch) -> + Ch + end, + Children) of + Children -> + false; % OldCh not found in new spec. + NewC -> + {ok, NewC} + end. + +%%% --------------------------------------------------- +%%% Start a new child. +%%% --------------------------------------------------- + +handle_start_child(Child, State) -> + case get_child(Child#child.name, State) of + false -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + Children = State#state.children, + {{ok, Pid}, + State#state{children = + [Child#child{pid = Pid}|Children]}}; + {ok, Pid, Extra} -> + Children = State#state.children, + {{ok, Pid, Extra}, + State#state{children = + [Child#child{pid = Pid}|Children]}}; + {error, What} -> + {{error, {What, Child}}, State} + end; + {value, OldChild} when OldChild#child.pid =/= undefined -> + {{error, {already_started, OldChild#child.pid}}, State}; + {value, _OldChild} -> + {{error, already_present}, State} + end. + +%%% --------------------------------------------------- +%%% Restart. A process has terminated. +%%% Returns: {ok, #state} | {shutdown, #state} +%%% --------------------------------------------------- + +restart_child(Pid, Reason, State) when ?is_simple(State) -> + case ?DICT:find(Pid, State#state.dynamics) of + {ok, Args} -> + [Child] = State#state.children, + RestartType = Child#child.restart_type, + {M, F, _} = Child#child.mfa, + NChild = Child#child{pid = Pid, mfa = {M, F, Args}}, + do_restart(RestartType, Reason, NChild, State); + error -> + {ok, State} + end; +restart_child(Pid, Reason, State) -> + Children = State#state.children, + case lists:keysearch(Pid, #child.pid, Children) of + {value, Child} -> + RestartType = Child#child.restart_type, + do_restart(RestartType, Reason, Child, State); + _ -> + {ok, State} + end. + +do_restart(permanent, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + restart(Child, State); +do_restart(_, normal, Child, State) -> + NState = state_del_child(Child, State), + {ok, NState}; +do_restart(_, shutdown, Child, State) -> + NState = state_del_child(Child, State), + {ok, NState}; +do_restart(transient, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + restart(Child, State); +do_restart(temporary, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + NState = state_del_child(Child, State), + {ok, NState}. + +restart(Child, State) -> + case add_restart(State) of + {ok, NState} -> + restart(NState#state.strategy, Child, NState); + {terminate, NState} -> + report_error(shutdown, reached_max_restart_intensity, + Child, State#state.name), + {shutdown, remove_child(Child, NState)} + end. + +restart(Strategy, Child, State) + when Strategy =:= simple_one_for_one orelse + Strategy =:= simple_one_for_one_terminate -> + #child{mfa = {M, F, A}} = Child, + Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), + case do_start_child_i(M, F, A) of + {ok, Pid} -> + NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, + {ok, NState}; + {ok, Pid, _Extra} -> + NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, + {ok, NState}; + {error, Error} -> + report_error(start_error, Error, Child, State#state.name), + restart(Child, State) + end; +restart(one_for_one, Child, State) -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + NState = replace_child(Child#child{pid = Pid}, State), + {ok, NState}; + {ok, Pid, _Extra} -> + NState = replace_child(Child#child{pid = Pid}, State), + {ok, NState}; + {error, Reason} -> + report_error(start_error, Reason, Child, State#state.name), + restart(Child, State) + end; +restart(rest_for_one, Child, State) -> + {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), + ChAfter2 = terminate_children(ChAfter, State#state.name), + case start_children(ChAfter2, State#state.name) of + {ok, ChAfter3} -> + {ok, State#state{children = ChAfter3 ++ ChBefore}}; + {error, ChAfter3} -> + restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + end; +restart(one_for_all, Child, State) -> + Children1 = del_child(Child#child.pid, State#state.children), + Children2 = terminate_children(Children1, State#state.name), + case start_children(Children2, State#state.name) of + {ok, NChs} -> + {ok, State#state{children = NChs}}; + {error, NChs} -> + restart(Child, State#state{children = NChs}) + end. + +%%----------------------------------------------------------------- +%% Func: terminate_children/2 +%% Args: Children = [#child] in termination order +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Returns: NChildren = [#child] in +%% startup order (reversed termination order) +%%----------------------------------------------------------------- +terminate_children(Children, SupName) -> + terminate_children(Children, SupName, []). + +terminate_children([Child | Children], SupName, Res) -> + NChild = do_terminate(Child, SupName), + terminate_children(Children, SupName, [NChild | Res]); +terminate_children([], _SupName, Res) -> + Res. + +terminate_simple_children(Child, Dynamics, SupName) -> + dict:fold(fun (Pid, _Args, _Any) -> + do_terminate(Child#child{pid = Pid}, SupName) + end, ok, Dynamics), + ok. + +do_terminate(Child, SupName) when Child#child.pid =/= undefined -> + case shutdown(Child#child.pid, + Child#child.shutdown) of + ok -> + Child#child{pid = undefined}; + {error, OtherReason} -> + report_error(shutdown_error, OtherReason, Child, SupName), + Child#child{pid = undefined} + end; +do_terminate(Child, _SupName) -> + Child. + +%%----------------------------------------------------------------- +%% Shutdowns a child. We must check the EXIT value +%% of the child, because it might have died with another reason than +%% the wanted. In that case we want to report the error. We put a +%% monitor on the child an check for the 'DOWN' message instead of +%% checking for the 'EXIT' message, because if we check the 'EXIT' +%% message a "naughty" child, who does unlink(Sup), could hang the +%% supervisor. +%% Returns: ok | {error, OtherReason} (this should be reported) +%%----------------------------------------------------------------- +shutdown(Pid, brutal_kill) -> + + case monitor_child(Pid) of + ok -> + exit(Pid, kill), + receive + {'DOWN', _MRef, process, Pid, killed} -> + ok; + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + end; + {error, Reason} -> + {error, Reason} + end; + +shutdown(Pid, Time) -> + + case monitor_child(Pid) of + ok -> + exit(Pid, shutdown), %% Try to shutdown gracefully + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + ok; + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + after Time -> + exit(Pid, kill), %% Force termination. + receive + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + end + end; + {error, Reason} -> + {error, Reason} + end. + +%% Help function to shutdown/2 switches from link to monitor approach +monitor_child(Pid) -> + + %% Do the monitor operation first so that if the child dies + %% before the monitoring is done causing a 'DOWN'-message with + %% reason noproc, we will get the real reason in the 'EXIT'-message + %% unless a naughty child has already done unlink... + erlang:monitor(process, Pid), + unlink(Pid), + + receive + %% If the child dies before the unlik we must empty + %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. + {'EXIT', Pid, Reason} -> + receive + {'DOWN', _, process, Pid, _} -> + {error, Reason} + end + after 0 -> + %% If a naughty child did unlink and the child dies before + %% monitor the result will be that shutdown/2 receives a + %% 'DOWN'-message with reason noproc. + %% If the child should die after the unlink there + %% will be a 'DOWN'-message with a correct reason + %% that will be handled in shutdown/2. + ok + end. + + +%%----------------------------------------------------------------- +%% Child/State manipulating functions. +%%----------------------------------------------------------------- +state_del_child(#child{pid = Pid}, State) when ?is_simple(State) -> + NDynamics = ?DICT:erase(Pid, State#state.dynamics), + State#state{dynamics = NDynamics}; +state_del_child(Child, State) -> + NChildren = del_child(Child#child.name, State#state.children), + State#state{children = NChildren}. + +del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name -> + [Ch#child{pid = undefined} | Chs]; +del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid -> + [Ch#child{pid = undefined} | Chs]; +del_child(Name, [Ch|Chs]) -> + [Ch|del_child(Name, Chs)]; +del_child(_, []) -> + []. + +%% Chs = [S4, S3, Ch, S1, S0] +%% Ret: {[S4, S3, Ch], [S1, S0]} +split_child(Name, Chs) -> + split_child(Name, Chs, []). + +split_child(Name, [Ch|Chs], After) when Ch#child.name =:= Name -> + {lists:reverse([Ch#child{pid = undefined} | After]), Chs}; +split_child(Pid, [Ch|Chs], After) when Ch#child.pid =:= Pid -> + {lists:reverse([Ch#child{pid = undefined} | After]), Chs}; +split_child(Name, [Ch|Chs], After) -> + split_child(Name, Chs, [Ch | After]); +split_child(_, [], After) -> + {lists:reverse(After), []}. + +get_child(Name, State) -> + lists:keysearch(Name, #child.name, State#state.children). +replace_child(Child, State) -> + Chs = do_replace_child(Child, State#state.children), + State#state{children = Chs}. + +do_replace_child(Child, [Ch|Chs]) when Ch#child.name =:= Child#child.name -> + [Child | Chs]; +do_replace_child(Child, [Ch|Chs]) -> + [Ch|do_replace_child(Child, Chs)]. + +remove_child(Child, State) -> + Chs = lists:keydelete(Child#child.name, #child.name, State#state.children), + State#state{children = Chs}. + +%%----------------------------------------------------------------- +%% Func: init_state/4 +%% Args: SupName = {local, atom()} | {global, atom()} | self +%% Type = {Strategy, MaxIntensity, Period} +%% Strategy = one_for_one | one_for_all | simple_one_for_one | +%% rest_for_one +%% MaxIntensity = integer() +%% Period = integer() +%% Mod :== atom() +%% Arsg :== term() +%% Purpose: Check that Type is of correct type (!) +%% Returns: {ok, #state} | Error +%%----------------------------------------------------------------- +init_state(SupName, Type, Mod, Args) -> + case catch init_state1(SupName, Type, Mod, Args) of + {ok, State} -> + {ok, State}; + Error -> + Error + end. + +init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) -> + validStrategy(Strategy), + validIntensity(MaxIntensity), + validPeriod(Period), + {ok, #state{name = supname(SupName,Mod), + strategy = Strategy, + intensity = MaxIntensity, + period = Period, + module = Mod, + args = Args}}; +init_state1(_SupName, Type, _, _) -> + {invalid_type, Type}. + +validStrategy(simple_one_for_one_terminate) -> true; +validStrategy(simple_one_for_one) -> true; +validStrategy(one_for_one) -> true; +validStrategy(one_for_all) -> true; +validStrategy(rest_for_one) -> true; +validStrategy(What) -> throw({invalid_strategy, What}). + +validIntensity(Max) when is_integer(Max), + Max >= 0 -> true; +validIntensity(What) -> throw({invalid_intensity, What}). + +validPeriod(Period) when is_integer(Period), + Period > 0 -> true; +validPeriod(What) -> throw({invalid_period, What}). + +supname(self,Mod) -> {self(),Mod}; +supname(N,_) -> N. + +%%% ------------------------------------------------------ +%%% Check that the children start specification is valid. +%%% Shall be a six (6) tuple +%%% {Name, Func, RestartType, Shutdown, ChildType, Modules} +%%% where Name is an atom +%%% Func is {Mod, Fun, Args} == {atom, atom, list} +%%% RestartType is permanent | temporary | transient +%%% Shutdown = integer() | infinity | brutal_kill +%%% ChildType = supervisor | worker +%%% Modules = [atom()] | dynamic +%%% Returns: {ok, [#child]} | Error +%%% ------------------------------------------------------ + +check_startspec(Children) -> check_startspec(Children, []). + +check_startspec([ChildSpec|T], Res) -> + case check_childspec(ChildSpec) of + {ok, Child} -> + case lists:keymember(Child#child.name, #child.name, Res) of + true -> {duplicate_child_name, Child#child.name}; + false -> check_startspec(T, [Child | Res]) + end; + Error -> Error + end; +check_startspec([], Res) -> + {ok, lists:reverse(Res)}. + +check_childspec({Name, Func, RestartType, Shutdown, ChildType, Mods}) -> + catch check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods); +check_childspec(X) -> {invalid_child_spec, X}. + +check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) -> + validName(Name), + validFunc(Func), + validRestartType(RestartType), + validChildType(ChildType), + validShutdown(Shutdown, ChildType), + validMods(Mods), + {ok, #child{name = Name, mfa = Func, restart_type = RestartType, + shutdown = Shutdown, child_type = ChildType, modules = Mods}}. + +validChildType(supervisor) -> true; +validChildType(worker) -> true; +validChildType(What) -> throw({invalid_child_type, What}). + +validName(_Name) -> true. + +validFunc({M, F, A}) when is_atom(M), + is_atom(F), + is_list(A) -> true; +validFunc(Func) -> throw({invalid_mfa, Func}). + +validRestartType(permanent) -> true; +validRestartType(temporary) -> true; +validRestartType(transient) -> true; +validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}). + +validShutdown(Shutdown, _) + when is_integer(Shutdown), Shutdown > 0 -> true; +validShutdown(infinity, supervisor) -> true; +validShutdown(brutal_kill, _) -> true; +validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). + +validMods(dynamic) -> true; +validMods(Mods) when is_list(Mods) -> + lists:foreach(fun(Mod) -> + if + is_atom(Mod) -> ok; + true -> throw({invalid_module, Mod}) + end + end, + Mods); +validMods(Mods) -> throw({invalid_modules, Mods}). + +%%% ------------------------------------------------------ +%%% Add a new restart and calculate if the max restart +%%% intensity has been reached (in that case the supervisor +%%% shall terminate). +%%% All restarts accured inside the period amount of seconds +%%% are kept in the #state.restarts list. +%%% Returns: {ok, State'} | {terminate, State'} +%%% ------------------------------------------------------ + +add_restart(State) -> + I = State#state.intensity, + P = State#state.period, + R = State#state.restarts, + Now = erlang:now(), + R1 = add_restart([Now|R], Now, P), + State1 = State#state{restarts = R1}, + case length(R1) of + CurI when CurI =< I -> + {ok, State1}; + _ -> + {terminate, State1} + end. + +add_restart([R|Restarts], Now, Period) -> + case inPeriod(R, Now, Period) of + true -> + [R|add_restart(Restarts, Now, Period)]; + _ -> + [] + end; +add_restart([], _, _) -> + []. + +inPeriod(Time, Now, Period) -> + case difference(Time, Now) of + T when T > Period -> + false; + _ -> + true + end. + +%% +%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored) +%% Calculate the time elapsed in seconds between two timestamps. +%% If MegaSecs is equal just subtract Secs. +%% Else calculate the Mega difference and add the Secs difference, +%% note that Secs difference can be negative, e.g. +%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs. +%% +difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM -> + ((CurM - TimeM) * 1000000) + (CurS - TimeS); +difference({_, TimeS, _}, {_, CurS, _}) -> + CurS - TimeS. + +%%% ------------------------------------------------------ +%%% Error and progress reporting. +%%% ------------------------------------------------------ + +report_error(Error, Reason, Child, SupName) -> + ErrorMsg = [{supervisor, SupName}, + {errorContext, Error}, + {reason, Reason}, + {offender, extract_child(Child)}], + error_logger:error_report(supervisor_report, ErrorMsg). + + +extract_child(Child) -> + [{pid, Child#child.pid}, + {name, Child#child.name}, + {mfa, Child#child.mfa}, + {restart_type, Child#child.restart_type}, + {shutdown, Child#child.shutdown}, + {child_type, Child#child.child_type}]. + +report_progress(Child, SupName) -> + Progress = [{supervisor, SupName}, + {started, extract_child(Child)}], + error_logger:info_report(progress, Progress). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 68efc27f97..3b23daa5c1 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -76,7 +76,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), %% handle - apply(M, F, A ++ [Sock]) + file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -104,6 +104,7 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> + ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 9ef8c636a3..57901fd5cf 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -35,6 +35,8 @@ -export([start_link/1, submit/2, submit_async/2, run/1]). +-export([set_maximum_since_use/2]). + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -48,6 +50,7 @@ (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; ({atom(), atom(), [any()]}) -> any()). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. @@ -67,6 +70,9 @@ submit(Pid, Fun) -> submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). +set_maximum_since_use(Pid, Age) -> + gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + run({M, F, A}) -> apply(M, F, A); run(Fun) -> @@ -75,6 +81,8 @@ run(Fun) -> %%---------------------------------------------------------------------------- init([WId]) -> + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), {ok, WId, hibernate, @@ -93,6 +101,10 @@ handle_cast({submit_async, Fun}, WId) -> ok = worker_pool:idle(WId), {noreply, WId, hibernate}; +handle_cast({set_maximum_since_use, Age}, WId) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, WId, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |
