summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-12 23:59:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-12 23:59:02 +0100
commit4a6d328ff71764207186b355500d02b7a0d626e8 (patch)
treec560e25144f67d41b8c1383708f84b7fc95234e3 /src
parent8f72c82f50fc75dc2f7388f65e30ccba76578cc1 (diff)
downloadrabbitmq-server-git-4a6d328ff71764207186b355500d02b7a0d626e8.tar.gz
initial work on compacter.
If you ack messages in exactly the same order as they arrived in, then files will be deleted correctly.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl70
1 files changed, 38 insertions, 32 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ebc1488ef9..869ed841c9 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -58,8 +58,6 @@
-define(SERVER, ?MODULE).
--record(dqfile, {valid_data, contiguous_prefix, left, right}).
-
-record(dqstate, {msg_location,
file_summary,
file_detail,
@@ -106,9 +104,8 @@ clean_stop() ->
init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
InitName = "0" ++ ?FILE_EXTENSION,
- FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]),
- file_summary = FileSummary,
+ file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
file_detail = ets:new(?FILE_DETAIL_ETS_NAME, [ordered_set, private]),
current_file_num = 0,
current_file_name = InitName,
@@ -236,7 +233,8 @@ internal_ack(Q, MsgIds, State) ->
%% Q is only needed if MnesiaDelete = true
remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
file_summary = FileSummary,
- file_detail = FileDetail
+ file_detail = FileDetail,
+ current_file_name = CurName
}) ->
Files
= lists:foldl(fun (MsgId, Files2) ->
@@ -244,21 +242,21 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
= ets:lookup(MsgLocation, MsgId),
if 1 =:= RefCount ->
true = ets:delete(MsgLocation, MsgId),
- [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}]
+ [{File, ValidTotalSize, ContiguousTop, Left, Right}]
= ets:lookup(FileSummary, File),
true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
true = ets:insert(FileSummary,
- {File, FileSum #dqfile { valid_data = (ValidTotalSize -
- TotalSize - ?FILE_PACKING_ADJUSTMENT),
- contiguous_prefix = ContiguousTop1}}),
+ {File, (ValidTotalSize - TotalSize - ?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
if MnesiaDelete ->
ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
true ->
ok
end,
- sets:add_element(File, Files2);
+ if CurName =:= File -> Files2;
+ true -> sets:add_element(File, Files2)
+ end;
1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
Files2
@@ -279,9 +277,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
% New message, lots to do
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
- [{CurName, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- right = undefined }}]
+ [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}]
= ets:lookup(FileSummary, CurName),
true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}),
ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT,
@@ -289,8 +285,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
ValidTotalSize; % can't be any holes in this file
true -> ContiguousTop
end,
- true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1,
- contiguous_prefix = ContiguousTop1 }}),
+ true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}),
maybe_roll_to_new_file(CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
State # dqstate {current_offset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
@@ -343,12 +338,8 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
{ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]),
- [{CurName, FileSum = #dqfile {right = undefined}}] = ets:lookup(FileSummary, CurName),
- true = ets:insert(FileSummary, {CurName, FileSum #dqfile {right = NextName}}),
- true = ets:insert_new(FileSummary, {NextName, #dqfile { valid_data = 0,
- contiguous_prefix = 0,
- left = CurName,
- right = undefined }}),
+ true = ets:update_element(FileSummary, CurName, {5, NextName}), % 5 is Right
+ true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
{ok, State # dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
@@ -359,9 +350,31 @@ maybe_roll_to_new_file(_, State) ->
%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
-compact(_FilesSet, State) ->
+compact(FilesSet, State) ->
+ % smallest number, hence eldest, hence left-most, first
+ Files = lists:sort(sets:to_list(FilesSet)),
+ % foldl reverses, so now youngest/right-most first
+ RemainingFiles = lists:foldl(fun(File, Acc) -> delete_empty_files(File, Acc, State) end, [], Files),
State.
+delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
+ [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File),
+ case ValidData of
+ % we should NEVER find the current file in here - hence right should always be a file, not undefined
+ 0 -> case {Left, Right} of
+ {undefined, _} when not(is_atom(Right)) ->
+ % the eldest file is empty. YAY!
+ true = ets:update_element(FileSummary, Right, {4, undefined}); % left is the 4th field
+ {_, _} when not(is_atom(Right)) ->
+ true = ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field
+ true = ets:update_element(FileSummary, Left, {5, Right}) % right is the 5th field
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(File)),
+ Acc;
+ _ -> [File|Acc]
+ end.
+
%% ---- DISK RECOVERY ----
load_from_disk(State) ->
@@ -377,10 +390,7 @@ load_from_disk(State) ->
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
- true = ets:insert_new(FileSummary, {CurName, #dqfile { valid_data = 0,
- contiguous_prefix = 0,
- left = undefined,
- right = undefined}}),
+ true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
State;
load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) ->
Num = list_to_integer(filename:rootname(Left)),
@@ -419,11 +429,7 @@ load_messages(Left, [File|Files],
[] -> undefined;
[F|_] -> F
end,
- true = ets:insert_new(FileSummary, {File, #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- left = Left,
- right = Right
- }}),
+ true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}),
load_messages(File, Files, State).
%% ---- DISK RECOVERY OF FAILED COMPACTION ----