summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-25 17:13:38 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-25 17:13:38 +0100
commitb48837f70e2f0bffabbdc092a7dd4cbdb7f6dec1 (patch)
tree642cbc4daa8c78a11439513758d0ba4f7acf787d /src
parent3f3514008f98a2e1298695ca646334c1c98b2baa (diff)
parentf2e772f4a442170cece366912fda5ce153328263 (diff)
downloadrabbitmq-server-git-b48837f70e2f0bffabbdc092a7dd4cbdb7f6dec1.tar.gz
Merging default into bug 15930 and minor debitrot (heartbeater)
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl675
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl2
-rw-r--r--src/rabbit_heartbeat.erl27
-rw-r--r--src/rabbit_msg_store.erl9
-rw-r--r--src/rabbit_plugin_activator.erl4
-rw-r--r--src/rabbit_reader.erl7
-rw-r--r--src/rabbit_tests.erl28
-rw-r--r--src/tcp_acceptor.erl7
10 files changed, 501 insertions, 264 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4bdc0392..f83fa0bc11 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -116,13 +116,13 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and release_on_death. obtain/0
-%% blocks until a file descriptor is available. release_on_death/1
-%% takes a pid and monitors the pid, reducing the count by 1 when the
-%% pid dies. Thus the assumption is that obtain/0 is called first, and
-%% when that returns, release_on_death/1 is called with the pid who
-%% "owns" the file descriptor. This is, for example, used to track the
-%% use of file descriptors through network sockets.
+%% The server also supports obtain and transfer. obtain/0 blocks until
+%% a file descriptor is available. transfer/1 is transfers ownership
+%% of a file descriptor between processes. It is non-blocking.
+%%
+%% The callers of register_callback/3, obtain/0, and the argument of
+%% transfer/1 are monitored, reducing the count of handles in use
+%% appropriately when the processes terminate.
-behaviour(gen_server).
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/1]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -148,7 +148,8 @@
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
--define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
+-define(CLIENT_ETS_TABLE, ?MODULE).
%%----------------------------------------------------------------------------
@@ -182,11 +183,26 @@
obtain_limit,
obtain_count,
obtain_pending,
- callbacks,
- client_mrefs,
+ clients,
timer_ref
}).
+-record(cstate,
+ { pid,
+ callback,
+ opened,
+ obtained,
+ blocked,
+ pending_closes
+ }).
+
+-record(pending,
+ { kind,
+ pid,
+ requested,
+ from
+ }).
+
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -222,7 +238,10 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(obtain/1 :: (pid()) -> 'ok').
+-spec(obtain/0 :: () -> 'ok').
+-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
+-spec(get_limit/0 :: () -> non_neg_integer()).
-endif.
@@ -249,9 +268,9 @@ open(Path, Mode, Options) ->
IsWriter = is_writer(Mode1),
case IsWriter andalso HasWriter of
true -> {error, writer_exists};
- false -> Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, bof, new) of
- {ok, _Handle} ->
+ false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
+ case get_or_reopen([{Ref, new}]) of
+ {ok, [_Handle1]} ->
RCount1 = case is_reader(Mode1) of
true -> RCount + 1;
false -> RCount
@@ -262,6 +281,7 @@ open(Path, Mode, Options) ->
has_writer = HasWriter1 }),
{ok, Ref};
Error ->
+ erase({Ref, fhc_handle}),
Error
end
end.
@@ -432,8 +452,8 @@ set_maximum_since_use(MaximumAge) ->
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
- Age = timer:now_diff(Now, Then),
- case Hdl =/= closed andalso Age >= MaximumAge of
+ case Hdl =/= closed andalso
+ timer:now_diff(Now, Then) >= MaximumAge of
true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
@@ -444,8 +464,17 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, Pid}, infinity).
+obtain() ->
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
+
+transfer(Pid) ->
+ gen_server:cast(?SERVER, {transfer, self(), Pid}).
+
+set_limit(Limit) ->
+ gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+
+get_limit() ->
+ gen_server:call(?SERVER, get_limit, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -462,18 +491,9 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
- ResHandles = lists:foldl(
- fun (Ref, {ok, HandlesAcc}) ->
- case get_or_reopen(Ref) of
- {ok, Handle} -> {ok, [Handle | HandlesAcc]};
- Error -> Error
- end;
- (_Ref, Error) ->
- Error
- end, {ok, []}, Refs),
- case ResHandles of
+ case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
{ok, Handles} ->
- case Fun(lists:reverse(Handles)) of
+ case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
Result;
@@ -502,17 +522,80 @@ with_flushed_handles(Refs, Fun) ->
end
end).
-get_or_reopen(Ref) ->
- case get({Ref, fhc_handle}) of
- undefined ->
- {error, not_open, Ref};
- #handle { hdl = closed, offset = Offset,
- path = Path, mode = Mode, options = Options } ->
- open1(Path, Mode, Options, Ref, Offset, reopen);
- Handle ->
- {ok, Handle}
+get_or_reopen(RefNewOrReopens) ->
+ case partition_handles(RefNewOrReopens) of
+ {OpenHdls, []} ->
+ {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
+ {OpenHdls, ClosedHdls} ->
+ Oldest = oldest(get_age_tree(), fun () -> now() end),
+ case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
+ ok ->
+ case reopen(ClosedHdls) of
+ {ok, RefHdls} -> sort_handles(RefNewOrReopens,
+ OpenHdls, RefHdls, []);
+ Error -> Error
+ end;
+ close ->
+ [soft_close(Ref, Handle) ||
+ {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
+ get(),
+ Hdl =/= closed],
+ get_or_reopen(RefNewOrReopens)
+ end
+ end.
+
+reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
+
+reopen([], Tree, RefHdls) ->
+ put_age_tree(Tree),
+ {ok, lists:reverse(RefHdls)};
+reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
+ path = Path,
+ mode = Mode,
+ offset = Offset,
+ last_used_at = undefined }} |
+ RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
+ case file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
+ {ok, Hdl} ->
+ Now = now(),
+ {{ok, Offset1}, Handle1} =
+ maybe_seek(Offset, Handle #handle { hdl = Hdl,
+ offset = 0,
+ last_used_at = Now }),
+ Handle2 = Handle1 #handle { trusted_offset = Offset1 },
+ put({Ref, fhc_handle}, Handle2),
+ reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
+ [{Ref, Handle2} | RefHdls]);
+ Error ->
+ %% NB: none of the handles in ToOpen are in the age tree
+ Oldest = oldest(Tree, fun () -> undefined end),
+ [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ put_age_tree(Tree),
+ Error
end.
+partition_handles(RefNewOrReopens) ->
+ lists:foldr(
+ fun ({Ref, NewOrReopen}, {Open, Closed}) ->
+ case get({Ref, fhc_handle}) of
+ #handle { hdl = closed } = Handle ->
+ {Open, [{Ref, NewOrReopen, Handle} | Closed]};
+ #handle {} = Handle ->
+ {[{Ref, Handle} | Open], Closed}
+ end
+ end, {[], []}, RefNewOrReopens).
+
+sort_handles([], [], [], Acc) ->
+ {ok, lists:reverse(Acc)};
+sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
+sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
+
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
Now = now(),
age_tree_update(Then, Now, Ref),
@@ -528,21 +611,6 @@ get_age_tree() ->
put_age_tree(Tree) -> put(fhc_age_tree, Tree).
-age_tree_insert(Now, Ref) ->
- Tree = get_age_tree(),
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), Oldest,
- not gb_trees:is_empty(Tree)}, infinity) of
- ok ->
- put_age_tree(Tree1);
- close ->
- [soft_close(Ref1, Handle1) ||
- {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
- Hdl1 =/= closed],
- age_tree_insert(Now, Ref)
- end.
-
age_tree_update(Then, Now, Ref) ->
with_age_tree(
fun (Tree) ->
@@ -553,13 +621,7 @@ age_tree_delete(Then) ->
with_age_tree(
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
- Oldest = case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} = gb_trees:smallest(Tree1),
- Oldest1
- end,
+ Oldest = oldest(Tree1, fun () -> undefined end),
gen_server:cast(?SERVER, {close, self(), Oldest}),
Tree1
end).
@@ -575,44 +637,37 @@ age_tree_change() ->
Tree
end).
-open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
- Mode1 = case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end,
- Now = now(),
- age_tree_insert(Now, Ref),
- case file:open(Path, Mode1) of
- {ok, Hdl} ->
- WriteBufferSize =
- case proplists:get_value(write_buffer, Options, unbuffered) of
- unbuffered -> 0;
- infinity -> infinity;
- N when is_integer(N) -> N
- end,
- Handle = #handle { hdl = Hdl,
- offset = 0,
- trusted_offset = 0,
- is_dirty = false,
- write_buffer_size = 0,
- write_buffer_size_limit = WriteBufferSize,
- write_buffer = [],
- at_eof = false,
- path = Path,
- mode = Mode,
- options = Options,
- is_write = is_writer(Mode),
- is_read = is_reader(Mode),
- last_used_at = Now },
- {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
- Handle2 = Handle1 #handle { trusted_offset = Offset1 },
- put({Ref, fhc_handle}, Handle2),
- {ok, Handle2};
- {error, Reason} ->
- age_tree_delete(Now),
- {error, Reason}
+oldest(Tree, DefaultFun) ->
+ case gb_trees:is_empty(Tree) of
+ true -> DefaultFun();
+ false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ Oldest
end.
+new_closed_handle(Path, Mode, Options) ->
+ WriteBufferSize =
+ case proplists:get_value(write_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ infinity -> infinity;
+ N when is_integer(N) -> N
+ end,
+ Ref = make_ref(),
+ put({Ref, fhc_handle}, #handle { hdl = closed,
+ offset = 0,
+ trusted_offset = 0,
+ is_dirty = false,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
+ write_buffer = [],
+ at_eof = false,
+ path = Path,
+ mode = Mode,
+ options = Options,
+ is_write = is_writer(Mode),
+ is_read = is_reader(Mode),
+ last_used_at = undefined }),
+ {ok, Ref}.
+
soft_close(Ref, Handle) ->
{Res, Handle1} = soft_close(Handle),
case Res of
@@ -626,7 +681,9 @@ soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
+ {ok, #handle { hdl = Hdl,
+ offset = Offset,
+ is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
true -> file:sync(Hdl);
@@ -634,8 +691,10 @@ soft_close(Handle) ->
end,
ok = file:close(Hdl),
age_tree_delete(Then),
- {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
- is_dirty = false }};
+ {ok, Handle1 #handle { hdl = closed,
+ trusted_offset = Offset,
+ is_dirty = false,
+ last_used_at = undefined }};
{_Error, _Handle} = Result ->
Result
end.
@@ -724,127 +783,206 @@ init([]) ->
_ ->
ulimit()
end,
- ObtainLimit = case Limit of
- infinity -> infinity;
- _ -> ?OBTAIN_LIMIT(Limit)
- end,
+ ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
+ Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
{ok, #fhc_state { elders = dict:new(),
limit = Limit,
open_count = 0,
- open_pending = [],
+ open_pending = pending_new(),
obtain_limit = ObtainLimit,
obtain_count = 0,
- obtain_pending = [],
- callbacks = dict:new(),
- client_mrefs = dict:new(),
+ obtain_pending = pending_new(),
+ clients = Clients,
timer_ref = undefined }}.
+handle_call({open, Pid, Requested, EldestUnusedSince}, From,
+ State = #fhc_state { open_count = Count,
+ open_pending = Pending,
+ elders = Elders,
+ clients = Clients })
+ when EldestUnusedSince =/= undefined ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ Item = #pending { kind = open,
+ pid = Pid,
+ requested = Requested,
+ from = From },
+ ok = track_client(Pid, Clients),
+ State1 = State #fhc_state { elders = Elders1 },
+ case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ true -> case ets:lookup(Clients, Pid) of
+ [#cstate { opened = 0 }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.blocked, true}),
+ {noreply,
+ reduce(State1 #fhc_state {
+ open_pending = pending_in(Item, Pending) })};
+ [#cstate { opened = Opened }] ->
+ true = ets:update_element(
+ Clients, Pid,
+ {#cstate.pending_closes, Opened}),
+ {reply, close, State1}
+ end;
+ false -> {noreply, run_pending_item(Item, State1)}
+ end;
+
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
obtain_count = Count,
obtain_pending = Pending,
- elders = Elders })
+ clients = Clients })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply,
- State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
- elders = Elders }) ->
- case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- {true, State1} ->
- {noreply, State1 #fhc_state {
- obtain_count = Count,
- obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
- {false, State1} ->
- _MRef = erlang:monitor(process, Pid),
- {reply, ok, State1}
+ clients = Clients }) ->
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ ok = track_client(Pid, Clients),
+ case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ true ->
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ {noreply, reduce(State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) })};
+ false ->
+ {noreply, run_pending_item(Item, State)}
end;
-
-handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
- State = #fhc_state { open_count = Count,
- open_pending = Pending,
- elders = Elders }) ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- case maybe_reduce(
- ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
- elders = Elders1 })) of
- {true, State1} ->
- State2 = State1 #fhc_state { open_count = Count },
- case CanClose of
- true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [{open, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
- end;
- {false, State1} ->
- {reply, ok, State1}
- end.
+handle_call({set_limit, Limit}, _From, State) ->
+ {reply, ok, maybe_reduce(
+ process_pending(State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) }))};
+handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
+ {reply, Limit, State}.
handle_cast({register_callback, Pid, MFA},
- State = #fhc_state { callbacks = Callbacks }) ->
- {noreply, ensure_mref(
- Pid, State #fhc_state {
- callbacks = dict:store(Pid, MFA, Callbacks) })};
-
-handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+ State = #fhc_state { clients = Clients }) ->
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
+ {noreply, State};
+
+handle_cast({update, Pid, EldestUnusedSince},
+ State = #fhc_state { elders = Elders })
+ when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
-handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, open_count = Count }) ->
+handle_cast({close, Pid, EldestUnusedSince},
+ State = #fhc_state { elders = Elders, clients = Clients }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
+ ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
- elders = Elders1 }))};
+ update_counts(open, Pid, -1,
+ State #fhc_state { elders = Elders1 }))};
-handle_cast(check_counts, State) ->
- {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
- {noreply, State1}.
+handle_cast({transfer, FromPid, ToPid}, State) ->
+ ok = track_client(ToPid, State#fhc_state.clients),
+ {noreply, process_pending(
+ update_counts(obtain, ToPid, +1,
+ update_counts(obtain, FromPid, -1, State)))};
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
+handle_cast(check_counts, State) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ clients = Clients }) ->
+ [#cstate { opened = Opened, obtained = Obtained }] =
+ ets:lookup(Clients, Pid),
+ true = ets:delete(Clients, Pid),
+ FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, process_pending(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { obtain_count = Count - 1 }
- end)}.
-
-terminate(_Reason, State) ->
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) })}.
+
+terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+ ets:delete(Clients),
State.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
+%% pending queue abstraction helpers
+%%----------------------------------------------------------------------------
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+
+filter_pending(Fun, {Count, Queue}) ->
+ {Delta, Queue1} =
+ queue_fold(fun (Item, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - requested(Item), QueueN}
+ end
+ end, {0, queue:new()}, Queue),
+ {Count + Delta, Queue1}.
+
+pending_new() ->
+ {0, queue:new()}.
+
+pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
+ {Count + Requested, queue:in(Item, Queue)}.
+
+pending_out({0, _Queue} = Pending) ->
+ {empty, Pending};
+pending_out({N, Queue}) ->
+ {{value, #pending { requested = Requested }} = Result, Queue1} =
+ queue:out(Queue),
+ {Result, {N - Requested, Queue1}}.
+
+pending_count({Count, _Queue}) ->
+ Count.
+
+pending_is_empty({0, _Queue}) ->
+ true;
+pending_is_empty({_N, _Queue}) ->
+ false.
+
+%%----------------------------------------------------------------------------
%% server helpers
%%----------------------------------------------------------------------------
+obtain_limit(infinity) -> infinity;
+obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
+ OLimit when OLimit < 0 -> 0;
+ OLimit -> OLimit
+ end.
+
+requested({_Kind, _Pid, Requested, _From}) ->
+ Requested.
+
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
- process_obtain(process_open(State)).
+ process_open(process_obtain(State)).
process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount)),
- State #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ {Pending1, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1 }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -853,70 +991,139 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc} = process_pending(Pending, Quota),
- State #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
-
-process_pending([], _Quota) ->
- {[], 0};
-process_pending(Pending, Quota) when Quota =< 0 ->
- {Pending, 0};
-process_pending(Pending, Quota) ->
- PendingLen = length(Pending),
- SatisfiableLen = lists:min([PendingLen, Quota]),
- Take = PendingLen - SatisfiableLen,
- {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [run_pending_item(Item) || Item <- SatisfiableRev],
- {PendingNew, SatisfiableLen}.
-
-run_pending_item({open, From}) ->
- gen_server:reply(From, ok);
-run_pending_item({obtain, Pid, From}) ->
- _MRef = erlang:monitor(process, Pid),
- gen_server:reply(From, ok).
-
-maybe_reduce(State = #fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_limit = ObtainLimit,
- obtain_pending = ObtainPending,
- elders = Elders,
- callbacks = Callbacks,
- timer_ref = TRef })
- when Limit =/= infinity andalso
- (((OpenCount + ObtainCount) > Limit) orelse
- (OpenPending =/= []) orelse
- (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
+ {Pending1, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1 }.
+
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, State};
+process_pending(Pending, Quota, State) ->
+ case pending_out(Pending) of
+ {empty, _Pending} ->
+ {Pending, State};
+ {{value, #pending { requested = Requested }}, _Pending1}
+ when Requested > Quota ->
+ {Pending, State};
+ {{value, #pending { requested = Requested } = Item}, Pending1} ->
+ process_pending(Pending1, Quota - Requested,
+ run_pending_item(Item, State))
+ end.
+
+run_pending_item(#pending { kind = Kind,
+ pid = Pid,
+ requested = Requested,
+ from = From },
+ State = #fhc_state { clients = Clients }) ->
+ gen_server:reply(From, ok),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
+ update_counts(Kind, Pid, Requested, State).
+
+update_counts(Kind, Pid, Delta,
+ State = #fhc_state { open_count = OpenCount,
+ obtain_count = ObtainCount,
+ clients = Clients }) ->
+ {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta }.
+
+update_counts1(open, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
+ {Delta, 0};
+update_counts1(obtain, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
+ {0, Delta}.
+
+maybe_reduce(State) ->
+ case needs_reduce(State) of
+ true -> reduce(State);
+ false -> State
+ end.
+
+needs_reduce(#fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending }) ->
+ Limit =/= infinity
+ andalso ((OpenCount + ObtainCount > Limit)
+ orelse (not pending_is_empty(OpenPending))
+ orelse (ObtainCount < ObtainLimit
+ andalso not pending_is_empty(ObtainPending))).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ clients = Clients,
+ timer_ref = TRef }) ->
Now = now(),
- {Pids, Sum, ClientCount} =
- dict:fold(fun (_Pid, undefined, Accs) ->
- Accs;
- (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
- {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
- CountAcc + 1}
+ {CStates, Sum, ClientCount} =
+ dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ [#cstate { pending_closes = PendingCloses,
+ opened = Opened,
+ blocked = Blocked } = CState] =
+ ets:lookup(Clients, Pid),
+ case Blocked orelse PendingCloses =:= Opened of
+ true -> Accs;
+ false -> {[CState | CStatesAcc],
+ SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end
end, {[], 0, 0}, Elders),
- case Pids of
+ case CStates of
[] -> ok;
- _ -> AverageAge = Sum / ClientCount,
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error -> ok;
- {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
- end
- end, Pids)
+ _ -> case (Sum / ClientCount) -
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ notify_age(CStates, AverageAge);
+ _ ->
+ notify_age0(Clients, CStates,
+ pending_count(OpenPending) +
+ pending_count(ObtainPending))
+ end
end,
- AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
- _ -> {AboveLimit, State}
- end;
-maybe_reduce(State) ->
- {false, State}.
+ State #fhc_state { timer_ref = TRef1 };
+ _ -> State
+ end.
+
+notify_age(CStates, AverageAge) ->
+ lists:foreach(
+ fun (#cstate { callback = undefined }) -> ok;
+ (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
+ end, CStates).
+
+notify_age0(Clients, CStates, Required) ->
+ Notifications =
+ [CState || CState <- CStates, CState#cstate.callback =/= undefined],
+ {L1, L2} = lists:split(random:uniform(length(Notifications)),
+ Notifications),
+ notify(Clients, Required, L2 ++ L1).
+
+notify(_Clients, _Required, []) ->
+ ok;
+notify(_Clients, Required, _Notifications) when Required =< 0 ->
+ ok;
+notify(Clients, Required, [#cstate{ pid = Pid,
+ callback = {M, F, A},
+ opened = Opened } | Notifications]) ->
+ apply(M, F, A ++ [0]),
+ ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
+ notify(Clients, Required - Opened, Notifications).
+
+track_client(Pid, Clients) ->
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained = 0,
+ blocked = false,
+ pending_closes = 0 }) of
+ true -> _MRef = erlang:monitor(process, Pid),
+ ok;
+ false -> ok
+ end.
%% For all unices, assume ulimit exists. Further googling suggests
%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
@@ -947,11 +1154,3 @@ ulimit() ->
_ ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
- end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2453280e34..0cdb4fff08 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,7 +197,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
+ [Q || Q <- Qs,
+ gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d52660c5ac..2cab7136a6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -603,6 +603,7 @@ handle_call({init, Recover}, From,
declare(Recover, From, State);
_ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined} = State,
+ gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
@@ -610,7 +611,7 @@ handle_call({init, Recover}, From,
end,
BQS = BQ:init(QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
- {stop, normal, not_found, State#q{backing_queue_state = BQS}}
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 89b2441e38..e796acf327 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery =
Delivery).
split_topic_key(Key) ->
- re:split(Key, "\\.", [{return, list}]).
+ string:tokens(binary_to_list(Key), ".").
topic_matches(PatternKey, RoutingKey) ->
P = split_topic_key(PatternKey),
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 61ef5efb65..a9945af1d4 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -62,24 +62,22 @@ start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- {ok, proc_lib:spawn_link(
- fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, {0, 0})
- end)}.
+ heartbeater(
+ {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(
+ Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}).
start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- {ok, proc_lib:spawn_link(
- fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, {0, 0}) end)}.
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ Parent ! timeout,
+ stop
+ end}).
pause_monitor(none) ->
ok;
@@ -95,6 +93,9 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
+heartbeater(Params) ->
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
+
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
{StatVal, SameCount}) ->
Recurse = fun (V) -> heartbeater(Params, V) end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 5bc1f9d5e4..6576bfbbfb 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid, Msg}), CState}.
+ {gen_server2:cast(Server, {write, Guid}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -362,7 +362,7 @@ set_maximum_since_use(Server, Age) ->
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -382,7 +382,7 @@ client_delete_and_terminate(CState, Server, Ref) ->
ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
successfully_recovered_state(Server) ->
- gen_server2:call(Server, successfully_recovered_state, infinity).
+ gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -611,7 +611,7 @@ handle_call({delete_client, CRef}, _From,
reply(ok,
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
-handle_cast({write, Guid, Msg},
+handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
@@ -619,6 +619,7 @@ handle_cast({write, Guid, Msg},
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
+ [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
not_found ->
%% New message, lots to do
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index c9f75be030..0f5ed286b3 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -131,7 +131,9 @@ start() ->
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
io:format("~n~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~w~n", [App]) || App <- PluginApps],
+ [io:format("* ~s-~s~n", [App, Vsn]) ||
+ App <- PluginApps,
+ {App, Vsn} <- [proplists:lookup(App, AppVersions)]],
io:nl(),
halt(),
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6efbeab610..3c32fc9e3b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -799,9 +799,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- State1 = State#v1{connection_state = running,
- connection = NewConnection},
+ State1 = internal_conserve_memory(
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State#v1{connection_state = running,
+ connection = NewConnection}),
rabbit_event:notify(
connection_created,
[{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a71e49f830..74fae2640b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,8 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ ok = file_handle_cache:set_limit(10),
+ passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),
@@ -1418,6 +1420,32 @@ extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
test_supervisor_delayed_restart() ->
test_sup:test_supervisor_delayed_restart().
+test_file_handle_cache() ->
+ %% test copying when there is just one spare handle
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores
+ TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
+ ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")),
+ Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open(
+ filename:join(TmpDir, "file3"),
+ [write], []),
+ receive close -> ok end,
+ file_handle_cache:delete(Hdl)
+ end),
+ Src = filename:join(TmpDir, "file1"),
+ Dst = filename:join(TmpDir, "file2"),
+ Content = <<"foo">>,
+ ok = file:write_file(Src, Content),
+ {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
+ {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
+ Size = size(Content),
+ {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
+ ok = file_handle_cache:delete(SrcHdl),
+ file_handle_cache:delete(DstHdl),
+ Pid ! close,
+ ok = file_handle_cache:set_limit(Limit),
+ passed.
+
test_backing_queue() ->
case application:get_env(rabbit, backing_queue_module) of
{ok, rabbit_variable_queue} ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 11ce6fc532..c9809ace61 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,7 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
- ok = file_handle_cache:obtain(self()),
+ ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
@@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain()
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% accept more
accept(State);
+
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
+
handle_info(_Info, State) ->
{noreply, State}.