summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 12:43:50 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 12:43:50 +0100
commit659194e683096c9ae0e3758f6dd6b95cd72b2662 (patch)
tree438c7884787e1a772e6365d594295bc33ffd3618
parent0056ad665a93ab048f6dce0c065ac7664d57f551 (diff)
downloadrabbitmq-server-git-659194e683096c9ae0e3758f6dd6b95cd72b2662.tar.gz
more reformatting and refactoring.
-rw-r--r--src/rabbit_disk_queue.erl558
-rw-r--r--src/rabbit_tests.erl79
2 files changed, 328 insertions, 309 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 190c06f00b..2952ca89ec 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -107,32 +107,32 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
ok = filelib:ensure_dir(form_filename("nothing")),
InitName = "0" ++ ?FILE_EXTENSION,
- {ok, MsgLocation}
- = dets:open_file(?MSG_LOC_DETS_NAME,
- [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME)
- ++ ?FILE_EXTENSION_DETS)},
- {min_no_slots, 1024*1024},
- %% man says this should be <= 32M. But it works...
- {max_no_slots, 1024*1024*1024},
- {type, set}
- ]),
- State
- = #dqstate { msg_location = MsgLocation,
- file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [set, private]),
- sequences = ets:new(?SEQUENCE_ETS_NAME,
- [set, private]),
- current_file_num = 0,
- current_file_name = InitName,
- current_file_handle = undefined,
- current_offset = 0,
- file_size_limit = FileSizeLimit,
- read_file_handles = {dict:new(), gb_trees:empty()},
- read_file_handles_limit = ReadFileHandlesLimit
- },
+ {ok, MsgLocation} =
+ dets:open_file(?MSG_LOC_DETS_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++
+ ?FILE_EXTENSION_DETS)},
+ {min_no_slots, 1024*1024},
+ %% man says this should be <= 32M. But it works...
+ {max_no_slots, 1024*1024*1024},
+ {type, set}
+ ]),
+ State =
+ #dqstate { msg_location = MsgLocation,
+ file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
+ [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME,
+ [set, private]),
+ current_file_num = 0,
+ current_file_name = InitName,
+ current_file_handle = undefined,
+ current_offset = 0,
+ file_size_limit = FileSizeLimit,
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ read_file_handles_limit = ReadFileHandlesLimit
+ },
{ok, State1 = #dqstate { current_file_name = CurrentName,
- current_offset = Offset } }
- = load_from_disk(State),
+ current_offset = Offset } } =
+ load_from_disk(State),
Path = form_filename(CurrentName),
%% read is only needed so that we can seek
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
@@ -149,8 +149,8 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(clean_stop, _From, State) ->
State1 = #dqstate { file_summary = FileSummary,
- sequences = Sequences }
- = shutdown(State), %% tidy up file handles early
+ sequences = Sequences } =
+ shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
true = ets:delete(FileSummary),
true = ets:delete(Sequences),
@@ -185,8 +185,8 @@ shutdown(State = #dqstate { msg_location = MsgLocation,
}) ->
%% deliberately ignoring return codes here
dets:close(MsgLocation),
- file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME)
- ++ ?FILE_EXTENSION_DETS)),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++
+ ?FILE_EXTENSION_DETS)),
if FileHdl =:= undefined -> ok;
true -> file:sync(FileHdl),
file:close(FileHdl)
@@ -210,53 +210,51 @@ base_directory() ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, State
- = #dqstate { msg_location = MsgLocation,
- sequences = Sequences,
- read_file_handles_limit = ReadFileHandlesLimit,
- read_file_handles = {ReadHdls, ReadHdlsAge}
- }) ->
+internal_deliver(Q, State =
+ #dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ read_file_handles_limit = ReadFileHandlesLimit,
+ read_file_handles = {ReadHdls, ReadHdlsAge}
+ }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId}] ->
case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
[] -> {ok, empty, State};
- [Obj
- = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
- [{MsgId, _RefCount, File, Offset, TotalSize}]
- = dets:lookup(MsgLocation, MsgId),
- {FileHdl, ReadHdls1, ReadHdlsAge1}
- = case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File),
- [read, raw, binary,
- read_ahead]),
- Now = now(),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl,
- dict:store(File, {Hdl, Now}, ReadHdls),
- gb_trees:enter(Now, File, ReadHdlsAge)};
- _False ->
- {_Then, OldFile, ReadHdlsAge2}
- = gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, _Then}}
- = dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- ReadHdls2 = dict:erase(OldFile, ReadHdls),
- {Hdl,
- dict:store(File, {Hdl, Now}, ReadHdls2),
- gb_trees:enter(Now, File, ReadHdlsAge2)}
- end;
- {ok, {Hdl, Then}} ->
- Now = now(),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
- gb_trees:enter(Now, File,
- gb_trees:delete(Then, ReadHdlsAge))}
- end,
+ [Obj =
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
+ [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ dets:lookup(MsgLocation, MsgId),
+ Now = now(),
+ {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ {ReadHdls2, ReadHdlsAge2} =
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {ReadHdls, ReadHdlsAge};
+ _False ->
+ {_Then, OldFile, ReadHdlsAge3} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, _Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {dict:erase(OldFile, ReadHdls),
+ ReadHdlsAge3}
+ end,
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2),
+ gb_trees:enter(Now, File, ReadHdlsAge2)};
+ {ok, {Hdl, Then}} ->
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
+ gb_trees:enter(Now, File,
+ gb_trees:delete(Then, ReadHdlsAge))}
+ end,
%% read the message
- {ok, {MsgBody, BodySize}}
- = read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
if Delivered -> ok;
true -> ok = mnesia:dirty_write(rabbit_disk_queue,
Obj #dq_msg_loc {is_delivered = true})
@@ -278,35 +276,35 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- Files
- = lists:foldl(
- fun ({MsgId, SeqId}, Files2) ->
- [{MsgId, RefCount, File, Offset, TotalSize}]
- = dets:lookup(MsgLocation, MsgId),
- Files3
- = if 1 =:= RefCount ->
- ok = dets:delete(MsgLocation, MsgId),
- [{File, ValidTotalSize, ContiguousTop, Left, Right}]
- = ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary,
- {File, (ValidTotalSize - TotalSize
- - ?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1, Left, Right}),
- if CurName =:= File -> Files2;
- true -> sets:add_element(File, Files2)
- end;
- 1 < RefCount ->
- ok = dets:insert(MsgLocation, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
- Files2
- end,
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
- true -> ok
- end,
- Files3
- end, sets:new(), MsgSeqIds),
+ Files =
+ lists:foldl(
+ fun ({MsgId, SeqId}, Files2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}] =
+ dets:lookup(MsgLocation, MsgId),
+ Files3 =
+ if 1 =:= RefCount ->
+ ok = dets:delete(MsgLocation, MsgId),
+ [{File, ValidTotalSize, ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ true = ets:insert(FileSummary,
+ {File, (ValidTotalSize - TotalSize
+ - ?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
+ if CurName =:= File -> Files2;
+ true -> sets:add_element(File, Files2)
+ end;
+ 1 < RefCount ->
+ ok = dets:insert(MsgLocation, {MsgId, RefCount - 1,
+ File, Offset, TotalSize}),
+ Files2
+ end,
+ if MnesiaDelete ->
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
+ true -> ok
+ end,
+ Files3
+ end, sets:new(), MsgSeqIds),
State2 = compact(Files, State),
{ok, State2}.
@@ -323,8 +321,8 @@ internal_tx_publish(MsgId, MsgBody,
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
true = dets:insert_new(MsgLocation, {MsgId, 1, CurName,
CurOffset, TotalSize}),
- [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}]
- = ets:lookup(FileSummary, CurName),
+ [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
+ ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
?FILE_PACKING_ADJUSTMENT,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
@@ -349,28 +347,28 @@ internal_tx_commit(Q, MsgIds,
current_file_handle = CurHdl,
current_file_name = CurName,
sequences = Sequences
- }) ->
- {ReadSeqId, InitWriteSeqId}
- = case ets:lookup(Sequences, Q) of
- [] -> {0,0};
- [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
- end,
- {atomic, {Sync, WriteSeqId}}
- = mnesia:transaction(
- fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun (MsgId, {Acc, NextWriteSeqId}) ->
- [{MsgId, _RefCount, File, _Offset, _TotalSize}]
- = dets:lookup(MsgLocation, MsgId),
- ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id
- = {Q, NextWriteSeqId},
- msg_id = MsgId,
- is_delivered = false},
- write),
- {Acc or (CurName =:= File), NextWriteSeqId + 1}
- end, {false, InitWriteSeqId}, MsgIds)
- end),
+ }) ->
+ {ReadSeqId, InitWriteSeqId} =
+ case ets:lookup(Sequences, Q) of
+ [] -> {0,0};
+ [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ end,
+ {atomic, {Sync, WriteSeqId}} =
+ mnesia:transaction(
+ fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(
+ fun (MsgId, {Acc, NextWriteSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
+ dets:lookup(MsgLocation, MsgId),
+ ok = mnesia:write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id =
+ {Q, NextWriteSeqId},
+ msg_id = MsgId,
+ is_delivered = false},
+ write),
+ {Acc or (CurName =:= File), NextWriteSeqId + 1}
+ end, {false, InitWriteSeqId}, MsgIds)
+ end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
if Sync -> ok = file:sync(CurHdl);
true -> ok
@@ -378,8 +376,8 @@ internal_tx_commit(Q, MsgIds,
{ok, State}.
internal_publish(Q, MsgId, MsgBody, State) ->
- {ok, State1 = #dqstate { sequences = Sequences }}
- = internal_tx_publish(MsgId, MsgBody, State),
+ {ok, State1 = #dqstate { sequences = Sequences }} =
+ internal_tx_publish(MsgId, MsgBody, State),
WriteSeqId = case ets:lookup(Sequences, Q) of
[] -> %% previously unseen queue
true = ets:insert_new(Sequences, {Q, 0, 1}),
@@ -447,42 +445,42 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
case ets:lookup(FileSummary, File) of
[] -> State;
[FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
- GoRight
- = fun() ->
- case Right of
- undefined -> State;
- _ when not(CurName =:= Right) ->
- [RightObj = {Right, RightValidData,
- _RightContiguousTop, File, RightRight}]
- = ets:lookup(FileSummary, Right),
- RightSumData = ValidData + RightValidData,
- if FileSizeLimit >= RightSumData ->
- %% here, Right will be the source and so will be deleted,
- %% File will be the destination
- State1 = combineFiles(RightObj, FileObj,
- State),
- %% this could fail if RightRight is undefined
- %% left is the 4th field
- ets:update_element(FileSummary,
- RightRight, {4, File}),
- true = ets:insert(FileSummary, {File,
- RightSumData,
- RightSumData,
- Left,
- RightRight}),
- true = ets:delete(FileSummary, Right),
- State1;
- true -> State
- end;
- _ -> State
- end
- end,
+ GoRight =
+ fun() ->
+ case Right of
+ undefined -> State;
+ _ when not(CurName =:= Right) ->
+ [RightObj = {Right, RightValidData,
+ _RightContiguousTop, File, RightRight}] =
+ ets:lookup(FileSummary, Right),
+ RightSumData = ValidData + RightValidData,
+ if FileSizeLimit >= RightSumData ->
+ %% here, Right will be the source and so will be deleted,
+ %% File will be the destination
+ State1 = combineFiles(RightObj, FileObj,
+ State),
+ %% this could fail if RightRight is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary,
+ RightRight, {4, File}),
+ true = ets:insert(FileSummary, {File,
+ RightSumData,
+ RightSumData,
+ Left,
+ RightRight}),
+ true = ets:delete(FileSummary, Right),
+ State1;
+ true -> State
+ end;
+ _ -> State
+ end
+ end,
case Left of
undefined ->
GoRight();
- _ -> [LeftObj
- = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}]
- = ets:lookup(FileSummary, Left),
+ _ -> [LeftObj =
+ {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] =
+ ets:lookup(FileSummary, Left),
LeftSumData = ValidData + LeftValidData,
if FileSizeLimit >= LeftSumData ->
%% here, File will be the source and so will be deleted,
@@ -502,19 +500,27 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
end
end.
+sortMsgLocationsByOffset(Asc, List) ->
+ Comp = if Asc -> fun(X, Y) -> X < Y end;
+ true -> fun(X, Y) -> X > Y end
+ end,
+ lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ Comp(OffA, OffB)
+ end, List).
+
combineFiles({Source, SourceValid, _SourceContiguousTop,
_SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop,
_DestinationLeft, _DestinationRight},
State1) ->
- (State = #dqstate { msg_location = MsgLocation })
- = closeFile(Source, closeFile(Destination, State1)),
- {ok, SourceHdl}
- = file:open(form_filename(Source),
- [read, write, raw, binary, delayed_write, read_ahead]),
- {ok, DestinationHdl}
- = file:open(form_filename(Destination),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ (State = #dqstate { msg_location = MsgLocation }) =
+ closeFile(Source, closeFile(Destination, State1)),
+ {ok, SourceHdl} =
+ file:open(form_filename(Source),
+ [read, write, raw, binary, delayed_write, read_ahead]),
+ {ok, DestinationHdl} =
+ file:open(form_filename(Destination),
+ [read, write, raw, binary, delayed_write, read_ahead]),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file
%% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file
@@ -531,56 +537,54 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
{bof, DestinationValid});
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl}
- = file:open(form_filename(Tmp),
- [read, write, raw, binary, delayed_write, read_ahead]),
- Worklist
- = lists:dropwhile(
- fun ({_, _, _, Offset, _})
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset == DestinationContiguousTop
- %% because if it was then DestinationContiguousTop would have been
- %% extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect that the list should be
- %% naturally sorted as we require, however, we need to enforce it anyway
- end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
- OffA < OffB
- end,
- dets:match_object(MsgLocation,
- {'_', '_', Destination,
- '_', '_'}))),
+ {ok, TmpHdl} =
+ file:open(form_filename(Tmp),
+ [read, write, raw, binary, delayed_write, read_ahead]),
+ Worklist =
+ lists:dropwhile(
+ fun ({_, _, _, Offset, _})
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset == DestinationContiguousTop
+ %% because if it was then DestinationContiguousTop would have been
+ %% extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect that the list should be
+ %% naturally sorted as we require, however, we need to enforce it anyway
+ end, sortMsgLocationsByOffset(true,
+ dets:match_object(MsgLocation,
+ {'_', '_',
+ Destination,
+ '_', '_'}))),
TmpSize = DestinationValid - DestinationContiguousTop,
- {TmpSize, BlockStart1, BlockEnd1}
- = lists:foldl(
- fun ({MsgId, RefCount, _Destination, Offset, TotalSize},
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the TmpFile.
- %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
- FinalOffset = DestinationContiguousTop + CurOffset,
- ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
- FinalOffset, TotalSize}),
-
- NextOffset = CurOffset + Size,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + Size};
- Offset =:= BlockEnd ->
- %% extend the current block because the next msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
- true ->
- %% found a gap, so actually do the work for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart}
- = file:position(DestinationHdl,
- {bof, BlockStart}),
- {ok, BSize} = file:copy(DestinationHdl,
- TmpHdl, BSize),
- {NextOffset, Offset, Offset + Size}
- end
- end, {0, undefined, undefined}, Worklist),
+ {TmpSize, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun ({MsgId, RefCount, _Destination, Offset, TotalSize},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the TmpFile.
+ %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
+ FinalOffset = DestinationContiguousTop + CurOffset,
+ ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
+ FinalOffset, TotalSize}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the work for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(DestinationHdl,
+ {bof, BlockStart}),
+ {ok, BSize} = file:copy(DestinationHdl,
+ TmpHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {0, undefined, undefined}, Worklist),
%% do the last remaining block
BSize1 = BlockEnd1 - BlockStart1,
{ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
@@ -589,56 +593,54 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% and MsgLocation has been updated to reflect compaction of Destination
%% so truncate Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
- {ok, DestinationContiguousTop}
- = file:position(DestinationHdl,
- {bof, DestinationContiguousTop}),
+ {ok, DestinationContiguousTop} =
+ file:position(DestinationHdl,
+ {bof, DestinationContiguousTop}),
ok = file:truncate(DestinationHdl),
- {ok, ExpectedSize}
- = file:position(DestinationHdl,
- {bof, ExpectedSize}),
+ {ok, ExpectedSize} =
+ file:position(DestinationHdl,
+ {bof, ExpectedSize}),
ok = file:truncate(DestinationHdl),
- {ok, DestinationContiguousTop}
- = file:position(DestinationHdl,
- {bof, DestinationContiguousTop}),
+ {ok, DestinationContiguousTop} =
+ file:position(DestinationHdl,
+ {bof, DestinationContiguousTop}),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be DestinationValid
ok = file:sync(DestinationHdl),
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Tmp))
end,
- SourceWorkList
- = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
- OffA < OffB
- end, dets:match_object(MsgLocation, {'_', '_', Source,
+ SourceWorkList =
+ sortMsgLocationsByOffset(true, dets:match_object(MsgLocation,
+ {'_', '_', Source,
'_', '_'})),
- {ExpectedSize, BlockStart2, BlockEnd2}
- = lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize},
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% update MsgLocation to reflect change of file and offset
- ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
- NextOffset = CurOffset + Size,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + Size};
- Offset =:= BlockEnd ->
- %% extend the current block because the next msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
- true ->
- %% found a gap, so actually do the work for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart}
- = file:position(SourceHdl,
- {bof, BlockStart}),
- {ok, BSize}
- = file:copy(SourceHdl, DestinationHdl,
- BSize),
- {NextOffset, Offset, Offset + Size}
- end
- end, {DestinationValid, undefined, undefined}, SourceWorkList),
+ {ExpectedSize, BlockStart2, BlockEnd2} =
+ lists:foldl(
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% update MsgLocation to reflect change of file and offset
+ ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the work for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(SourceHdl, {bof, BlockStart}),
+ {ok, BSize} =
+ file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {DestinationValid, undefined, undefined}, SourceWorkList),
%% do the last remaining block
BSize2 = BlockEnd2 - BlockStart2,
{ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
@@ -650,20 +652,21 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
ok = file:delete(form_filename(Source)),
State.
-closeFile(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) ->
+closeFile(File, State = #dqstate { read_file_handles =
+ {ReadHdls, ReadHdlsAge} }) ->
case dict:find(File, ReadHdls) of
error ->
State;
{ok, {Hdl, Then}} ->
ok = file:close(Hdl),
- State #dqstate { read_file_handles
- = { dict:erase(File, ReadHdls),
- gb_trees:delete(Then, ReadHdlsAge) } }
+ State #dqstate { read_file_handles =
+ { dict:erase(File, ReadHdls),
+ gb_trees:delete(Then, ReadHdlsAge) } }
end.
delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
- [{File, ValidData, _ContiguousTop, Left, Right}]
- = ets:lookup(FileSummary, File),
+ [{File, ValidData, _ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
case ValidData of
%% we should NEVER find the current file in here
%% hence right should always be a file, not undefined
@@ -691,15 +694,15 @@ load_from_disk(State) ->
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the whole lot
- (State1 = #dqstate{ msg_location = MsgLocation })
- = load_messages(undefined, Files, State),
+ (State1 = #dqstate{ msg_location = MsgLocation }) =
+ load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't loaded
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
- true = 1
- =:= length(dets:lookup(MsgLocation, MsgId))
+ true = 1 =:=
+ length(dets:lookup(MsgLocation, MsgId))
end,
true, rabbit_disk_queue)
end),
@@ -738,11 +741,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
Num = list_to_integer(filename:rootname(Left)),
Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of
[] -> 0;
- L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_]
- = lists:sort(fun ({_, _, _, OffA, _},
- {_, _, _, OffB, _}) ->
- OffB < OffA
- end, L),
+ L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
+ sortMsgLocationsByOffset(false, L),
MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State # dqstate { current_file_num = Num, current_file_name = Left,
@@ -791,8 +791,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
- {ok, UncorruptedMessagesTmp}
- = scan_file_for_valid_messages(form_filename(TmpFile)),
+ {ok, UncorruptedMessagesTmp} =
+ scan_file_for_valid_messages(form_filename(TmpFile)),
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
@@ -802,8 +802,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
queue_and_seq_id = '_',
is_delivered = '_'}))
end, MsgIdsTmp),
- {ok, UncorruptedMessages}
- = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ {ok, UncorruptedMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the main file
%% such that the main file is (prefix ++ tmpfile). This means that compaction
@@ -865,8 +865,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
ok = file:close(TmpHdl),
ok = file:delete(TmpFile),
- {ok, MainMessages}
- = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ {ok, MainMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIdsMain = lists:map(GrabMsgId, MainMessages),
%% check that everything in MsgIds is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
@@ -981,8 +981,8 @@ read_next_file_entry(FileHdl, Offset) ->
case file:position(FileHdl,
{cur, TotalSize - MsgIdBinSize}) of
{ok, ExpectedAbsPos} ->
- NextOffset = Offset + TotalSize
- + ?FILE_PACKING_ADJUSTMENT,
+ NextOffset = Offset + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
case file:read(FileHdl, 1) of
{ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
{ok, {ok, binary_to_term(MsgId),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a04c6f1bfb..cce9da1a68 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -685,7 +685,8 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
% unicode chars are supported properly from r13 onwards
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
- [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds
+ [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
+ timer:sleep(1000) end || % 1000 milliseconds
MsgSize <- [512, 8192, 32768, 131072],
Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)],
MsgCount <- [1024, 4096, 16384]
@@ -700,20 +701,29 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
- {Publish, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end,
- fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
- ]]),
- {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List],
- rabbit_disk_queue:ack(Q, SeqIds),
- ok = rabbit_disk_queue:tx_commit(Q, [])
- end || Q <- Qs]
- end]]),
+ {Publish, ok} =
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
+ || N <- List, _ <- Qs] end,
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List)
+ || Q <- Qs] end
+ ]]),
+ {Deliver, ok} =
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [begin SeqIds =
+ [begin {N, Msg, MsgSizeBytes, false, SeqId} =
+ rabbit_disk_queue:deliver(Q), SeqId end
+ || N <- List],
+ rabbit_disk_queue:ack(Q, SeqIds),
+ ok = rabbit_disk_queue:tx_commit(Q, [])
+ end || Q <- Qs]
+ end]]),
io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
[MsgCount, MsgSizeBytes, QCount, float(Startup),
- float(Publish), (Publish / (MsgCount * QCount)), (Publish / (MsgCount * QCount * MsgSizeBytes)),
- float(Deliver), (Deliver / (MsgCount * QCount)), (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
+ float(Publish), (Publish / (MsgCount * QCount)),
+ (Publish / (MsgCount * QCount * MsgSizeBytes)),
+ float(Deliver), (Deliver / (MsgCount * QCount)),
+ (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
rdq_stop().
% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have
@@ -728,22 +738,30 @@ rdq_stress_gc(MsgCount) ->
rabbit_disk_queue:tx_commit(q, List),
StartChunk = round(MsgCount / 20), % 5%
AckList =
- lists:reverse(lists:foldl(fun (E, Acc) -> case Acc of
- [] -> [E];
- [F|_Fs] ->
- case E rem F of
- 0 -> Acc;
- _ -> [E|Acc]
- end
- end
- end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
- ++ lists:seq(1, (StartChunk - 1)),
- MsgIdToSeqDict
- = lists:foldl(fun (_, Acc) ->
- {MsgId, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
- dict:store(MsgId, SeqId, Acc)
- end, dict:new(), List),
- rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), SeqId end || MsgId <- AckList]),
+ lists:reverse(
+ lists:foldl(
+ fun (E, Acc) ->
+ case Acc of
+ [] -> [E];
+ [F|_Fs] ->
+ case E rem F of
+ 0 -> Acc;
+ _ -> [E|Acc]
+ end
+ end
+ end, [], lists:flatten([lists:seq(N,MsgCount,N)
+ || N <- lists:seq(StartChunk,MsgCount)]))) ++
+ lists:seq(1, (StartChunk - 1)),
+ MsgIdToSeqDict =
+ lists:foldl(
+ fun (_, Acc) ->
+ {MsgId, Msg, MsgSizeBytes, false, SeqId} =
+ rabbit_disk_queue:deliver(q),
+ dict:store(MsgId, SeqId, Acc)
+ end, dict:new(), List),
+ rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
+ SeqId end
+ || MsgId <- AckList]),
rabbit_disk_queue:tx_commit(q, []),
rdq_stop(),
passed.
@@ -752,7 +770,8 @@ rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).
rdq_virgin() ->
- {Micros, {ok, _}} = timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]),
+ {Micros, {ok, _}} =
+ timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]),
ok = rabbit_disk_queue:clean_stop(),
Micros.