diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-11 12:15:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-11 12:15:47 +0100 |
| commit | 4a6c6aac0e92ab80c5159689358b223150a48b01 (patch) | |
| tree | b90f191144562af84fee3a6406c288d778648149 /src | |
| parent | 2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd (diff) | |
| download | rabbitmq-server-git-4a6c6aac0e92ab80c5159689358b223150a48b01.tar.gz | |
switched back to ets for file detail. Also remember to call compact from tx_cancel.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 124 |
1 files changed, 60 insertions, 64 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e605828e46..0313ebd715 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -48,16 +48,18 @@ -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, 8 * ?INTEGER_SIZE_BYTES). -define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location). +-define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))). -define(SERVER, ?MODULE). --record(dqfile, {valid_data, contiguous_prefix, left, right, detail}). +-record(dqfile, {valid_data, contiguous_prefix, left, right}). -record(dqstate, {msg_location, file_summary, + file_detail, current_file_num, current_file_name, current_file_handle, @@ -99,9 +101,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_summary = dict:store(InitName, (#dqfile { valid_data = 0, contiguous_prefix = 0, left = undefined, - right = undefined, - detail = dict:new()}), + right = undefined}), dict:new()), + file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [ordered_set, private]), current_file_num = 0, current_file_name = InitName, current_file_handle = undefined, @@ -197,41 +199,41 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. -internal_ack(Q, MsgIds, State) -> - {Files, State1} - = lists:foldl(fun (MsgId, {Files1, State2 = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary - }}) -> +internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + file_detail = FileDetail + }) -> + {Files, FileSummary1} + = lists:foldl(fun (MsgId, {Files2, FileSummary2}) -> [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), % is this the last time we need the message, in which case tidy up if 1 =:= RefCount -> true = ets:delete(MsgLocation, MsgId), {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - detail = FileDetail }} - = dict:find(File, FileSummary), - FileDetail1 = dict:erase(Offset, FileDetail), + contiguous_prefix = ContiguousTop }} + = dict:find(File, FileSummary2), + true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary1 + FileSummary3 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1, - detail = FileDetail1 - }, FileSummary), + contiguous_prefix = ContiguousTop1 + }, FileSummary2), ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}), - {sets:add_element(File, Files1), State2 # dqstate { file_summary = FileSummary1 }}; + {sets:add_element(File, Files2), FileSummary3}; 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - {Files1, State2} + {Files2, FileSummary2} end - end, {sets:new(), State}, MsgIds), - State2 = compact(Files, State1), + end, {sets:new(), FileSummary}, MsgIds), + State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }), {ok, State2}. internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, current_file_name = CurName, current_offset = Offset, - file_summary = FileSummary + file_summary = FileSummary, + file_detail = FileDetail }) -> case ets:lookup(MsgLocation, MsgId) of [] -> @@ -240,18 +242,16 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), {ok, FileSum = #dqfile { valid_data = ValidTotalSize, contiguous_prefix = ContiguousTop, - right = undefined, - detail = FileDetail }} + right = undefined }} = dict:find(CurName, FileSummary), - FileDetail1 = dict:store(Offset, TotalSize, FileDetail), + true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}), ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT), ContiguousTop1 = if Offset =:= ContiguousTop -> ValidTotalSize; % can't be any holes in this file true -> ContiguousTop end, FileSummary1 = dict:store(CurName, FileSum #dqfile { valid_data = ValidTotalSize1, - contiguous_prefix = ContiguousTop1, - detail = FileDetail1}, + contiguous_prefix = ContiguousTop1 }, FileSummary), maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), State # dqstate { file_summary = FileSummary1, @@ -289,30 +289,32 @@ internal_publish(Q, MsgId, MsgBody, State) -> internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary + file_summary = FileSummary, + file_detail = FileDetail }) -> - FileSummary1 = - lists:foldl(fun (MsgId, FileSummary2) -> + {Files, FileSummary1} = + lists:foldl(fun (MsgId, {Files2, FileSummary2}) -> [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), if 1 =:= RefCount -> true = ets:delete(MsgLocation, MsgId), {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - detail = FileDetail }} + contiguous_prefix = ContiguousTop }} = dict:find(File, FileSummary2), - FileDetail1 = dict:erase(Offset, FileDetail), + true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1, - detail = FileDetail1 - }, FileSummary2); + FileSummary3 + = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1 + }, FileSummary2), + {sets:add_element(File, Files2), FileSummary3}; 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - FileSummary2 + {Files2, FileSummary2} end - end, FileSummary, MsgIds), - {ok, State #dqstate { file_summary = FileSummary1 }}. + end, {sets:new(), FileSummary}, MsgIds), + State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }), + {ok, State2}. %% ---- ROLLING OVER THE APPEND FILE ---- @@ -337,8 +339,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi file_summary = dict:store(NextName, #dqfile { valid_data = 0, contiguous_prefix = 0, left = CurName, - right = undefined, - detail = dict:new()}, + right = undefined }, FileSummary1) } }; @@ -347,7 +348,7 @@ maybe_roll_to_new_file(_, State) -> %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- -compact(Files, State) -> +compact(_FilesSet, State) -> State. %% ---- DISK RECOVERY ---- @@ -355,48 +356,44 @@ compact(Files, State) -> load_from_disk(State) -> % sorted so that smallest number is first. which also means eldest file (left-most) first {Files, TmpFiles} = get_disk_queue_files(), - io:format("got files~n", []), ok = recover_crashed_compactions(Files, TmpFiles), - io:format("crash recovery done~n", []), % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), - io:format("loaded messages~n", []), % Finally, check there is nothing in mnesia which we haven't loaded true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)), - io:format("checked in mnesia~n", []), {ok, State1}. load_messages(undefined, [], State) -> State; -load_messages(Left, [], State = #dqstate { file_summary = Summary }) -> +load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) -> Num = list_to_integer(filename:rootname(Left)), - {ok, #dqfile { detail = FileDetail }} = dict:find(Left, Summary), - Offset = dict:fold(fun (Offset1, TotalSize, Acc) -> - Acc1 = Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT), - lists:max([Acc, Acc1]) - end, 0, FileDetail), - State # dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; + Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_'}) of + [] -> 0; + L -> {{Left, Offset1}, TotalSize} = lists:last(L), + Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT) + end, + State # dqstate { current_file_num = Num, current_file_name = Left, + current_offset = Offset }; load_messages(Left, [File|Files], State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary + file_summary = FileSummary, + file_detail = FileDetail }) -> % [{MsgId, TotalSize, FileOffset}] - io:format("scan start~n", []), {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), - io:format("scan end~n", []), - {ValidMessagesRev, ValidTotalSize, FileDetail} = lists:foldl( - fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc, FileDetail1}) -> + {ValidMessagesRev, ValidTotalSize} = lists:foldl( + fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of - 0 -> {VMAcc, VTSAcc, FileDetail1}; + 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), + true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], - VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT), - dict:store(Offset, TotalSize, FileDetail1) + VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT) } end - end, {[], 0, dict:new()}, Messages), + end, {[], 0}, Messages), % foldl reverses lists and find_contiguous_block_prefix needs elems in the same order % as from scan_file_for_valid_messages {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)), @@ -408,8 +405,7 @@ load_messages(Left, [File|Files], dict:store(File, #dqfile { valid_data = ValidTotalSize, contiguous_prefix = ContiguousTop, left = Left, - right = Right, - detail = FileDetail + right = Right }, FileSummary) }, |
