summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-18 14:47:56 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-18 14:47:56 +0000
commit5756f3dd99fa6ef710c539172b534d82c9b3913f (patch)
tree46e7a8802decce1cedf6d52c7afb66b8e495398e /src
parente46a9048be642fe8e89a46775312e92f2a71d644 (diff)
downloadrabbitmq-server-git-5756f3dd99fa6ef710c539172b534d82c9b3913f.tar.gz
Several fixes:
1. Both the msg_store and the amqqueue_process can have their mailboxes get very long. In this case, it's a problem because close messages from the FHC can't get through (Plain !, and no priority). Therefore, add callbacks registry to FHC and equip both msg_store and amqqueue_process with high priority casts to solve this problem (ftr, msg_store can get swamped with writes, whilst the amqqueue_process can get swamped with delivery notifications and acks). 2. The GC was missing the ability to deal with close msgs from the FHC 3. The FHC, when reopening a file, uses the same mode as the file was originally opened with. If that mode is just write, then when the file is reopened, its contents get trashed. Thus when reopening, add in read to the mode, but don't record this anywhere - the file still acts (API wise) as if it was only opened writable.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl114
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_msg_store.erl20
-rw-r--r--src/rabbit_msg_store_gc.erl4
5 files changed, 104 insertions, 58 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6e367b03ab..520be0ce2e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -133,10 +133,10 @@
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([release_on_death/1, obtain/0]).
+-export([release_on_death/1, obtain/0, register_callback/3]).
-define(SERVER, ?MODULE).
--define(RESERVED_FOR_OTHERS, 50).
+-define(RESERVED_FOR_OTHERS, 100).
-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -169,7 +169,8 @@
{ elders,
limit,
count,
- obtains
+ obtains,
+ callbacks
}).
%%----------------------------------------------------------------------------
@@ -184,6 +185,7 @@
-type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()}
| {'cur',integer()} | integer())).
+-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
(string(), [any()],
[{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) ->
@@ -215,6 +217,10 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+register_callback(M, F, A)
+ when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
+ gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity).
+
open(Path, Mode, Options) ->
case is_appender(Mode) of
true ->
@@ -241,7 +247,7 @@ open(Path, Mode, Options) ->
File1 #file { reader_count = RCount1,
has_writer = HasWriter1}),
Ref = make_ref(),
- case open1(Path1, Mode, Options, Ref, bof) of
+ case open1(Path1, Mode, Options, Ref, bof, new) of
{ok, _Handle} -> {ok, Ref};
Error -> Error
end
@@ -504,7 +510,7 @@ get_or_reopen(Ref) ->
{error, not_open, Ref};
#handle { hdl = closed, mode = Mode, options = Options,
offset = Offset, path = Path } ->
- open1(Path, Mode, Options, Ref, Offset);
+ open1(Path, Mode, Options, Ref, Offset, reopen);
Handle ->
{ok, Handle}
end.
@@ -524,8 +530,12 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
fun (Tree) -> gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) end),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-open1(Path, Mode, Options, Ref, Offset) ->
- case file:open(Path, Mode) of
+open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
+ Mode1 = case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end,
+ case file:open(Path, Mode1) of
{ok, Hdl} ->
WriteBufferSize =
case proplists:get_value(write_buffer, Options, unbuffered) of
@@ -561,31 +571,36 @@ close1(Ref, Handle, SoftOrHard) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl, path = Path, is_dirty = IsDirty,
is_read = IsReader, is_write = IsWriter,
- last_used_at = Then } = Handle1 } ->
- case Hdl of
- closed -> ok;
- _ -> ok = case IsDirty of
- true -> file:sync(Hdl);
- false -> ok
- end,
- ok = file:close(Hdl),
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:delete(Then, Tree),
- Oldest =
- case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} =
- gb_trees:smallest(Tree1),
- Oldest1
- end,
- gen_server:cast(
- ?SERVER, {close, self(), Oldest}),
- Tree1
- end)
- end,
+ last_used_at = Then, offset = Offset } = Handle1 } ->
+ Handle2 =
+ case Hdl of
+ closed ->
+ ok;
+ _ ->
+ ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ ok = file:close(Hdl),
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:delete(Then, Tree),
+ Oldest =
+ case gb_trees:is_empty(Tree1) of
+ true ->
+ undefined;
+ false ->
+ {Oldest1, _Ref} =
+ gb_trees:smallest(Tree1),
+ Oldest1
+ end,
+ gen_server:cast(
+ ?SERVER, {close, self(), Oldest}),
+ Tree1
+ end),
+ Handle1 #handle { trusted_offset = Offset,
+ is_dirty = false }
+ end,
case SoftOrHard of
hard -> #file { reader_count = RCount,
has_writer = HasWriter } = File =
@@ -602,7 +617,7 @@ close1(Ref, Handle, SoftOrHard) ->
has_writer = HasWriter1 })
end,
ok;
- soft -> {ok, Handle1 #handle { hdl = closed }}
+ soft -> {ok, Handle2 #handle { hdl = closed }}
end;
{Error, Handle1} ->
put_handle(Ref, Handle1),
@@ -673,7 +688,7 @@ init([]) ->
end,
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
{ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [] }}.
+ obtains = [], callbacks = dict:new() }}.
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
@@ -682,7 +697,12 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) ->
true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
count = Count1 - 1 }};
false -> {reply, ok, State1}
- end.
+ end;
+
+handle_call({register_callback, Pid, MFA}, _From,
+ State = #fhc_state { callbacks = Callbacks }) ->
+ {reply, ok,
+ State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}.
handle_cast({open, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
@@ -713,9 +733,11 @@ handle_cast({release_on_death, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
{noreply, State}.
-handle_info({'DOWN', _MRef, process, _Pid, _Reason},
- State = #fhc_state { count = Count }) ->
- {noreply, process_obtains(State #fhc_state { count = Count - 1 })}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #fhc_state { count = Count, callbacks = Callbacks }) ->
+ {noreply, process_obtains(
+ State #fhc_state { count = Count - 1,
+ callbacks = dict:erase(Pid, Callbacks) })}.
terminate(_Reason, State) ->
State.
@@ -742,7 +764,7 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count,
State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
- elders = Elders })
+ elders = Elders, callbacks = Callbacks })
when Limit /= infinity andalso Count >= Limit ->
Now = now(),
{Pids, Sum, ClientCount} =
@@ -755,10 +777,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
case Pids of
[] -> ok;
_ -> AverageAge = Sum / ClientCount,
- lists:foreach(fun (Pid) -> Pid ! {?MODULE,
- maximum_eldest_since_use,
- AverageAge}
- end, Pids)
+ lists:foreach(
+ fun (Pid) ->
+ case dict:find(Pid, Callbacks) of
+ error ->
+ Pid ! {?MODULE, maximum_eldest_since_use,
+ AverageAge};
+ {ok, {M, F, A}} ->
+ apply(M, F, A ++ [AverageAge])
+ end
+ end, Pids)
end,
{ok, _TRef} = timer:apply_after(?FILE_HANDLES_CHECK_INTERVAL, gen_server,
cast, [?SERVER, check_counts]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c9f5b5ae72..df4ca40f20 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3,
purge/1]).
-export([internal_declare/2, internal_delete/1, remeasure_rates/1,
- set_queue_duration/2]).
+ set_queue_duration/2, set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
@@ -122,6 +122,7 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(remeasure_rates/1 :: (pid()) -> 'ok').
-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -374,10 +375,13 @@ internal_delete(QueueName) ->
end).
remeasure_rates(QPid) ->
- gen_server2:pcast(QPid, 9, remeasure_rates).
+ gen_server2:pcast(QPid, 9, remeasure_rates).
set_queue_duration(QPid, Duration) ->
- gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
+ gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:pcast(QPid, 9, {set_maximum_since_use, Age}).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 73c3678d29..93ebc3c550 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,12 +102,14 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
VQS = rabbit_variable_queue:init(QName),
@@ -907,7 +909,11 @@ handle_cast({set_queue_duration, Duration},
State = #q{variable_queue_state = VQS}) ->
VQS1 = rabbit_variable_queue:set_queue_ram_duration_target(
Duration, VQS),
- noreply(State#q{variable_queue_state = VQS1}).
+ noreply(State#q{variable_queue_state = VQS1});
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -934,10 +940,6 @@ handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
State#q{variable_queue_state =
rabbit_variable_queue:tx_commit_from_vq(VQS)}));
-handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 10e325e985..81663c00e6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,7 @@
-export([start_link/3, write/2, read/2, contains/1, remove/1, release/1,
sync/2, client_init/0, client_terminate/1]).
--export([sync/0, gc_done/3]). %% internal
+-export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
@@ -98,6 +98,7 @@
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(client_init/0 :: () -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
@@ -305,6 +306,9 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
gc_done(Reclaimed, Source, Destination) ->
gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}).
+set_maximum_since_use(Age) ->
+ gen_server2:pcast(?SERVER, 9, {set_maximum_since_use, Age}).
+
client_init() ->
{IState, IModule, Dir} =
gen_server2:call(?SERVER, new_client_state, infinity),
@@ -422,6 +426,9 @@ close_all_indicated(CState) ->
init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
process_flag(trap_exit, true),
+ ok =
+ file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []),
+
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, IndexModule} = application:get_env(msg_store_index_module),
@@ -597,15 +604,15 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source),
noreply(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false })).
+ gc_active = false }));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State).
handle_info(timeout, State) ->
noreply(sync(State));
-handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -746,6 +753,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
+ {msg_id, MsgId},
{read, Rest},
{proc_dict, get()}
]}})
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 6023de0296..a64733dfc2 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -77,6 +77,10 @@ handle_cast({gc, Source, Destination}, State) ->
ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
{noreply, State, hibernate}.
+handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.