diff options
| -rw-r--r-- | src/file_handle_cache.erl | 264 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 12 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 4 |
3 files changed, 183 insertions, 97 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 8db5a7944b..fe4bdc0392 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([release_on_death/1, obtain/0]). +-export([obtain/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -148,6 +148,8 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). + %%---------------------------------------------------------------------------- -record(file, @@ -175,8 +177,11 @@ -record(fhc_state, { elders, limit, - count, - obtains, + open_count, + open_pending, + obtain_limit, + obtain_count, + obtain_pending, callbacks, client_mrefs, timer_ref @@ -217,8 +222,7 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(release_on_death/1 :: (pid()) -> 'ok'). --spec(obtain/0 :: () -> 'ok'). +-spec(obtain/1 :: (pid()) -> 'ok'). -endif. @@ -308,7 +312,7 @@ append(Ref, Data) -> Size1 = Size + iolist_size(Data), Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, write_buffer_size = Size1 }, - case Limit /= infinity andalso Size1 > Limit of + case Limit =/= infinity andalso Size1 > Limit of true -> {Result, Handle3} = write_buffer(Handle2), {Result, [Handle3]}; false -> {ok, [Handle2]} @@ -429,28 +433,19 @@ set_maximum_since_use(MaximumAge) -> fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> Age = timer:now_diff(Now, Then), - case Hdl /= closed andalso Age >= MaximumAge of - true -> {Res, Handle1} = soft_close(Handle), - case Res of - ok -> put({Ref, fhc_handle}, Handle1), - false; - _ -> put_handle(Ref, Handle1), - Rep - end; + case Hdl =/= closed andalso Age >= MaximumAge of + true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; (_KeyValuePair, Rep) -> Rep - end, true, get()) of - true -> age_tree_change(), ok; - false -> ok + end, false, get()) of + false -> age_tree_change(), ok; + true -> ok end. -release_on_death(Pid) when is_pid(Pid) -> - gen_server:cast(?SERVER, {release_on_death, Pid}). - -obtain() -> - gen_server:call(?SERVER, obtain, infinity). +obtain(Pid) -> + gen_server:call(?SERVER, {obtain, Pid}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -523,20 +518,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) -> age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -with_age_tree(Fun) -> - put(fhc_age_tree, Fun(case get(fhc_age_tree) of - undefined -> gb_trees:empty(); - AgeTree -> AgeTree - end)). +with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())). + +get_age_tree() -> + case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end. + +put_age_tree(Tree) -> put(fhc_age_tree, Tree). age_tree_insert(Now, Ref) -> - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - gen_server:cast(?SERVER, {open, self(), Oldest}), - Tree1 - end). + Tree = get_age_tree(), + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + case gen_server:call(?SERVER, {open, self(), Oldest, + not gb_trees:is_empty(Tree)}, infinity) of + ok -> + put_age_tree(Tree1); + close -> + [soft_close(Ref1, Handle1) || + {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), + Hdl1 =/= closed], + age_tree_insert(Now, Ref) + end. age_tree_update(Then, Now, Ref) -> with_age_tree( @@ -575,6 +580,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> new -> Mode; reopen -> [read | Mode] end, + Now = now(), + age_tree_insert(Now, Ref), case file:open(Path, Mode1) of {ok, Hdl} -> WriteBufferSize = @@ -583,7 +590,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> infinity -> infinity; N when is_integer(N) -> N end, - Now = now(), Handle = #handle { hdl = Hdl, offset = 0, trusted_offset = 0, @@ -601,12 +607,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), Handle2 = Handle1 #handle { trusted_offset = Offset1 }, put({Ref, fhc_handle}, Handle2), - age_tree_insert(Now, Ref), {ok, Handle2}; {error, Reason} -> + age_tree_delete(Now), {error, Reason} end. +soft_close(Ref, Handle) -> + {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + true; + _ -> put_handle(Ref, Handle1), + false + end. + soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> @@ -709,18 +724,63 @@ init([]) -> _ -> ulimit() end, - error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new(), timer_ref = undefined }}. - -handle_call(obtain, From, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = - maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit /= infinity andalso Count1 >= Limit of - true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], - count = Count1 - 1 }}; - false -> {reply, ok, State1} + ObtainLimit = case Limit of + infinity -> infinity; + _ -> ?OBTAIN_LIMIT(Limit) + end, + error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", + [Limit, ObtainLimit]), + {ok, #fhc_state { elders = dict:new(), + limit = Limit, + open_count = 0, + open_pending = [], + obtain_limit = ObtainLimit, + obtain_count = 0, + obtain_pending = [], + callbacks = dict:new(), + client_mrefs = dict:new(), + timer_ref = undefined }}. + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) + when Limit =/= infinity andalso Count >= Limit -> + {noreply, + State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of + {true, State1} -> + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; + {false, State1} -> + _MRef = erlang:monitor(process, Pid), + {reply, ok, State1} + end; + +handle_call({open, Pid, EldestUnusedSince, CanClose}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + case maybe_reduce( + ensure_mref(Pid, State #fhc_state { open_count = Count + 1, + elders = Elders1 })) of + {true, State1} -> + State2 = State1 #fhc_state { open_count = Count }, + case CanClose of + true -> {reply, close, State2}; + false -> {noreply, State2 #fhc_state { + open_pending = [{open, From} | Pending], + elders = dict:erase(Pid, Elders1) }} + end; + {false, State1} -> + {reply, ok, State1} end. handle_cast({register_callback, Pid, MFA}, @@ -729,47 +789,37 @@ handle_cast({register_callback, Pid, MFA}, Pid, State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) })}; -handle_cast({open, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - {noreply, maybe_reduce( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count + 1 }))}; - handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + #fhc_state { elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> + #fhc_state { elders = Elders, open_count = Count }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count - 1 }))}; + {noreply, process_pending( + ensure_mref(Pid, State #fhc_state { open_count = Count - 1, + elders = Elders1 }))}; handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; - -handle_cast({release_on_death, Pid}, State) -> - _MRef = erlang:monitor(process, Pid), - {noreply, State}. + {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), + {noreply, State1}. handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { count = Count, callbacks = Callbacks, + #fhc_state { obtain_count = Count, callbacks = Callbacks, client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_obtains( + {noreply, process_pending( case dict:find(Pid, ClientMRefs) of {ok, MRef} -> State #fhc_state { elders = dict:erase(Pid, Elders), client_mrefs = dict:erase(Pid, ClientMRefs), callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } + _ -> State #fhc_state { obtain_count = Count - 1 } end)}. terminate(_Reason, State) -> @@ -782,23 +832,62 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- -process_obtains(State = #fhc_state { obtains = [] }) -> +process_pending(State = #fhc_state { limit = infinity }) -> State; -process_obtains(State = #fhc_state { limit = Limit, count = Count }) - when Limit /= infinity andalso Count >= Limit -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count, - obtains = Obtains }) -> - ObtainsLen = length(Obtains), - ObtainableLen = lists:min([ObtainsLen, Limit - Count]), - Take = ObtainsLen - ObtainableLen, - {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), - [gen_server:reply(From, ok) || From <- ObtainableRev], - State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. - -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, - callbacks = Callbacks, timer_ref = TRef }) - when Limit /= infinity andalso Count >= Limit -> +process_pending(State) -> + process_obtain(process_open(State)). + +process_open(State = #fhc_state { limit = Limit, + open_pending = Pending, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Pending1, Inc} = + process_pending(Pending, Limit - (ObtainCount + OpenCount)), + State #fhc_state { open_pending = Pending1, + open_count = OpenCount + Inc }. + +process_obtain(State = #fhc_state { limit = Limit, + obtain_pending = Pending, + obtain_limit = ObtainLimit, + obtain_count = ObtainCount, + open_count = OpenCount }) -> + Quota = lists:min([ObtainLimit - ObtainCount, + Limit - (ObtainCount + OpenCount)]), + {Pending1, Inc} = process_pending(Pending, Quota), + State #fhc_state { obtain_pending = Pending1, + obtain_count = ObtainCount + Inc }. + +process_pending([], _Quota) -> + {[], 0}; +process_pending(Pending, Quota) when Quota =< 0 -> + {Pending, 0}; +process_pending(Pending, Quota) -> + PendingLen = length(Pending), + SatisfiableLen = lists:min([PendingLen, Quota]), + Take = PendingLen - SatisfiableLen, + {PendingNew, SatisfiableRev} = lists:split(Take, Pending), + [run_pending_item(Item) || Item <- SatisfiableRev], + {PendingNew, SatisfiableLen}. + +run_pending_item({open, From}) -> + gen_server:reply(From, ok); +run_pending_item({obtain, Pid, From}) -> + _MRef = erlang:monitor(process, Pid), + gen_server:reply(From, ok). + +maybe_reduce(State = #fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending, + elders = Elders, + callbacks = Callbacks, + timer_ref = TRef }) + when Limit =/= infinity andalso + (((OpenCount + ObtainCount) > Limit) orelse + (OpenPending =/= []) orelse + (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> @@ -818,15 +907,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, end end, Pids) end, + AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State #fhc_state { timer_ref = TRef1 }; - _ -> State + {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; + _ -> {AboveLimit, State} end; maybe_reduce(State) -> - State. + {false, State}. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c55380d332..4ca91aa1b5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -998,7 +998,6 @@ safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> %% file helper functions %%---------------------------------------------------------------------------- - open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). @@ -1084,11 +1083,8 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). list_sorted_file_names(Dir, Ext) -> - sort_file_names(filelib:wildcard("*" ++ Ext, Dir)). - -sort_file_names(FileNames) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, - FileNames). + filelib:wildcard("*" ++ Ext, Dir)). %%---------------------------------------------------------------------------- %% message cache helper functions @@ -1184,7 +1180,7 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> {false, Error} -> Fresh("failed to read recovery terms: ~p", [Error]); {true, Terms} -> - RecClientRefs = proplists:get_value(client_refs, Terms, []), + RecClientRefs = proplists:get_value(client_refs, Terms, []), RecIndexModule = proplists:get_value(index_module, Terms), case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs) andalso IndexModule =:= RecIndexModule) of @@ -1280,7 +1276,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> %% with duplicates appearing. Thus the simplest and safest thing %% to do is to append the contents of the tmp file to its main %% file. - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, ?READ_MODE ++ ?WRITE_MODE), {ok, _End} = file_handle_cache:position(MainHdl, eof), @@ -1381,7 +1377,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case index_lookup(Guid, State) of - StoreEntry = #msg_location { file = undefined } -> + #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, total_size = TotalSize }, diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index cc4982c9cb..11ce6fc532 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,6 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> + ok = file_handle_cache:obtain(self()), accept(State); handle_cast(_Msg, State) -> @@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) + file_handle_cache:obtain(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> - ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} |
