summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_disk_queue.erl63
-rw-r--r--src/rabbit_memory_manager.erl112
4 files changed, 96 insertions, 104 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8962b12ea6..f665ad92af 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -148,6 +148,10 @@ start(normal, []) ->
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
end},
+ {"guid generator",
+ fun () ->
+ ok = start_child(rabbit_guid)
+ end},
{"disk queue",
fun () ->
ok = start_child(rabbit_disk_queue)
@@ -162,10 +166,6 @@ start(normal, []) ->
ok = rabbit_disk_queue:delete_non_durable_queues(
DurableQueueNames)
end},
- {"guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
{"builtin applications",
fun () ->
{ok, DefaultVHost} = application:get_env(default_vhost),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 916a241062..3538b6fb61 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -114,8 +114,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
blocked_consumers = queue:new(),
memory_report_timer = undefined
},
- %% first thing we must do is report_memory which will clear out
- %% the 'undefined' values in gain and loss in mixed_queue state
+ %% first thing we must do is report_memory.
{ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -721,8 +720,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
handle_call(purge, _From, State) ->
{Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
- reply({ok, Count},
- State #q { mixed_state = MS });
+ reply({ok, Count}, State #q { mixed_state = MS });
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -738,10 +736,9 @@ handle_call({claim_queue, ReaderPid}, _From,
%% pid...
reply(locked, State);
ok ->
- reply(ok, State #q { owner =
- {ReaderPid,
- erlang:monitor(process, ReaderPid)} })
-
+ reply(ok,
+ State#q{ owner = {ReaderPid, erlang:monitor(
+ process, ReaderPid)} })
end;
{ReaderPid, _MonitorRef} ->
reply(ok, State);
@@ -827,8 +824,8 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) ->
noreply(State #q { mixed_state = MS1 }).
handle_info(report_memory, State) ->
- %% deliberately don't call noreply/2 as we don't want to restart the timer.
- %% By unsetting the timer, we force a report on the next normal message
+ %% deliberately don't call noreply/2/3 as we don't want to start the timer.
+ %% By unsetting the timer, we force a report on the next normal message.
{noreply, State #q { memory_report_timer = undefined }, hibernate};
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 18b250c55c..3e38be8e09 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -53,8 +53,8 @@
-include("rabbit.hrl").
-define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK_TRANSIENT, 255).
--define(WRITE_OK_PERSISTENT, 254).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
@@ -68,6 +68,7 @@
-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
-define(BATCH_SIZE, 10000).
-define(CACHE_MAX_SIZE, 10485760).
+-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]).
-define(SERVER, ?MODULE).
@@ -431,22 +432,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
load_from_disk(State),
- Path = form_filename(CurrentName),
- Exists = case file:read_file_info(Path) of
- {error,enoent} -> false;
- {ok, _} -> true
- end,
%% read is only needed so that we can seek
- {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
- case Exists of
- true -> {ok, Offset} = file:position(FileHdl, {bof, Offset});
- false -> %% new file, so preallocate
- ok = preallocate(FileHdl, FileSizeLimit, Offset)
- end,
+ {ok, FileHdl} = file:open(form_filename(CurrentName),
+ [read, write, raw, binary, delayed_write]),
+ {ok, Offset} = file:position(FileHdl, {bof, Offset}),
State2 = State1 #dqstate { current_file_handle = FileHdl },
%% by reporting a memory use of 0, we guarantee the manager will
- %% grant us to ram_disk mode. We have to start in ram_disk mode
- %% because we can't find values for mnesia_bytes_per_record or
+ %% not oppress us. We have to start in ram_disk mode because we
+ %% can't find values for mnesia_bytes_per_record or
%% ets_bytes_per_record otherwise.
ok = rabbit_memory_manager:report_memory(self(), 0, false),
ok = report_memory(false, State2),
@@ -1231,7 +1224,6 @@ maybe_roll_to_new_file(Offset,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
{ok, NextHdl} = file:open(form_filename(NextName),
[write, raw, binary, delayed_write]),
- ok = preallocate(NextHdl, FileSizeLimit, 0),
true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
State2 = State1 #dqstate { current_file_name = NextName,
@@ -1244,12 +1236,6 @@ maybe_roll_to_new_file(Offset,
maybe_roll_to_new_file(_, State) ->
{ok, State}.
-preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
- ok = file:truncate(Hdl),
- {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
- ok.
-
%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
compact(FilesSet, State) ->
@@ -1330,6 +1316,12 @@ sort_msg_locations_by_offset(Dir, List) ->
Comp(OffA, OffB)
end, List).
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
+ ok = file:truncate(Hdl),
+ {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
+ ok.
+
truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
{ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
ok = file:truncate(FileHdl),
@@ -1339,11 +1331,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
_SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop,
_DestinationLeft, _DestinationRight},
- State1) ->
- State = close_file(Source, close_file(Destination, State1)),
+ State) ->
+ State1 = close_file(Source, close_file(Destination, State)),
{ok, SourceHdl} =
file:open(form_filename(Source),
- [read, write, raw, binary, read_ahead, delayed_write]),
+ [read, raw, binary, read_ahead]),
{ok, DestinationHdl} =
file:open(form_filename(Destination),
[read, write, raw, binary, read_ahead, delayed_write]),
@@ -1378,11 +1370,11 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
%% enforce it anyway
end, sort_msg_locations_by_offset(
asc, dets_ets_match_object(
- State, #message_store_entry
+ State1, #message_store_entry
{ file = Destination, _ = '_' }))),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
+ DestinationHdl, TmpHdl, Destination, State1),
TmpSize = DestinationValid - DestinationContiguousTop,
%% so now Tmp contains everything we need to salvage from
%% Destination, and MsgLocationDets has been updated to
@@ -1399,16 +1391,16 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
end,
SourceWorkList =
sort_msg_locations_by_offset(
- asc, dets_ets_match_object(State, #message_store_entry
+ asc, dets_ets_match_object(State1, #message_store_entry
{ file = Source, _ = '_' })),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State),
+ SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
ok = file:sync(DestinationHdl),
ok = file:close(SourceHdl),
ok = file:close(DestinationHdl),
ok = file:delete(form_filename(Source)),
- State.
+ State1.
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
@@ -1748,7 +1740,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% note this also catches the case when the tmp file
%% is empty
ok = file:delete(TmpFile);
- _False ->
+ false ->
%% we're in case 4 above. Check that everything in the
%% main file is a valid message in mnesia
verify_messages_in_mnesia(MsgIds),
@@ -1760,8 +1752,10 @@ recover_crashed_compactions1(Files, TmpFile) ->
true = lists:all(fun (MsgId) ->
not (lists:member(MsgId, MsgIdsTmp))
end, MsgIds),
- {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
- [write, raw, binary, delayed_write]),
+ %% must open with read flag, otherwise will stomp over contents
+ {ok, MainHdl} =
+ file:open(form_filename(NonTmpRelatedFile),
+ [read, write, raw, binary, delayed_write]),
{ok, Top} = file:position(MainHdl, Top),
%% wipe out any rubbish at the end of the file
ok = file:truncate(MainHdl),
@@ -1780,6 +1774,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
{ok, TmpHdl} = file:open(form_filename(TmpFile),
[read, raw, binary, read_ahead]),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file:sync(MainHdl),
ok = file:close(MainHdl),
ok = file:close(TmpHdl),
ok = file:delete(TmpFile),
@@ -1862,7 +1857,7 @@ read_message_from_disk(FileHdl, TotalSize) ->
end.
scan_file_for_valid_messages(File) ->
- case file:open(form_filename(File), [raw, binary, read]) of
+ case file:open(form_filename(File), [raw, binary, read, read_ahead]) of
{ok, Hdl} ->
Valid = scan_file_for_valid_messages(Hdl, 0, []),
%% if something really bad's happened, the close could fail, but ignore
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl
index 44582dc424..29216d77d2 100644
--- a/src/rabbit_memory_manager.erl
+++ b/src/rabbit_memory_manager.erl
@@ -56,7 +56,7 @@
-endif.
-record(state, { available_tokens,
- liberated_processes,
+ processes,
callbacks,
tokens_per_byte,
hibernate,
@@ -166,7 +166,7 @@ init([]) ->
true -> ?TOTAL_TOKENS / MemAvail
end,
{ok, #state { available_tokens = ?TOTAL_TOKENS,
- liberated_processes = dict:new(),
+ processes = dict:new(),
callbacks = dict:new(),
tokens_per_byte = TPB,
hibernate = queue:new(),
@@ -176,64 +176,65 @@ init([]) ->
handle_call(info, _From, State) ->
State1 = #state { available_tokens = Avail,
- liberated_processes = Libre,
+ processes = Procs,
hibernate = Sleepy,
unoppressable = Unoppressable } =
free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
{reply, [{ available_tokens, Avail },
- { liberated_processes, dict:to_list(Libre) },
+ { processes, dict:to_list(Procs) },
{ hibernated_processes, queue:to_list(Sleepy) },
{ unoppressable_processes, sets:to_list(Unoppressable) }], State1}.
handle_cast({report_memory, Pid, Memory, Hibernating},
- State = #state { liberated_processes = Libre,
+ State = #state { processes = Procs,
available_tokens = Avail,
callbacks = Callbacks,
tokens_per_byte = TPB,
alarmed = Alarmed }) ->
Req = rabbit_misc:ceil(TPB * Memory),
LibreActivity = if Hibernating -> hibernate;
- true -> active
- end,
+ true -> active
+ end,
{StateN = #state { hibernate = Sleepy }, ActivityNew} =
- case find_process(Pid, Libre) of
+ case find_process(Pid, Procs) of
{libre, {OAlloc, _OActivity}} ->
Avail1 = Avail + OAlloc,
State1 = #state { available_tokens = Avail2,
- liberated_processes = Libre1 }
+ processes = Procs1 }
= free_upto(Pid, Req,
State #state { available_tokens = Avail1 }),
case Req > Avail2 of
true -> %% nowt we can do, oppress the process
ok = set_process_mode(Callbacks, Pid, oppressed),
- {State1 #state { liberated_processes =
- dict:erase(Pid, Libre1) }, oppressed};
+ {State1 #state { processes =
+ dict:store(Pid, {Req, oppressed},
+ Procs1) }, oppressed};
false -> %% keep liberated
{State1 #state
- { liberated_processes =
- dict:store(Pid, {Req, LibreActivity}, Libre1),
+ { processes =
+ dict:store(Pid, {Req, LibreActivity}, Procs1),
available_tokens = Avail2 - Req },
LibreActivity}
end;
- oppressed ->
- case Alarmed of
+ {oppressed, OrigReq} ->
+ case Alarmed orelse Hibernating orelse
+ (Req > OrigReq * 0.95 andalso Req < OrigReq * 1.05) of
true ->
{State, oppressed};
false ->
State1 = #state { available_tokens = Avail1,
- liberated_processes = Libre1 } =
+ processes = Procs1 } =
free_upto(Pid, Req, State),
- case Req > Avail1 orelse Hibernating of
+ case Req > Avail1 of
true ->
- %% not enough space, or no compelling
- %% reason, so stay oppressed
+ %% not enough space, so stay oppressed
{State1, oppressed};
false -> %% can liberate the process
set_process_mode(Callbacks, Pid, liberated),
{State1 #state {
- liberated_processes =
+ processes =
dict:store(Pid, {Req, LibreActivity},
- Libre1),
+ Procs1),
available_tokens = Avail1 - Req },
LibreActivity}
end
@@ -266,16 +267,14 @@ handle_cast({conserve_memory, Conserve}, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
- liberated_processes = Libre }) ->
- State1 = case find_process(Pid, Libre) of
- oppressed ->
- State;
- {libre, {Alloc, _Activity}} ->
- State #state { available_tokens = Avail + Alloc,
- liberated_processes =
- dict:erase(Pid, Libre) }
- end,
- {noreply, State1};
+ processes = Procs }) ->
+ State1 = State #state { processes = dict:erase(Pid, Procs) },
+ {noreply, case find_process(Pid, Procs) of
+ {oppressed, _OrigReq} ->
+ State1;
+ {libre, {Alloc, _Activity}} ->
+ State1 #state { available_tokens = Avail + Alloc }
+ end};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(_Info, State) ->
@@ -287,22 +286,23 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-find_process(Pid, Libre) ->
- case dict:find(Pid, Libre) of
- {ok, Value} -> {libre, Value};
- error -> oppressed
+find_process(Pid, Procs) ->
+ case dict:find(Pid, Procs) of
+ {ok, {OrigReq, oppressed}} -> {oppressed, OrigReq};
+ {ok, Value = {_Alloc, _Activity}} -> {libre, Value};
+ error -> {oppressed, -9999}
end.
set_process_mode(Callbacks, Pid, Mode) ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
erlang:apply(Module, Function, Args ++ [Mode]).
-tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) ->
- tidy_and_sum(hibernate, Libre, fun queue:out/1,
+tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) ->
+ tidy_and_sum(hibernate, Procs, fun queue:out/1,
fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
IgnorePids, Sleepy, queue:new(), 0).
-tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet,
+tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet,
CataInit, AnaInit, AllocAcc) ->
case Catamorphism(CataInit) of
{empty, _CataInit} -> {AnaInit, AllocAcc};
@@ -312,7 +312,7 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet,
true ->
{DupCheckSet, AnaInit, AllocAcc};
false ->
- case find_process(Pid, Libre) of
+ case find_process(Pid, Procs) of
{libre, {Alloc, AtomExpected}} ->
{sets:add_element(Pid, DupCheckSet),
Anamorphism(Pid, Alloc, AnaInit),
@@ -321,13 +321,13 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet,
{DupCheckSet, AnaInit, AllocAcc}
end
end,
- tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism,
+ tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism,
DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
end.
-free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) ->
+free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) ->
free_from(Callbacks,
- fun(Libre1, Sleepy1, SleepyAcc) ->
+ fun(Procs1, Sleepy1, SleepyAcc) ->
case queue:out(Sleepy1) of
{empty, _Sleepy2} ->
empty;
@@ -336,37 +336,37 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) ->
true -> {skip, Sleepy2,
queue:in(Pid, SleepyAcc)};
false -> {Alloc, hibernate} =
- dict:fetch(Pid, Libre1),
+ dict:fetch(Pid, Procs1),
{value, Sleepy2, Pid, Alloc}
end
end
- end, fun queue:join/2, Libre, Sleepy, queue:new(), Req).
+ end, fun queue:join/2, Procs, Sleepy, queue:new(), Req).
-free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) ->
- case Hylomorphism(Libre, CataInit, AnaInit) of
+free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req) ->
+ case Hylomorphism(Procs, CataInit, AnaInit) of
empty ->
- {AnaInit, Libre, Req};
+ {AnaInit, Procs, Req};
{skip, CataInit1, AnaInit1} ->
- free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit1,
+ free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit1,
AnaInit1, Req);
{value, CataInit1, Pid, Alloc} ->
- Libre1 = dict:erase(Pid, Libre),
+ Procs1 = dict:store(Pid, {Alloc, oppressed}, Procs),
ok = set_process_mode(Callbacks, Pid, oppressed),
case Req > Alloc of
- true -> free_from(Callbacks, Hylomorphism, BaseCase, Libre1,
+ true -> free_from(Callbacks, Hylomorphism, BaseCase, Procs1,
CataInit1, AnaInit, Req - Alloc);
- false -> {BaseCase(CataInit1, AnaInit), Libre1, Req - Alloc}
+ false -> {BaseCase(CataInit1, AnaInit), Procs1, Req - Alloc}
end
end.
free_upto(Pid, Req, State = #state { available_tokens = Avail,
- liberated_processes = Libre,
+ processes = Procs,
callbacks = Callbacks,
hibernate = Sleepy,
unoppressable = Unoppressable })
when Req > Avail ->
Unoppressable1 = sets:add_element(Pid, Unoppressable),
- {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre),
+ {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs),
case Req > Avail + SleepySum of
true -> %% not enough in sleepy, just return tidied state
State #state { hibernate = Sleepy1 };
@@ -374,11 +374,11 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail,
%% ReqRem1 will be <= 0 because it's likely we'll have
%% freed more than we need, thus Req - ReqRem1 is total
%% freed
- {Sleepy2, Libre1, ReqRem} =
+ {Sleepy2, Procs1, ReqRem} =
free_upto_sleepy(Unoppressable1, Callbacks,
- Sleepy1, Libre, Req),
+ Sleepy1, Procs, Req),
State #state { available_tokens = Avail + (Req - ReqRem),
- liberated_processes = Libre1,
+ processes = Procs1,
hibernate = Sleepy2 }
end;
free_upto(_Pid, _Req, State) ->