summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-11 11:26:13 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-11 11:26:13 +0100
commit2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd (patch)
treea093208f96c7477920461b69a80cfb55bb66323f /src
parenta0329ef53a84049cfa7a111162391bef1849dc52 (diff)
downloadrabbitmq-server-git-2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd.tar.gz
only do compact after having ack'd all messages.
This should prevent unnecessary compactions (i.e. we now have the ability to scan and delete all now-empty files before attempting any compaction).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl70
1 files changed, 39 insertions, 31 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index dc23c9ca9a..e605828e46 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -128,9 +128,7 @@ handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
{noreply, State1};
handle_cast({ack, Q, MsgIds}, State) ->
- {ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) ->
- internal_ack(Q, MsgId, State2)
- end, {ok, State}, MsgIds),
+ {ok, State1} = internal_ack(Q, MsgIds, State),
{noreply, State1};
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
@@ -199,31 +197,35 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
-internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary
- }) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
- % is this the last time we need the message, in which case tidy up
- State1 =
- if 1 =:= RefCount ->
- true = ets:delete(MsgLocation, MsgId),
- {ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- detail = FileDetail }}
- = dict:find(File, FileSummary),
- FileDetail1 = dict:erase(Offset, FileDetail),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary1 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1,
- detail = FileDetail1
- }, FileSummary),
- ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}),
- compact(File, State # dqstate { file_summary = FileSummary1 } );
- 1 < RefCount ->
- ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- State
- end,
- {ok, State1}.
+internal_ack(Q, MsgIds, State) ->
+ {Files, State1}
+ = lists:foldl(fun (MsgId, {Files1, State2 = #dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary
+ }}) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
+ % is this the last time we need the message, in which case tidy up
+ if 1 =:= RefCount ->
+ true = ets:delete(MsgLocation, MsgId),
+ {ok, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop,
+ detail = FileDetail }}
+ = dict:find(File, FileSummary),
+ FileDetail1 = dict:erase(Offset, FileDetail),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ FileSummary1
+ = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1,
+ detail = FileDetail1
+ }, FileSummary),
+ ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}),
+ {sets:add_element(File, Files1), State2 # dqstate { file_summary = FileSummary1 }};
+ 1 < RefCount ->
+ ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ {Files1, State2}
+ end
+ end, {sets:new(), State}, MsgIds),
+ State2 = compact(Files, State1),
+ {ok, State2}.
internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
@@ -345,7 +347,7 @@ maybe_roll_to_new_file(_, State) ->
%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
-compact(File, State) ->
+compact(Files, State) ->
State.
%% ---- DISK RECOVERY ----
@@ -353,12 +355,16 @@ compact(File, State) ->
load_from_disk(State) ->
% sorted so that smallest number is first. which also means eldest file (left-most) first
{Files, TmpFiles} = get_disk_queue_files(),
+ io:format("got files~n", []),
ok = recover_crashed_compactions(Files, TmpFiles),
+ io:format("crash recovery done~n", []),
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
+ io:format("loaded messages~n", []),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:all(fun ({MsgId, _Q}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
- mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)),
+ true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
+ true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)),
+ io:format("checked in mnesia~n", []),
{ok, State1}.
load_messages(undefined, [], State) ->
@@ -376,7 +382,9 @@ load_messages(Left, [File|Files],
file_summary = FileSummary
}) ->
% [{MsgId, TotalSize, FileOffset}]
+ io:format("scan start~n", []),
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
+ io:format("scan end~n", []),
{ValidMessagesRev, ValidTotalSize, FileDetail} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc, FileDetail1}) ->
case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of