diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 16:12:58 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 16:12:58 +0100 |
| commit | 36e2dee055754e7aa3bd56660e312385172ef903 (patch) | |
| tree | 7882014d3467d8ab4c52c4f7e75d01d5fc4263de /src | |
| parent | 044eff474a1f3f85da2add310d99d02ab82eb3a8 (diff) | |
| download | rabbitmq-server-git-36e2dee055754e7aa3bd56660e312385172ef903.tar.gz | |
added specs. ensured mnesia is local
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 112 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 1 |
2 files changed, 78 insertions, 35 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index ba24dd9319..7ee02f99e8 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -206,6 +206,27 @@ %% alternating full files and files with only one tiny message in %% them). +%% ---- SPECS ---- + +-ifdef(use_specs). + +-type(seq_id() :: non_neg_integer()). + +-spec(start_link/2 :: (non_neg_integer(), non_neg_integer()) -> + {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). +-spec(deliver/1 :: (queue_name()) -> + {'empty' | {msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}}). +-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). +-spec(tx_commit/2 :: (queue_name(), [msg_id()]) -> 'ok'). +-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(stop_and_obliterate/0 :: () -> 'ok'). + +-endif. + %% ---- PUBLIC API ---- start_link(FileSizeLimit, ReadFileHandlesLimit) -> @@ -234,11 +255,13 @@ stop() -> gen_server:call(?SERVER, stop, infinity). stop_and_obliterate() -> - gen_server:call(?SERVER, clean_stop, infinity). + gen_server:call(?SERVER, stop_vaporise, infinity). %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> + %% gen_server does not trap by default. Without this, terminate/2 + %% won't be called process_flag(trap_exit, true), ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, @@ -282,7 +305,7 @@ handle_call({tx_commit, Q, MsgIds}, _From, State) -> {reply, ok, State1}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate -handle_call(clean_stop, _From, State) -> +handle_call(stop_vaporise, _From, State) -> State1 = #dqstate { file_summary = FileSummary, sequences = Sequences } = shutdown(State), %% tidy up file handles early @@ -573,8 +596,8 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, file_summary = FileSummary, current_file_name = CurName }) -> - %% the file we're looking at may no longer exist as it may have been deleted - %% within the current GC run + %% the file we're looking at may no longer exist as it may have + %% been deleted within the current GC run case ets:lookup(FileSummary, File) of [] -> State; [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> @@ -696,19 +719,24 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% CurOffset is in the TmpFile. %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset + %% this message is going to end up back in + %% Destination, at DestinationContiguousTop + %% + CurOffset FinalOffset = DestinationContiguousTop + CurOffset, ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, FinalOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> - %% base case, called only for the first list elem + %% base case, called only for the + %% first list elem {NextOffset, Offset, Offset + Size}; Offset =:= BlockEnd -> - %% extend the current block because the next msg follows straight on + %% extend the current block because + %% the next msg follows straight on {NextOffset, BlockStart, BlockEnd + Size}; true -> - %% found a gap, so actually do the work for the previous block + %% found a gap, so actually do the + %% work for the previous block BSize = BlockEnd - BlockStart, {ok, BlockStart} = file:position(DestinationHdl, @@ -722,9 +750,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, BSize1 = BlockEnd1 - BlockStart1, {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), - %% so now Tmp contains everything we need to salvage from Destination, - %% and MsgLocation has been updated to reflect compaction of Destination - %% so truncate Destination and copy from Tmp back to the end + %% so now Tmp contains everything we need to salvage from + %% Destination, and MsgLocation has been updated to + %% reflect compaction of Destination so truncate + %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), {ok, DestinationContiguousTop} = file:position(DestinationHdl, @@ -738,7 +767,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, file:position(DestinationHdl, {bof, DestinationContiguousTop}), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid + %% position in DestinationHdl should now be + %% DestinationValid ok = file:sync(DestinationHdl), ok = file:close(TmpHdl), ok = file:delete(form_filename(Tmp)) @@ -759,13 +789,16 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, CurOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> - %% base case, called only for the first list elem + %% base case, called only for the first list + %% elem {NextOffset, Offset, Offset + Size}; Offset =:= BlockEnd -> - %% extend the current block because the next msg follows straight on + %% extend the current block because the next + %% msg follows straight on {NextOffset, BlockStart, BlockEnd + Size}; true -> - %% found a gap, so actually do the work for the previous block + %% found a gap, so actually do the work for + %% the previous block BSize = BlockEnd - BlockStart, {ok, BlockStart} = file:position(SourceHdl, {bof, BlockStart}), @@ -801,8 +834,8 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), case ValidData of - %% we should NEVER find the current file in here - %% hence right should always be a file, not undefined + %% we should NEVER find the current file in here hence right + %% should always be a file, not undefined 0 -> case {Left, Right} of {undefined, _} when not(is_atom(Right)) -> %% the eldest file is empty. YAY! @@ -823,13 +856,16 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> %% ---- DISK RECOVERY ---- load_from_disk(State) -> - %% sorted so that smallest number is first. which also means eldest file (left-most) first + %% sorted so that smallest number is first. which also means + %% eldest file (left-most) first {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), - %% There should be no more tmp files now, so go ahead and load the whole lot + %% There should be no more tmp files now, so go ahead and load the + %% whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), - %% Finally, check there is nothing in mnesia which we haven't loaded + %% Finally, check there is nothing in mnesia which we haven't + %% loaded {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), @@ -843,7 +879,8 @@ load_from_disk(State) -> {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - %% next-seqid-to-read is the lowest seqid which has is_delivered = false + %% next-seqid-to-read is the lowest seqid which has is_delivered = + %% false {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), @@ -902,8 +939,8 @@ load_messages(Left, [File|Files], } end end, {[], 0}, Messages), - %% foldl reverses lists and find_contiguous_block_prefix needs elems in the same order - %% as from scan_file_for_valid_messages + %% foldl reverses lists and find_contiguous_block_prefix needs + %% elems in the same order as from scan_file_for_valid_messages {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)), Right = case Files of [] -> undefined; @@ -927,7 +964,8 @@ recover_crashed_compactions1(Files, TmpFile) -> {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), - %% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out + %% all of these messages should appear in the mnesia table, + %% otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> true = 0 < length(mnesia:dirty_match_object (rabbit_disk_queue, @@ -959,7 +997,8 @@ recover_crashed_compactions1(Files, TmpFile) -> %% them over again case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file is empty + %% note this also catches the case when the tmp file + %% is empty ok = file:delete(TmpFile); _False -> %% we're in case 4 above. @@ -973,7 +1012,8 @@ recover_crashed_compactions1(Files, TmpFile) -> end, MsgIds), %% The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), - %% we should have that none of the messages in the prefix are in the tmp file + %% we should have that none of the messages in the prefix + %% are in the tmp file true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, MsgIds), @@ -981,16 +1021,18 @@ recover_crashed_compactions1(Files, TmpFile) -> [write, raw, binary, delayed_write]), {ok, Top} = file:position(MainHdl, Top), ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file - %% there really could be rubbish at the end of the file - we could have failed after the - %% extending truncate. - %% Remember the head of the list will be the highest entry in the file + %% there really could be rubbish at the end of the file - + %% we could have failed after the extending truncate. + %% Remember the head of the list will be the highest entry + %% in the file [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), - ok = file:truncate(MainHdl), %% and now extend the main file as big as necessary in a single move - %% if we run out of disk space, this truncate could fail, but we still - %% aren't risking losing data + %% and now extend the main file as big as necessary in a + %% single move if we run out of disk space, this truncate + %% could fail, but we still aren't risking losing data + ok = file:truncate(MainHdl), {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary, read_ahead]), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), @@ -1010,9 +1052,9 @@ recover_crashed_compactions1(Files, TmpFile) -> end, ok. -%% this assumes that the messages are ordered such that the highest address is at -%% the head of the list. -%% this matches what scan_file_for_valid_messages produces +%% this assumes that the messages are ordered such that the highest +%% address is at the head of the list. This matches what +%% scan_file_for_valid_messages produces find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b3c4a9267e..d2b2b15c8a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -147,6 +147,7 @@ table_definitions() -> {rabbit_disk_queue, [{record_name, dq_msg_loc}, {type, set}, + {local_content, true}, {attributes, record_info(fields, dq_msg_loc)}, {disc_only_copies, [node()]}]} ]. |
