summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-11 12:15:47 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-11 12:15:47 +0100
commit4a6c6aac0e92ab80c5159689358b223150a48b01 (patch)
treeb90f191144562af84fee3a6406c288d778648149 /src
parent2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd (diff)
downloadrabbitmq-server-git-4a6c6aac0e92ab80c5159689358b223150a48b01.tar.gz
switched back to ets for file detail. Also remember to call compact from tx_cancel.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl124
1 files changed, 60 insertions, 64 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e605828e46..0313ebd715 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -48,16 +48,18 @@
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, 8 * ?INTEGER_SIZE_BYTES).
-define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))).
-define(SERVER, ?MODULE).
--record(dqfile, {valid_data, contiguous_prefix, left, right, detail}).
+-record(dqfile, {valid_data, contiguous_prefix, left, right}).
-record(dqstate, {msg_location,
file_summary,
+ file_detail,
current_file_num,
current_file_name,
current_file_handle,
@@ -99,9 +101,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_summary = dict:store(InitName, (#dqfile { valid_data = 0,
contiguous_prefix = 0,
left = undefined,
- right = undefined,
- detail = dict:new()}),
+ right = undefined}),
dict:new()),
+ file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [ordered_set, private]),
current_file_num = 0,
current_file_name = InitName,
current_file_handle = undefined,
@@ -197,41 +199,41 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
-internal_ack(Q, MsgIds, State) ->
- {Files, State1}
- = lists:foldl(fun (MsgId, {Files1, State2 = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary
- }}) ->
+internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary,
+ file_detail = FileDetail
+ }) ->
+ {Files, FileSummary1}
+ = lists:foldl(fun (MsgId, {Files2, FileSummary2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
% is this the last time we need the message, in which case tidy up
if 1 =:= RefCount ->
true = ets:delete(MsgLocation, MsgId),
{ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- detail = FileDetail }}
- = dict:find(File, FileSummary),
- FileDetail1 = dict:erase(Offset, FileDetail),
+ contiguous_prefix = ContiguousTop }}
+ = dict:find(File, FileSummary2),
+ true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary1
+ FileSummary3
= dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1,
- detail = FileDetail1
- }, FileSummary),
+ contiguous_prefix = ContiguousTop1
+ }, FileSummary2),
ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}),
- {sets:add_element(File, Files1), State2 # dqstate { file_summary = FileSummary1 }};
+ {sets:add_element(File, Files2), FileSummary3};
1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- {Files1, State2}
+ {Files2, FileSummary2}
end
- end, {sets:new(), State}, MsgIds),
- State2 = compact(Files, State1),
+ end, {sets:new(), FileSummary}, MsgIds),
+ State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }),
{ok, State2}.
internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = Offset,
- file_summary = FileSummary
+ file_summary = FileSummary,
+ file_detail = FileDetail
}) ->
case ets:lookup(MsgLocation, MsgId) of
[] ->
@@ -240,18 +242,16 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
{ok, FileSum = #dqfile { valid_data = ValidTotalSize,
contiguous_prefix = ContiguousTop,
- right = undefined,
- detail = FileDetail }}
+ right = undefined }}
= dict:find(CurName, FileSummary),
- FileDetail1 = dict:store(Offset, TotalSize, FileDetail),
+ true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}),
ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT),
ContiguousTop1 = if Offset =:= ContiguousTop ->
ValidTotalSize; % can't be any holes in this file
true -> ContiguousTop
end,
FileSummary1 = dict:store(CurName, FileSum #dqfile { valid_data = ValidTotalSize1,
- contiguous_prefix = ContiguousTop1,
- detail = FileDetail1},
+ contiguous_prefix = ContiguousTop1 },
FileSummary),
maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT),
State # dqstate { file_summary = FileSummary1,
@@ -289,30 +289,32 @@ internal_publish(Q, MsgId, MsgBody, State) ->
internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary
+ file_summary = FileSummary,
+ file_detail = FileDetail
}) ->
- FileSummary1 =
- lists:foldl(fun (MsgId, FileSummary2) ->
+ {Files, FileSummary1} =
+ lists:foldl(fun (MsgId, {Files2, FileSummary2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= ets:lookup(MsgLocation, MsgId),
if 1 =:= RefCount ->
true = ets:delete(MsgLocation, MsgId),
{ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- detail = FileDetail }}
+ contiguous_prefix = ContiguousTop }}
= dict:find(File, FileSummary2),
- FileDetail1 = dict:erase(Offset, FileDetail),
+ true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1,
- detail = FileDetail1
- }, FileSummary2);
+ FileSummary3
+ = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1
+ }, FileSummary2),
+ {sets:add_element(File, Files2), FileSummary3};
1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- FileSummary2
+ {Files2, FileSummary2}
end
- end, FileSummary, MsgIds),
- {ok, State #dqstate { file_summary = FileSummary1 }}.
+ end, {sets:new(), FileSummary}, MsgIds),
+ State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }),
+ {ok, State2}.
%% ---- ROLLING OVER THE APPEND FILE ----
@@ -337,8 +339,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
file_summary = dict:store(NextName, #dqfile { valid_data = 0,
contiguous_prefix = 0,
left = CurName,
- right = undefined,
- detail = dict:new()},
+ right = undefined },
FileSummary1)
}
};
@@ -347,7 +348,7 @@ maybe_roll_to_new_file(_, State) ->
%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
-compact(Files, State) ->
+compact(_FilesSet, State) ->
State.
%% ---- DISK RECOVERY ----
@@ -355,48 +356,44 @@ compact(Files, State) ->
load_from_disk(State) ->
% sorted so that smallest number is first. which also means eldest file (left-most) first
{Files, TmpFiles} = get_disk_queue_files(),
- io:format("got files~n", []),
ok = recover_crashed_compactions(Files, TmpFiles),
- io:format("crash recovery done~n", []),
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
- io:format("loaded messages~n", []),
% Finally, check there is nothing in mnesia which we haven't loaded
true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)),
- io:format("checked in mnesia~n", []),
{ok, State1}.
load_messages(undefined, [], State) ->
State;
-load_messages(Left, [], State = #dqstate { file_summary = Summary }) ->
+load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) ->
Num = list_to_integer(filename:rootname(Left)),
- {ok, #dqfile { detail = FileDetail }} = dict:find(Left, Summary),
- Offset = dict:fold(fun (Offset1, TotalSize, Acc) ->
- Acc1 = Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT),
- lists:max([Acc, Acc1])
- end, 0, FileDetail),
- State # dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset };
+ Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_'}) of
+ [] -> 0;
+ L -> {{Left, Offset1}, TotalSize} = lists:last(L),
+ Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT)
+ end,
+ State # dqstate { current_file_num = Num, current_file_name = Left,
+ current_offset = Offset };
load_messages(Left, [File|Files],
State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary
+ file_summary = FileSummary,
+ file_detail = FileDetail
}) ->
% [{MsgId, TotalSize, FileOffset}]
- io:format("scan start~n", []),
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
- io:format("scan end~n", []),
- {ValidMessagesRev, ValidTotalSize, FileDetail} = lists:foldl(
- fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc, FileDetail1}) ->
+ {ValidMessagesRev, ValidTotalSize} = lists:foldl(
+ fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of
- 0 -> {VMAcc, VTSAcc, FileDetail1};
+ 0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
+ true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc],
- VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT),
- dict:store(Offset, TotalSize, FileDetail1)
+ VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT)
}
end
- end, {[], 0, dict:new()}, Messages),
+ end, {[], 0}, Messages),
% foldl reverses lists and find_contiguous_block_prefix needs elems in the same order
% as from scan_file_for_valid_messages
{ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)),
@@ -408,8 +405,7 @@ load_messages(Left, [File|Files],
dict:store(File, #dqfile { valid_data = ValidTotalSize,
contiguous_prefix = ContiguousTop,
left = Left,
- right = Right,
- detail = FileDetail
+ right = Right
},
FileSummary)
},