summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_msg_file.erl5
-rw-r--r--src/rabbit_msg_store.erl264
-rw-r--r--src/rabbit_msg_store_gc.erl181
-rw-r--r--src/rabbit_msg_store_misc.erl74
-rw-r--r--src/rabbit_queue_index.erl1
-rw-r--r--src/rabbit_variable_queue.erl5
9 files changed, 257 insertions, 288 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1c8cf522b9..00407824d8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -58,8 +58,7 @@
-ifdef(use_specs).
--type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
+-type(get_msg_result() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
@@ -101,7 +100,7 @@
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), msg()} | 'empty').
+ {'ok', non_neg_integer(), get_msg_result()} | 'empty').
-spec(basic_consume/8 ::
(amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4c42b0ef84..7f43f79a6c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,8 +36,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RATES_REMEASURE_INTERVAL, 5000).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c8733ed197..a8c17efbde 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,9 +48,6 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking}).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(INFO_KEYS,
@@ -69,8 +66,7 @@
-ifdef(use_specs).
--type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
+-type(msg_to_deliver() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-spec(start_link/5 ::
(channel_number(), pid(), pid(), username(), vhost()) -> pid()).
@@ -78,7 +74,7 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), msg_to_deliver()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 831b4d793a..267cb633d8 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -46,11 +46,10 @@
%%----------------------------------------------------------------------------
+-include("rabbit.hrl").
+
-ifdef(use_specs).
--type(io_device() :: any()).
--type(msg_id() :: binary()).
--type(msg() :: any()).
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e5de24ce1c..b2db0ea51a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -38,16 +38,32 @@
successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2,
- build_index_worker/6]). %% internal
+ build_index_worker/6, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
+%%----------------------------------------------------------------------------
+
+-include("rabbit_msg_store.hrl").
+
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(BINARY_MODE, [raw, binary]).
+-define(READ_MODE, [read]).
+-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
+-define(WRITE_MODE, [write]).
+
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
+
+-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
+
%%----------------------------------------------------------------------------
-record(msstate,
@@ -84,14 +100,15 @@
cur_file_cache_ets
}).
+-record(file_summary,
+ {file, valid_total_size, contiguous_top, left, right, file_size,
+ locked, readers}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(server() :: pid() | atom()).
--type(msg_id() :: binary()).
--type(msg() :: any()).
--type(file_path() :: any()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
index_state :: any(),
@@ -122,12 +139,13 @@
-spec(clean/2 :: (atom(), file_path()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
+-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
+ {tid(), file_path(), atom(), any()}) -> non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
--include("rabbit_msg_store.hrl").
-
%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
%% It is not recommended to set this to < 0.5
-define(GARBAGE_FRACTION, 0.5).
@@ -558,8 +576,8 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
build_index(Recovered1, Files, State),
%% read is only needed so that we can seek
- {ok, FileHdl} = rabbit_msg_store_misc:open_file(
- Dir, rabbit_msg_store_misc:filenum_to_name(CurFile),
+ {ok, FileHdl} = open_file(
+ Dir, filenum_to_name(CurFile),
[read | ?WRITE_MODE]),
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
@@ -956,6 +974,10 @@ run_pending({contains, MsgId, From}, State) ->
run_pending({remove, MsgId}, State) ->
remove_message(MsgId, State).
+open_file(Dir, FileName, Mode) ->
+ file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
@@ -1001,8 +1023,8 @@ get_read_handle(FileNum, FHC, Dir) ->
{ok, Hdl} ->
{Hdl, FHC};
error ->
- {ok, Hdl} = rabbit_msg_store_misc:open_file(
- Dir, rabbit_msg_store_misc:filenum_to_name(FileNum),
+ {ok, Hdl} = open_file(
+ Dir, filenum_to_name(FileNum),
?READ_MODE),
{Hdl, dict:store(FileNum, Hdl, FHC) }
end.
@@ -1036,6 +1058,35 @@ store_file_summary(Tid, Dir) ->
[{extended_info, [object_count]}]),
ets:delete(Tid).
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
+ ok = file_handle_cache:truncate(Hdl),
+ {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
+ ok.
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
+ ok = file_handle_cache:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+form_filename(Dir, Name) -> filename:join(Dir, Name).
+
+filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
+
+scan_file_for_valid_messages(Dir, FileName) ->
+ case open_file(Dir, FileName, ?READ_MODE) of
+ {ok, Hdl} ->
+ Valid = rabbit_msg_file:scan(Hdl),
+ %% if something really bad's happened, the close could fail,
+ %% but ignore
+ file_handle_cache:close(Hdl),
+ Valid;
+ {error, enoent} -> {ok, [], 0};
+ {error, Reason} -> throw({error,
+ {unable_to_scan_file, FileName, Reason}})
+ end.
+
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -1180,7 +1231,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% consist only of valid messages. Plan: Truncate the main file
%% back to before any of the files in the tmp file and copy
%% them over again
- TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName),
+ TmpPath = form_filename(Dir, TmpFileName),
case is_sublist(MsgIdsTmp, MsgIds) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
@@ -1212,7 +1263,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% are in the tmp file
true = is_disjoint(MsgIds1, MsgIdsTmp),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = rabbit_msg_store_misc:open_file(
+ {ok, MainHdl} = open_file(
Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
@@ -1222,9 +1273,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% Extend the main file as big as necessary in a single
%% move. If we run out of disk space, this truncate could
%% fail, but we still aren't risking losing data
- ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ ok = truncate_and_extend_file(
MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ {ok, TmpHdl} = open_file(
Dir, TmpFileName, ?READ_AHEAD_MODE),
{ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
ok = file_handle_cache:close(MainHdl),
@@ -1248,7 +1299,7 @@ is_disjoint(SmallerL, BiggerL) ->
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
{ok, Messages, _FileSize} =
- rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName),
+ scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
%% Takes the list in *ascending* order (i.e. eldest message
@@ -1318,8 +1369,8 @@ build_index(Gatherer, Left, [File|Files], State) ->
build_index_worker(
Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) ->
{ok, Messages, FileSize} =
- rabbit_msg_store_misc:scan_file_for_valid_messages(
- Dir, rabbit_msg_store_misc:filenum_to_name(File)),
+ scan_file_for_valid_messages(
+ Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -1357,7 +1408,7 @@ build_index_worker(
ok = gatherer:finished(Gatherer, Guid).
%%----------------------------------------------------------------------------
-%% garbage collection / compaction / aggregation
+%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------
maybe_roll_to_new_file(Offset,
@@ -1370,8 +1421,8 @@ maybe_roll_to_new_file(Offset,
State1 = internal_sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
- {ok, NextHdl} = rabbit_msg_store_misc:open_file(
- Dir, rabbit_msg_store_misc:filenum_to_name(NextFile),
+ {ok, NextHdl} = open_file(
+ Dir, filenum_to_name(NextFile),
?WRITE_MODE),
true = ets:insert_new(
FileSummaryEts, #file_summary {
@@ -1478,9 +1529,178 @@ delete_file_if_empty(File, State =
true = mark_handle_to_close(FileHandlesEts, File),
true = ets:delete(FileSummaryEts, File),
State1 = close_handle(File, State),
- ok = file:delete(rabbit_msg_store_misc:form_filename(
+ ok = file:delete(form_filename(
Dir,
- rabbit_msg_store_misc:filenum_to_name(File))),
+ filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
_ -> State
end.
+
+%%----------------------------------------------------------------------------
+%% garbage collection / compaction / aggregation -- external
+%%----------------------------------------------------------------------------
+
+gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
+
+ [SourceObj = #file_summary {
+ readers = SourceReaders,
+ valid_total_size = SourceValidData, left = DestFile,
+ file_size = SourceFileSize, locked = true }] =
+ ets:lookup(FileSummaryEts, SourceFile),
+ [DestObj = #file_summary {
+ readers = DestReaders,
+ valid_total_size = DestValidData, right = SourceFile,
+ file_size = DestFileSize, locked = true }] =
+ ets:lookup(FileSummaryEts, DestFile),
+
+ case SourceReaders =:= 0 andalso DestReaders =:= 0 of
+ true ->
+ TotalValidData = DestValidData + SourceValidData,
+ ok = combine_files(SourceObj, DestObj, State),
+ %% don't update dest.right, because it could be changing
+ %% at the same time
+ true = ets:update_element(
+ FileSummaryEts, DestFile,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.contiguous_top, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+ SourceFileSize + DestFileSize - TotalValidData;
+ false ->
+ timer:sleep(100),
+ gc(SourceFile, DestFile, State)
+ end.
+
+combine_files(#file_summary { file = Source,
+ valid_total_size = SourceValid,
+ left = Destination },
+ #file_summary { file = Destination,
+ valid_total_size = DestinationValid,
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
+ State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
+ SourceName = filenum_to_name(Source),
+ DestinationName = filenum_to_name(Destination),
+ {ok, SourceHdl} =
+ open_file(Dir, SourceName, ?READ_AHEAD_MODE),
+ {ok, DestinationHdl} =
+ open_file(Dir, DestinationName,
+ ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationValid, ExpectedSize);
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(
+ Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(form_filename(Dir, Tmp))
+ end,
+ SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file_handle_cache:close(SourceHdl),
+ ok = file_handle_cache:close(DestinationHdl),
+ ok = file:delete(form_filename(Dir, SourceName)),
+ ok.
+
+find_unremoved_messages_in_file(File,
+ {_FileSummaryEts, Dir, Index, IndexState}) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ scan_file_for_valid_messages(
+ Dir, filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case Index:lookup(MsgId, IndexState) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
+ total_size = TotalSize },
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ %% update MsgLocation to reflect change of file and offset
+ ok = Index:update_fields(MsgId,
+ [{#msg_location.file, Destination},
+ {#msg_location.offset, CurOffset}],
+ IndexState),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {CurOffset + TotalSize, BlockStart2, BlockEnd2}
+ end, {InitOffset, undefined, undefined}, WorkList),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
+ ok.
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 9cf11af29f..5c8e88d64e 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -46,7 +46,7 @@
file_summary_ets
}).
--include("rabbit_msg_store.hrl").
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -73,9 +73,12 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination}, State = #gcstate { parent = Parent }) ->
- Reclaimed = adjust_meta_and_combine(Source, Destination,
- State),
+handle_cast({gc, Source, Destination}, State =
+ #gcstate { parent = Parent, dir = Dir, index_module = Index,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts }) ->
+ Reclaimed = rabbit_msg_store:gc(Source, Destination,
+ {FileSummaryEts, Dir, Index, IndexState}),
ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
{noreply, State, hibernate}.
@@ -91,173 +94,3 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%%----------------------------------------------------------------------------
-
-adjust_meta_and_combine(SourceFile, DestFile, State =
- #gcstate { file_summary_ets = FileSummaryEts }) ->
-
- [SourceObj = #file_summary {
- readers = SourceReaders,
- valid_total_size = SourceValidData, left = DestFile,
- file_size = SourceFileSize, locked = true }] =
- ets:lookup(FileSummaryEts, SourceFile),
- [DestObj = #file_summary {
- readers = DestReaders,
- valid_total_size = DestValidData, right = SourceFile,
- file_size = DestFileSize, locked = true }] =
- ets:lookup(FileSummaryEts, DestFile),
-
- case SourceReaders =:= 0 andalso DestReaders =:= 0 of
- true ->
- TotalValidData = DestValidData + SourceValidData,
- ok = combine_files(SourceObj, DestObj, State),
- %% don't update dest.right, because it could be changing
- %% at the same time
- true = ets:update_element(
- FileSummaryEts, DestFile,
- [{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.contiguous_top, TotalValidData},
- {#file_summary.file_size, TotalValidData}]),
- SourceFileSize + DestFileSize - TotalValidData;
- false ->
- timer:sleep(100),
- adjust_meta_and_combine(SourceFile, DestFile, State)
- end.
-
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
- right = Source },
- State = #gcstate { dir = Dir }) ->
- SourceName = rabbit_msg_store_misc:filenum_to_name(Source),
- DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination),
- {ok, SourceHdl} =
- rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE),
- {ok, DestinationHdl} =
- rabbit_msg_store_misc:open_file(Dir, DestinationName,
- ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
- %% if DestinationValid =:= DestinationContiguousTop then we don't
- %% need a tmp file
- %% if they're not equal, then we need to write out everything past
- %% the DestinationContiguousTop to a tmp file then truncate,
- %% copy back in, and then copy over from Source
- %% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
- ok = rabbit_msg_store_misc:truncate_and_extend_file(
- DestinationHdl, DestinationValid, ExpectedSize);
- true ->
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect
- %% that the list should be naturally sorted
- %% as we require, however, we need to
- %% enforce it anyway
- end,
- find_unremoved_messages_in_file(Destination, State)),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
- Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = rabbit_msg_store_misc:truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:close(TmpHdl),
- ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp))
- end,
- SourceWorkList = find_unremoved_messages_in_file(Source, State),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State),
- %% tidy up
- ok = file_handle_cache:close(SourceHdl),
- ok = file_handle_cache:close(DestinationHdl),
- ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)),
- ok.
-
-find_unremoved_messages_in_file(File, #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index }) ->
- %% Msgs here will be end-of-file at start-of-list
- {ok, Messages, _FileSize} =
- rabbit_msg_store_misc:scan_file_for_valid_messages(
- Dir, rabbit_msg_store_misc:filenum_to_name(File)),
- %% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(
- fun ({MsgId, _TotalSize, _Offset}, Acc) ->
- case Index:lookup(MsgId, IndexState) of
- Entry = #msg_location { file = File } -> [ Entry | Acc ];
- _ -> Acc
- end
- end, [], Messages).
-
-copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, #gcstate { index_module = Index,
- index_state = IndexState }) ->
- {FinalOffset, BlockStart1, BlockEnd1} =
- lists:foldl(
- fun (#msg_location { msg_id = MsgId, offset = Offset,
- total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- %% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(MsgId,
- [{#msg_location.file, Destination},
- {#msg_location.offset, CurOffset}],
- IndexState),
- {BlockStart2, BlockEnd2} =
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the
- %% next msg follows straight on
- {BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work
- %% for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl,
- BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {Offset, Offset + TotalSize}
- end,
- {CurOffset + TotalSize, BlockStart2, BlockEnd2}
- end, {InitOffset, undefined, undefined}, WorkList),
- case WorkList of
- [] ->
- ok;
- _ ->
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} =
- file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} =
- file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl)
- end,
- ok.
diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl
deleted file mode 100644
index 3cece7da13..0000000000
--- a/src/rabbit_msg_store_misc.erl
+++ /dev/null
@@ -1,74 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_msg_store_misc).
-
--export([open_file/3, preallocate/3, truncate_and_extend_file/3,
- form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]).
-
--include("rabbit_msg_store.hrl").
-
-
-%%----------------------------------------------------------------------------
-
-open_file(Dir, FileName, Mode) ->
- file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
-
-%%----------------------------------------------------------------------------
-
-preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
- ok = file_handle_cache:truncate(Hdl),
- {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
- ok.
-
-truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
- ok = file_handle_cache:truncate(FileHdl),
- ok = preallocate(FileHdl, Highpoint, Lowpoint).
-
-form_filename(Dir, Name) -> filename:join(Dir, Name).
-
-filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
-
-scan_file_for_valid_messages(Dir, FileName) ->
- case open_file(Dir, FileName, ?READ_MODE) of
- {ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
- %% if something really bad's happened, the close could fail,
- %% but ignore
- file_handle_cache:close(Hdl),
- Valid;
- {error, enoent} -> {ok, [], 0};
- {error, Reason} -> throw({error,
- {unable_to_scan_file, FileName, Reason}})
- end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b37845d47d..f5f49cf4f4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -186,7 +186,6 @@
path :: file_path(),
num :: non_neg_integer()
})).
--type(msg_id() :: binary()).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(qistate() :: #qistate { dir :: file_path(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 297c3ef401..9bb031f3a8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -163,8 +163,6 @@
transient_threshold
}).
--include("rabbit.hrl").
-
-record(msg_status,
{ msg,
msg_id,
@@ -190,11 +188,12 @@
%% more.
-define(RAM_INDEX_BATCH_SIZE, 64).
+-include("rabbit.hrl").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--type(msg_id() :: binary()).
-type(bpqueue() :: any()).
-type(seq_id() :: non_neg_integer()).
-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()}