summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-21 15:14:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-21 15:14:59 +0100
commitd4ffc1ae9034745bff6ef63111ebc809fca8356f (patch)
tree1d108bd2b08cc32505c30956da62a806b4165488 /src
parent8d57331105d78a08fe5fbebd8f8c1a9f877f453f (diff)
downloadrabbitmq-server-git-d4ffc1ae9034745bff6ef63111ebc809fca8356f.tar.gz
cosmetic -> with_queue_head => queue_head
Also, time for a new optimisation! YAY! Previously, reading a message off disk meant seeking to the correct position and then reading the data. Now if the handle is already in the right position, then that seek is a waste of quite a lot of time, as it is an OS call. Now, I cache the location of the handle and so avoid seeking when possible. This has a MASSIVE effect on performance, especially in straight line cases, eg where a single prefetcher can drain a queue of disk in about one third of the time it used to take. Just looking at the code coverage from the test suite, there were just 534 seeks and 8582 cases where we found the handle in the right position already. This is a fairly small amount of code, and provides very useful benefits.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl56
1 files changed, 33 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4b8759f8f9..d19469d682 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -754,7 +754,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
-get_read_handle(File, Offset, State =
+get_read_handle(File, Offset, TotalSize, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
@@ -766,7 +766,8 @@ get_read_handle(File, Offset, State =
true -> State
end,
Now = now(),
- {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} =
case dict:find(File, ReadHdls) of
error ->
{ok, Hdl} = file:open(form_filename(File),
@@ -774,21 +775,21 @@ get_read_handle(File, Offset, State =
read_ahead]),
case dict:size(ReadHdls) < ReadFileHandlesLimit of
true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
+ {Hdl, 0, ReadHdls, ReadHdlsAge};
+ false ->
{Then, OldFile, ReadHdlsAge2} =
gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
+ {ok, {OldHdl, _Offset, Then}} =
dict:find(OldFile, ReadHdls),
ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ {ok, {Hdl, OldOffset1, Then}} ->
+ {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
end,
- ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl,
+ {FileHdl, Offset /= OldOffset,
State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
sequence_lookup(Sequences, Q) ->
@@ -874,7 +875,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
internal_fetch_body(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
{Message, State2} = read_stored_message(StoreEntry, State1),
@@ -882,7 +883,7 @@ internal_fetch_body(Q, MarkDelivered, Advance, State) ->
end.
internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered,
#message_store_entry { msg_id = MsgId, is_persistent = IsPersistent },
@@ -890,8 +891,8 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
{ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
end.
-with_queue_head(Q, MarkDelivered, Advance,
- State = #dqstate { sequences = Sequences }) ->
+queue_head(Q, MarkDelivered, Advance,
+ State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
{ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
@@ -913,9 +914,10 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
total_size = TotalSize }, State) ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {FileHdl, State1} = get_read_handle(File, Offset, State),
+ {FileHdl, SeekReq, State1} =
+ get_read_handle(File, Offset, TotalSize, State),
{ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
+ read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
insert_into_cache(Message, EncodedBodySize, State1);
@@ -1480,7 +1482,7 @@ close_file(File, State = #dqstate { read_file_handles =
case dict:find(File, ReadHdls) of
error ->
State;
- {ok, {Hdl, Then}} ->
+ {ok, {Hdl, _Offset, Then}} ->
ok = file:close(Hdl),
State #dqstate { read_file_handles =
{ dict:erase(File, ReadHdls),
@@ -1867,10 +1869,17 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
KO -> KO
end.
-read_message_at_offset(FileHdl, Offset, TotalSize) ->
+read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) ->
TotalSizeWriteOkBytes = TotalSize + 1,
- case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} ->
+ SeekRes = case SeekReq of
+ true -> case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} -> ok;
+ KO -> KO
+ end;
+ false -> ok
+ end,
+ case SeekRes of
+ ok ->
case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
{ok, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
@@ -1884,9 +1893,9 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
{ok, {MsgBody, true, BodySize}}
end;
- KO -> KO
+ KO1 -> KO1
end;
- KO -> KO
+ KO2 -> KO2
end.
scan_file_for_valid_messages(File) ->
@@ -1931,7 +1940,8 @@ read_next_file_entry(FileHdl, Offset) ->
{false, false} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgId:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
+ ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
+ TotalSize - 1,
case file:position(FileHdl,
{cur, TotalSize - MsgIdBinSize}
) of