diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-17 17:26:37 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-17 17:26:37 +0100 |
| commit | 1cf7c30120d48a2d6e4820e7f858119463e3ae51 (patch) | |
| tree | 70e0f1426845bf44872e4824c8449a1ba5b5b49c | |
| parent | 3e16b75ac21e8dabd5c8bcfe4ac11cd67a4f19c7 (diff) | |
| download | rabbitmq-server-git-1cf7c30120d48a2d6e4820e7f858119463e3ae51.tar.gz | |
ok, limits on the cache, and on prefetch.
I decided the right thing to do is to prefer older messages in the cache to younger ones. This is because they're more likely to be used sooner. Which means we just fill it up and then leave it alone, which is nice and simple.
Things are pretty much ok with it now, but the whole notion of prefetch is still wrong and needs to be changed to be driven by the mixed queue, not the disk_queue. For one reason, currently, if two or more queues issue prefetch requests, and the first fills the cache up, then the 2nd won't do anything. The cache is useful, but shouldn't be abused for prefetching purposes. The two things are separate.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 95ed8adf78..178771b8c7 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -66,6 +66,8 @@ -define(FILE_EXTENSION_DETS, ".dets"). -define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). -define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds +-define(BATCH_SIZE, 10000). +-define(CACHE_MAX_SIZE, 10485760). -define(SERVER, ?MODULE). @@ -856,7 +858,7 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> true = try case ets:update_counter(Cache, MsgId, {4, -1}) of - N when N =< 0 -> ets:delete(Cache, MsgId); + N when N =< 0 -> true = ets:delete(Cache, MsgId); _N -> true end catch error:badarg -> @@ -867,15 +869,21 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> end, ok. -insert_into_cache(Message = #basic_message { guid = MsgId }, - MsgSize, Forced, #dqstate { message_cache = Cache }) -> - Count = case Forced of - true -> 0; - false -> 1 - end, - true = - ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}), - ok. +insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize, + Forced, State = #dqstate { message_cache = Cache }) -> + case cache_is_full(State) of + true -> ok; + false -> Count = case Forced of + true -> 0; + false -> 1 + end, + true = ets:insert_new(Cache, {MsgId, Message, + MsgSize, Count}), + ok + end. + +cache_is_full(#dqstate { message_cache = Cache }) -> + ets:info(Cache, memory) > ?CACHE_MAX_SIZE. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -905,15 +913,19 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), Length = WriteSeqId - ReadSeqId, Count1 = lists:min([Length, Count]), - StateN = - lists:foldl( - fun(N, State1) -> - {ok, _MsgStuff, State2} = - internal_read_message(Q, N, true, true, true, State1), - State2 - end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)), + StateN = internal_prefetch(Q, ReadSeqId + Count1 - 1, ReadSeqId, State), {ok, StateN}. +internal_prefetch(_Q, Target, Target, State) -> + State; +internal_prefetch(Q, Target, ReadSeqId, State) -> + {ok, _MsgStuff, State1} = + internal_read_message(Q, ReadSeqId, true, true, true, State), + case cache_is_full(State1) of + true -> State1; + false -> internal_prefetch(Q, Target, ReadSeqId + 1, State1) + end. + internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). @@ -1571,7 +1583,7 @@ load_from_disk(State) -> {State3, OldQ, MsgSeqIds, Len}) -> {State4, MsgSeqIds1, Len1} = case {OldQ == Q, MsgSeqIds} of - {true, _} when Len < 10000 -> + {true, _} when Len < ?BATCH_SIZE -> {State3, MsgSeqIds, Len}; {false, []} -> {State3, MsgSeqIds, Len}; {_, _} -> |
