diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-02-18 14:47:56 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-02-18 14:47:56 +0000 |
| commit | 5756f3dd99fa6ef710c539172b534d82c9b3913f (patch) | |
| tree | 46e7a8802decce1cedf6d52c7afb66b8e495398e /src | |
| parent | e46a9048be642fe8e89a46775312e92f2a71d644 (diff) | |
| download | rabbitmq-server-git-5756f3dd99fa6ef710c539172b534d82c9b3913f.tar.gz | |
Several fixes:
1. Both the msg_store and the amqqueue_process can have their mailboxes get very long. In this case, it's a problem because close messages from the FHC can't get through (Plain !, and no priority). Therefore, add callbacks registry to FHC and equip both msg_store and amqqueue_process with high priority casts to solve this problem (ftr, msg_store can get swamped with writes, whilst the amqqueue_process can get swamped with delivery notifications and acks).
2. The GC was missing the ability to deal with close msgs from the FHC
3. The FHC, when reopening a file, uses the same mode as the file was originally opened with. If that mode is just write, then when the file is reopened, its contents get trashed. Thus when reopening, add in read to the mode, but don't record this anywhere - the file still acts (API wise) as if it was only opened writable.
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 114 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 4 |
5 files changed, 104 insertions, 58 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6e367b03ab..520be0ce2e 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -133,10 +133,10 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([release_on_death/1, obtain/0]). +-export([release_on_death/1, obtain/0, register_callback/3]). -define(SERVER, ?MODULE). --define(RESERVED_FOR_OTHERS, 50). +-define(RESERVED_FOR_OTHERS, 100). -define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). @@ -169,7 +169,8 @@ { elders, limit, count, - obtains + obtains, + callbacks }). %%---------------------------------------------------------------------------- @@ -184,6 +185,7 @@ -type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()} | {'cur',integer()} | integer())). +-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: (string(), [any()], [{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) -> @@ -215,6 +217,10 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). +register_callback(M, F, A) + when is_atom(M) andalso is_atom(F) andalso is_list(A) -> + gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity). + open(Path, Mode, Options) -> case is_appender(Mode) of true -> @@ -241,7 +247,7 @@ open(Path, Mode, Options) -> File1 #file { reader_count = RCount1, has_writer = HasWriter1}), Ref = make_ref(), - case open1(Path1, Mode, Options, Ref, bof) of + case open1(Path1, Mode, Options, Ref, bof, new) of {ok, _Handle} -> {ok, Ref}; Error -> Error end @@ -504,7 +510,7 @@ get_or_reopen(Ref) -> {error, not_open, Ref}; #handle { hdl = closed, mode = Mode, options = Options, offset = Offset, path = Path } -> - open1(Path, Mode, Options, Ref, Offset); + open1(Path, Mode, Options, Ref, Offset, reopen); Handle -> {ok, Handle} end. @@ -524,8 +530,12 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) -> fun (Tree) -> gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) end), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -open1(Path, Mode, Options, Ref, Offset) -> - case file:open(Path, Mode) of +open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> + Mode1 = case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end, + case file:open(Path, Mode1) of {ok, Hdl} -> WriteBufferSize = case proplists:get_value(write_buffer, Options, unbuffered) of @@ -561,31 +571,36 @@ close1(Ref, Handle, SoftOrHard) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, path = Path, is_dirty = IsDirty, is_read = IsReader, is_write = IsWriter, - last_used_at = Then } = Handle1 } -> - case Hdl of - closed -> ok; - _ -> ok = case IsDirty of - true -> file:sync(Hdl); - false -> ok - end, - ok = file:close(Hdl), - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:delete(Then, Tree), - Oldest = - case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = - gb_trees:smallest(Tree1), - Oldest1 - end, - gen_server:cast( - ?SERVER, {close, self(), Oldest}), - Tree1 - end) - end, + last_used_at = Then, offset = Offset } = Handle1 } -> + Handle2 = + case Hdl of + closed -> + ok; + _ -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete(Then, Tree), + Oldest = + case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = + gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server:cast( + ?SERVER, {close, self(), Oldest}), + Tree1 + end), + Handle1 #handle { trusted_offset = Offset, + is_dirty = false } + end, case SoftOrHard of hard -> #file { reader_count = RCount, has_writer = HasWriter } = File = @@ -602,7 +617,7 @@ close1(Ref, Handle, SoftOrHard) -> has_writer = HasWriter1 }) end, ok; - soft -> {ok, Handle1 #handle { hdl = closed }} + soft -> {ok, Handle2 #handle { hdl = closed }} end; {Error, Handle1} -> put_handle(Ref, Handle1), @@ -673,7 +688,7 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [] }}. + obtains = [], callbacks = dict:new() }}. handle_call(obtain, From, State = #fhc_state { count = Count }) -> State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = @@ -682,7 +697,12 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) -> true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], count = Count1 - 1 }}; false -> {reply, ok, State1} - end. + end; + +handle_call({register_callback, Pid, MFA}, _From, + State = #fhc_state { callbacks = Callbacks }) -> + {reply, ok, + State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}. handle_cast({open, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> @@ -713,9 +733,11 @@ handle_cast({release_on_death, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}. -handle_info({'DOWN', _MRef, process, _Pid, _Reason}, - State = #fhc_state { count = Count }) -> - {noreply, process_obtains(State #fhc_state { count = Count - 1 })}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { count = Count, callbacks = Callbacks }) -> + {noreply, process_obtains( + State #fhc_state { count = Count - 1, + callbacks = dict:erase(Pid, Callbacks) })}. terminate(_Reason, State) -> State. @@ -742,7 +764,7 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count, State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. maybe_reduce(State = #fhc_state { limit = Limit, count = Count, - elders = Elders }) + elders = Elders, callbacks = Callbacks }) when Limit /= infinity andalso Count >= Limit -> Now = now(), {Pids, Sum, ClientCount} = @@ -755,10 +777,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, case Pids of [] -> ok; _ -> AverageAge = Sum / ClientCount, - lists:foreach(fun (Pid) -> Pid ! {?MODULE, - maximum_eldest_since_use, - AverageAge} - end, Pids) + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + Pid ! {?MODULE, maximum_eldest_since_use, + AverageAge}; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids) end, {ok, _TRef} = timer:apply_after(?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c9f5b5ae72..df4ca40f20 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, remeasure_rates/1, - set_queue_duration/2]). + set_queue_duration/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). @@ -122,6 +122,7 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(remeasure_rates/1 :: (pid()) -> 'ok'). -spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -374,10 +375,13 @@ internal_delete(QueueName) -> end). remeasure_rates(QPid) -> - gen_server2:pcast(QPid, 9, remeasure_rates). + gen_server2:pcast(QPid, 9, remeasure_rates). set_queue_duration(QPid, Duration) -> - gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). + gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). + +set_maximum_since_use(QPid, Age) -> + gen_server2:pcast(QPid, 9, {set_maximum_since_use, Age}). on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 73c3678d29..93ebc3c550 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,12 +102,14 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), VQS = rabbit_variable_queue:init(QName), @@ -907,7 +909,11 @@ handle_cast({set_queue_duration, Duration}, State = #q{variable_queue_state = VQS}) -> VQS1 = rabbit_variable_queue:set_queue_ram_duration_target( Duration, VQS), - noreply(State#q{variable_queue_state = VQS1}). + noreply(State#q{variable_queue_state = VQS1}); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -934,10 +940,6 @@ handle_info(timeout, State = #q{variable_queue_state = VQS}) -> State#q{variable_queue_state = rabbit_variable_queue:tx_commit_from_vq(VQS)})); -handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 10e325e985..81663c00e6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,7 @@ -export([start_link/3, write/2, read/2, contains/1, remove/1, release/1, sync/2, client_init/0, client_terminate/1]). --export([sync/0, gc_done/3]). %% internal +-export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). @@ -98,6 +98,7 @@ -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). -spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(client_init/0 :: () -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). @@ -305,6 +306,9 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal gc_done(Reclaimed, Source, Destination) -> gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). +set_maximum_since_use(Age) -> + gen_server2:pcast(?SERVER, 9, {set_maximum_since_use, Age}). + client_init() -> {IState, IModule, Dir} = gen_server2:call(?SERVER, new_client_state, infinity), @@ -422,6 +426,9 @@ close_all_indicated(CState) -> init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> process_flag(trap_exit, true), + ok = + file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, IndexModule} = application:get_env(msg_store_index_module), @@ -597,15 +604,15 @@ handle_cast({gc_done, Reclaimed, Source, Dest}, true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false })). + gc_active = false })); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State). handle_info(timeout, State) -> noreply(sync(State)); -handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -746,6 +753,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, + {msg_id, MsgId}, {read, Rest}, {proc_dict, get()} ]}}) diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 6023de0296..a64733dfc2 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -77,6 +77,10 @@ handle_cast({gc, Source, Destination}, State) -> ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination), {noreply, State, hibernate}. +handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, State, hibernate}; + handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. |
