diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-11 11:26:13 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-11 11:26:13 +0100 |
| commit | 2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd (patch) | |
| tree | a093208f96c7477920461b69a80cfb55bb66323f /src | |
| parent | a0329ef53a84049cfa7a111162391bef1849dc52 (diff) | |
| download | rabbitmq-server-git-2ce46bd0507f5badfdc14cd6e1c86fc6451ccfcd.tar.gz | |
only do compact after having ack'd all messages.
This should prevent unnecessary compactions (i.e. we now have the ability to scan and delete all now-empty files before attempting any compaction).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index dc23c9ca9a..e605828e46 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -128,9 +128,7 @@ handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), {noreply, State1}; handle_cast({ack, Q, MsgIds}, State) -> - {ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) -> - internal_ack(Q, MsgId, State2) - end, {ok, State}, MsgIds), + {ok, State1} = internal_ack(Q, MsgIds, State), {noreply, State1}; handle_cast({tx_publish, MsgId, MsgBody}, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), @@ -199,31 +197,35 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. -internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary - }) -> - [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), - % is this the last time we need the message, in which case tidy up - State1 = - if 1 =:= RefCount -> - true = ets:delete(MsgLocation, MsgId), - {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - detail = FileDetail }} - = dict:find(File, FileSummary), - FileDetail1 = dict:erase(Offset, FileDetail), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary1 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1, - detail = FileDetail1 - }, FileSummary), - ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}), - compact(File, State # dqstate { file_summary = FileSummary1 } ); - 1 < RefCount -> - ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - State - end, - {ok, State1}. +internal_ack(Q, MsgIds, State) -> + {Files, State1} + = lists:foldl(fun (MsgId, {Files1, State2 = #dqstate { msg_location = MsgLocation, + file_summary = FileSummary + }}) -> + [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), + % is this the last time we need the message, in which case tidy up + if 1 =:= RefCount -> + true = ets:delete(MsgLocation, MsgId), + {ok, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop, + detail = FileDetail }} + = dict:find(File, FileSummary), + FileDetail1 = dict:erase(Offset, FileDetail), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + FileSummary1 + = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1, + detail = FileDetail1 + }, FileSummary), + ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}), + {sets:add_element(File, Files1), State2 # dqstate { file_summary = FileSummary1 }}; + 1 < RefCount -> + ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + {Files1, State2} + end + end, {sets:new(), State}, MsgIds), + State2 = compact(Files, State1), + {ok, State2}. internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, @@ -345,7 +347,7 @@ maybe_roll_to_new_file(_, State) -> %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- -compact(File, State) -> +compact(Files, State) -> State. %% ---- DISK RECOVERY ---- @@ -353,12 +355,16 @@ compact(File, State) -> load_from_disk(State) -> % sorted so that smallest number is first. which also means eldest file (left-most) first {Files, TmpFiles} = get_disk_queue_files(), + io:format("got files~n", []), ok = recover_crashed_compactions(Files, TmpFiles), + io:format("crash recovery done~n", []), % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), + io:format("loaded messages~n", []), % Finally, check there is nothing in mnesia which we haven't loaded - true = lists:all(fun ({MsgId, _Q}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, - mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)), + true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, + true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)), + io:format("checked in mnesia~n", []), {ok, State1}. load_messages(undefined, [], State) -> @@ -376,7 +382,9 @@ load_messages(Left, [File|Files], file_summary = FileSummary }) -> % [{MsgId, TotalSize, FileOffset}] + io:format("scan start~n", []), {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), + io:format("scan end~n", []), {ValidMessagesRev, ValidTotalSize, FileDetail} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc, FileDetail1}) -> case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of |
