summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2017-05-27 00:46:01 +0300
committerMichael Klishin <mklishin@pivotal.io>2017-05-27 00:46:01 +0300
commit10fd03dbf2168aed1cb7e96da7c3603444f1e449 (patch)
treeb51a5a2def3550732a072013aa79dbd6f8e1d5d2 /src
parentf8dbf368bc358ad0ac889354f4fc2cfd1942d96f (diff)
parent8bf65e52fc2e5a6242197f8364d2ec0dd3b60145 (diff)
downloadrabbitmq-server-git-10fd03dbf2168aed1cb7e96da7c3603444f1e449.tar.gz
Merge branch 'master' into rabbitmq-common-198
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl1530
-rw-r--r--src/file_handle_cache_stats.erl66
-rw-r--r--src/rabbit_fhc_helpers.erl52
-rw-r--r--src/rabbit_memory_monitor.erl14
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_plugins.erl45
-rw-r--r--src/rabbit_runtime_parameters.erl20
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/vm_memory_monitor.erl47
9 files changed, 146 insertions, 1634 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
deleted file mode 100644
index 070b6e5263..0000000000
--- a/src/file_handle_cache.erl
+++ /dev/null
@@ -1,1530 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(file_handle_cache).
-
-%% A File Handle Cache
-%%
-%% This extends a subset of the functionality of the Erlang file
-%% module. In the below, we use "file handle" to specifically refer to
-%% file handles, and "file descriptor" to refer to descriptors which
-%% are not file handles, e.g. sockets.
-%%
-%% Some constraints
-%% 1) This supports one writer, multiple readers per file. Nothing
-%% else.
-%% 2) Do not open the same file from different processes. Bad things
-%% may happen, especially for writes.
-%% 3) Writes are all appends. You cannot write to the middle of a
-%% file, although you can truncate and then append if you want.
-%% 4) There are read and write buffers. Feel free to use the read_ahead
-%% mode, but beware of the interaction between that buffer and the write
-%% buffer.
-%%
-%% Some benefits
-%% 1) You do not have to remember to call sync before close
-%% 2) Buffering is much more flexible than with the plain file module,
-%% and you can control when the buffer gets flushed out. This means
-%% that you can rely on reads-after-writes working, without having to
-%% call the expensive sync.
-%% 3) Unnecessary calls to position and sync get optimised out.
-%% 4) You can find out what your 'real' offset is, and what your
-%% 'virtual' offset is (i.e. where the hdl really is, and where it
-%% would be after the write buffer is written out).
-%%
-%% There is also a server component which serves to limit the number
-%% of open file descriptors. This is a hard limit: the server
-%% component will ensure that clients do not have more file
-%% descriptors open than it's configured to allow.
-%%
-%% On open, the client requests permission from the server to open the
-%% required number of file handles. The server may ask the client to
-%% close other file handles that it has open, or it may queue the
-%% request and ask other clients to close file handles they have open
-%% in order to satisfy the request. Requests are always satisfied in
-%% the order they arrive, even if a latter request (for a small number
-%% of file handles) can be satisfied before an earlier request (for a
-%% larger number of file handles). On close, the client sends a
-%% message to the server. These messages allow the server to keep
-%% track of the number of open handles. The client also keeps a
-%% gb_tree which is updated on every use of a file handle, mapping the
-%% time at which the file handle was last used (timestamp) to the
-%% handle. Thus the smallest key in this tree maps to the file handle
-%% that has not been used for the longest amount of time. This
-%% smallest key is included in the messages to the server. As such,
-%% the server keeps track of when the least recently used file handle
-%% was used *at the point of the most recent open or close* by each
-%% client.
-%%
-%% Note that this data can go very out of date, by the client using
-%% the least recently used handle.
-%%
-%% When the limit is exceeded (i.e. the number of open file handles is
-%% at the limit and there are pending 'open' requests), the server
-%% calculates the average age of the last reported least recently used
-%% file handle of all the clients. It then tells all the clients to
-%% close any handles not used for longer than this average, by
-%% invoking the callback the client registered. The client should
-%% receive this message and pass it into
-%% set_maximum_since_use/1. However, it is highly possible this age
-%% will be greater than the ages of all the handles the client knows
-%% of because the client has used its file handles in the mean
-%% time. Thus at this point the client reports to the server the
-%% current timestamp at which its least recently used file handle was
-%% last used. The server will check two seconds later that either it
-%% is back under the limit, in which case all is well again, or if
-%% not, it will calculate a new average age. Its data will be much
-%% more recent now, and so it is very likely that when this is
-%% communicated to the clients, the clients will close file handles.
-%% (In extreme cases, where it's very likely that all clients have
-%% used their open handles since they last sent in an update, which
-%% would mean that the average will never cause any file handles to
-%% be closed, the server can send out an average age of 0, resulting
-%% in all available clients closing all their file handles.)
-%%
-%% Care is taken to ensure that (a) processes which are blocked
-%% waiting for file descriptors to become available are not sent
-%% requests to close file handles; and (b) given it is known how many
-%% file handles a process has open, when the average age is forced to
-%% 0, close messages are only sent to enough processes to release the
-%% correct number of file handles and the list of processes is
-%% randomly shuffled. This ensures we don't cause processes to
-%% needlessly close file handles, and ensures that we don't always
-%% make such requests of the same processes.
-%%
-%% The advantage of this scheme is that there is only communication
-%% from the client to the server on open, close, and when in the
-%% process of trying to reduce file handle usage. There is no
-%% communication from the client to the server on normal file handle
-%% operations. This scheme forms a feed-back loop - the server does
-%% not care which file handles are closed, just that some are, and it
-%% checks this repeatedly when over the limit.
-%%
-%% Handles which are closed as a result of the server are put into a
-%% "soft-closed" state in which the handle is closed (data flushed out
-%% and sync'd first) but the state is maintained. The handle will be
-%% fully reopened again as soon as needed, thus users of this library
-%% do not need to worry about their handles being closed by the server
-%% - reopening them when necessary is handled transparently.
-%%
-%% The server also supports obtain, release and transfer. obtain/{0,1}
-%% blocks until a file descriptor is available, at which point the
-%% requesting process is considered to 'own' more descriptor(s).
-%% release/{0,1} is the inverse operation and releases previously obtained
-%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s)
-%% between processes. It is non-blocking. Obtain has a
-%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
-%% the entire limit, but will be evicted by obtain calls up to the
-%% point at which no more obtain calls can be satisfied by the obtains
-%% limit. Thus there will always be some capacity available for file
-%% handles. Processes that use obtain are never asked to return them,
-%% and they are not managed in any way by the server. It is simply a
-%% mechanism to ensure that processes that need file descriptors such
-%% as sockets can do so in such a way that the overall number of open
-%% file descriptors is managed.
-%%
-%% The callers of register_callback/3, obtain, and the argument of
-%% transfer are monitored, reducing the count of handles in use
-%% appropriately when the processes terminate.
-
--behaviour(gen_server2).
-
--export([register_callback/3]).
--export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
- truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
- copy/3, set_maximum_since_use/1, delete/1, clear/1,
- open_with_absolute_path/3]).
--export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
- set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2,
- info/0, info/1, clear_read_cache/0]).
--export([ulimit/0]).
-
--export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
-
--define(SERVER, ?MODULE).
--define(RESERVED_FOR_OTHERS, 100).
-
--define(FILE_HANDLES_LIMIT_OTHER, 1024).
--define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-
--define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
--define(CLIENT_ETS_TABLE, file_handle_cache_client).
--define(ELDERS_ETS_TABLE, file_handle_cache_elders).
-
--include("rabbit.hrl"). % For #amqqueue record definition.
-
-%%----------------------------------------------------------------------------
-
--record(file,
- { reader_count,
- has_writer
- }).
-
--record(handle,
- { hdl,
- ref,
- offset,
- is_dirty,
- write_buffer_size,
- write_buffer_size_limit,
- write_buffer,
- read_buffer,
- read_buffer_pos,
- read_buffer_rem, %% Num of bytes from pos to end
- read_buffer_size, %% Next size of read buffer to use
- read_buffer_size_limit, %% Max size of read buffer to use
- read_buffer_usage, %% Bytes we have read from it, for tuning
- at_eof,
- path,
- mode,
- options,
- is_write,
- is_read,
- last_used_at
- }).
-
--record(fhc_state,
- { elders,
- limit,
- open_count,
- open_pending,
- obtain_limit, %%socket
- obtain_count_socket,
- obtain_count_file,
- obtain_pending_socket,
- obtain_pending_file,
- clients,
- timer_ref,
- alarm_set,
- alarm_clear
- }).
-
--record(cstate,
- { pid,
- callback,
- opened,
- obtained_socket,
- obtained_file,
- blocked,
- pending_closes
- }).
-
--record(pending,
- { kind,
- pid,
- requested,
- from
- }).
-
-%%----------------------------------------------------------------------------
-%% Specs
-%%----------------------------------------------------------------------------
-
--type ref() :: any().
--type ok_or_error() :: 'ok' | {'error', any()}.
--type val_or_error(T) :: {'ok', T} | {'error', any()}.
--type position() :: ('bof' | 'eof' | non_neg_integer() |
- {('bof' |'eof'), non_neg_integer()} |
- {'cur', integer()}).
--type offset() :: non_neg_integer().
-
--spec register_callback(atom(), atom(), [any()]) -> 'ok'.
--spec open
- (file:filename(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
- {'read_buffer', (non_neg_integer() | 'unbuffered')}]) ->
- val_or_error(ref()).
--spec open_with_absolute_path
- (file:filename(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
- {'read_buffer', (non_neg_integer() | 'unbuffered')}]) ->
- val_or_error(ref()).
--spec close(ref()) -> ok_or_error().
--spec read
- (ref(), non_neg_integer()) -> val_or_error([char()] | binary()) | 'eof'.
--spec append(ref(), iodata()) -> ok_or_error().
--spec sync(ref()) -> ok_or_error().
--spec position(ref(), position()) -> val_or_error(offset()).
--spec truncate(ref()) -> ok_or_error().
--spec current_virtual_offset(ref()) -> val_or_error(offset()).
--spec current_raw_offset(ref()) -> val_or_error(offset()).
--spec flush(ref()) -> ok_or_error().
--spec copy(ref(), ref(), non_neg_integer()) -> val_or_error(non_neg_integer()).
--spec delete(ref()) -> ok_or_error().
--spec clear(ref()) -> ok_or_error().
--spec set_maximum_since_use(non_neg_integer()) -> 'ok'.
--spec obtain() -> 'ok'.
--spec obtain(non_neg_integer()) -> 'ok'.
--spec release() -> 'ok'.
--spec release(non_neg_integer()) -> 'ok'.
--spec transfer(pid()) -> 'ok'.
--spec transfer(pid(), non_neg_integer()) -> 'ok'.
--spec with_handle(fun(() -> A)) -> A.
--spec with_handle(non_neg_integer(), fun(() -> A)) -> A.
--spec set_limit(non_neg_integer()) -> 'ok'.
--spec get_limit() -> non_neg_integer().
--spec info_keys() -> rabbit_types:info_keys().
--spec info() -> rabbit_types:infos().
--spec info([atom()]) -> rabbit_types:infos().
--spec ulimit() -> 'unknown' | non_neg_integer().
-
-%%----------------------------------------------------------------------------
--define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
-
-%%----------------------------------------------------------------------------
-%% Public API
-%%----------------------------------------------------------------------------
-
-start_link() ->
- start_link(fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
-
-start_link(AlarmSet, AlarmClear) ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear],
- [{timeout, infinity}]).
-
-register_callback(M, F, A)
- when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
- gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
-
-open(Path, Mode, Options) ->
- open_with_absolute_path(filename:absname(Path), Mode, Options).
-
-open_with_absolute_path(Path, Mode, Options) ->
- File1 = #file { reader_count = RCount, has_writer = HasWriter } =
- case get({Path, fhc_file}) of
- File = #file {} -> File;
- undefined -> #file { reader_count = 0,
- has_writer = false }
- end,
- Mode1 = append_to_write(Mode),
- IsWriter = is_writer(Mode1),
- case IsWriter andalso HasWriter of
- true -> {error, writer_exists};
- false -> {ok, Ref} = new_closed_handle(Path, Mode1, Options),
- case get_or_reopen_timed([{Ref, new}]) of
- {ok, [_Handle1]} ->
- RCount1 = case is_reader(Mode1) of
- true -> RCount + 1;
- false -> RCount
- end,
- HasWriter1 = HasWriter orelse IsWriter,
- put({Path, fhc_file},
- File1 #file { reader_count = RCount1,
- has_writer = HasWriter1 }),
- {ok, Ref};
- Error ->
- erase({Ref, fhc_handle}),
- Error
- end
- end.
-
-close(Ref) ->
- case erase({Ref, fhc_handle}) of
- undefined -> ok;
- Handle -> case hard_close(Handle) of
- ok -> ok;
- {Error, Handle1} -> put_handle(Ref, Handle1),
- Error
- end
- end.
-
-read(Ref, Count) ->
- with_flushed_handles(
- [Ref], keep,
- fun ([#handle { is_read = false }]) ->
- {error, not_open_for_reading};
- ([#handle{read_buffer_size_limit = 0,
- hdl = Hdl, offset = Offset} = Handle]) ->
- %% The read buffer is disabled. This is just an
- %% optimization: the clauses below can handle this case.
- case prim_file_read(Hdl, Count) of
- {ok, Data} -> {{ok, Data},
- [Handle#handle{offset = Offset+size(Data)}]};
- eof -> {eof, [Handle #handle { at_eof = true }]};
- Error -> {Error, Handle}
- end;
- ([Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- read_buffer_usage = BufUsg,
- offset = Offset}])
- when BufRem >= Count ->
- <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf,
- {{ok, Res}, [Handle#handle{offset = Offset + Count,
- read_buffer_pos = BufPos + Count,
- read_buffer_rem = BufRem - Count,
- read_buffer_usage = BufUsg + Count }]};
- ([Handle0]) ->
- maybe_reduce_read_cache([Ref]),
- Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- read_buffer_size = BufSz,
- hdl = Hdl,
- offset = Offset}
- = tune_read_buffer_limit(Handle0, Count),
- WantedCount = Count - BufRem,
- case prim_file_read(Hdl, max(BufSz, WantedCount)) of
- {ok, Data} ->
- <<_:BufPos/binary, BufTl/binary>> = Buf,
- ReadCount = size(Data),
- case ReadCount < WantedCount of
- true ->
- OffSet1 = Offset + BufRem + ReadCount,
- {{ok, <<BufTl/binary, Data/binary>>},
- [reset_read_buffer(
- Handle#handle{offset = OffSet1})]};
- false ->
- <<Hd:WantedCount/binary, _/binary>> = Data,
- OffSet1 = Offset + BufRem + WantedCount,
- BufRem1 = ReadCount - WantedCount,
- {{ok, <<BufTl/binary, Hd/binary>>},
- [Handle#handle{offset = OffSet1,
- read_buffer = Data,
- read_buffer_pos = WantedCount,
- read_buffer_rem = BufRem1,
- read_buffer_usage = WantedCount}]}
- end;
- eof ->
- {eof, [Handle #handle { at_eof = true }]};
- Error ->
- {Error, [reset_read_buffer(Handle)]}
- end
- end).
-
-append(Ref, Data) ->
- with_handles(
- [Ref],
- fun ([#handle { is_write = false }]) ->
- {error, not_open_for_writing};
- ([Handle]) ->
- case maybe_seek(eof, Handle) of
- {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
- write_buffer_size_limit = 0,
- at_eof = true } = Handle1} ->
- Offset1 = Offset + iolist_size(Data),
- {prim_file_write(Hdl, Data),
- [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
- {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
- write_buffer_size = Size,
- write_buffer_size_limit = Limit,
- at_eof = true } = Handle1} ->
- WriteBuffer1 = [Data | WriteBuffer],
- Size1 = Size + iolist_size(Data),
- Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
- write_buffer_size = Size1 },
- case Limit =/= infinity andalso Size1 > Limit of
- true -> {Result, Handle3} = write_buffer(Handle2),
- {Result, [Handle3]};
- false -> {ok, [Handle2]}
- end;
- {{error, _} = Error, Handle1} ->
- {Error, [Handle1]}
- end
- end).
-
-sync(Ref) ->
- with_flushed_handles(
- [Ref], keep,
- fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
- ok;
- ([Handle = #handle { hdl = Hdl,
- is_dirty = true, write_buffer = [] }]) ->
- case prim_file_sync(Hdl) of
- ok -> {ok, [Handle #handle { is_dirty = false }]};
- Error -> {Error, [Handle]}
- end
- end).
-
-needs_sync(Ref) ->
- %% This must *not* use with_handles/2; see bug 25052
- case get({Ref, fhc_handle}) of
- #handle { is_dirty = false, write_buffer = [] } -> false;
- #handle {} -> true
- end.
-
-position(Ref, NewOffset) ->
- with_flushed_handles(
- [Ref], keep,
- fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
- {Result, [Handle1]}
- end).
-
-truncate(Ref) ->
- with_flushed_handles(
- [Ref],
- fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case prim_file:truncate(Hdl) of
- ok -> {ok, [Handle1 #handle { at_eof = true }]};
- Error -> {Error, [Handle1]}
- end
- end).
-
-current_virtual_offset(Ref) ->
- with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
- offset = Offset,
- write_buffer_size = Size }]) ->
- {ok, Offset + Size};
- ([#handle { offset = Offset }]) ->
- {ok, Offset}
- end).
-
-current_raw_offset(Ref) ->
- with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
-
-flush(Ref) ->
- with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
-
-copy(Src, Dest, Count) ->
- with_flushed_handles(
- [Src, Dest],
- fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
- DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
- ) ->
- case prim_file:copy(SHdl, DHdl, Count) of
- {ok, Count1} = Result1 ->
- {Result1,
- [SHandle #handle { offset = SOffset + Count1 },
- DHandle #handle { offset = DOffset + Count1,
- is_dirty = true }]};
- Error ->
- {Error, [SHandle, DHandle]}
- end;
- (_Handles) ->
- {error, incorrect_handle_modes}
- end).
-
-delete(Ref) ->
- case erase({Ref, fhc_handle}) of
- undefined ->
- ok;
- Handle = #handle { path = Path } ->
- case hard_close(Handle #handle { is_dirty = false,
- write_buffer = [] }) of
- ok -> prim_file:delete(Path);
- {Error, Handle1} -> put_handle(Ref, Handle1),
- Error
- end
- end.
-
-clear(Ref) ->
- with_handles(
- [Ref],
- fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
- ok;
- ([Handle]) ->
- case maybe_seek(bof, Handle#handle{write_buffer = [],
- write_buffer_size = 0}) of
- {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case prim_file:truncate(Hdl) of
- ok -> {ok, [Handle1 #handle { at_eof = true }]};
- Error -> {Error, [Handle1]}
- end;
- {{error, _} = Error, Handle1} ->
- {Error, [Handle1]}
- end
- end).
-
-set_maximum_since_use(MaximumAge) ->
- Now = erlang:monotonic_time(),
- case lists:foldl(
- fun ({{Ref, fhc_handle},
- Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
- case Hdl =/= closed andalso
- erlang:convert_time_unit(Now - Then,
- native,
- micro_seconds)
- >= MaximumAge of
- true -> soft_close(Ref, Handle) orelse Rep;
- false -> Rep
- end;
- (_KeyValuePair, Rep) ->
- Rep
- end, false, get()) of
- false -> age_tree_change(), ok;
- true -> ok
- end.
-
-obtain() -> obtain(1).
-release() -> release(1).
-transfer(Pid) -> transfer(Pid, 1).
-
-obtain(Count) -> obtain(Count, socket).
-release(Count) -> release(Count, socket).
-
-with_handle(Fun) ->
- with_handle(1, Fun).
-
-with_handle(N, Fun) ->
- ok = obtain(N, file),
- try Fun()
- after ok = release(N, file)
- end.
-
-obtain(Count, Type) when Count > 0 ->
- %% If the FHC isn't running, obtains succeed immediately.
- case whereis(?SERVER) of
- undefined -> ok;
- _ -> gen_server2:call(
- ?SERVER, {obtain, Count, Type, self()}, infinity)
- end.
-
-release(Count, Type) when Count > 0 ->
- gen_server2:cast(?SERVER, {release, Count, Type, self()}).
-
-transfer(Pid, Count) when Count > 0 ->
- gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
-
-set_limit(Limit) ->
- gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
-
-get_limit() ->
- gen_server2:call(?SERVER, get_limit, infinity).
-
-info_keys() -> ?INFO_KEYS.
-
-info() -> info(?INFO_KEYS).
-info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
-
-clear_read_cache() ->
- case application:get_env(rabbit, fhc_read_buffering) of
- {ok, true} ->
- gen_server2:cast(?SERVER, clear_read_cache),
- clear_vhost_read_cache(rabbit_vhost:list());
- _ -> %% undefined or {ok, false}
- ok
- end.
-
-clear_vhost_read_cache([]) ->
- ok;
-clear_vhost_read_cache([VHost | Rest]) ->
- clear_queue_read_cache(rabbit_amqqueue:list(VHost)),
- clear_vhost_read_cache(Rest).
-
-clear_queue_read_cache([]) ->
- ok;
-clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) ->
- %% Limit the action to the current node.
- Pids = [P || P <- [MPid | SPids], node(P) =:= node()],
- %% This function is executed in the context of the backing queue
- %% process because the read buffer is stored in the process
- %% dictionary.
- Fun = fun(_, State) ->
- _ = clear_process_read_cache(),
- State
- end,
- [rabbit_amqqueue:run_backing_queue(Pid, rabbit_variable_queue, Fun)
- || Pid <- Pids],
- clear_queue_read_cache(Rest).
-
-clear_process_read_cache() ->
- [
- begin
- Handle1 = reset_read_buffer(Handle),
- put({Ref, fhc_handle}, Handle1)
- end ||
- {{Ref, fhc_handle}, Handle} <- get(),
- size(Handle#handle.read_buffer) > 0
- ].
-
-%%----------------------------------------------------------------------------
-%% Internal functions
-%%----------------------------------------------------------------------------
-
-prim_file_read(Hdl, Size) ->
- file_handle_cache_stats:update(
- io_read, Size, fun() -> prim_file:read(Hdl, Size) end).
-
-prim_file_write(Hdl, Bytes) ->
- file_handle_cache_stats:update(
- io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end).
-
-prim_file_sync(Hdl) ->
- file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end).
-
-prim_file_position(Hdl, NewOffset) ->
- file_handle_cache_stats:update(
- io_seek, fun() -> prim_file:position(Hdl, NewOffset) end).
-
-is_reader(Mode) -> lists:member(read, Mode).
-
-is_writer(Mode) -> lists:member(write, Mode).
-
-append_to_write(Mode) ->
- case lists:member(append, Mode) of
- true -> [write | Mode -- [append, write]];
- false -> Mode
- end.
-
-with_handles(Refs, Fun) ->
- with_handles(Refs, reset, Fun).
-
-with_handles(Refs, ReadBuffer, Fun) ->
- case get_or_reopen_timed([{Ref, reopen} || Ref <- Refs]) of
- {ok, Handles0} ->
- Handles = case ReadBuffer of
- reset -> [reset_read_buffer(H) || H <- Handles0];
- keep -> Handles0
- end,
- case Fun(Handles) of
- {Result, Handles1} when is_list(Handles1) ->
- _ = lists:zipwith(fun put_handle/2, Refs, Handles1),
- Result;
- Result ->
- Result
- end;
- Error ->
- Error
- end.
-
-with_flushed_handles(Refs, Fun) ->
- with_flushed_handles(Refs, reset, Fun).
-
-with_flushed_handles(Refs, ReadBuffer, Fun) ->
- with_handles(
- Refs, ReadBuffer,
- fun (Handles) ->
- case lists:foldl(
- fun (Handle, {ok, HandlesAcc}) ->
- {Res, Handle1} = write_buffer(Handle),
- {Res, [Handle1 | HandlesAcc]};
- (Handle, {Error, HandlesAcc}) ->
- {Error, [Handle | HandlesAcc]}
- end, {ok, []}, Handles) of
- {ok, Handles1} ->
- Fun(lists:reverse(Handles1));
- {Error, Handles1} ->
- {Error, lists:reverse(Handles1)}
- end
- end).
-
-get_or_reopen_timed(RefNewOrReopens) ->
- file_handle_cache_stats:update(
- io_file_handle_open_attempt, fun() -> get_or_reopen(RefNewOrReopens) end).
-
-get_or_reopen(RefNewOrReopens) ->
- case partition_handles(RefNewOrReopens) of
- {OpenHdls, []} ->
- {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
- {OpenHdls, ClosedHdls} ->
- Oldest = oldest(get_age_tree(),
- fun () -> erlang:monotonic_time() end),
- case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
- Oldest}, infinity) of
- ok ->
- case reopen(ClosedHdls) of
- {ok, RefHdls} -> sort_handles(RefNewOrReopens,
- OpenHdls, RefHdls, []);
- Error -> Error
- end;
- close ->
- [soft_close(Ref, Handle) ||
- {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
- get(),
- Hdl =/= closed],
- get_or_reopen(RefNewOrReopens)
- end
- end.
-
-reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
-
-reopen([], Tree, RefHdls) ->
- put_age_tree(Tree),
- {ok, lists:reverse(RefHdls)};
-reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
- path = Path,
- mode = Mode0,
- offset = Offset,
- last_used_at = undefined }} |
- RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- Mode = case NewOrReopen of
- new -> Mode0;
- reopen -> file_handle_cache_stats:update(io_reopen),
- [read | Mode0]
- end,
- case prim_file:open(Path, Mode) of
- {ok, Hdl} ->
- Now = erlang:monotonic_time(),
- {{ok, _Offset}, Handle1} =
- maybe_seek(Offset, reset_read_buffer(
- Handle#handle{hdl = Hdl,
- offset = 0,
- last_used_at = Now})),
- put({Ref, fhc_handle}, Handle1),
- reopen(RefNewOrReopenHdls, gb_trees:insert({Now, Ref}, true, Tree),
- [{Ref, Handle1} | RefHdls]);
- Error ->
- %% NB: none of the handles in ToOpen are in the age tree
- Oldest = oldest(Tree, fun () -> undefined end),
- [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
- put_age_tree(Tree),
- Error
- end.
-
-partition_handles(RefNewOrReopens) ->
- lists:foldr(
- fun ({Ref, NewOrReopen}, {Open, Closed}) ->
- case get({Ref, fhc_handle}) of
- #handle { hdl = closed } = Handle ->
- {Open, [{Ref, NewOrReopen, Handle} | Closed]};
- #handle {} = Handle ->
- {[{Ref, Handle} | Open], Closed}
- end
- end, {[], []}, RefNewOrReopens).
-
-sort_handles([], [], [], Acc) ->
- {ok, lists:reverse(Acc)};
-sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
- sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
-sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
- sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
-
-put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
- Now = erlang:monotonic_time(),
- age_tree_update(Then, Now, Ref),
- put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-
-with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
-
-get_age_tree() ->
- case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end.
-
-put_age_tree(Tree) -> put(fhc_age_tree, Tree).
-
-age_tree_update(Then, Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- gb_trees:insert({Now, Ref}, true,
- gb_trees:delete_any({Then, Ref}, Tree))
- end).
-
-age_tree_delete(Then, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:delete_any({Then, Ref}, Tree),
- Oldest = oldest(Tree1, fun () -> undefined end),
- gen_server2:cast(?SERVER, {close, self(), Oldest}),
- Tree1
- end).
-
-age_tree_change() ->
- with_age_tree(
- fun (Tree) ->
- case gb_trees:is_empty(Tree) of
- true -> Tree;
- false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
- gen_server2:cast(?SERVER, {update, self(), Oldest}),
- Tree
- end
- end).
-
-oldest(Tree, DefaultFun) ->
- case gb_trees:is_empty(Tree) of
- true -> DefaultFun();
- false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
- Oldest
- end.
-
-new_closed_handle(Path, Mode, Options) ->
- WriteBufferSize =
- case application:get_env(rabbit, fhc_write_buffering) of
- {ok, false} -> 0;
- {ok, true} ->
- case proplists:get_value(write_buffer, Options, unbuffered) of
- unbuffered -> 0;
- infinity -> infinity;
- N when is_integer(N) -> N
- end
- end,
- ReadBufferSize =
- case application:get_env(rabbit, fhc_read_buffering) of
- {ok, false} -> 0;
- {ok, true} ->
- case proplists:get_value(read_buffer, Options, unbuffered) of
- unbuffered -> 0;
- N2 when is_integer(N2) -> N2
- end
- end,
- Ref = make_ref(),
- put({Ref, fhc_handle}, #handle { hdl = closed,
- ref = Ref,
- offset = 0,
- is_dirty = false,
- write_buffer_size = 0,
- write_buffer_size_limit = WriteBufferSize,
- write_buffer = [],
- read_buffer = <<>>,
- read_buffer_pos = 0,
- read_buffer_rem = 0,
- read_buffer_size = ReadBufferSize,
- read_buffer_size_limit = ReadBufferSize,
- read_buffer_usage = 0,
- at_eof = false,
- path = Path,
- mode = Mode,
- options = Options,
- is_write = is_writer(Mode),
- is_read = is_reader(Mode),
- last_used_at = undefined }),
- {ok, Ref}.
-
-soft_close(Ref, Handle) ->
- {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- true;
- _ -> put_handle(Ref, Handle1),
- false
- end.
-
-soft_close(Handle = #handle { hdl = closed }) ->
- {ok, Handle};
-soft_close(Handle) ->
- case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl,
- ref = Ref,
- is_dirty = IsDirty,
- last_used_at = Then } = Handle1 } ->
- ok = case IsDirty of
- true -> prim_file_sync(Hdl);
- false -> ok
- end,
- ok = prim_file:close(Hdl),
- age_tree_delete(Then, Ref),
- {ok, Handle1 #handle { hdl = closed,
- is_dirty = false,
- last_used_at = undefined }};
- {_Error, _Handle} = Result ->
- Result
- end.
-
-hard_close(Handle) ->
- case soft_close(Handle) of
- {ok, #handle { path = Path,
- is_read = IsReader, is_write = IsWriter }} ->
- #file { reader_count = RCount, has_writer = HasWriter } = File =
- get({Path, fhc_file}),
- RCount1 = case IsReader of
- true -> RCount - 1;
- false -> RCount
- end,
- HasWriter1 = HasWriter andalso not IsWriter,
- case RCount1 =:= 0 andalso not HasWriter1 of
- true -> erase({Path, fhc_file});
- false -> put({Path, fhc_file},
- File #file { reader_count = RCount1,
- has_writer = HasWriter1 })
- end,
- ok;
- {_Error, _Handle} = Result ->
- Result
- end.
-
-maybe_seek(New, Handle = #handle{hdl = Hdl,
- offset = Old,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- at_eof = AtEoF}) ->
- {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New),
- case NeedsSeek of
- true when is_number(New) andalso
- ((New >= Old andalso New =< BufRem + Old)
- orelse (New < Old andalso Old - New =< BufPos)) ->
- Diff = New - Old,
- {{ok, New}, Handle#handle{offset = New,
- at_eof = AtEoF1,
- read_buffer_pos = BufPos + Diff,
- read_buffer_rem = BufRem - Diff}};
- true ->
- case prim_file_position(Hdl, New) of
- {ok, Offset1} = Result ->
- {Result, reset_read_buffer(Handle#handle{offset = Offset1,
- at_eof = AtEoF1})};
- {error, _} = Error ->
- {Error, Handle}
- end;
- false ->
- {{ok, Old}, Handle}
- end.
-
-needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
-needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false};
-needs_seek( true, _CurOffset, eof ) -> {true , false};
-needs_seek( true, _CurOffset, {eof, 0}) -> {true , false};
-needs_seek( false, _CurOffset, eof ) -> {true , true };
-needs_seek( false, _CurOffset, {eof, 0}) -> {true , true };
-needs_seek( AtEoF, 0, bof ) -> {AtEoF, false};
-needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false};
-needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false};
-needs_seek( true, CurOffset, {bof, DesiredOffset})
- when DesiredOffset >= CurOffset ->
- {true, true};
-needs_seek( true, _CurOffset, {cur, DesiredOffset})
- when DesiredOffset > 0 ->
- {true, true};
-needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO}
- when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
- {true, true};
-%% because we can't really track size, we could well end up at EoF and not know
-needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
- {false, true}.
-
-write_buffer(Handle = #handle { write_buffer = [] }) ->
- {ok, Handle};
-write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
- write_buffer = WriteBuffer,
- write_buffer_size = DataSize,
- at_eof = true }) ->
- case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of
- ok ->
- Offset1 = Offset + DataSize,
- {ok, Handle #handle { offset = Offset1, is_dirty = true,
- write_buffer = [], write_buffer_size = 0 }};
- {error, _} = Error ->
- {Error, Handle}
- end.
-
-reset_read_buffer(Handle) ->
- Handle#handle{read_buffer = <<>>,
- read_buffer_pos = 0,
- read_buffer_rem = 0}.
-
-%% We come into this function whenever there's been a miss while
-%% reading from the buffer - but note that when we first start with a
-%% new handle the usage will be 0. Therefore in that case don't take
-%% it as meaning the buffer was useless, we just haven't done anything
-%% yet!
-tune_read_buffer_limit(Handle = #handle{read_buffer_usage = 0}, _Count) ->
- Handle;
-%% In this head we have been using the buffer but now tried to read
-%% outside it. So how did we do? If we used less than the size of the
-%% buffer, make the new buffer the size of what we used before, but
-%% add one byte (so that next time we can distinguish between getting
-%% the buffer size exactly right and actually wanting more). If we
-%% read 100% of what we had, then double it for next time, up to the
-%% limit that was set when we were created.
-tune_read_buffer_limit(Handle = #handle{read_buffer = Buf,
- read_buffer_usage = Usg,
- read_buffer_size = Sz,
- read_buffer_size_limit = Lim}, Count) ->
- %% If the buffer is <<>> then we are in the first read after a
- %% reset, the read_buffer_usage is the total usage from before the
- %% reset. But otherwise we are in a read which read off the end of
- %% the buffer, so really the size of this read should be included
- %% in the usage.
- TotalUsg = case Buf of
- <<>> -> Usg;
- _ -> Usg + Count
- end,
- Handle#handle{read_buffer_usage = 0,
- read_buffer_size = erlang:min(case TotalUsg < Sz of
- true -> Usg + 1;
- false -> Usg * 2
- end, Lim)}.
-
-maybe_reduce_read_cache(SparedRefs) ->
- case rabbit_memory_monitor:memory_use(bytes) of
- {_, infinity} -> ok;
- {MemUse, MemLimit} when MemUse < MemLimit -> ok;
- {MemUse, MemLimit} -> reduce_read_cache(
- (MemUse - MemLimit) * 2,
- SparedRefs)
- end.
-
-reduce_read_cache(MemToFree, SparedRefs) ->
- Handles = lists:sort(
- fun({_, H1}, {_, H2}) -> H1 < H2 end,
- [{R, H} || {{R, fhc_handle}, H} <- get(),
- not lists:member(R, SparedRefs)
- andalso size(H#handle.read_buffer) > 0]),
- FreedMem = lists:foldl(
- fun
- (_, Freed) when Freed >= MemToFree ->
- Freed;
- ({Ref, #handle{read_buffer = Buf} = Handle}, Freed) ->
- Handle1 = reset_read_buffer(Handle),
- put({Ref, fhc_handle}, Handle1),
- Freed + size(Buf)
- end, 0, Handles),
- if
- FreedMem < MemToFree andalso SparedRefs =/= [] ->
- reduce_read_cache(MemToFree - FreedMem, []);
- true ->
- ok
- end.
-
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-
-i(total_limit, #fhc_state{limit = Limit}) -> Limit;
-i(total_used, State) -> used(State);
-i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
-i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count;
-i(Item, _) -> throw({bad_argument, Item}).
-
-used(#fhc_state{open_count = C1,
- obtain_count_socket = C2,
- obtain_count_file = C3}) -> C1 + C2 + C3.
-
-%%----------------------------------------------------------------------------
-%% gen_server2 callbacks
-%%----------------------------------------------------------------------------
-
-init([AlarmSet, AlarmClear]) ->
- _ = file_handle_cache_stats:init(),
- Limit = case application:get_env(file_handles_high_watermark) of
- {ok, Watermark} when (is_integer(Watermark) andalso
- Watermark > 0) ->
- Watermark;
- _ ->
- case ulimit() of
- unknown -> ?FILE_HANDLES_LIMIT_OTHER;
- Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
- end
- end,
- ObtainLimit = obtain_limit(Limit),
- error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
- [Limit, ObtainLimit]),
- Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
- Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
- {ok, #fhc_state { elders = Elders,
- limit = Limit,
- open_count = 0,
- open_pending = pending_new(),
- obtain_limit = ObtainLimit,
- obtain_count_file = 0,
- obtain_pending_file = pending_new(),
- obtain_count_socket = 0,
- obtain_pending_socket = pending_new(),
- clients = Clients,
- timer_ref = undefined,
- alarm_set = AlarmSet,
- alarm_clear = AlarmClear }}.
-
-prioritise_cast(Msg, _Len, _State) ->
- case Msg of
- {release, _, _, _} -> 5;
- _ -> 0
- end.
-
-handle_call({open, Pid, Requested, EldestUnusedSince}, From,
- State = #fhc_state { open_count = Count,
- open_pending = Pending,
- elders = Elders,
- clients = Clients })
- when EldestUnusedSince =/= undefined ->
- true = ets:insert(Elders, {Pid, EldestUnusedSince}),
- Item = #pending { kind = open,
- pid = Pid,
- requested = Requested,
- from = From },
- ok = track_client(Pid, Clients),
- case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
- true -> case ets:lookup(Clients, Pid) of
- [#cstate { opened = 0 }] ->
- true = ets:update_element(
- Clients, Pid, {#cstate.blocked, true}),
- {noreply,
- reduce(State #fhc_state {
- open_pending = pending_in(Item, Pending) })};
- [#cstate { opened = Opened }] ->
- true = ets:update_element(
- Clients, Pid,
- {#cstate.pending_closes, Opened}),
- {reply, close, State}
- end;
- false -> {noreply, run_pending_item(Item, State)}
- end;
-
-handle_call({obtain, N, Type, Pid}, From,
- State = #fhc_state { clients = Clients }) ->
- Count = obtain_state(Type, count, State),
- Pending = obtain_state(Type, pending, State),
- ok = track_client(Pid, Clients),
- Item = #pending { kind = {obtain, Type}, pid = Pid,
- requested = N, from = From },
- Enqueue = fun () ->
- true = ets:update_element(Clients, Pid,
- {#cstate.blocked, true}),
- set_obtain_state(Type, pending,
- pending_in(Item, Pending), State)
- end,
- {noreply,
- case obtain_limit_reached(Type, State) of
- true -> Enqueue();
- false -> case needs_reduce(
- set_obtain_state(Type, count, Count + 1, State)) of
- true -> reduce(Enqueue());
- false -> adjust_alarm(
- State, run_pending_item(Item, State))
- end
- end};
-
-handle_call({set_limit, Limit}, _From, State) ->
- {reply, ok, adjust_alarm(
- State, maybe_reduce(
- process_pending(
- State #fhc_state {
- limit = Limit,
- obtain_limit = obtain_limit(Limit) })))};
-
-handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
- {reply, Limit, State};
-
-handle_call({info, Items}, _From, State) ->
- {reply, infos(Items, State), State}.
-
-handle_cast({register_callback, Pid, MFA},
- State = #fhc_state { clients = Clients }) ->
- ok = track_client(Pid, Clients),
- true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
- {noreply, State};
-
-handle_cast({update, Pid, EldestUnusedSince},
- State = #fhc_state { elders = Elders })
- when EldestUnusedSince =/= undefined ->
- true = ets:insert(Elders, {Pid, EldestUnusedSince}),
- %% don't call maybe_reduce from here otherwise we can create a
- %% storm of messages
- {noreply, State};
-
-handle_cast({release, N, Type, Pid}, State) ->
- State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)),
- {noreply, adjust_alarm(State, State1)};
-
-handle_cast({close, Pid, EldestUnusedSince},
- State = #fhc_state { elders = Elders, clients = Clients }) ->
- true = case EldestUnusedSince of
- undefined -> ets:delete(Elders, Pid);
- _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
- end,
- ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
- {noreply, adjust_alarm(State, process_pending(
- update_counts(open, Pid, -1, State)))};
-
-handle_cast({transfer, N, FromPid, ToPid}, State) ->
- ok = track_client(ToPid, State#fhc_state.clients),
- {noreply, process_pending(
- update_counts({obtain, socket}, ToPid, +N,
- update_counts({obtain, socket}, FromPid, -N,
- State)))};
-
-handle_cast(clear_read_cache, State) ->
- _ = clear_process_read_cache(),
- {noreply, State}.
-
-handle_info(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
-
-handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #fhc_state { elders = Elders,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count_file = ObtainCountF,
- obtain_count_socket = ObtainCountS,
- obtain_pending_file = ObtainPendingF,
- obtain_pending_socket = ObtainPendingS,
- clients = Clients }) ->
- [#cstate { opened = Opened,
- obtained_file = ObtainedFile,
- obtained_socket = ObtainedSocket}] =
- ets:lookup(Clients, Pid),
- true = ets:delete(Clients, Pid),
- true = ets:delete(Elders, Pid),
- Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- State1 = process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(Fun, OpenPending),
- obtain_count_file = ObtainCountF - ObtainedFile,
- obtain_count_socket = ObtainCountS - ObtainedSocket,
- obtain_pending_file = filter_pending(Fun, ObtainPendingF),
- obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }),
- {noreply, adjust_alarm(State, State1)}.
-
-terminate(_Reason, State = #fhc_state { clients = Clients,
- elders = Elders }) ->
- ets:delete(Clients),
- ets:delete(Elders),
- State.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-%% pending queue abstraction helpers
-%%----------------------------------------------------------------------------
-
-queue_fold(Fun, Init, Q) ->
- case queue:out(Q) of
- {empty, _Q} -> Init;
- {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
- end.
-
-filter_pending(Fun, {Count, Queue}) ->
- {Delta, Queue1} =
- queue_fold(
- fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
- case Fun(Item) of
- true -> {DeltaN, queue:in(Item, QueueN)};
- false -> {DeltaN - Requested, QueueN}
- end
- end, {0, queue:new()}, Queue),
- {Count + Delta, Queue1}.
-
-pending_new() ->
- {0, queue:new()}.
-
-pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
- {Count + Requested, queue:in(Item, Queue)}.
-
-pending_out({0, _Queue} = Pending) ->
- {empty, Pending};
-pending_out({N, Queue}) ->
- {{value, #pending { requested = Requested }} = Result, Queue1} =
- queue:out(Queue),
- {Result, {N - Requested, Queue1}}.
-
-pending_count({Count, _Queue}) ->
- Count.
-
-%%----------------------------------------------------------------------------
-%% server helpers
-%%----------------------------------------------------------------------------
-
-obtain_limit(infinity) -> infinity;
-obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
- OLimit when OLimit < 0 -> 0;
- OLimit -> OLimit
- end.
-
-obtain_limit_reached(socket, State) -> obtain_limit_reached(State);
-obtain_limit_reached(file, State) -> needs_reduce(State).
-
-obtain_limit_reached(#fhc_state{obtain_limit = Limit,
- obtain_count_socket = Count}) ->
- Limit =/= infinity andalso Count >= Limit.
-
-obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N;
-obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N;
-obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N;
-obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N.
-
-set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N};
-set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N};
-set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N};
-set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}.
-
-adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
- alarm_clear = AlarmClear }, NewState) ->
- case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
- {false, true} -> AlarmSet({file_descriptor_limit, []});
- {true, false} -> AlarmClear(file_descriptor_limit);
- _ -> ok
- end,
- NewState.
-
-process_pending(State = #fhc_state { limit = infinity }) ->
- State;
-process_pending(State) ->
- process_open(process_obtain(socket, process_obtain(file, State))).
-
-process_open(State = #fhc_state { limit = Limit,
- open_pending = Pending}) ->
- {Pending1, State1} = process_pending(Pending, Limit - used(State), State),
- State1 #fhc_state { open_pending = Pending1 }.
-
-process_obtain(socket, State = #fhc_state { limit = Limit,
- obtain_limit = ObtainLimit,
- open_count = OpenCount,
- obtain_count_socket = ObtainCount,
- obtain_pending_socket = Pending,
- obtain_count_file = ObtainCountF}) ->
- Quota = min(ObtainLimit - ObtainCount,
- Limit - (OpenCount + ObtainCount + ObtainCountF)),
- {Pending1, State1} = process_pending(Pending, Quota, State),
- State1#fhc_state{obtain_pending_socket = Pending1};
-process_obtain(file, State = #fhc_state { limit = Limit,
- open_count = OpenCount,
- obtain_count_socket = ObtainCountS,
- obtain_count_file = ObtainCountF,
- obtain_pending_file = Pending}) ->
- Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF),
- {Pending1, State1} = process_pending(Pending, Quota, State),
- State1#fhc_state{obtain_pending_file = Pending1}.
-
-process_pending(Pending, Quota, State) when Quota =< 0 ->
- {Pending, State};
-process_pending(Pending, Quota, State) ->
- case pending_out(Pending) of
- {empty, _Pending} ->
- {Pending, State};
- {{value, #pending { requested = Requested }}, _Pending1}
- when Requested > Quota ->
- {Pending, State};
- {{value, #pending { requested = Requested } = Item}, Pending1} ->
- process_pending(Pending1, Quota - Requested,
- run_pending_item(Item, State))
- end.
-
-run_pending_item(#pending { kind = Kind,
- pid = Pid,
- requested = Requested,
- from = From },
- State = #fhc_state { clients = Clients }) ->
- gen_server2:reply(From, ok),
- true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
- update_counts(Kind, Pid, Requested, State).
-
-update_counts(open, Pid, Delta,
- State = #fhc_state { open_count = OpenCount,
- clients = Clients }) ->
- ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
- State #fhc_state { open_count = OpenCount + Delta};
-update_counts({obtain, file}, Pid, Delta,
- State = #fhc_state {obtain_count_file = ObtainCountF,
- clients = Clients }) ->
- ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
- State #fhc_state { obtain_count_file = ObtainCountF + Delta};
-update_counts({obtain, socket}, Pid, Delta,
- State = #fhc_state {obtain_count_socket = ObtainCountS,
- clients = Clients }) ->
- ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
- State #fhc_state { obtain_count_socket = ObtainCountS + Delta}.
-
-maybe_reduce(State) ->
- case needs_reduce(State) of
- true -> reduce(State);
- false -> State
- end.
-
-needs_reduce(#fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = {OpenPending, _},
- obtain_limit = ObtainLimit,
- obtain_count_socket = ObtainCountS,
- obtain_count_file = ObtainCountF,
- obtain_pending_file = {ObtainPendingF, _},
- obtain_pending_socket = {ObtainPendingS, _} }) ->
- Limit =/= infinity
- andalso (((OpenCount + ObtainCountS + ObtainCountF) > Limit)
- orelse (OpenPending =/= 0)
- orelse (ObtainPendingF =/= 0)
- orelse (ObtainCountS < ObtainLimit
- andalso (ObtainPendingS =/= 0))).
-
-reduce(State = #fhc_state { open_pending = OpenPending,
- obtain_pending_file = ObtainPendingFile,
- obtain_pending_socket = ObtainPendingSocket,
- elders = Elders,
- clients = Clients,
- timer_ref = TRef }) ->
- Now = erlang:monotonic_time(),
- {CStates, Sum, ClientCount} =
- ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
- [#cstate { pending_closes = PendingCloses,
- opened = Opened,
- blocked = Blocked } = CState] =
- ets:lookup(Clients, Pid),
- TimeDiff = erlang:convert_time_unit(
- Now - Eldest, native, micro_seconds),
- case Blocked orelse PendingCloses =:= Opened of
- true -> Accs;
- false -> {[CState | CStatesAcc],
- SumAcc + TimeDiff,
- CountAcc + 1}
- end
- end, {[], 0, 0}, Elders),
- case CStates of
- [] -> ok;
- _ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
- AverageAge when AverageAge > 0 ->
- notify_age(CStates, AverageAge);
- _ ->
- notify_age0(Clients, CStates,
- pending_count(OpenPending) +
- pending_count(ObtainPendingFile) +
- pending_count(ObtainPendingSocket))
- end
- end,
- case TRef of
- undefined -> TRef1 = erlang:send_after(
- ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER,
- check_counts),
- State #fhc_state { timer_ref = TRef1 };
- _ -> State
- end.
-
-notify_age(CStates, AverageAge) ->
- lists:foreach(
- fun (#cstate { callback = undefined }) -> ok;
- (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
- end, CStates).
-
-notify_age0(Clients, CStates, Required) ->
- case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
- [] -> ok;
- Notifications -> S = rand:uniform(length(Notifications)),
- {L1, L2} = lists:split(S, Notifications),
- notify(Clients, Required, L2 ++ L1)
- end.
-
-notify(_Clients, _Required, []) ->
- ok;
-notify(_Clients, Required, _Notifications) when Required =< 0 ->
- ok;
-notify(Clients, Required, [#cstate{ pid = Pid,
- callback = {M, F, A},
- opened = Opened } | Notifications]) ->
- apply(M, F, A ++ [0]),
- ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
- notify(Clients, Required - Opened, Notifications).
-
-track_client(Pid, Clients) ->
- case ets:insert_new(Clients, #cstate { pid = Pid,
- callback = undefined,
- opened = 0,
- obtained_file = 0,
- obtained_socket = 0,
- blocked = false,
- pending_closes = 0 }) of
- true -> _MRef = erlang:monitor(process, Pid),
- ok;
- false -> ok
- end.
-
-
-%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
-%% environment variable, on Linux set `ulimit -n`.
-ulimit() ->
- case proplists:get_value(max_fds, erlang:system_info(check_io)) of
- MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
- case os:type() of
- {win32, _OsName} ->
- %% On Windows max_fds is twice the number of open files:
- %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
- MaxFds div 2;
- _Any ->
- %% For other operating systems trust Erlang.
- MaxFds
- end;
- _ ->
- unknown
- end.
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
deleted file mode 100644
index 000d1c820c..0000000000
--- a/src/file_handle_cache_stats.erl
+++ /dev/null
@@ -1,66 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(file_handle_cache_stats).
-
-%% stats about read / write operations that go through the fhc.
-
--export([init/0, update/3, update/2, update/1, get/0]).
-
--define(TABLE, ?MODULE).
-
--define(COUNT,
- [io_reopen, mnesia_ram_tx, mnesia_disk_tx,
- msg_store_read, msg_store_write,
- queue_index_journal_write, queue_index_write, queue_index_read]).
--define(COUNT_TIME, [io_sync, io_seek, io_file_handle_open_attempt]).
--define(COUNT_TIME_BYTES, [io_read, io_write]).
-
-init() ->
- ets:new(?TABLE, [public, named_table]),
- [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES,
- Counter <- [count, bytes, time]],
- [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME,
- Counter <- [count, time]],
- [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT,
- Counter <- [count]].
-
-update(Op, Bytes, Thunk) ->
- {Time, Res} = timer_tc(Thunk),
- ets:update_counter(?TABLE, {Op, count}, 1),
- ets:update_counter(?TABLE, {Op, bytes}, Bytes),
- ets:update_counter(?TABLE, {Op, time}, Time),
- Res.
-
-update(Op, Thunk) ->
- {Time, Res} = timer_tc(Thunk),
- ets:update_counter(?TABLE, {Op, count}, 1),
- ets:update_counter(?TABLE, {Op, time}, Time),
- Res.
-
-update(Op) ->
- ets:update_counter(?TABLE, {Op, count}, 1),
- ok.
-
-get() ->
- lists:sort(ets:tab2list(?TABLE)).
-
-timer_tc(Thunk) ->
- T1 = erlang:monotonic_time(),
- Res = Thunk(),
- T2 = erlang:monotonic_time(),
- Diff = erlang:convert_time_unit(T2 - T1, native, micro_seconds),
- {Diff, Res}.
diff --git a/src/rabbit_fhc_helpers.erl b/src/rabbit_fhc_helpers.erl
new file mode 100644
index 0000000000..5f19eff087
--- /dev/null
+++ b/src/rabbit_fhc_helpers.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_fhc_helpers).
+
+-export([clear_read_cache/0]).
+
+-include("rabbit.hrl"). % For #amqqueue record definition.
+
+clear_read_cache() ->
+ case application:get_env(rabbit, fhc_read_buffering) of
+ {ok, true} ->
+ file_handle_cache:clear_read_cache(),
+ clear_vhost_read_cache(rabbit_vhost:list());
+ _ -> %% undefined or {ok, false}
+ ok
+ end.
+
+clear_vhost_read_cache([]) ->
+ ok;
+clear_vhost_read_cache([VHost | Rest]) ->
+ clear_queue_read_cache(rabbit_amqqueue:list(VHost)),
+ clear_vhost_read_cache(Rest).
+
+clear_queue_read_cache([]) ->
+ ok;
+clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) ->
+ %% Limit the action to the current node.
+ Pids = [P || P <- [MPid | SPids], node(P) =:= node()],
+ %% This function is executed in the context of the backing queue
+ %% process because the read buffer is stored in the process
+ %% dictionary.
+ Fun = fun(_, State) ->
+ _ = file_handle_cache:clear_process_read_cache(),
+ State
+ end,
+ [rabbit_amqqueue:run_backing_queue(Pid, rabbit_variable_queue, Fun)
+ || Pid <- Pids],
+ clear_queue_read_cache(Rest).
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index de0b50ccdc..06ffb77b67 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -89,18 +89,8 @@ conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node ->
conserve_resources(_Pid, _Source, _Conserve) ->
ok.
-memory_use(bytes) ->
- MemoryLimit = vm_memory_monitor:get_memory_limit(),
- {erlang:memory(total), case MemoryLimit > 0.0 of
- true -> MemoryLimit;
- false -> infinity
- end};
-memory_use(ratio) ->
- MemoryLimit = vm_memory_monitor:get_memory_limit(),
- case MemoryLimit > 0.0 of
- true -> erlang:memory(total) / MemoryLimit;
- false -> infinity
- end.
+memory_use(Type) ->
+ vm_memory_monitor:get_memory_use(Type).
%%----------------------------------------------------------------------------
%% Gen_server callbacks
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index e48eb2b91f..e49ea6dfb6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -141,7 +141,7 @@ init_from_config() ->
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
- rabbit_log:info("All discovered existing cluster peers: ~p~n",
+ rabbit_log:info("All discovered existing cluster peers: ~s~n",
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
Peers = nodes_excl_me(DiscoveredNodes),
case Peers of
@@ -160,7 +160,7 @@ init_from_config() ->
join_discovered_peers(TryNodes, NodeType) ->
case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of
{ok, Node} ->
- rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
+ rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
rabbit_connection_tracking:boot(),
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 292469da61..73488663d5 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -157,7 +157,7 @@ extract_schema(#plugin{type = dir, location = Location}, SchemaDir) ->
%% @doc Lists the plugins which are currently running.
active() ->
- InstalledPlugins = plugin_names(list(plugins_expand_dir())),
+ InstalledPlugins = plugin_names(list(plugins_dist_dir())),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
@@ -213,8 +213,7 @@ strictly_plugins(Plugins, AllPlugins) ->
end, Plugins).
strictly_plugins(Plugins) ->
- ExpandDir = plugins_expand_dir(),
- AllPlugins = list(ExpandDir),
+ AllPlugins = list(plugins_dist_dir()),
lists:filter(
fun(Name) ->
is_strictly_plugin(lists:keyfind(Name, #plugin.name, AllPlugins))
@@ -280,10 +279,8 @@ prepare_plugins(Enabled) ->
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
[ExpandDir, E2]}})
end,
- [prepare_plugin(Plugin, ExpandDir) || Plugin <- ValidPlugins],
- [prepare_dir_plugin(PluginAppDescPath) ||
- PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")],
+ [prepare_plugin(Plugin, ExpandDir) || Plugin <- ValidPlugins],
Wanted.
maybe_warn_about_invalid_plugins([]) ->
@@ -460,11 +457,37 @@ delete_recursively(Fn) ->
{error, {Path, E}} -> {error, {cannot_delete, Path, E}}
end.
-prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) ->
- zip:unzip(Location, [{cwd, ExpandDir}]);
-prepare_plugin(#plugin{type = dir, name = Name, location = Location},
- ExpandDir) ->
- rabbit_file:recursive_copy(Location, filename:join([ExpandDir, Name])).
+find_unzipped_app_file(ExpandDir, Files) ->
+ StripComponents = length(filename:split(ExpandDir)),
+ [ X || X <- Files,
+ [_AppName, "ebin", MaybeAppFile] <-
+ [lists:nthtail(StripComponents, filename:split(X))],
+ lists:suffix(".app", MaybeAppFile)
+ ].
+
+prepare_plugin(#plugin{type = ez, name = Name, location = Location}, ExpandDir) ->
+ case zip:unzip(Location, [{cwd, ExpandDir}]) of
+ {ok, Files} ->
+ case find_unzipped_app_file(ExpandDir, Files) of
+ [PluginAppDescPath|_] ->
+ prepare_dir_plugin(PluginAppDescPath);
+ _ ->
+ rabbit_log:error("Plugin archive '~s' doesn't contain an .app file~n", [Location]),
+ throw({app_file_missing, Name, Location})
+ end;
+ {error, Reason} ->
+ rabbit_log:error("Could not unzip plugin archive '~s': ~p~n", [Location, Reason]),
+ throw({failed_to_unzip_plugin, Name, Location, Reason})
+ end;
+prepare_plugin(#plugin{type = dir, location = Location, name = Name},
+ _ExpandDir) ->
+ case filelib:wildcard(Location ++ "/ebin/*.app") of
+ [PluginAppDescPath|_] ->
+ prepare_dir_plugin(PluginAppDescPath);
+ _ ->
+ rabbit_log:error("Plugin directory '~s' doesn't contain an .app file~n", [Location]),
+ throw({app_file_missing, Name, Location})
+ end.
plugin_info({ez, EZ}) ->
case read_app_file(EZ) of
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 82dea0bd7c..1b3cfb58c6 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -306,20 +306,32 @@ list_global() ->
end).
list_formatted(VHost) ->
- [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)].
+ [ format_parameter(info_keys(), P) || P <- list(VHost) ].
+
+format_parameter(InfoKeys, P) ->
+ lists:foldr(fun
+ (value, Acc) ->
+ [{value, rabbit_json:encode(pget(value, P))} | Acc];
+ (Key, Acc) ->
+ case lists:keyfind(Key, 1, P) of
+ false -> Acc;
+ {Key, Val} -> [{Key, Val} | Acc]
+ end
+ end,
+ [], InfoKeys).
list_formatted(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
- fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)).
+ fun(P) -> format_parameter(info_keys(), P) end, list(VHost)).
list_global_formatted() ->
- [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list_global()].
+ [ format_parameter(global_info_keys(), P) || P <- list_global() ].
list_global_formatted(Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
- fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list_global()).
+ fun(P) -> format_parameter(global_info_keys(), P) end, list_global()).
lookup(VHost, Component, Name) ->
case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b1d3e90ff8..498db6e01c 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -19,7 +19,7 @@
%% If you are tempted to add include("rabbit.hrl"). here, don't. Using record
%% defs here leads to pain later.
--compile([export_all]).
+-compile([nowarn_export_all, export_all]).
-rabbit_upgrade({remove_user_scope, mnesia, []}).
-rabbit_upgrade({hash_passwords, mnesia, []}).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 7c44cb8e12..ba324d649e 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -35,7 +35,7 @@
-export([get_total_memory/0, get_vm_limit/0,
get_check_interval/0, set_check_interval/1,
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
- get_memory_limit/0]).
+ get_memory_limit/0, get_memory_use/1]).
%% for tests
-export([parse_line_linux/1]).
@@ -73,19 +73,28 @@
-spec get_vm_memory_high_watermark() -> vm_memory_high_watermark().
-spec set_vm_memory_high_watermark(vm_memory_high_watermark()) -> 'ok'.
-spec get_memory_limit() -> non_neg_integer().
+-spec get_memory_use(bytes) -> {non_neg_integer(), float() | infinity};
+ (ratio) -> float() | infinity.
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
get_total_memory() ->
- try
- get_total_memory(os:type())
- catch _:Error ->
- rabbit_log:warning(
- "Failed to get total system memory: ~n~p~n~p~n",
- [Error, erlang:get_stacktrace()]),
- unknown
+ case application:get_env(rabbit, total_memory_available_override_value) of
+ {ok, Value} ->
+ case rabbit_resource_monitor_misc:parse_information_unit(Value) of
+ {ok, ParsedTotal} ->
+ ParsedTotal;
+ {error, parse_error} ->
+ rabbit_log:warning(
+ "The override value for the total memmory available is "
+ "not a valid value: ~p, getting total from the system.~n",
+ [Value]),
+ get_total_memory_from_os()
+ end;
+ undefined ->
+ get_total_memory_from_os()
end.
get_vm_limit() -> get_vm_limit(os:type()).
@@ -106,6 +115,19 @@ set_vm_memory_high_watermark(Fraction) ->
get_memory_limit() ->
gen_server:call(?MODULE, get_memory_limit, infinity).
+get_memory_use(bytes) ->
+ MemoryLimit = get_memory_limit(),
+ {erlang:memory(total), case MemoryLimit > 0.0 of
+ true -> MemoryLimit;
+ false -> infinity
+ end};
+get_memory_use(ratio) ->
+ MemoryLimit = get_memory_limit(),
+ case MemoryLimit > 0.0 of
+ true -> erlang:memory(total) / MemoryLimit;
+ false -> infinity
+ end.
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -164,6 +186,15 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
%% Server Internals
%%----------------------------------------------------------------------------
+get_total_memory_from_os() ->
+ try
+ get_total_memory(os:type())
+ catch _:Error ->
+ rabbit_log:warning(
+ "Failed to get total system memory: ~n~p~n~p~n",
+ [Error, erlang:get_stacktrace()]),
+ unknown
+ end.
set_mem_limits(State, MemLimit) ->
case erlang:system_info(wordsize) of