summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-19 16:49:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-19 16:49:29 +0100
commit3fc57e1b7fdfb948e8470b0b63d7ebe1e9c91c25 (patch)
tree7b3c28d134dbff4a0aa7178c70ffa6ef1b5a6823
parent656cbf75101146cac83dffee1ff708bc39d1498d (diff)
downloadrabbitmq-server-git-3fc57e1b7fdfb948e8470b0b63d7ebe1e9c91c25.tar.gz
Added caching layer using ets which, when a message is shared between multiple queues, eliminates the need for multiple reads, provided the /next/ copy of the message is requested before the previous copy of the message has been acked. Should reduce memory pressure.
-rw-r--r--src/rabbit_disk_queue.erl71
1 files changed, 61 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b133f538ed..a7d4e6e3ca 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -58,6 +58,7 @@
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_EXTENSION_DETS, ".dets").
@@ -82,7 +83,8 @@
%% since the last fsync?
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
- read_file_handles_limit %% how many file handles can we open?
+ read_file_handles_limit, %% how many file handles can we open?
+ message_cache %% ets message cache
}).
%% The components:
@@ -385,8 +387,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_dirty = false,
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
- read_file_handles_limit = ReadFileHandlesLimit
- },
+ read_file_handles_limit = ReadFileHandlesLimit,
+ message_cache = ets:new(?CACHE_ETS_NAME,
+ [set, private])
+ },
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
load_from_disk(State),
@@ -684,6 +688,37 @@ msg_to_bin(Msg = #basic_message { content = Content }) ->
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
+remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) ->
+ true = ets:delete(Cache, MsgId),
+ ok.
+
+fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ case ets:lookup(Cache, MsgId) of
+ [] ->
+ not_found;
+ [{MsgId, Message, MsgSize, _RefCount}] ->
+ NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}),
+ {Message, MsgSize, NewRefCount}
+ end.
+
+decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ true = try case ets:update_counter(Cache, MsgId, {4, -1}) of
+ 0 -> ets:delete(Cache, MsgId);
+ _N -> true
+ end
+ catch error:badarg ->
+ %% MsgId is not in there because although it's been
+ %% delivered, it's never actually been read (think:
+ %% persistent message in mixed queue)
+ true
+ end,
+ ok.
+
+insert_into_cache(Message = #basic_message { guid = MsgId },
+ MsgSize, #dqstate { message_cache = Cache }) ->
+ true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}),
+ ok.
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, FakeDeliver,
@@ -713,7 +748,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
next_seq_id = NextReadSeqId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
ok =
if FakeDeliver orelse Delivered -> ok;
@@ -723,12 +758,27 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
end,
case ReadMsg of
true ->
- {FileHdl, State1} = get_read_handle(File, State),
- {ok, {MsgBody, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- Message = bin_to_msg(MsgBody),
- {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
- NextReadSeqId, State1};
+ case fetch_and_increment_cache(MsgId, State) of
+ false ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ Message = bin_to_msg(MsgBody),
+ ok = case RefCount of
+ 1 ->
+ %% it's not in the cache and we only
+ %% have 1 queue with the message. So
+ %% don't bother putting it in the
+ %% queue.
+ ok;
+ _ -> insert_into_cache(Message, BodySize, State1)
+ end,
+ {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
+ NextReadSeqId, State1};
+ {Message, BodySize, _RefCount} ->
+ {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
+ NextReadSeqId, State}
+ end;
false ->
{ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
end.
@@ -758,6 +808,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
fun ({MsgId, SeqId}, Files1) ->
[{MsgId, RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
+ ok = decrement_cache(MsgId, State),
Files2 =
case RefCount of
1 ->