diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-12 23:59:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-12 23:59:02 +0100 |
| commit | 4a6d328ff71764207186b355500d02b7a0d626e8 (patch) | |
| tree | c560e25144f67d41b8c1383708f84b7fc95234e3 /src | |
| parent | 8f72c82f50fc75dc2f7388f65e30ccba76578cc1 (diff) | |
| download | rabbitmq-server-git-4a6d328ff71764207186b355500d02b7a0d626e8.tar.gz | |
initial work on compacter.
If you ack messages in exactly the same order as they arrived in, then files will be deleted correctly.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 70 |
1 files changed, 38 insertions, 32 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index ebc1488ef9..869ed841c9 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -58,8 +58,6 @@ -define(SERVER, ?MODULE). --record(dqfile, {valid_data, contiguous_prefix, left, right}). - -record(dqstate, {msg_location, file_summary, file_detail, @@ -106,9 +104,8 @@ clean_stop() -> init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), InitName = "0" ++ ?FILE_EXTENSION, - FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]), - file_summary = FileSummary, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), file_detail = ets:new(?FILE_DETAIL_ETS_NAME, [ordered_set, private]), current_file_num = 0, current_file_name = InitName, @@ -236,7 +233,8 @@ internal_ack(Q, MsgIds, State) -> %% Q is only needed if MnesiaDelete = true remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, file_summary = FileSummary, - file_detail = FileDetail + file_detail = FileDetail, + current_file_name = CurName }) -> Files = lists:foldl(fun (MsgId, Files2) -> @@ -244,21 +242,21 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL = ets:lookup(MsgLocation, MsgId), if 1 =:= RefCount -> true = ets:delete(MsgLocation, MsgId), - [{File, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }}] + [{File, ValidTotalSize, ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), true = ets:insert(FileSummary, - {File, FileSum #dqfile { valid_data = (ValidTotalSize - - TotalSize - ?FILE_PACKING_ADJUSTMENT), - contiguous_prefix = ContiguousTop1}}), + {File, (ValidTotalSize - TotalSize - ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), if MnesiaDelete -> ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q}); true -> ok end, - sets:add_element(File, Files2); + if CurName =:= File -> Files2; + true -> sets:add_element(File, Files2) + end; 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), Files2 @@ -279,9 +277,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio % New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), - [{CurName, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - right = undefined }}] + [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, @@ -289,8 +285,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio ValidTotalSize; % can't be any holes in this file true -> ContiguousTop end, - true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1, - contiguous_prefix = ContiguousTop1 }}), + true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), maybe_roll_to_new_file(CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, State # dqstate {current_offset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT}); [{MsgId, RefCount, File, Offset, TotalSize}] -> @@ -343,12 +338,8 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]), - [{CurName, FileSum = #dqfile {right = undefined}}] = ets:lookup(FileSummary, CurName), - true = ets:insert(FileSummary, {CurName, FileSum #dqfile {right = NextName}}), - true = ets:insert_new(FileSummary, {NextName, #dqfile { valid_data = 0, - contiguous_prefix = 0, - left = CurName, - right = undefined }}), + true = ets:update_element(FileSummary, CurName, {5, NextName}), % 5 is Right + true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), {ok, State # dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, @@ -359,9 +350,31 @@ maybe_roll_to_new_file(_, State) -> %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- -compact(_FilesSet, State) -> +compact(FilesSet, State) -> + % smallest number, hence eldest, hence left-most, first + Files = lists:sort(sets:to_list(FilesSet)), + % foldl reverses, so now youngest/right-most first + RemainingFiles = lists:foldl(fun(File, Acc) -> delete_empty_files(File, Acc, State) end, [], Files), State. +delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> + [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), + case ValidData of + % we should NEVER find the current file in here - hence right should always be a file, not undefined + 0 -> case {Left, Right} of + {undefined, _} when not(is_atom(Right)) -> + % the eldest file is empty. YAY! + true = ets:update_element(FileSummary, Right, {4, undefined}); % left is the 4th field + {_, _} when not(is_atom(Right)) -> + true = ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field + true = ets:update_element(FileSummary, Left, {5, Right}) % right is the 5th field + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(File)), + Acc; + _ -> [File|Acc] + end. + %% ---- DISK RECOVERY ---- load_from_disk(State) -> @@ -377,10 +390,7 @@ load_from_disk(State) -> load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> - true = ets:insert_new(FileSummary, {CurName, #dqfile { valid_data = 0, - contiguous_prefix = 0, - left = undefined, - right = undefined}}), + true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), State; load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) -> Num = list_to_integer(filename:rootname(Left)), @@ -419,11 +429,7 @@ load_messages(Left, [File|Files], [] -> undefined; [F|_] -> F end, - true = ets:insert_new(FileSummary, {File, #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - left = Left, - right = Right - }}), + true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}), load_messages(File, Files, State). %% ---- DISK RECOVERY OF FAILED COMPACTION ---- |
