summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl118
1 files changed, 54 insertions, 64 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0313ebd715..5ab684e602 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -49,6 +49,7 @@
-define(INTEGER_SIZE_BITS, 8 * ?INTEGER_SIZE_BYTES).
-define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location).
-define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))).
@@ -97,12 +98,13 @@ tx_cancel(MsgIds) when is_list(MsgIds) ->
init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
InitName = "0" ++ (?FILE_EXTENSION),
+ FileSummary = ets:new((?FILE_SUMMARY_ETS_NAME), [set, private]),
+ true = ets:insert(FileSummary, {InitName, #dqfile { valid_data = 0,
+ contiguous_prefix = 0,
+ left = undefined,
+ right = undefined}}),
State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]),
- file_summary = dict:store(InitName, (#dqfile { valid_data = 0,
- contiguous_prefix = 0,
- left = undefined,
- right = undefined}),
- dict:new()),
+ file_summary = FileSummary,
file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [ordered_set, private]),
current_file_num = 0,
current_file_name = InitName,
@@ -145,8 +147,8 @@ handle_info(_Info, State) ->
terminate(_Reason, #dqstate { current_file_handle = FileHdl,
read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
- ok = file:sync(FileHdl),
- ok = file:close(FileHdl),
+ file:sync(FileHdl),
+ file:close(FileHdl),
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
end, ok, ReadHdls).
@@ -203,29 +205,28 @@ internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
file_summary = FileSummary,
file_detail = FileDetail
}) ->
- {Files, FileSummary1}
- = lists:foldl(fun (MsgId, {Files2, FileSummary2}) ->
+ Files
+ = lists:foldl(fun (MsgId, Files2) ->
[{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
% is this the last time we need the message, in which case tidy up
if 1 =:= RefCount ->
true = ets:delete(MsgLocation, MsgId),
- {ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}
- = dict:find(File, FileSummary2),
+ [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop }}]
+ = ets:lookup(FileSummary, File),
true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary3
- = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1
- }, FileSummary2),
+ true = ets:insert(FileSummary,
+ {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1}}),
ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}),
- {sets:add_element(File, Files2), FileSummary3};
+ sets:add_element(File, Files2);
1 < RefCount ->
- ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- {Files2, FileSummary2}
+ true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ Files2
end
- end, {sets:new(), FileSummary}, MsgIds),
- State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }),
+ end, sets:new(), MsgIds),
+ State2 = compact(Files, State),
{ok, State2}.
internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
@@ -240,23 +241,20 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
% New message, lots to do
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
- {ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- right = undefined }}
- = dict:find(CurName, FileSummary),
+ [{CurName, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop,
+ right = undefined }}]
+ = ets:lookup(FileSummary, CurName),
true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}),
ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT),
ContiguousTop1 = if Offset =:= ContiguousTop ->
ValidTotalSize; % can't be any holes in this file
true -> ContiguousTop
end,
- FileSummary1 = dict:store(CurName, FileSum #dqfile { valid_data = ValidTotalSize1,
- contiguous_prefix = ContiguousTop1 },
- FileSummary),
+ true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1,
+ contiguous_prefix = ContiguousTop1 }}),
maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT),
- State # dqstate { file_summary = FileSummary1,
- current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)
- });
+ State # dqstate {current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
% We already know about it, just update counter
true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
@@ -287,33 +285,30 @@ internal_publish(Q, MsgId, MsgBody, State) ->
ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}),
{ok, State1}.
-
internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
file_summary = FileSummary,
file_detail = FileDetail
}) ->
- {Files, FileSummary1} =
- lists:foldl(fun (MsgId, {Files2, FileSummary2}) ->
+ Files =
+ lists:foldl(fun (MsgId, Files2) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= ets:lookup(MsgLocation, MsgId),
if 1 =:= RefCount ->
true = ets:delete(MsgLocation, MsgId),
- {ok, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}
- = dict:find(File, FileSummary2),
+ [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop }}]
+ = ets:lookup(FileSummary, File),
true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary3
- = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1
- }, FileSummary2),
- {sets:add_element(File, Files2), FileSummary3};
+ true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1}}),
+ sets:add_element(File, Files2);
1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- {Files2, FileSummary2}
+ Files2
end
- end, {sets:new(), FileSummary}, MsgIds),
- State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }),
+ end, sets:new(), MsgIds),
+ State2 = compact(Files, State),
{ok, State2}.
%% ---- ROLLING OVER THE APPEND FILE ----
@@ -330,19 +325,17 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ (?FILE_EXTENSION),
{ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary]),
- {ok, FileSum = #dqfile {right = undefined}} = dict:find(CurName, FileSummary),
- FileSummary1 = dict:store(CurName, FileSum #dqfile {right = NextName}, FileSummary),
+ [{CurName, FileSum = #dqfile {right = undefined}}] = ets:lookup(FileSummary, CurName),
+ true = ets:insert(FileSummary, {CurName, FileSum #dqfile {right = NextName}}),
+ true = ets:insert_new(FileSummary, {NextName, #dqfile { valid_data = 0,
+ contiguous_prefix = 0,
+ left = CurName,
+ right = undefined }}),
{ok, State # dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
- current_offset = 0,
- file_summary = dict:store(NextName, #dqfile { valid_data = 0,
- contiguous_prefix = 0,
- left = CurName,
- right = undefined },
- FileSummary1)
- }
- };
+ current_offset = 0
+ }}
maybe_roll_to_new_file(_, State) ->
{ok, State}.
@@ -401,15 +394,12 @@ load_messages(Left, [File|Files],
[] -> undefined;
[F|_] -> F
end,
- State1 = State # dqstate { file_summary =
- dict:store(File, #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop,
- left = Left,
- right = Right
- },
- FileSummary)
- },
- load_messages(File, Files, State1).
+ true = ets:insert_new(FileSummary, {File, #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop,
+ left = Left,
+ right = Right
+ }}),
+ load_messages(File, Files, State).
%% ---- DISK RECOVERY OF FAILED COMPACTION ----