diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl | 112 |
4 files changed, 96 insertions, 104 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8962b12ea6..f665ad92af 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -148,6 +148,10 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) end}, + {"guid generator", + fun () -> + ok = start_child(rabbit_guid) + end}, {"disk queue", fun () -> ok = start_child(rabbit_disk_queue) @@ -162,10 +166,6 @@ start(normal, []) -> ok = rabbit_disk_queue:delete_non_durable_queues( DurableQueueNames) end}, - {"guid generator", - fun () -> - ok = start_child(rabbit_guid) - end}, {"builtin applications", fun () -> {ok, DefaultVHost} = application:get_env(default_vhost), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 916a241062..3538b6fb61 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -114,8 +114,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> blocked_consumers = queue:new(), memory_report_timer = undefined }, - %% first thing we must do is report_memory which will clear out - %% the 'undefined' values in gain and loss in mixed_queue state + %% first thing we must do is report_memory. {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -721,8 +720,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State) -> {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), - reply({ok, Count}, - State #q { mixed_state = MS }); + reply({ok, Count}, State #q { mixed_state = MS }); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -738,10 +736,9 @@ handle_call({claim_queue, ReaderPid}, _From, %% pid... reply(locked, State); ok -> - reply(ok, State #q { owner = - {ReaderPid, - erlang:monitor(process, ReaderPid)} }) - + reply(ok, + State#q{ owner = {ReaderPid, erlang:monitor( + process, ReaderPid)} }) end; {ReaderPid, _MonitorRef} -> reply(ok, State); @@ -827,8 +824,8 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> noreply(State #q { mixed_state = MS1 }). handle_info(report_memory, State) -> - %% deliberately don't call noreply/2 as we don't want to restart the timer. - %% By unsetting the timer, we force a report on the next normal message + %% deliberately don't call noreply/2/3 as we don't want to start the timer. + %% By unsetting the timer, we force a report on the next normal message. {noreply, State #q { memory_report_timer = undefined }, hibernate}; handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 18b250c55c..3e38be8e09 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -53,8 +53,8 @@ -include("rabbit.hrl"). -define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). @@ -68,6 +68,7 @@ -define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(BATCH_SIZE, 10000). -define(CACHE_MAX_SIZE, 10485760). +-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]). -define(SERVER, ?MODULE). @@ -431,22 +432,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), - Path = form_filename(CurrentName), - Exists = case file:read_file_info(Path) of - {error,enoent} -> false; - {ok, _} -> true - end, %% read is only needed so that we can seek - {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), - case Exists of - true -> {ok, Offset} = file:position(FileHdl, {bof, Offset}); - false -> %% new file, so preallocate - ok = preallocate(FileHdl, FileSizeLimit, Offset) - end, + {ok, FileHdl} = file:open(form_filename(CurrentName), + [read, write, raw, binary, delayed_write]), + {ok, Offset} = file:position(FileHdl, {bof, Offset}), State2 = State1 #dqstate { current_file_handle = FileHdl }, %% by reporting a memory use of 0, we guarantee the manager will - %% grant us to ram_disk mode. We have to start in ram_disk mode - %% because we can't find values for mnesia_bytes_per_record or + %% not oppress us. We have to start in ram_disk mode because we + %% can't find values for mnesia_bytes_per_record or %% ets_bytes_per_record otherwise. ok = rabbit_memory_manager:report_memory(self(), 0, false), ok = report_memory(false, State2), @@ -1231,7 +1224,6 @@ maybe_roll_to_new_file(Offset, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]), - ok = preallocate(NextHdl, FileSizeLimit, 0), true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State2 = State1 #dqstate { current_file_name = NextName, @@ -1244,12 +1236,6 @@ maybe_roll_to_new_file(Offset, maybe_roll_to_new_file(_, State) -> {ok, State}. -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), - ok = file:truncate(Hdl), - {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), - ok. - %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- compact(FilesSet, State) -> @@ -1330,6 +1316,12 @@ sort_msg_locations_by_offset(Dir, List) -> Comp(OffA, OffB) end, List). +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + ok. + truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok = file:truncate(FileHdl), @@ -1339,11 +1331,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, - State1) -> - State = close_file(Source, close_file(Destination, State1)), + State) -> + State1 = close_file(Source, close_file(Destination, State)), {ok, SourceHdl} = file:open(form_filename(Source), - [read, write, raw, binary, read_ahead, delayed_write]), + [read, raw, binary, read_ahead]), {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, read_ahead, delayed_write]), @@ -1378,11 +1370,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop, %% enforce it anyway end, sort_msg_locations_by_offset( asc, dets_ets_match_object( - State, #message_store_entry + State1, #message_store_entry { file = Destination, _ = '_' }))), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), + DestinationHdl, TmpHdl, Destination, State1), TmpSize = DestinationValid - DestinationContiguousTop, %% so now Tmp contains everything we need to salvage from %% Destination, and MsgLocationDets has been updated to @@ -1399,16 +1391,16 @@ combine_files({Source, SourceValid, _SourceContiguousTop, end, SourceWorkList = sort_msg_locations_by_offset( - asc, dets_ets_match_object(State, #message_store_entry + asc, dets_ets_match_object(State1, #message_store_entry { file = Source, _ = '_' })), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State), + SourceHdl, DestinationHdl, Destination, State1), %% tidy up ok = file:sync(DestinationHdl), ok = file:close(SourceHdl), ok = file:close(DestinationHdl), ok = file:delete(form_filename(Source)), - State. + State1. copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> @@ -1748,7 +1740,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% note this also catches the case when the tmp file %% is empty ok = file:delete(TmpFile); - _False -> + false -> %% we're in case 4 above. Check that everything in the %% main file is a valid message in mnesia verify_messages_in_mnesia(MsgIds), @@ -1760,8 +1752,10 @@ recover_crashed_compactions1(Files, TmpFile) -> true = lists:all(fun (MsgId) -> not (lists:member(MsgId, MsgIdsTmp)) end, MsgIds), - {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), - [write, raw, binary, delayed_write]), + %% must open with read flag, otherwise will stomp over contents + {ok, MainHdl} = + file:open(form_filename(NonTmpRelatedFile), + [read, write, raw, binary, delayed_write]), {ok, Top} = file:position(MainHdl, Top), %% wipe out any rubbish at the end of the file ok = file:truncate(MainHdl), @@ -1780,6 +1774,7 @@ recover_crashed_compactions1(Files, TmpFile) -> {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary, read_ahead]), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:sync(MainHdl), ok = file:close(MainHdl), ok = file:close(TmpHdl), ok = file:delete(TmpFile), @@ -1862,7 +1857,7 @@ read_message_from_disk(FileHdl, TotalSize) -> end. scan_file_for_valid_messages(File) -> - case file:open(form_filename(File), [raw, binary, read]) of + case file:open(form_filename(File), [raw, binary, read, read_ahead]) of {ok, Hdl} -> Valid = scan_file_for_valid_messages(Hdl, 0, []), %% if something really bad's happened, the close could fail, but ignore diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl index 44582dc424..29216d77d2 100644 --- a/src/rabbit_memory_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -56,7 +56,7 @@ -endif. -record(state, { available_tokens, - liberated_processes, + processes, callbacks, tokens_per_byte, hibernate, @@ -166,7 +166,7 @@ init([]) -> true -> ?TOTAL_TOKENS / MemAvail end, {ok, #state { available_tokens = ?TOTAL_TOKENS, - liberated_processes = dict:new(), + processes = dict:new(), callbacks = dict:new(), tokens_per_byte = TPB, hibernate = queue:new(), @@ -176,64 +176,65 @@ init([]) -> handle_call(info, _From, State) -> State1 = #state { available_tokens = Avail, - liberated_processes = Libre, + processes = Procs, hibernate = Sleepy, unoppressable = Unoppressable } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, - { liberated_processes, dict:to_list(Libre) }, + { processes, dict:to_list(Procs) }, { hibernated_processes, queue:to_list(Sleepy) }, { unoppressable_processes, sets:to_list(Unoppressable) }], State1}. handle_cast({report_memory, Pid, Memory, Hibernating}, - State = #state { liberated_processes = Libre, + State = #state { processes = Procs, available_tokens = Avail, callbacks = Callbacks, tokens_per_byte = TPB, alarmed = Alarmed }) -> Req = rabbit_misc:ceil(TPB * Memory), LibreActivity = if Hibernating -> hibernate; - true -> active - end, + true -> active + end, {StateN = #state { hibernate = Sleepy }, ActivityNew} = - case find_process(Pid, Libre) of + case find_process(Pid, Procs) of {libre, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, State1 = #state { available_tokens = Avail2, - liberated_processes = Libre1 } + processes = Procs1 } = free_upto(Pid, Req, State #state { available_tokens = Avail1 }), case Req > Avail2 of true -> %% nowt we can do, oppress the process ok = set_process_mode(Callbacks, Pid, oppressed), - {State1 #state { liberated_processes = - dict:erase(Pid, Libre1) }, oppressed}; + {State1 #state { processes = + dict:store(Pid, {Req, oppressed}, + Procs1) }, oppressed}; false -> %% keep liberated {State1 #state - { liberated_processes = - dict:store(Pid, {Req, LibreActivity}, Libre1), + { processes = + dict:store(Pid, {Req, LibreActivity}, Procs1), available_tokens = Avail2 - Req }, LibreActivity} end; - oppressed -> - case Alarmed of + {oppressed, OrigReq} -> + case Alarmed orelse Hibernating orelse + (Req > OrigReq * 0.95 andalso Req < OrigReq * 1.05) of true -> {State, oppressed}; false -> State1 = #state { available_tokens = Avail1, - liberated_processes = Libre1 } = + processes = Procs1 } = free_upto(Pid, Req, State), - case Req > Avail1 orelse Hibernating of + case Req > Avail1 of true -> - %% not enough space, or no compelling - %% reason, so stay oppressed + %% not enough space, so stay oppressed {State1, oppressed}; false -> %% can liberate the process set_process_mode(Callbacks, Pid, liberated), {State1 #state { - liberated_processes = + processes = dict:store(Pid, {Req, LibreActivity}, - Libre1), + Procs1), available_tokens = Avail1 - Req }, LibreActivity} end @@ -266,16 +267,14 @@ handle_cast({conserve_memory, Conserve}, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, - liberated_processes = Libre }) -> - State1 = case find_process(Pid, Libre) of - oppressed -> - State; - {libre, {Alloc, _Activity}} -> - State #state { available_tokens = Avail + Alloc, - liberated_processes = - dict:erase(Pid, Libre) } - end, - {noreply, State1}; + processes = Procs }) -> + State1 = State #state { processes = dict:erase(Pid, Procs) }, + {noreply, case find_process(Pid, Procs) of + {oppressed, _OrigReq} -> + State1; + {libre, {Alloc, _Activity}} -> + State1 #state { available_tokens = Avail + Alloc } + end}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(_Info, State) -> @@ -287,22 +286,23 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -find_process(Pid, Libre) -> - case dict:find(Pid, Libre) of - {ok, Value} -> {libre, Value}; - error -> oppressed +find_process(Pid, Procs) -> + case dict:find(Pid, Procs) of + {ok, {OrigReq, oppressed}} -> {oppressed, OrigReq}; + {ok, Value = {_Alloc, _Activity}} -> {libre, Value}; + error -> {oppressed, -9999} end. set_process_mode(Callbacks, Pid, Mode) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), erlang:apply(Module, Function, Args ++ [Mode]). -tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) -> - tidy_and_sum(hibernate, Libre, fun queue:out/1, +tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) -> + tidy_and_sum(hibernate, Procs, fun queue:out/1, fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, IgnorePids, Sleepy, queue:new(), 0). -tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, +tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet, CataInit, AnaInit, AllocAcc) -> case Catamorphism(CataInit) of {empty, _CataInit} -> {AnaInit, AllocAcc}; @@ -312,7 +312,7 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, true -> {DupCheckSet, AnaInit, AllocAcc}; false -> - case find_process(Pid, Libre) of + case find_process(Pid, Procs) of {libre, {Alloc, AtomExpected}} -> {sets:add_element(Pid, DupCheckSet), Anamorphism(Pid, Alloc, AnaInit), @@ -321,13 +321,13 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, {DupCheckSet, AnaInit, AllocAcc} end end, - tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, + tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) -> free_from(Callbacks, - fun(Libre1, Sleepy1, SleepyAcc) -> + fun(Procs1, Sleepy1, SleepyAcc) -> case queue:out(Sleepy1) of {empty, _Sleepy2} -> empty; @@ -336,37 +336,37 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> true -> {skip, Sleepy2, queue:in(Pid, SleepyAcc)}; false -> {Alloc, hibernate} = - dict:fetch(Pid, Libre1), + dict:fetch(Pid, Procs1), {value, Sleepy2, Pid, Alloc} end end - end, fun queue:join/2, Libre, Sleepy, queue:new(), Req). + end, fun queue:join/2, Procs, Sleepy, queue:new(), Req). -free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) -> - case Hylomorphism(Libre, CataInit, AnaInit) of +free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req) -> + case Hylomorphism(Procs, CataInit, AnaInit) of empty -> - {AnaInit, Libre, Req}; + {AnaInit, Procs, Req}; {skip, CataInit1, AnaInit1} -> - free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit1, + free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit1, AnaInit1, Req); {value, CataInit1, Pid, Alloc} -> - Libre1 = dict:erase(Pid, Libre), + Procs1 = dict:store(Pid, {Alloc, oppressed}, Procs), ok = set_process_mode(Callbacks, Pid, oppressed), case Req > Alloc of - true -> free_from(Callbacks, Hylomorphism, BaseCase, Libre1, + true -> free_from(Callbacks, Hylomorphism, BaseCase, Procs1, CataInit1, AnaInit, Req - Alloc); - false -> {BaseCase(CataInit1, AnaInit), Libre1, Req - Alloc} + false -> {BaseCase(CataInit1, AnaInit), Procs1, Req - Alloc} end end. free_upto(Pid, Req, State = #state { available_tokens = Avail, - liberated_processes = Libre, + processes = Procs, callbacks = Callbacks, hibernate = Sleepy, unoppressable = Unoppressable }) when Req > Avail -> Unoppressable1 = sets:add_element(Pid, Unoppressable), - {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs), case Req > Avail + SleepySum of true -> %% not enough in sleepy, just return tidied state State #state { hibernate = Sleepy1 }; @@ -374,11 +374,11 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail, %% ReqRem1 will be <= 0 because it's likely we'll have %% freed more than we need, thus Req - ReqRem1 is total %% freed - {Sleepy2, Libre1, ReqRem} = + {Sleepy2, Procs1, ReqRem} = free_upto_sleepy(Unoppressable1, Callbacks, - Sleepy1, Libre, Req), + Sleepy1, Procs, Req), State #state { available_tokens = Avail + (Req - ReqRem), - liberated_processes = Libre1, + processes = Procs1, hibernate = Sleepy2 } end; free_upto(_Pid, _Req, State) -> |
