diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 23:44:39 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 23:44:39 +0100 |
| commit | 1e4be9b8114a89fc8fb88b5eac349fc6575dacc2 (patch) | |
| tree | 40e37e9fc36d7381cc6363869b84b4347941c5b2 | |
| parent | 7e4f744b0ec187118ad44c5f22d5b47f3e2d030f (diff) | |
| download | rabbitmq-server-git-1e4be9b8114a89fc8fb88b5eac349fc6575dacc2.tar.gz | |
Lots of things:
1) disk_queue: remove prealloc - fs's that support holes do it really fast and it buys us nothing. fs's that don't support holes get crippled.
2) disk_queue: make sure we fsync before closing. This is crucial. Posix is perfectly happy to close without flushing data to disk.
3) disk_queue: bug in recover from crash during compaction (*still* not tested), where we were opening with just write, not read+write, hence would have stomped over existing content.
4) memory manager: track oppressed processes and only liberate if they're reporting a memory usage 5% different from what they last had.
5) minor cosmetics to amqqueue_process.
6) start up guid earlier. Necessary because it is actually needed in recovery of queues.
I'm not happy with memory manager, but am utterly knackered and need to sleep. All tests pass.
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl | 112 |
4 files changed, 96 insertions, 104 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8962b12ea6..f665ad92af 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -148,6 +148,10 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) end}, + {"guid generator", + fun () -> + ok = start_child(rabbit_guid) + end}, {"disk queue", fun () -> ok = start_child(rabbit_disk_queue) @@ -162,10 +166,6 @@ start(normal, []) -> ok = rabbit_disk_queue:delete_non_durable_queues( DurableQueueNames) end}, - {"guid generator", - fun () -> - ok = start_child(rabbit_guid) - end}, {"builtin applications", fun () -> {ok, DefaultVHost} = application:get_env(default_vhost), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 916a241062..3538b6fb61 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -114,8 +114,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> blocked_consumers = queue:new(), memory_report_timer = undefined }, - %% first thing we must do is report_memory which will clear out - %% the 'undefined' values in gain and loss in mixed_queue state + %% first thing we must do is report_memory. {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -721,8 +720,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State) -> {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), - reply({ok, Count}, - State #q { mixed_state = MS }); + reply({ok, Count}, State #q { mixed_state = MS }); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -738,10 +736,9 @@ handle_call({claim_queue, ReaderPid}, _From, %% pid... reply(locked, State); ok -> - reply(ok, State #q { owner = - {ReaderPid, - erlang:monitor(process, ReaderPid)} }) - + reply(ok, + State#q{ owner = {ReaderPid, erlang:monitor( + process, ReaderPid)} }) end; {ReaderPid, _MonitorRef} -> reply(ok, State); @@ -827,8 +824,8 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> noreply(State #q { mixed_state = MS1 }). handle_info(report_memory, State) -> - %% deliberately don't call noreply/2 as we don't want to restart the timer. - %% By unsetting the timer, we force a report on the next normal message + %% deliberately don't call noreply/2/3 as we don't want to start the timer. + %% By unsetting the timer, we force a report on the next normal message. {noreply, State #q { memory_report_timer = undefined }, hibernate}; handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 18b250c55c..3e38be8e09 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -53,8 +53,8 @@ -include("rabbit.hrl"). -define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). @@ -68,6 +68,7 @@ -define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(BATCH_SIZE, 10000). -define(CACHE_MAX_SIZE, 10485760). +-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]). -define(SERVER, ?MODULE). @@ -431,22 +432,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), - Path = form_filename(CurrentName), - Exists = case file:read_file_info(Path) of - {error,enoent} -> false; - {ok, _} -> true - end, %% read is only needed so that we can seek - {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), - case Exists of - true -> {ok, Offset} = file:position(FileHdl, {bof, Offset}); - false -> %% new file, so preallocate - ok = preallocate(FileHdl, FileSizeLimit, Offset) - end, + {ok, FileHdl} = file:open(form_filename(CurrentName), + [read, write, raw, binary, delayed_write]), + {ok, Offset} = file:position(FileHdl, {bof, Offset}), State2 = State1 #dqstate { current_file_handle = FileHdl }, %% by reporting a memory use of 0, we guarantee the manager will - %% grant us to ram_disk mode. We have to start in ram_disk mode - %% because we can't find values for mnesia_bytes_per_record or + %% not oppress us. We have to start in ram_disk mode because we + %% can't find values for mnesia_bytes_per_record or %% ets_bytes_per_record otherwise. ok = rabbit_memory_manager:report_memory(self(), 0, false), ok = report_memory(false, State2), @@ -1231,7 +1224,6 @@ maybe_roll_to_new_file(Offset, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]), - ok = preallocate(NextHdl, FileSizeLimit, 0), true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State2 = State1 #dqstate { current_file_name = NextName, @@ -1244,12 +1236,6 @@ maybe_roll_to_new_file(Offset, maybe_roll_to_new_file(_, State) -> {ok, State}. -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), - ok = file:truncate(Hdl), - {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), - ok. - %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- compact(FilesSet, State) -> @@ -1330,6 +1316,12 @@ sort_msg_locations_by_offset(Dir, List) -> Comp(OffA, OffB) end, List). +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + ok. + truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok = file:truncate(FileHdl), @@ -1339,11 +1331,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, - State1) -> - State = close_file(Source, close_file(Destination, State1)), + State) -> + State1 = close_file(Source, close_file(Destination, State)), {ok, SourceHdl} = file:open(form_filename(Source), - [read, write, raw, binary, read_ahead, delayed_write]), + [read, raw, binary, read_ahead]), {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, read_ahead, delayed_write]), @@ -1378,11 +1370,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop, %% enforce it anyway end, sort_msg_locations_by_offset( asc, dets_ets_match_object( - State, #message_store_entry + State1, #message_store_entry { file = Destination, _ = '_' }))), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), + DestinationHdl, TmpHdl, Destination, State1), TmpSize = DestinationValid - DestinationContiguousTop, %% so now Tmp contains everything we need to salvage from %% Destination, and MsgLocationDets has been updated to @@ -1399,16 +1391,16 @@ combine_files({Source, SourceValid, _SourceContiguousTop, end, SourceWorkList = sort_msg_locations_by_offset( - asc, dets_ets_match_object(State, #message_store_entry + asc, dets_ets_match_object(State1, #message_store_entry { file = Source, _ = '_' })), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State), + SourceHdl, DestinationHdl, Destination, State1), %% tidy up ok = file:sync(DestinationHdl), ok = file:close(SourceHdl), ok = file:close(DestinationHdl), ok = file:delete(form_filename(Source)), - State. + State1. copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> @@ -1748,7 +1740,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% note this also catches the case when the tmp file %% is empty ok = file:delete(TmpFile); - _False -> + false -> %% we're in case 4 above. Check that everything in the %% main file is a valid message in mnesia verify_messages_in_mnesia(MsgIds), @@ -1760,8 +1752,10 @@ recover_crashed_compactions1(Files, TmpFile) -> true = lists:all(fun (MsgId) -> not (lists:member(MsgId, MsgIdsTmp)) end, MsgIds), - {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), - [write, raw, binary, delayed_write]), + %% must open with read flag, otherwise will stomp over contents + {ok, MainHdl} = + file:open(form_filename(NonTmpRelatedFile), + [read, write, raw, binary, delayed_write]), {ok, Top} = file:position(MainHdl, Top), %% wipe out any rubbish at the end of the file ok = file:truncate(MainHdl), @@ -1780,6 +1774,7 @@ recover_crashed_compactions1(Files, TmpFile) -> {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary, read_ahead]), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:sync(MainHdl), ok = file:close(MainHdl), ok = file:close(TmpHdl), ok = file:delete(TmpFile), @@ -1862,7 +1857,7 @@ read_message_from_disk(FileHdl, TotalSize) -> end. scan_file_for_valid_messages(File) -> - case file:open(form_filename(File), [raw, binary, read]) of + case file:open(form_filename(File), [raw, binary, read, read_ahead]) of {ok, Hdl} -> Valid = scan_file_for_valid_messages(Hdl, 0, []), %% if something really bad's happened, the close could fail, but ignore diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl index 44582dc424..29216d77d2 100644 --- a/src/rabbit_memory_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -56,7 +56,7 @@ -endif. -record(state, { available_tokens, - liberated_processes, + processes, callbacks, tokens_per_byte, hibernate, @@ -166,7 +166,7 @@ init([]) -> true -> ?TOTAL_TOKENS / MemAvail end, {ok, #state { available_tokens = ?TOTAL_TOKENS, - liberated_processes = dict:new(), + processes = dict:new(), callbacks = dict:new(), tokens_per_byte = TPB, hibernate = queue:new(), @@ -176,64 +176,65 @@ init([]) -> handle_call(info, _From, State) -> State1 = #state { available_tokens = Avail, - liberated_processes = Libre, + processes = Procs, hibernate = Sleepy, unoppressable = Unoppressable } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, - { liberated_processes, dict:to_list(Libre) }, + { processes, dict:to_list(Procs) }, { hibernated_processes, queue:to_list(Sleepy) }, { unoppressable_processes, sets:to_list(Unoppressable) }], State1}. handle_cast({report_memory, Pid, Memory, Hibernating}, - State = #state { liberated_processes = Libre, + State = #state { processes = Procs, available_tokens = Avail, callbacks = Callbacks, tokens_per_byte = TPB, alarmed = Alarmed }) -> Req = rabbit_misc:ceil(TPB * Memory), LibreActivity = if Hibernating -> hibernate; - true -> active - end, + true -> active + end, {StateN = #state { hibernate = Sleepy }, ActivityNew} = - case find_process(Pid, Libre) of + case find_process(Pid, Procs) of {libre, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, State1 = #state { available_tokens = Avail2, - liberated_processes = Libre1 } + processes = Procs1 } = free_upto(Pid, Req, State #state { available_tokens = Avail1 }), case Req > Avail2 of true -> %% nowt we can do, oppress the process ok = set_process_mode(Callbacks, Pid, oppressed), - {State1 #state { liberated_processes = - dict:erase(Pid, Libre1) }, oppressed}; + {State1 #state { processes = + dict:store(Pid, {Req, oppressed}, + Procs1) }, oppressed}; false -> %% keep liberated {State1 #state - { liberated_processes = - dict:store(Pid, {Req, LibreActivity}, Libre1), + { processes = + dict:store(Pid, {Req, LibreActivity}, Procs1), available_tokens = Avail2 - Req }, LibreActivity} end; - oppressed -> - case Alarmed of + {oppressed, OrigReq} -> + case Alarmed orelse Hibernating orelse + (Req > OrigReq * 0.95 andalso Req < OrigReq * 1.05) of true -> {State, oppressed}; false -> State1 = #state { available_tokens = Avail1, - liberated_processes = Libre1 } = + processes = Procs1 } = free_upto(Pid, Req, State), - case Req > Avail1 orelse Hibernating of + case Req > Avail1 of true -> - %% not enough space, or no compelling - %% reason, so stay oppressed + %% not enough space, so stay oppressed {State1, oppressed}; false -> %% can liberate the process set_process_mode(Callbacks, Pid, liberated), {State1 #state { - liberated_processes = + processes = dict:store(Pid, {Req, LibreActivity}, - Libre1), + Procs1), available_tokens = Avail1 - Req }, LibreActivity} end @@ -266,16 +267,14 @@ handle_cast({conserve_memory, Conserve}, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, - liberated_processes = Libre }) -> - State1 = case find_process(Pid, Libre) of - oppressed -> - State; - {libre, {Alloc, _Activity}} -> - State #state { available_tokens = Avail + Alloc, - liberated_processes = - dict:erase(Pid, Libre) } - end, - {noreply, State1}; + processes = Procs }) -> + State1 = State #state { processes = dict:erase(Pid, Procs) }, + {noreply, case find_process(Pid, Procs) of + {oppressed, _OrigReq} -> + State1; + {libre, {Alloc, _Activity}} -> + State1 #state { available_tokens = Avail + Alloc } + end}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(_Info, State) -> @@ -287,22 +286,23 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -find_process(Pid, Libre) -> - case dict:find(Pid, Libre) of - {ok, Value} -> {libre, Value}; - error -> oppressed +find_process(Pid, Procs) -> + case dict:find(Pid, Procs) of + {ok, {OrigReq, oppressed}} -> {oppressed, OrigReq}; + {ok, Value = {_Alloc, _Activity}} -> {libre, Value}; + error -> {oppressed, -9999} end. set_process_mode(Callbacks, Pid, Mode) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), erlang:apply(Module, Function, Args ++ [Mode]). -tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) -> - tidy_and_sum(hibernate, Libre, fun queue:out/1, +tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) -> + tidy_and_sum(hibernate, Procs, fun queue:out/1, fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, IgnorePids, Sleepy, queue:new(), 0). -tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, +tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet, CataInit, AnaInit, AllocAcc) -> case Catamorphism(CataInit) of {empty, _CataInit} -> {AnaInit, AllocAcc}; @@ -312,7 +312,7 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, true -> {DupCheckSet, AnaInit, AllocAcc}; false -> - case find_process(Pid, Libre) of + case find_process(Pid, Procs) of {libre, {Alloc, AtomExpected}} -> {sets:add_element(Pid, DupCheckSet), Anamorphism(Pid, Alloc, AnaInit), @@ -321,13 +321,13 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, {DupCheckSet, AnaInit, AllocAcc} end end, - tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, + tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) -> free_from(Callbacks, - fun(Libre1, Sleepy1, SleepyAcc) -> + fun(Procs1, Sleepy1, SleepyAcc) -> case queue:out(Sleepy1) of {empty, _Sleepy2} -> empty; @@ -336,37 +336,37 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> true -> {skip, Sleepy2, queue:in(Pid, SleepyAcc)}; false -> {Alloc, hibernate} = - dict:fetch(Pid, Libre1), + dict:fetch(Pid, Procs1), {value, Sleepy2, Pid, Alloc} end end - end, fun queue:join/2, Libre, Sleepy, queue:new(), Req). + end, fun queue:join/2, Procs, Sleepy, queue:new(), Req). -free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) -> - case Hylomorphism(Libre, CataInit, AnaInit) of +free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req) -> + case Hylomorphism(Procs, CataInit, AnaInit) of empty -> - {AnaInit, Libre, Req}; + {AnaInit, Procs, Req}; {skip, CataInit1, AnaInit1} -> - free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit1, + free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit1, AnaInit1, Req); {value, CataInit1, Pid, Alloc} -> - Libre1 = dict:erase(Pid, Libre), + Procs1 = dict:store(Pid, {Alloc, oppressed}, Procs), ok = set_process_mode(Callbacks, Pid, oppressed), case Req > Alloc of - true -> free_from(Callbacks, Hylomorphism, BaseCase, Libre1, + true -> free_from(Callbacks, Hylomorphism, BaseCase, Procs1, CataInit1, AnaInit, Req - Alloc); - false -> {BaseCase(CataInit1, AnaInit), Libre1, Req - Alloc} + false -> {BaseCase(CataInit1, AnaInit), Procs1, Req - Alloc} end end. free_upto(Pid, Req, State = #state { available_tokens = Avail, - liberated_processes = Libre, + processes = Procs, callbacks = Callbacks, hibernate = Sleepy, unoppressable = Unoppressable }) when Req > Avail -> Unoppressable1 = sets:add_element(Pid, Unoppressable), - {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs), case Req > Avail + SleepySum of true -> %% not enough in sleepy, just return tidied state State #state { hibernate = Sleepy1 }; @@ -374,11 +374,11 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail, %% ReqRem1 will be <= 0 because it's likely we'll have %% freed more than we need, thus Req - ReqRem1 is total %% freed - {Sleepy2, Libre1, ReqRem} = + {Sleepy2, Procs1, ReqRem} = free_upto_sleepy(Unoppressable1, Callbacks, - Sleepy1, Libre, Req), + Sleepy1, Procs, Req), State #state { available_tokens = Avail + (Req - ReqRem), - liberated_processes = Libre1, + processes = Procs1, hibernate = Sleepy2 } end; free_upto(_Pid, _Req, State) -> |
