diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-25 17:13:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-25 17:13:38 +0100 |
| commit | b48837f70e2f0bffabbdc092a7dd4cbdb7f6dec1 (patch) | |
| tree | 642cbc4daa8c78a11439513758d0ba4f7acf787d /src | |
| parent | 3f3514008f98a2e1298695ca646334c1c98b2baa (diff) | |
| parent | f2e772f4a442170cece366912fda5ce153328263 (diff) | |
| download | rabbitmq-server-git-b48837f70e2f0bffabbdc092a7dd4cbdb7f6dec1.tar.gz | |
Merging default into bug 15930 and minor debitrot (heartbeater)
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 675 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 28 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 7 |
10 files changed, 501 insertions, 264 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe4bdc0392..f83fa0bc11 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -116,13 +116,13 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available. transfer/1 is transfers ownership +%% of a file descriptor between processes. It is non-blocking. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/1]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -148,7 +148,8 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). --define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,11 +183,26 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - client_mrefs, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -222,7 +238,10 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(obtain/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). -endif. @@ -249,9 +268,9 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, bof, new) of - {ok, _Handle} -> + false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + case get_or_reopen([{Ref, new}]) of + {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; false -> RCount @@ -262,6 +281,7 @@ open(Path, Mode, Options) -> has_writer = HasWriter1 }), {ok, Ref}; Error -> + erase({Ref, fhc_handle}), Error end end. @@ -432,8 +452,8 @@ set_maximum_since_use(MaximumAge) -> case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - Age = timer:now_diff(Now, Then), - case Hdl =/= closed andalso Age >= MaximumAge of + case Hdl =/= closed andalso + timer:now_diff(Now, Then) >= MaximumAge of true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; @@ -444,8 +464,17 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain(Pid) -> - gen_server:call(?SERVER, {obtain, Pid}, infinity). +obtain() -> + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). + +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -462,18 +491,9 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> - ResHandles = lists:foldl( - fun (Ref, {ok, HandlesAcc}) -> - case get_or_reopen(Ref) of - {ok, Handle} -> {ok, [Handle | HandlesAcc]}; - Error -> Error - end; - (_Ref, Error) -> - Error - end, {ok, []}, Refs), - case ResHandles of + case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of {ok, Handles} -> - case Fun(lists:reverse(Handles)) of + case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -502,17 +522,80 @@ with_flushed_handles(Refs, Fun) -> end end). -get_or_reopen(Ref) -> - case get({Ref, fhc_handle}) of - undefined -> - {error, not_open, Ref}; - #handle { hdl = closed, offset = Offset, - path = Path, mode = Mode, options = Options } -> - open1(Path, Mode, Options, Ref, Offset, reopen); - Handle -> - {ok, Handle} +get_or_reopen(RefNewOrReopens) -> + case partition_handles(RefNewOrReopens) of + {OpenHdls, []} -> + {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; + {OpenHdls, ClosedHdls} -> + Oldest = oldest(get_age_tree(), fun () -> now() end), + case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of + ok -> + case reopen(ClosedHdls) of + {ok, RefHdls} -> sort_handles(RefNewOrReopens, + OpenHdls, RefHdls, []); + Error -> Error + end; + close -> + [soft_close(Ref, Handle) || + {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- + get(), + Hdl =/= closed], + get_or_reopen(RefNewOrReopens) + end + end. + +reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). + +reopen([], Tree, RefHdls) -> + put_age_tree(Tree), + {ok, lists:reverse(RefHdls)}; +reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, + path = Path, + mode = Mode, + offset = Offset, + last_used_at = undefined }} | + RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> + case file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of + {ok, Hdl} -> + Now = now(), + {{ok, Offset1}, Handle1} = + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + last_used_at = Now }), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + [{Ref, Handle2} | RefHdls]); + Error -> + %% NB: none of the handles in ToOpen are in the age tree + Oldest = oldest(Tree, fun () -> undefined end), + [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + put_age_tree(Tree), + Error end. +partition_handles(RefNewOrReopens) -> + lists:foldr( + fun ({Ref, NewOrReopen}, {Open, Closed}) -> + case get({Ref, fhc_handle}) of + #handle { hdl = closed } = Handle -> + {Open, [{Ref, NewOrReopen, Handle} | Closed]}; + #handle {} = Handle -> + {[{Ref, Handle} | Open], Closed} + end + end, {[], []}, RefNewOrReopens). + +sort_handles([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); +sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). + put_handle(Ref, Handle = #handle { last_used_at = Then }) -> Now = now(), age_tree_update(Then, Now, Ref), @@ -528,21 +611,6 @@ get_age_tree() -> put_age_tree(Tree) -> put(fhc_age_tree, Tree). -age_tree_insert(Now, Ref) -> - Tree = get_age_tree(), - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest, - not gb_trees:is_empty(Tree)}, infinity) of - ok -> - put_age_tree(Tree1); - close -> - [soft_close(Ref1, Handle1) || - {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), - Hdl1 =/= closed], - age_tree_insert(Now, Ref) - end. - age_tree_update(Then, Now, Ref) -> with_age_tree( fun (Tree) -> @@ -553,13 +621,7 @@ age_tree_delete(Then) -> with_age_tree( fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), - Oldest = case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = gb_trees:smallest(Tree1), - Oldest1 - end, + Oldest = oldest(Tree1, fun () -> undefined end), gen_server:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -575,44 +637,37 @@ age_tree_change() -> Tree end). -open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> - Mode1 = case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end, - Now = now(), - age_tree_insert(Now, Ref), - case file:open(Path, Mode1) of - {ok, Hdl} -> - WriteBufferSize = - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end, - Handle = #handle { hdl = Hdl, - offset = 0, - trusted_offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = Now }, - {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), - {ok, Handle2}; - {error, Reason} -> - age_tree_delete(Now), - {error, Reason} +oldest(Tree, DefaultFun) -> + case gb_trees:is_empty(Tree) of + true -> DefaultFun(); + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + Oldest end. +new_closed_handle(Path, Mode, Options) -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Ref = make_ref(), + put({Ref, fhc_handle}, #handle { hdl = closed, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = undefined }), + {ok, Ref}. + soft_close(Ref, Handle) -> {Res, Handle1} = soft_close(Handle), case Res of @@ -626,7 +681,9 @@ soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + {ok, #handle { hdl = Hdl, + offset = Offset, + is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of true -> file:sync(Hdl); @@ -634,8 +691,10 @@ soft_close(Handle) -> end, ok = file:close(Hdl), age_tree_delete(Then), - {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, - is_dirty = false }}; + {ok, Handle1 #handle { hdl = closed, + trusted_offset = Offset, + is_dirty = false, + last_used_at = undefined }}; {_Error, _Handle} = Result -> Result end. @@ -724,127 +783,206 @@ init([]) -> _ -> ulimit() end, - ObtainLimit = case Limit of - infinity -> infinity; - _ -> ?OBTAIN_LIMIT(Limit) - end, + ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, - open_pending = [], + open_pending = pending_new(), obtain_limit = ObtainLimit, obtain_count = 0, - obtain_pending = [], - callbacks = dict:new(), - client_mrefs = dict:new(), + obtain_pending = pending_new(), + clients = Clients, timer_ref = undefined }}. +handle_call({open, Pid, Requested, EldestUnusedSince}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders, + clients = Clients }) + when EldestUnusedSince =/= undefined -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, + ok = track_client(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, + case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State1 #fhc_state { + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> + true = ets:update_element( + Clients, Pid, + {#cstate.pending_closes, Opened}), + {reply, close, State1} + end; + false -> {noreply, run_pending_item(Item, State1)} + end; + handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> - {noreply, - State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> - case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; - {false, State1} -> - _MRef = erlang:monitor(process, Pid), - {reply, ok, State1} + clients = Clients }) -> + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + ok = track_client(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; + false -> + {noreply, run_pending_item(Item, State)} end; - -handle_call({open, Pid, EldestUnusedSince, CanClose}, From, - State = #fhc_state { open_count = Count, - open_pending = Pending, - elders = Elders }) -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case maybe_reduce( - ensure_mref(Pid, State #fhc_state { open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [{open, From} | Pending], - elders = dict:erase(Pid, Elders1) }} - end; - {false, State1} -> - {reply, ok, State1} - end. +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; - -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + State = #fhc_state { clients = Clients }) -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; + +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, open_count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, process_pending( - ensure_mref(Pid, State #fhc_state { open_count = Count - 1, - elders = Elders1 }))}; + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; -handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. +handle_cast({transfer, FromPid, ToPid}, State) -> + ok = track_client(ToPid, State#fhc_state.clients), + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, State)))}; -handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, elders = Elders }) -> +handle_cast(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + clients = Clients }) -> + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, process_pending( - case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { obtain_count = Count - 1 } - end)}. - -terminate(_Reason, State) -> + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) })}. + +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of + OLimit when OLimit < 0 -> 0; + OLimit -> OLimit + end. + +requested({_Kind, _Pid, Requested, _From}) -> + Requested. + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc} = - process_pending(Pending, Limit - (ObtainCount + OpenCount)), - State #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + {Pending1, State1} = + process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -853,70 +991,139 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc} = process_pending(Pending, Quota), - State #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. - -process_pending([], _Quota) -> - {[], 0}; -process_pending(Pending, Quota) when Quota =< 0 -> - {Pending, 0}; -process_pending(Pending, Quota) -> - PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Quota]), - Take = PendingLen - SatisfiableLen, - {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - [run_pending_item(Item) || Item <- SatisfiableRev], - {PendingNew, SatisfiableLen}. - -run_pending_item({open, From}) -> - gen_server:reply(From, ok); -run_pending_item({obtain, Pid, From}) -> - _MRef = erlang:monitor(process, Pid), - gen_server:reply(From, ok). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. + +process_pending(Pending, Quota, State) when Quota =< 0 -> + {Pending, State}; +process_pending(Pending, Quota, State) -> + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end. + +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, + State = #fhc_state { clients = Clients }) -> + gen_server:reply(From, ok), + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, Requested, State). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { open_count = OpenCount, + obtain_count = ObtainCount, + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. + +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (not pending_is_empty(OpenPending)) + orelse (ObtainCount < ObtainLimit + andalso not pending_is_empty(ObtainPending))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of + true -> Accs; + false -> {[CState | CStatesAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), - case Pids of + case CStates of [] -> ok; - _ -> AverageAge = Sum / ClientCount, - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + pending_count(OpenPending) + + pending_count(ObtainPending)) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). + +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -947,11 +1154,3 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. - -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } - end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2453280e34..0cdb4fff08 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. + [Q || Q <- Qs, + gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5ac..2cab7136a6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -603,6 +603,7 @@ handle_call({init, Recover}, From, declare(Recover, From, State); _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined} = State, + gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( @@ -610,7 +611,7 @@ handle_call({init, Recover}, From, end, BQS = BQ:init(QName, IsDurable, Recover), %% Rely on terminate to delete the queue. - {stop, normal, not_found, State#q{backing_queue_state = BQS}} + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 89b2441e38..e796acf327 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery = Delivery). split_topic_key(Key) -> - re:split(Key, "\\.", [{return, list}]). + string:tokens(binary_to_list(Key), "."). topic_matches(PatternKey, RoutingKey) -> P = split_topic_key(PatternKey), diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 61ef5efb65..a9945af1d4 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -62,24 +62,22 @@ start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, {0, 0}) - end)}. + heartbeater( + {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send( + Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}). start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, {0, 0}) end)}. + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + Parent ! timeout, + stop + end}). pause_monitor(none) -> ok; @@ -95,6 +93,9 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- +heartbeater(Params) -> + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. + heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, {StatVal, SameCount}) -> Recurse = fun (V) -> heartbeater(Params, V) end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5bc1f9d5e4..6576bfbbfb 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -362,7 +362,7 @@ set_maximum_since_use(Server, Age) -> client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -382,7 +382,7 @@ client_delete_and_terminate(CState, Server, Ref) -> ok = gen_server2:call(Server, {delete_client, Ref}, infinity). successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). + gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -611,7 +611,7 @@ handle_call({delete_client, CRef}, _From, reply(ok, State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). -handle_cast({write, Guid, Msg}, +handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, @@ -619,6 +619,7 @@ handle_cast({write, Guid, Msg}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> %% New message, lots to do diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index c9f75be030..0f5ed286b3 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -131,7 +131,9 @@ start() -> error -> error("failed to compile boot script file ~s", [ScriptFile]) end, io:format("~n~w plugins activated:~n", [length(PluginApps)]), - [io:format("* ~w~n", [App]) || App <- PluginApps], + [io:format("* ~s-~s~n", [App, Vsn]) || + App <- PluginApps, + {App, Vsn} <- [proplists:lookup(App, AppVersions)]], io:nl(), halt(), ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6efbeab610..3c32fc9e3b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -799,9 +799,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - State1 = State#v1{connection_state = running, - connection = NewConnection}, + State1 = internal_conserve_memory( + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State#v1{connection_state = running, + connection = NewConnection}), rabbit_event:notify( connection_created, [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a71e49f830..74fae2640b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,8 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), + passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), @@ -1418,6 +1420,32 @@ extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> test_supervisor_delayed_restart() -> test_sup:test_supervisor_delayed_restart(). +test_file_handle_cache() -> + %% test copying when there is just one spare handle + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( + filename:join(TmpDir, "file3"), + [write], []), + receive close -> ok end, + file_handle_cache:delete(Hdl) + end), + Src = filename:join(TmpDir, "file1"), + Dst = filename:join(TmpDir, "file2"), + Content = <<"foo">>, + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + file_handle_cache:delete(DstHdl), + Pid ! close, + ok = file_handle_cache:set_limit(Limit), + passed. + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 11ce6fc532..c9809ace61 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,7 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> - ok = file_handle_cache:obtain(self()), + ok = file_handle_cache:obtain(), accept(State); handle_cast(_Msg, State) -> @@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:obtain(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain() catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% accept more accept(State); + handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we %% know this will fail. {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. |
