summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl264
-rw-r--r--src/rabbit_msg_store.erl12
-rw-r--r--src/tcp_acceptor.erl4
3 files changed, 183 insertions, 97 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 8db5a7944b..fe4bdc0392 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([release_on_death/1, obtain/0]).
+-export([obtain/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -148,6 +148,8 @@
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -175,8 +177,11 @@
-record(fhc_state,
{ elders,
limit,
- count,
- obtains,
+ open_count,
+ open_pending,
+ obtain_limit,
+ obtain_count,
+ obtain_pending,
callbacks,
client_mrefs,
timer_ref
@@ -217,8 +222,7 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(release_on_death/1 :: (pid()) -> 'ok').
--spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (pid()) -> 'ok').
-endif.
@@ -308,7 +312,7 @@ append(Ref, Data) ->
Size1 = Size + iolist_size(Data),
Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
write_buffer_size = Size1 },
- case Limit /= infinity andalso Size1 > Limit of
+ case Limit =/= infinity andalso Size1 > Limit of
true -> {Result, Handle3} = write_buffer(Handle2),
{Result, [Handle3]};
false -> {ok, [Handle2]}
@@ -429,28 +433,19 @@ set_maximum_since_use(MaximumAge) ->
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
Age = timer:now_diff(Now, Then),
- case Hdl /= closed andalso Age >= MaximumAge of
- true -> {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- false;
- _ -> put_handle(Ref, Handle1),
- Rep
- end;
+ case Hdl =/= closed andalso Age >= MaximumAge of
+ true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
(_KeyValuePair, Rep) ->
Rep
- end, true, get()) of
- true -> age_tree_change(), ok;
- false -> ok
+ end, false, get()) of
+ false -> age_tree_change(), ok;
+ true -> ok
end.
-release_on_death(Pid) when is_pid(Pid) ->
- gen_server:cast(?SERVER, {release_on_death, Pid}).
-
-obtain() ->
- gen_server:call(?SERVER, obtain, infinity).
+obtain(Pid) ->
+ gen_server:call(?SERVER, {obtain, Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -523,20 +518,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-with_age_tree(Fun) ->
- put(fhc_age_tree, Fun(case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end)).
+with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
+
+get_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_insert(Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- gen_server:cast(?SERVER, {open, self(), Oldest}),
- Tree1
- end).
+ Tree = get_age_tree(),
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ case gen_server:call(?SERVER, {open, self(), Oldest,
+ not gb_trees:is_empty(Tree)}, infinity) of
+ ok ->
+ put_age_tree(Tree1);
+ close ->
+ [soft_close(Ref1, Handle1) ||
+ {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
+ Hdl1 =/= closed],
+ age_tree_insert(Now, Ref)
+ end.
age_tree_update(Then, Now, Ref) ->
with_age_tree(
@@ -575,6 +580,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
new -> Mode;
reopen -> [read | Mode]
end,
+ Now = now(),
+ age_tree_insert(Now, Ref),
case file:open(Path, Mode1) of
{ok, Hdl} ->
WriteBufferSize =
@@ -583,7 +590,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
- Now = now(),
Handle = #handle { hdl = Hdl,
offset = 0,
trusted_offset = 0,
@@ -601,12 +607,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
{{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
Handle2 = Handle1 #handle { trusted_offset = Offset1 },
put({Ref, fhc_handle}, Handle2),
- age_tree_insert(Now, Ref),
{ok, Handle2};
{error, Reason} ->
+ age_tree_delete(Now),
{error, Reason}
end.
+soft_close(Ref, Handle) ->
+ {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ true;
+ _ -> put_handle(Ref, Handle1),
+ false
+ end.
+
soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
@@ -709,18 +724,63 @@ init([]) ->
_ ->
ulimit()
end,
- error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new(), timer_ref = undefined }}.
-
-handle_call(obtain, From, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
- maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit /= infinity andalso Count1 >= Limit of
- true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }};
- false -> {reply, ok, State1}
+ ObtainLimit = case Limit of
+ infinity -> infinity;
+ _ -> ?OBTAIN_LIMIT(Limit)
+ end,
+ error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
+ [Limit, ObtainLimit]),
+ {ok, #fhc_state { elders = dict:new(),
+ limit = Limit,
+ open_count = 0,
+ open_pending = [],
+ obtain_limit = ObtainLimit,
+ obtain_count = 0,
+ obtain_pending = [],
+ callbacks = dict:new(),
+ client_mrefs = dict:new(),
+ timer_ref = undefined }}.
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
+ when Limit =/= infinity andalso Count >= Limit ->
+ {noreply,
+ State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
+ {false, State1} ->
+ _MRef = erlang:monitor(process, Pid),
+ {reply, ok, State1}
+ end;
+
+handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
+ State = #fhc_state { open_count = Count,
+ open_pending = Pending,
+ elders = Elders }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ case maybe_reduce(
+ ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
+ elders = Elders1 })) of
+ {true, State1} ->
+ State2 = State1 #fhc_state { open_count = Count },
+ case CanClose of
+ true -> {reply, close, State2};
+ false -> {noreply, State2 #fhc_state {
+ open_pending = [{open, From} | Pending],
+ elders = dict:erase(Pid, Elders1) }}
+ end;
+ {false, State1} ->
+ {reply, ok, State1}
end.
handle_cast({register_callback, Pid, MFA},
@@ -729,47 +789,37 @@ handle_cast({register_callback, Pid, MFA},
Pid, State #fhc_state {
callbacks = dict:store(Pid, MFA, Callbacks) })};
-handle_cast({open, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- {noreply, maybe_reduce(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count + 1 }))};
-
handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+ #fhc_state { elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
{noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+ #fhc_state { elders = Elders, open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count - 1 }))};
+ {noreply, process_pending(
+ ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
+ elders = Elders1 }))};
handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
-
-handle_cast({release_on_death, Pid}, State) ->
- _MRef = erlang:monitor(process, Pid),
- {noreply, State}.
+ {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
+ {noreply, State1}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { count = Count, callbacks = Callbacks,
+ #fhc_state { obtain_count = Count, callbacks = Callbacks,
client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_obtains(
+ {noreply, process_pending(
case dict:find(Pid, ClientMRefs) of
{ok, MRef} -> State #fhc_state {
elders = dict:erase(Pid, Elders),
client_mrefs = dict:erase(Pid, ClientMRefs),
callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
+ _ -> State #fhc_state { obtain_count = Count - 1 }
end)}.
terminate(_Reason, State) ->
@@ -782,23 +832,62 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-process_obtains(State = #fhc_state { obtains = [] }) ->
+process_pending(State = #fhc_state { limit = infinity }) ->
State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count })
- when Limit /= infinity andalso Count >= Limit ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count,
- obtains = Obtains }) ->
- ObtainsLen = length(Obtains),
- ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
- Take = ObtainsLen - ObtainableLen,
- {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
- [gen_server:reply(From, ok) || From <- ObtainableRev],
- State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
-
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
- callbacks = Callbacks, timer_ref = TRef })
- when Limit /= infinity andalso Count >= Limit ->
+process_pending(State) ->
+ process_obtain(process_open(State)).
+
+process_open(State = #fhc_state { limit = Limit,
+ open_pending = Pending,
+ open_count = OpenCount,
+ obtain_count = ObtainCount }) ->
+ {Pending1, Inc} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount)),
+ State #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
+
+process_obtain(State = #fhc_state { limit = Limit,
+ obtain_pending = Pending,
+ obtain_limit = ObtainLimit,
+ obtain_count = ObtainCount,
+ open_count = OpenCount }) ->
+ Quota = lists:min([ObtainLimit - ObtainCount,
+ Limit - (ObtainCount + OpenCount)]),
+ {Pending1, Inc} = process_pending(Pending, Quota),
+ State #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota) ->
+ {[], 0};
+process_pending(Pending, Quota) when Quota =< 0 ->
+ {Pending, 0};
+process_pending(Pending, Quota) ->
+ PendingLen = length(Pending),
+ SatisfiableLen = lists:min([PendingLen, Quota]),
+ Take = PendingLen - SatisfiableLen,
+ {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
+ [run_pending_item(Item) || Item <- SatisfiableRev],
+ {PendingNew, SatisfiableLen}.
+
+run_pending_item({open, From}) ->
+ gen_server:reply(From, ok);
+run_pending_item({obtain, Pid, From}) ->
+ _MRef = erlang:monitor(process, Pid),
+ gen_server:reply(From, ok).
+
+maybe_reduce(State = #fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ callbacks = Callbacks,
+ timer_ref = TRef })
+ when Limit =/= infinity andalso
+ (((OpenCount + ObtainCount) > Limit) orelse
+ (OpenPending =/= []) orelse
+ (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
@@ -818,15 +907,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
end
end, Pids)
end,
+ AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- State #fhc_state { timer_ref = TRef1 };
- _ -> State
+ {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
+ _ -> {AboveLimit, State}
end;
maybe_reduce(State) ->
- State.
+ {false, State}.
%% For all unices, assume ulimit exists. Further googling suggests
%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c55380d332..4ca91aa1b5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -998,7 +998,6 @@ safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
%% file helper functions
%%----------------------------------------------------------------------------
-
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
@@ -1084,11 +1083,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
list_sorted_file_names(Dir, Ext) ->
- sort_file_names(filelib:wildcard("*" ++ Ext, Dir)).
-
-sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
- FileNames).
+ filelib:wildcard("*" ++ Ext, Dir)).
%%----------------------------------------------------------------------------
%% message cache helper functions
@@ -1184,7 +1180,7 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
{false, Error} ->
Fresh("failed to read recovery terms: ~p", [Error]);
{true, Terms} ->
- RecClientRefs = proplists:get_value(client_refs, Terms, []),
+ RecClientRefs = proplists:get_value(client_refs, Terms, []),
RecIndexModule = proplists:get_value(index_module, Terms),
case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
andalso IndexModule =:= RecIndexModule) of
@@ -1280,7 +1276,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
%% with duplicates appearing. Thus the simplest and safest thing
%% to do is to append the contents of the tmp file to its main
%% file.
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
{ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
?READ_MODE ++ ?WRITE_MODE),
{ok, _End} = file_handle_cache:position(MainHdl, eof),
@@ -1381,7 +1377,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(Guid, State) of
- StoreEntry = #msg_location { file = undefined } ->
+ #msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index cc4982c9cb..11ce6fc532 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,6 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
+ ok = file_handle_cache:obtain(self()),
accept(State);
handle_cast(_Msg, State) ->
@@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
+ file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
- ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}