summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 16:12:58 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 16:12:58 +0100
commit36e2dee055754e7aa3bd56660e312385172ef903 (patch)
tree7882014d3467d8ab4c52c4f7e75d01d5fc4263de /src
parent044eff474a1f3f85da2add310d99d02ab82eb3a8 (diff)
downloadrabbitmq-server-git-36e2dee055754e7aa3bd56660e312385172ef903.tar.gz
added specs. ensured mnesia is local
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl112
-rw-r--r--src/rabbit_mnesia.erl1
2 files changed, 78 insertions, 35 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ba24dd9319..7ee02f99e8 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -206,6 +206,27 @@
%% alternating full files and files with only one tiny message in
%% them).
+%% ---- SPECS ----
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+
+-spec(start_link/2 :: (non_neg_integer(), non_neg_integer()) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
+-spec(deliver/1 :: (queue_name()) ->
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}}).
+-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
+-spec(tx_commit/2 :: (queue_name(), [msg_id()]) -> 'ok').
+-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_obliterate/0 :: () -> 'ok').
+
+-endif.
+
%% ---- PUBLIC API ----
start_link(FileSizeLimit, ReadFileHandlesLimit) ->
@@ -234,11 +255,13 @@ stop() ->
gen_server:call(?SERVER, stop, infinity).
stop_and_obliterate() ->
- gen_server:call(?SERVER, clean_stop, infinity).
+ gen_server:call(?SERVER, stop_vaporise, infinity).
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
+ %% gen_server does not trap by default. Without this, terminate/2
+ %% won't be called
process_flag(trap_exit, true),
ok = filelib:ensure_dir(form_filename("nothing")),
InitName = "0" ++ ?FILE_EXTENSION,
@@ -282,7 +305,7 @@ handle_call({tx_commit, Q, MsgIds}, _From, State) ->
{reply, ok, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
-handle_call(clean_stop, _From, State) ->
+handle_call(stop_vaporise, _From, State) ->
State1 = #dqstate { file_summary = FileSummary,
sequences = Sequences } =
shutdown(State), %% tidy up file handles early
@@ -573,8 +596,8 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- %% the file we're looking at may no longer exist as it may have been deleted
- %% within the current GC run
+ %% the file we're looking at may no longer exist as it may have
+ %% been deleted within the current GC run
case ets:lookup(FileSummary, File) of
[] -> State;
[FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
@@ -696,19 +719,24 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% CurOffset is in the TmpFile.
%% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
+ %% this message is going to end up back in
+ %% Destination, at DestinationContiguousTop
+ %% + CurOffset
FinalOffset = DestinationContiguousTop + CurOffset,
ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
FinalOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
+ %% base case, called only for the
+ %% first list elem
{NextOffset, Offset, Offset + Size};
Offset =:= BlockEnd ->
- %% extend the current block because the next msg follows straight on
+ %% extend the current block because
+ %% the next msg follows straight on
{NextOffset, BlockStart, BlockEnd + Size};
true ->
- %% found a gap, so actually do the work for the previous block
+ %% found a gap, so actually do the
+ %% work for the previous block
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
file:position(DestinationHdl,
@@ -722,9 +750,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
BSize1 = BlockEnd1 - BlockStart1,
{ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
{ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
- %% so now Tmp contains everything we need to salvage from Destination,
- %% and MsgLocation has been updated to reflect compaction of Destination
- %% so truncate Destination and copy from Tmp back to the end
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and MsgLocation has been updated to
+ %% reflect compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
{ok, DestinationContiguousTop} =
file:position(DestinationHdl,
@@ -738,7 +767,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
file:position(DestinationHdl,
{bof, DestinationContiguousTop}),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
+ %% position in DestinationHdl should now be
+ %% DestinationValid
ok = file:sync(DestinationHdl),
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Tmp))
@@ -759,13 +789,16 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
CurOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
+ %% base case, called only for the first list
+ %% elem
{NextOffset, Offset, Offset + Size};
Offset =:= BlockEnd ->
- %% extend the current block because the next msg follows straight on
+ %% extend the current block because the next
+ %% msg follows straight on
{NextOffset, BlockStart, BlockEnd + Size};
true ->
- %% found a gap, so actually do the work for the previous block
+ %% found a gap, so actually do the work for
+ %% the previous block
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
file:position(SourceHdl, {bof, BlockStart}),
@@ -801,8 +834,8 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
[{File, ValidData, _ContiguousTop, Left, Right}] =
ets:lookup(FileSummary, File),
case ValidData of
- %% we should NEVER find the current file in here
- %% hence right should always be a file, not undefined
+ %% we should NEVER find the current file in here hence right
+ %% should always be a file, not undefined
0 -> case {Left, Right} of
{undefined, _} when not(is_atom(Right)) ->
%% the eldest file is empty. YAY!
@@ -823,13 +856,16 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
%% ---- DISK RECOVERY ----
load_from_disk(State) ->
- %% sorted so that smallest number is first. which also means eldest file (left-most) first
+ %% sorted so that smallest number is first. which also means
+ %% eldest file (left-most) first
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
- %% There should be no more tmp files now, so go ahead and load the whole lot
+ %% There should be no more tmp files now, so go ahead and load the
+ %% whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) =
load_messages(undefined, Files, State),
- %% Finally, check there is nothing in mnesia which we haven't loaded
+ %% Finally, check there is nothing in mnesia which we haven't
+ %% loaded
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
@@ -843,7 +879,8 @@ load_from_disk(State) ->
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- %% next-seqid-to-read is the lowest seqid which has is_delivered = false
+ %% next-seqid-to-read is the lowest seqid which has is_delivered =
+ %% false
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
@@ -902,8 +939,8 @@ load_messages(Left, [File|Files],
}
end
end, {[], 0}, Messages),
- %% foldl reverses lists and find_contiguous_block_prefix needs elems in the same order
- %% as from scan_file_for_valid_messages
+ %% foldl reverses lists and find_contiguous_block_prefix needs
+ %% elems in the same order as from scan_file_for_valid_messages
{ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)),
Right = case Files of
[] -> undefined;
@@ -927,7 +964,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
{ok, UncorruptedMessagesTmp} =
scan_file_for_valid_messages(form_filename(TmpFile)),
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
- %% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
+ %% all of these messages should appear in the mnesia table,
+ %% otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
true = 0 < length(mnesia:dirty_match_object
(rabbit_disk_queue,
@@ -959,7 +997,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% them over again
case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file is empty
+ %% note this also catches the case when the tmp file
+ %% is empty
ok = file:delete(TmpFile);
_False ->
%% we're in case 4 above.
@@ -973,7 +1012,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
end, MsgIds),
%% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
- %% we should have that none of the messages in the prefix are in the tmp file
+ %% we should have that none of the messages in the prefix
+ %% are in the tmp file
true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end,
MsgIds),
@@ -981,16 +1021,18 @@ recover_crashed_compactions1(Files, TmpFile) ->
[write, raw, binary, delayed_write]),
{ok, Top} = file:position(MainHdl, Top),
ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file
- %% there really could be rubbish at the end of the file - we could have failed after the
- %% extending truncate.
- %% Remember the head of the list will be the highest entry in the file
+ %% there really could be rubbish at the end of the file -
+ %% we could have failed after the extending truncate.
+ %% Remember the head of the list will be the highest entry
+ %% in the file
[{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Top + TmpSize,
{ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
- ok = file:truncate(MainHdl), %% and now extend the main file as big as necessary in a single move
- %% if we run out of disk space, this truncate could fail, but we still
- %% aren't risking losing data
+ %% and now extend the main file as big as necessary in a
+ %% single move if we run out of disk space, this truncate
+ %% could fail, but we still aren't risking losing data
+ ok = file:truncate(MainHdl),
{ok, TmpHdl} = file:open(form_filename(TmpFile),
[read, raw, binary, read_ahead]),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
@@ -1010,9 +1052,9 @@ recover_crashed_compactions1(Files, TmpFile) ->
end,
ok.
-%% this assumes that the messages are ordered such that the highest address is at
-%% the head of the list.
-%% this matches what scan_file_for_valid_messages produces
+%% this assumes that the messages are ordered such that the highest
+%% address is at the head of the list. This matches what
+%% scan_file_for_valid_messages produces
find_contiguous_block_prefix([]) -> {0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) ->
case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b3c4a9267e..d2b2b15c8a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -147,6 +147,7 @@ table_definitions() ->
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
{type, set},
+ {local_content, true},
{attributes, record_info(fields, dq_msg_loc)},
{disc_only_copies, [node()]}]}
].