diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-19 16:49:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-19 16:49:29 +0100 |
| commit | 3fc57e1b7fdfb948e8470b0b63d7ebe1e9c91c25 (patch) | |
| tree | 7b3c28d134dbff4a0aa7178c70ffa6ef1b5a6823 | |
| parent | 656cbf75101146cac83dffee1ff708bc39d1498d (diff) | |
| download | rabbitmq-server-git-3fc57e1b7fdfb948e8470b0b63d7ebe1e9c91c25.tar.gz | |
Added caching layer using ets which, when a message is shared between multiple queues, eliminates the need for multiple reads, provided the /next/ copy of the message is requested before the previous copy of the message has been acked. Should reduce memory pressure.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 71 |
1 files changed, 61 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b133f538ed..a7d4e6e3ca 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -58,6 +58,7 @@ -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(FILE_EXTENSION_DETS, ".dets"). @@ -82,7 +83,8 @@ %% since the last fsync? file_size_limit, %% how big can our files get? read_file_handles, %% file handles for reading (LRU) - read_file_handles_limit %% how many file handles can we open? + read_file_handles_limit, %% how many file handles can we open? + message_cache %% ets message cache }). %% The components: @@ -385,8 +387,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_dirty = false, file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit - }, + read_file_handles_limit = ReadFileHandlesLimit, + message_cache = ets:new(?CACHE_ETS_NAME, + [set, private]) + }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), @@ -684,6 +688,37 @@ msg_to_bin(Msg = #basic_message { content = Content }) -> bin_to_msg(MsgBin) -> binary_to_term(MsgBin). +remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) -> + true = ets:delete(Cache, MsgId), + ok. + +fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> + case ets:lookup(Cache, MsgId) of + [] -> + not_found; + [{MsgId, Message, MsgSize, _RefCount}] -> + NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}), + {Message, MsgSize, NewRefCount} + end. + +decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> + true = try case ets:update_counter(Cache, MsgId, {4, -1}) of + 0 -> ets:delete(Cache, MsgId); + _N -> true + end + catch error:badarg -> + %% MsgId is not in there because although it's been + %% delivered, it's never actually been read (think: + %% persistent message in mixed queue) + true + end, + ok. + +insert_into_cache(Message = #basic_message { guid = MsgId }, + MsgSize, #dqstate { message_cache = Cache }) -> + true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}), + ok. + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, FakeDeliver, @@ -713,7 +748,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, next_seq_id = NextReadSeqId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - [{MsgId, _RefCount, File, Offset, TotalSize}] = + [{MsgId, RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), ok = if FakeDeliver orelse Delivered -> ok; @@ -723,12 +758,27 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> end, case ReadMsg of true -> - {FileHdl, State1} = get_read_handle(File, State), - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - Message = bin_to_msg(MsgBody), - {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, - NextReadSeqId, State1}; + case fetch_and_increment_cache(MsgId, State) of + false -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + Message = bin_to_msg(MsgBody), + ok = case RefCount of + 1 -> + %% it's not in the cache and we only + %% have 1 queue with the message. So + %% don't bother putting it in the + %% queue. + ok; + _ -> insert_into_cache(Message, BodySize, State1) + end, + {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, + NextReadSeqId, State1}; + {Message, BodySize, _RefCount} -> + {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, + NextReadSeqId, State} + end; false -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} end. @@ -758,6 +808,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, fun ({MsgId, SeqId}, Files1) -> [{MsgId, RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), + ok = decrement_cache(MsgId, State), Files2 = case RefCount of 1 -> |
