diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 118 |
1 files changed, 54 insertions, 64 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0313ebd715..5ab684e602 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -49,6 +49,7 @@ -define(INTEGER_SIZE_BITS, 8 * ?INTEGER_SIZE_BYTES). -define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location). -define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))). @@ -97,12 +98,13 @@ tx_cancel(MsgIds) when is_list(MsgIds) -> init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), InitName = "0" ++ (?FILE_EXTENSION), + FileSummary = ets:new((?FILE_SUMMARY_ETS_NAME), [set, private]), + true = ets:insert(FileSummary, {InitName, #dqfile { valid_data = 0, + contiguous_prefix = 0, + left = undefined, + right = undefined}}), State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]), - file_summary = dict:store(InitName, (#dqfile { valid_data = 0, - contiguous_prefix = 0, - left = undefined, - right = undefined}), - dict:new()), + file_summary = FileSummary, file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [ordered_set, private]), current_file_num = 0, current_file_name = InitName, @@ -145,8 +147,8 @@ handle_info(_Info, State) -> terminate(_Reason, #dqstate { current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> - ok = file:sync(FileHdl), - ok = file:close(FileHdl), + file:sync(FileHdl), + file:close(FileHdl), dict:fold(fun (_File, Hdl, _Acc) -> file:close(Hdl) end, ok, ReadHdls). @@ -203,29 +205,28 @@ internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary, file_detail = FileDetail }) -> - {Files, FileSummary1} - = lists:foldl(fun (MsgId, {Files2, FileSummary2}) -> + Files + = lists:foldl(fun (MsgId, Files2) -> [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), % is this the last time we need the message, in which case tidy up if 1 =:= RefCount -> true = ets:delete(MsgLocation, MsgId), - {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }} - = dict:find(File, FileSummary2), + [{File, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop }}] + = ets:lookup(FileSummary, File), true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary3 - = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1 - }, FileSummary2), + true = ets:insert(FileSummary, + {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1}}), ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}), - {sets:add_element(File, Files2), FileSummary3}; + sets:add_element(File, Files2); 1 < RefCount -> - ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - {Files2, FileSummary2} + true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + Files2 end - end, {sets:new(), FileSummary}, MsgIds), - State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }), + end, sets:new(), MsgIds), + State2 = compact(Files, State), {ok, State2}. internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, @@ -240,23 +241,20 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio % New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), - {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - right = undefined }} - = dict:find(CurName, FileSummary), + [{CurName, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop, + right = undefined }}] + = ets:lookup(FileSummary, CurName), true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}), ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT), ContiguousTop1 = if Offset =:= ContiguousTop -> ValidTotalSize; % can't be any holes in this file true -> ContiguousTop end, - FileSummary1 = dict:store(CurName, FileSum #dqfile { valid_data = ValidTotalSize1, - contiguous_prefix = ContiguousTop1 }, - FileSummary), + true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1, + contiguous_prefix = ContiguousTop1 }}), maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), - State # dqstate { file_summary = FileSummary1, - current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT) - }); + State # dqstate {current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)}); [{MsgId, RefCount, File, Offset, TotalSize}] -> % We already know about it, just update counter true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), @@ -287,33 +285,30 @@ internal_publish(Q, MsgId, MsgBody, State) -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}), {ok, State1}. - internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary, file_detail = FileDetail }) -> - {Files, FileSummary1} = - lists:foldl(fun (MsgId, {Files2, FileSummary2}) -> + Files = + lists:foldl(fun (MsgId, Files2) -> [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), if 1 =:= RefCount -> true = ets:delete(MsgLocation, MsgId), - {ok, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }} - = dict:find(File, FileSummary2), + [{File, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop }}] + = ets:lookup(FileSummary, File), true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary3 - = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1 - }, FileSummary2), - {sets:add_element(File, Files2), FileSummary3}; + true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1}}), + sets:add_element(File, Files2); 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - {Files2, FileSummary2} + Files2 end - end, {sets:new(), FileSummary}, MsgIds), - State2 = compact(Files, State # dqstate { file_summary = FileSummary1 }), + end, sets:new(), MsgIds), + State2 = compact(Files, State), {ok, State2}. %% ---- ROLLING OVER THE APPEND FILE ---- @@ -330,19 +325,17 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ (?FILE_EXTENSION), {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary]), - {ok, FileSum = #dqfile {right = undefined}} = dict:find(CurName, FileSummary), - FileSummary1 = dict:store(CurName, FileSum #dqfile {right = NextName}, FileSummary), + [{CurName, FileSum = #dqfile {right = undefined}}] = ets:lookup(FileSummary, CurName), + true = ets:insert(FileSummary, {CurName, FileSum #dqfile {right = NextName}}), + true = ets:insert_new(FileSummary, {NextName, #dqfile { valid_data = 0, + contiguous_prefix = 0, + left = CurName, + right = undefined }}), {ok, State # dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, - current_offset = 0, - file_summary = dict:store(NextName, #dqfile { valid_data = 0, - contiguous_prefix = 0, - left = CurName, - right = undefined }, - FileSummary1) - } - }; + current_offset = 0 + }} maybe_roll_to_new_file(_, State) -> {ok, State}. @@ -401,15 +394,12 @@ load_messages(Left, [File|Files], [] -> undefined; [F|_] -> F end, - State1 = State # dqstate { file_summary = - dict:store(File, #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop, - left = Left, - right = Right - }, - FileSummary) - }, - load_messages(File, Files, State1). + true = ets:insert_new(FileSummary, {File, #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop, + left = Left, + right = Right + }}), + load_messages(File, Files, State). %% ---- DISK RECOVERY OF FAILED COMPACTION ---- |
