summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl48
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 95ed8adf78..178771b8c7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -66,6 +66,8 @@
-define(FILE_EXTENSION_DETS, ".dets").
-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
+-define(BATCH_SIZE, 10000).
+-define(CACHE_MAX_SIZE, 10485760).
-define(SERVER, ?MODULE).
@@ -856,7 +858,7 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
true = try case ets:update_counter(Cache, MsgId, {4, -1}) of
- N when N =< 0 -> ets:delete(Cache, MsgId);
+ N when N =< 0 -> true = ets:delete(Cache, MsgId);
_N -> true
end
catch error:badarg ->
@@ -867,15 +869,21 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
end,
ok.
-insert_into_cache(Message = #basic_message { guid = MsgId },
- MsgSize, Forced, #dqstate { message_cache = Cache }) ->
- Count = case Forced of
- true -> 0;
- false -> 1
- end,
- true =
- ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
- ok.
+insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize,
+ Forced, State = #dqstate { message_cache = Cache }) ->
+ case cache_is_full(State) of
+ true -> ok;
+ false -> Count = case Forced of
+ true -> 0;
+ false -> 1
+ end,
+ true = ets:insert_new(Cache, {MsgId, Message,
+ MsgSize, Count}),
+ ok
+ end.
+
+cache_is_full(#dqstate { message_cache = Cache }) ->
+ ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -905,15 +913,19 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
Length = WriteSeqId - ReadSeqId,
Count1 = lists:min([Length, Count]),
- StateN =
- lists:foldl(
- fun(N, State1) ->
- {ok, _MsgStuff, State2} =
- internal_read_message(Q, N, true, true, true, State1),
- State2
- end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
+ StateN = internal_prefetch(Q, ReadSeqId + Count1 - 1, ReadSeqId, State),
{ok, StateN}.
+internal_prefetch(_Q, Target, Target, State) ->
+ State;
+internal_prefetch(Q, Target, ReadSeqId, State) ->
+ {ok, _MsgStuff, State1} =
+ internal_read_message(Q, ReadSeqId, true, true, true, State),
+ case cache_is_full(State1) of
+ true -> State1;
+ false -> internal_prefetch(Q, Target, ReadSeqId + 1, State1)
+ end.
+
internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
@@ -1571,7 +1583,7 @@ load_from_disk(State) ->
{State3, OldQ, MsgSeqIds, Len}) ->
{State4, MsgSeqIds1, Len1} =
case {OldQ == Q, MsgSeqIds} of
- {true, _} when Len < 10000 ->
+ {true, _} when Len < ?BATCH_SIZE ->
{State3, MsgSeqIds, Len};
{false, []} -> {State3, MsgSeqIds, Len};
{_, _} ->