summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-11 01:05:09 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-11 01:05:09 +0100
commitb8b9896cf79bc62bc0e11da85e23922e024342de (patch)
tree3d9c51efa8676087c16ae5c6ab023c74ddf80568 /src
parent1882d139375274368a3ab2bffe3695c2d3f5c9bc (diff)
downloadrabbitmq-server-git-b8b9896cf79bc62bc0e11da85e23922e024342de.tar.gz
Mainly removing the unnecessary file:position in append.
This wasn't needed as I can track the position myself, and profiling showed that it is expensive (which I'd kinda been aware of before). Having removed it, things are quite a bit faster - though still CPU bound with smaller messages. Next step is to convert dicts in state into ets tables.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl69
1 files changed, 39 insertions, 30 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 391c9b715c..8affd7aa96 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -50,6 +50,7 @@
-define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))).
-define(SERVER, ?MODULE).
@@ -60,6 +61,7 @@
current_file_num,
current_file_name,
current_file_handle,
+ current_offset,
file_size_limit,
read_file_handles,
read_file_handles_limit
@@ -103,14 +105,16 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_file_num = 0,
current_file_name = InitName,
current_file_handle = undefined,
+ current_offset = 0,
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit
},
- {ok, State1 = #dqstate { current_file_name = CurrentName } } = load_from_disk(State),
+ {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State),
Path = form_filename(CurrentName),
ok = filelib:ensure_dir(Path),
- {ok, FileHdl} = file:open(Path, [append, raw, binary]),
+ {ok, FileHdl} = file:open(Path, [read, write, raw, binary]), %% read only needed so that we can seek
+ {ok, Offset} = file:position(FileHdl, {bof, Offset}),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
handle_call({deliver, Q, MsgId}, _From, State) ->
@@ -159,21 +163,13 @@ form_filename(Name) ->
base_directory() ->
filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
-file_packing_adjustment_bytes() ->
- 1 + (2* (?INTEGER_SIZE_BYTES)).
-
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
- %current_file_handle = CurHdl,
- %current_file_name = CurName,
read_file_handles_limit = ReadFileHandlesLimit,
read_file_handles = {ReadHdls, ReadHdlsAge}
}) ->
[{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId),
- %if CurName =:= File -> ok = file:sync(CurHdl); % don't think this is necessary. Within a process you should always have a consistent view of a file
- % true -> ok
- %end,
% so this next bit implements an LRU for file handles. But it's a bit insane, and smells
% of premature optimisation. So I might remove it and dump it overboard
{FileHdl, ReadHdls1, ReadHdlsAge1}
@@ -198,7 +194,9 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
% read the message
{ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
[#dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true}),
+ if Delivered -> ok;
+ true -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true})
+ end,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
@@ -216,7 +214,7 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
= dict:find(File, FileSummary),
FileDetail1 = dict:erase(Offset, FileDetail),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary2 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - file_packing_adjustment_bytes()),
+ FileSummary2 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
contiguous_prefix = ContiguousTop1,
detail = FileDetail1
}, FileSummary),
@@ -232,12 +230,12 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
current_file_name = CurName,
+ current_offset = Offset,
file_summary = FileSummary
}) ->
case ets:lookup(MsgLocation, MsgId) of
[] ->
% New message, lots to do
- {ok, Offset} = file:position(CurHdl, cur),
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
{ok, FileSum = #dqfile { valid_data = ValidTotalSize,
@@ -246,7 +244,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
detail = FileDetail }}
= dict:find(CurName, FileSummary),
FileDetail1 = dict:store(Offset, TotalSize, FileDetail),
- ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(),
+ ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT),
ContiguousTop1 = if Offset =:= ContiguousTop ->
ValidTotalSize; % can't be any holes in this file
true -> ContiguousTop
@@ -255,8 +253,10 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
contiguous_prefix = ContiguousTop1,
detail = FileDetail1},
FileSummary),
- maybe_roll_to_new_file(Offset + TotalSize + file_packing_adjustment_bytes(),
- State # dqstate { file_summary = FileSummary1 });
+ maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT),
+ State # dqstate { file_summary = FileSummary1,
+ current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)
+ });
[{MsgId, RefCount, File, Offset, TotalSize}] ->
% We already know about it, just update counter
true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
@@ -303,7 +303,7 @@ internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
= dict:find(File, FileSummary2),
FileDetail1 = dict:erase(Offset, FileDetail),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - file_packing_adjustment_bytes()),
+ dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
contiguous_prefix = ContiguousTop1,
detail = FileDetail1
}, FileSummary2);
@@ -333,6 +333,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
{ok, State # dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
+ current_offset = 0,
file_summary = dict:store(NextName, #dqfile { valid_data = 0,
contiguous_prefix = 0,
left = CurName,
@@ -364,9 +365,14 @@ load_from_disk(State) ->
load_messages(undefined, [], State) ->
State;
-load_messages(Left, [], State) ->
+load_messages(Left, [], State = #dqstate { file_summary = Summary }) ->
Num = list_to_integer(filename:rootname(Left)),
- State # dqstate { current_file_num = Num, current_file_name = Left };
+ {ok, #dqfile { detail = FileDetail }} = dict:find(Left, Summary),
+ Offset = dict:fold(fun (Offset1, TotalSize, Acc) ->
+ Acc1 = Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT),
+ lists:max([Acc, Acc1])
+ end, 0, FileDetail),
+ State # dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset };
load_messages(Left, [File|Files],
State = #dqstate { msg_location = MsgLocation,
file_summary = FileSummary
@@ -380,7 +386,7 @@ load_messages(Left, [File|Files],
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc],
- VTSAcc + TotalSize + file_packing_adjustment_bytes(),
+ VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT),
dict:store(Offset, TotalSize, FileDetail1)
}
end
@@ -405,9 +411,12 @@ load_messages(Left, [File|Files],
%% ---- DISK RECOVERY OF FAILED COMPACTION ----
-recover_crashed_compactions(_Files, []) ->
- ok;
-recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
+recover_crashed_compactions(Files, TmpFiles) ->
+ lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end,
+ TmpFiles),
+ ok.
+
+recover_crashed_compactions1(Files, TmpFile) ->
GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ (?FILE_EXTENSION),
true = lists:member(NonTmpRelatedFile, Files),
@@ -461,7 +470,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% extending truncate.
% Remember the head of the list will be the highest entry in the file
[{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize + file_packing_adjustment_bytes(),
+ TmpSize = TmpTopOffset + TmpTopTotalSize + (?FILE_PACKING_ADJUSTMENT),
ExpectedAbsPos = Top + TmpSize,
{ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move
@@ -480,7 +489,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% check that everything in MsgIdsTmp is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIdsTmp)
end,
- recover_crashed_compactions(Files, TmpFiles).
+ ok.
% this assumes that the messages are ordered such that the highest address is at
% the head of the list.
@@ -488,7 +497,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
find_contiguous_block_prefix([]) -> {0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) ->
case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
- {ok, Acc} -> {Offset + TotalSize + file_packing_adjustment_bytes(), lists:reverse(Acc)};
+ {ok, Acc} -> {Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), lists:reverse(Acc)};
Res -> Res
end.
find_contiguous_block_prefix([], 0, Acc) ->
@@ -496,7 +505,7 @@ find_contiguous_block_prefix([], 0, Acc) ->
find_contiguous_block_prefix([], _N, _Acc) ->
{0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc)
- when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use file_packing_adjustment_bytes()
+ when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use (?FILE_PACKING_ADJUSTMENT)
find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
find_contiguous_block_prefix(List).
@@ -573,7 +582,7 @@ read_next_file_entry(FileHdl, Offset) ->
case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
{true, _} -> {ok, eof}; %% Nothing we can do other than stop
{false, true} -> %% current message corrupted, try skipping past it
- ExpectedAbsPos = Offset + file_packing_adjustment_bytes() + TotalSize,
+ ExpectedAbsPos = Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize,
case file:position(FileHdl, {cur, TotalSize + 1}) of
{ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}};
{ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
@@ -588,9 +597,9 @@ read_next_file_entry(FileHdl, Offset) ->
case file:read(FileHdl, 1) of
{ok, <<(?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} ->
{ok, {ok, binary_to_term(MsgId), TotalSize,
- Offset + file_packing_adjustment_bytes() + TotalSize}};
+ Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize}};
{ok, _SomeOtherData} ->
- {ok, {corrupted, Offset + file_packing_adjustment_bytes() + TotalSize}};
+ {ok, {corrupted, Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize}};
KO -> KO
end;
{ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up