diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 264 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 181 | ||||
| -rw-r--r-- | src/rabbit_msg_store_misc.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 5 |
9 files changed, 257 insertions, 288 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1c8cf522b9..00407824d8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -58,8 +58,7 @@ -ifdef(use_specs). --type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(get_msg_result() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). @@ -101,7 +100,7 @@ -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), get_msg_result()} | 'empty'). -spec(basic_consume/8 :: (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4c42b0ef84..7f43f79a6c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(SYNC_INTERVAL, 5). %% milliseconds -define(RATES_REMEASURE_INTERVAL, 5000). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c8733ed197..a8c17efbde 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,9 +48,6 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking}). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(INFO_KEYS, @@ -69,8 +66,7 @@ -ifdef(use_specs). --type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(msg_to_deliver() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -spec(start_link/5 :: (channel_number(), pid(), pid(), username(), vhost()) -> pid()). @@ -78,7 +74,7 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), msg_to_deliver()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 831b4d793a..267cb633d8 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -46,11 +46,10 @@ %%---------------------------------------------------------------------------- +-include("rabbit.hrl"). + -ifdef(use_specs). --type(io_device() :: any()). --type(msg_id() :: binary()). --type(msg() :: any()). -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e5de24ce1c..b2db0ea51a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -38,16 +38,32 @@ successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, - build_index_worker/6]). %% internal + build_index_worker/6, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). +%%---------------------------------------------------------------------------- + +-include("rabbit_msg_store.hrl"). + -define(SYNC_INTERVAL, 5). %% milliseconds -define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). +-define(BINARY_MODE, [raw, binary]). +-define(READ_MODE, [read]). +-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). +-define(WRITE_MODE, [write]). + +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). + +-define(FILE_SIZE_LIMIT, (16*1024*1024)). + +-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + %%---------------------------------------------------------------------------- -record(msstate, @@ -84,14 +100,15 @@ cur_file_cache_ets }). +-record(file_summary, + {file, valid_total_size, contiguous_top, left, right, file_size, + locked, readers}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -type(server() :: pid() | atom()). --type(msg_id() :: binary()). --type(msg() :: any()). --type(file_path() :: any()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), index_state :: any(), @@ -122,12 +139,13 @@ -spec(clean/2 :: (atom(), file_path()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). +-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), + {tid(), file_path(), atom(), any()}) -> non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- --include("rabbit_msg_store.hrl"). - %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION %% It is not recommended to set this to < 0.5 -define(GARBAGE_FRACTION, 0.5). @@ -558,8 +576,8 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> build_index(Recovered1, Files, State), %% read is only needed so that we can seek - {ok, FileHdl} = rabbit_msg_store_misc:open_file( - Dir, rabbit_msg_store_misc:filenum_to_name(CurFile), + {ok, FileHdl} = open_file( + Dir, filenum_to_name(CurFile), [read | ?WRITE_MODE]), {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), @@ -956,6 +974,10 @@ run_pending({contains, MsgId, From}, State) -> run_pending({remove, MsgId}, State) -> remove_message(MsgId, State). +open_file(Dir, FileName, Mode) -> + file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; @@ -1001,8 +1023,8 @@ get_read_handle(FileNum, FHC, Dir) -> {ok, Hdl} -> {Hdl, FHC}; error -> - {ok, Hdl} = rabbit_msg_store_misc:open_file( - Dir, rabbit_msg_store_misc:filenum_to_name(FileNum), + {ok, Hdl} = open_file( + Dir, filenum_to_name(FileNum), ?READ_MODE), {Hdl, dict:store(FileNum, Hdl, FHC) } end. @@ -1036,6 +1058,35 @@ store_file_summary(Tid, Dir) -> [{extended_info, [object_count]}]), ets:delete(Tid). + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), + ok = file_handle_cache:truncate(Hdl), + {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), + ok. + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), + ok = file_handle_cache:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +form_filename(Dir, Name) -> filename:join(Dir, Name). + +filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. + +scan_file_for_valid_messages(Dir, FileName) -> + case open_file(Dir, FileName, ?READ_MODE) of + {ok, Hdl} -> + Valid = rabbit_msg_file:scan(Hdl), + %% if something really bad's happened, the close could fail, + %% but ignore + file_handle_cache:close(Hdl), + Valid; + {error, enoent} -> {ok, [], 0}; + {error, Reason} -> throw({error, + {unable_to_scan_file, FileName, Reason}}) + end. + %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -1180,7 +1231,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% consist only of valid messages. Plan: Truncate the main file %% back to before any of the files in the tmp file and copy %% them over again - TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName), + TmpPath = form_filename(Dir, TmpFileName), case is_sublist(MsgIdsTmp, MsgIds) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file @@ -1212,7 +1263,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% are in the tmp file true = is_disjoint(MsgIds1, MsgIdsTmp), %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = rabbit_msg_store_misc:open_file( + {ok, MainHdl} = open_file( Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]), %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the @@ -1222,9 +1273,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% Extend the main file as big as necessary in a single %% move. If we run out of disk space, this truncate could %% fail, but we still aren't risking losing data - ok = rabbit_msg_store_misc:truncate_and_extend_file( + ok = truncate_and_extend_file( MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = rabbit_msg_store_misc:open_file( + {ok, TmpHdl} = open_file( Dir, TmpFileName, ?READ_AHEAD_MODE), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), ok = file_handle_cache:close(MainHdl), @@ -1248,7 +1299,7 @@ is_disjoint(SmallerL, BiggerL) -> scan_file_for_valid_messages_msg_ids(Dir, FileName) -> {ok, Messages, _FileSize} = - rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName), + scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. %% Takes the list in *ascending* order (i.e. eldest message @@ -1318,8 +1369,8 @@ build_index(Gatherer, Left, [File|Files], State) -> build_index_worker( Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) -> {ok, Messages, FileSize} = - rabbit_msg_store_misc:scan_file_for_valid_messages( - Dir, rabbit_msg_store_misc:filenum_to_name(File)), + scan_file_for_valid_messages( + Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -1357,7 +1408,7 @@ build_index_worker( ok = gatherer:finished(Gatherer, Guid). %%---------------------------------------------------------------------------- -%% garbage collection / compaction / aggregation +%% garbage collection / compaction / aggregation -- internal %%---------------------------------------------------------------------------- maybe_roll_to_new_file(Offset, @@ -1370,8 +1421,8 @@ maybe_roll_to_new_file(Offset, State1 = internal_sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, - {ok, NextHdl} = rabbit_msg_store_misc:open_file( - Dir, rabbit_msg_store_misc:filenum_to_name(NextFile), + {ok, NextHdl} = open_file( + Dir, filenum_to_name(NextFile), ?WRITE_MODE), true = ets:insert_new( FileSummaryEts, #file_summary { @@ -1478,9 +1529,178 @@ delete_file_if_empty(File, State = true = mark_handle_to_close(FileHandlesEts, File), true = ets:delete(FileSummaryEts, File), State1 = close_handle(File, State), - ok = file:delete(rabbit_msg_store_misc:form_filename( + ok = file:delete(form_filename( Dir, - rabbit_msg_store_misc:filenum_to_name(File))), + filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; _ -> State end. + +%%---------------------------------------------------------------------------- +%% garbage collection / compaction / aggregation -- external +%%---------------------------------------------------------------------------- + +gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> + + [SourceObj = #file_summary { + readers = SourceReaders, + valid_total_size = SourceValidData, left = DestFile, + file_size = SourceFileSize, locked = true }] = + ets:lookup(FileSummaryEts, SourceFile), + [DestObj = #file_summary { + readers = DestReaders, + valid_total_size = DestValidData, right = SourceFile, + file_size = DestFileSize, locked = true }] = + ets:lookup(FileSummaryEts, DestFile), + + case SourceReaders =:= 0 andalso DestReaders =:= 0 of + true -> + TotalValidData = DestValidData + SourceValidData, + ok = combine_files(SourceObj, DestObj, State), + %% don't update dest.right, because it could be changing + %% at the same time + true = ets:update_element( + FileSummaryEts, DestFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SourceFileSize + DestFileSize - TotalValidData; + false -> + timer:sleep(100), + gc(SourceFile, DestFile, State) + end. + +combine_files(#file_summary { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), + {ok, SourceHdl} = + open_file(Dir, SourceName, ?READ_AHEAD_MODE), + {ok, DestinationHdl} = + open_file(Dir, DestinationName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file( + DestinationHdl, DestinationValid, ExpectedSize); + true -> + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, + find_unremoved_messages_in_file(Destination, State)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file( + Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:close(TmpHdl), + ok = file:delete(form_filename(Dir, Tmp)) + end, + SourceWorkList = find_unremoved_messages_in_file(Source, State), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file_handle_cache:close(SourceHdl), + ok = file_handle_cache:close(DestinationHdl), + ok = file:delete(form_filename(Dir, SourceName)), + ok. + +find_unremoved_messages_in_file(File, + {_FileSummaryEts, Dir, Index, IndexState}) -> + %% Msgs here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + scan_file_for_valid_messages( + Dir, filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl( + fun ({MsgId, _TotalSize, _Offset}, Acc) -> + case Index:lookup(MsgId, IndexState) of + Entry = #msg_location { file = File } -> [ Entry | Acc ]; + _ -> Acc + end + end, [], Messages). + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + {FinalOffset, BlockStart1, BlockEnd1} = + lists:foldl( + fun (#msg_location { msg_id = MsgId, offset = Offset, + total_size = TotalSize }, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + %% update MsgLocation to reflect change of file and offset + ok = Index:update_fields(MsgId, + [{#msg_location.file, Destination}, + {#msg_location.offset, CurOffset}], + IndexState), + {BlockStart2, BlockEnd2} = + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work + %% for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, + BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), + {Offset, Offset + TotalSize} + end, + {CurOffset + TotalSize, BlockStart2, BlockEnd2} + end, {InitOffset, undefined, undefined}, WorkList), + case WorkList of + [] -> + ok; + _ -> + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = + file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl) + end, + ok. diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 9cf11af29f..5c8e88d64e 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -46,7 +46,7 @@ file_summary_ets }). --include("rabbit_msg_store.hrl"). +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -73,9 +73,12 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, State = #gcstate { parent = Parent }) -> - Reclaimed = adjust_meta_and_combine(Source, Destination, - State), +handle_cast({gc, Source, Destination}, State = + #gcstate { parent = Parent, dir = Dir, index_module = Index, + index_state = IndexState, + file_summary_ets = FileSummaryEts }) -> + Reclaimed = rabbit_msg_store:gc(Source, Destination, + {FileSummaryEts, Dir, Index, IndexState}), ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), {noreply, State, hibernate}. @@ -91,173 +94,3 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%---------------------------------------------------------------------------- - -adjust_meta_and_combine(SourceFile, DestFile, State = - #gcstate { file_summary_ets = FileSummaryEts }) -> - - [SourceObj = #file_summary { - readers = SourceReaders, - valid_total_size = SourceValidData, left = DestFile, - file_size = SourceFileSize, locked = true }] = - ets:lookup(FileSummaryEts, SourceFile), - [DestObj = #file_summary { - readers = DestReaders, - valid_total_size = DestValidData, right = SourceFile, - file_size = DestFileSize, locked = true }] = - ets:lookup(FileSummaryEts, DestFile), - - case SourceReaders =:= 0 andalso DestReaders =:= 0 of - true -> - TotalValidData = DestValidData + SourceValidData, - ok = combine_files(SourceObj, DestObj, State), - %% don't update dest.right, because it could be changing - %% at the same time - true = ets:update_element( - FileSummaryEts, DestFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SourceFileSize + DestFileSize - TotalValidData; - false -> - timer:sleep(100), - adjust_meta_and_combine(SourceFile, DestFile, State) - end. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, - right = Source }, - State = #gcstate { dir = Dir }) -> - SourceName = rabbit_msg_store_misc:filenum_to_name(Source), - DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination), - {ok, SourceHdl} = - rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE), - {ok, DestinationHdl} = - rabbit_msg_store_misc:open_file(Dir, DestinationName, - ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, - %% if DestinationValid =:= DestinationContiguousTop then we don't - %% need a tmp file - %% if they're not equal, then we need to write out everything past - %% the DestinationContiguousTop to a tmp file then truncate, - %% copy back in, and then copy over from Source - %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> - ok = rabbit_msg_store_misc:truncate_and_extend_file( - DestinationHdl, DestinationValid, ExpectedSize); - true -> - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect - %% that the list should be naturally sorted - %% as we require, however, we need to - %% enforce it anyway - end, - find_unremoved_messages_in_file(Destination, State)), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = rabbit_msg_store_misc:open_file( - Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = rabbit_msg_store_misc:truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:close(TmpHdl), - ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp)) - end, - SourceWorkList = find_unremoved_messages_in_file(Source, State), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State), - %% tidy up - ok = file_handle_cache:close(SourceHdl), - ok = file_handle_cache:close(DestinationHdl), - ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)), - ok. - -find_unremoved_messages_in_file(File, #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index }) -> - %% Msgs here will be end-of-file at start-of-list - {ok, Messages, _FileSize} = - rabbit_msg_store_misc:scan_file_for_valid_messages( - Dir, rabbit_msg_store_misc:filenum_to_name(File)), - %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl( - fun ({MsgId, _TotalSize, _Offset}, Acc) -> - case Index:lookup(MsgId, IndexState) of - Entry = #msg_location { file = File } -> [ Entry | Acc ]; - _ -> Acc - end - end, [], Messages). - -copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, #gcstate { index_module = Index, - index_state = IndexState }) -> - {FinalOffset, BlockStart1, BlockEnd1} = - lists:foldl( - fun (#msg_location { msg_id = MsgId, offset = Offset, - total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - %% update MsgLocation to reflect change of file and offset - ok = Index:update_fields(MsgId, - [{#msg_location.file, Destination}, - {#msg_location.offset, CurOffset}], - IndexState), - {BlockStart2, BlockEnd2} = - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the - %% next msg follows straight on - {BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work - %% for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, - BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {Offset, Offset + TotalSize} - end, - {CurOffset + TotalSize, BlockStart2, BlockEnd2} - end, {InitOffset, undefined, undefined}, WorkList), - case WorkList of - [] -> - ok; - _ -> - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = - file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = - file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), - ok = file_handle_cache:sync(DestinationHdl) - end, - ok. diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl deleted file mode 100644 index 3cece7da13..0000000000 --- a/src/rabbit_msg_store_misc.erl +++ /dev/null @@ -1,74 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_msg_store_misc). - --export([open_file/3, preallocate/3, truncate_and_extend_file/3, - form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]). - --include("rabbit_msg_store.hrl"). - - -%%---------------------------------------------------------------------------- - -open_file(Dir, FileName, Mode) -> - file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). - -%%---------------------------------------------------------------------------- - -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), - ok = file_handle_cache:truncate(Hdl), - {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), - ok. - -truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), - ok = file_handle_cache:truncate(FileHdl), - ok = preallocate(FileHdl, Highpoint, Lowpoint). - -form_filename(Dir, Name) -> filename:join(Dir, Name). - -filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. - -scan_file_for_valid_messages(Dir, FileName) -> - case open_file(Dir, FileName, ?READ_MODE) of - {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), - %% if something really bad's happened, the close could fail, - %% but ignore - file_handle_cache:close(Hdl), - Valid; - {error, enoent} -> {ok, [], 0}; - {error, Reason} -> throw({error, - {unable_to_scan_file, FileName, Reason}}) - end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b37845d47d..f5f49cf4f4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -186,7 +186,6 @@ path :: file_path(), num :: non_neg_integer() })). --type(msg_id() :: binary()). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file_path(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 297c3ef401..9bb031f3a8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -163,8 +163,6 @@ transient_threshold }). --include("rabbit.hrl"). - -record(msg_status, { msg, msg_id, @@ -190,11 +188,12 @@ %% more. -define(RAM_INDEX_BATCH_SIZE, 64). +-include("rabbit.hrl"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(msg_id() :: binary()). -type(bpqueue() :: any()). -type(seq_id() :: non_neg_integer()). -type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()} |
