diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 95ed8adf78..178771b8c7 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -66,6 +66,8 @@ -define(FILE_EXTENSION_DETS, ".dets"). -define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). -define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds +-define(BATCH_SIZE, 10000). +-define(CACHE_MAX_SIZE, 10485760). -define(SERVER, ?MODULE). @@ -856,7 +858,7 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> true = try case ets:update_counter(Cache, MsgId, {4, -1}) of - N when N =< 0 -> ets:delete(Cache, MsgId); + N when N =< 0 -> true = ets:delete(Cache, MsgId); _N -> true end catch error:badarg -> @@ -867,15 +869,21 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> end, ok. -insert_into_cache(Message = #basic_message { guid = MsgId }, - MsgSize, Forced, #dqstate { message_cache = Cache }) -> - Count = case Forced of - true -> 0; - false -> 1 - end, - true = - ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}), - ok. +insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize, + Forced, State = #dqstate { message_cache = Cache }) -> + case cache_is_full(State) of + true -> ok; + false -> Count = case Forced of + true -> 0; + false -> 1 + end, + true = ets:insert_new(Cache, {MsgId, Message, + MsgSize, Count}), + ok + end. + +cache_is_full(#dqstate { message_cache = Cache }) -> + ets:info(Cache, memory) > ?CACHE_MAX_SIZE. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -905,15 +913,19 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), Length = WriteSeqId - ReadSeqId, Count1 = lists:min([Length, Count]), - StateN = - lists:foldl( - fun(N, State1) -> - {ok, _MsgStuff, State2} = - internal_read_message(Q, N, true, true, true, State1), - State2 - end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)), + StateN = internal_prefetch(Q, ReadSeqId + Count1 - 1, ReadSeqId, State), {ok, StateN}. +internal_prefetch(_Q, Target, Target, State) -> + State; +internal_prefetch(Q, Target, ReadSeqId, State) -> + {ok, _MsgStuff, State1} = + internal_read_message(Q, ReadSeqId, true, true, true, State), + case cache_is_full(State1) of + true -> State1; + false -> internal_prefetch(Q, Target, ReadSeqId + 1, State1) + end. + internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). @@ -1571,7 +1583,7 @@ load_from_disk(State) -> {State3, OldQ, MsgSeqIds, Len}) -> {State4, MsgSeqIds1, Len1} = case {OldQ == Q, MsgSeqIds} of - {true, _} when Len < 10000 -> + {true, _} when Len < ?BATCH_SIZE -> {State3, MsgSeqIds, Len}; {false, []} -> {State3, MsgSeqIds, Len}; {_, _} -> |
