diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 112 |
1 files changed, 66 insertions, 46 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 134762d7fb..961d1fe639 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -113,6 +113,9 @@ -record(message_store_entry, {msg_id, ref_count, file, offset, total_size, is_persistent}). +-record(file_summary_entry, + {file, valid_total_size, contiguous_top, left, right}). + %% The components: %% %% MsgLocation: this is a (d)ets table which contains: @@ -430,8 +433,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, operation_mode = Mode, - file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private]), + file_summary = ets:new( + ?FILE_SUMMARY_ETS_NAME, + [set, private, {keypos, 2}]), sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), current_file_num = 0, @@ -693,7 +697,6 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, rabbit_log:info("Converting disk queue to ram disk mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), - ok = mnesia:dirty_delete(rabbit_disk_queue, ?BPR_KEY), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), garbage_collect(), @@ -1024,12 +1027,14 @@ remove_message(MsgId, Files, 1 -> ok = dets_ets_delete(State, MsgId), ok = remove_cache_entry(MsgId, State), - [{File, ValidTotalSize, ContiguousTop, Left, Right}] = + [FSEntry = #file_summary_entry { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop }] = ets:lookup(FileSummary, File), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = - ets:insert(FileSummary, {File, ValidTotalSize - TotalSize, - ContiguousTop1, Left, Right}), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:insert(FileSummary, FSEntry #file_summary_entry { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), if CurName =:= File -> Files; true -> sets:add_element(File, Files) end; @@ -1058,7 +1063,9 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, { msg_id = MsgId, ref_count = 1, file = CurName, offset = CurOffset, total_size = TotalSize, is_persistent = IsPersistent }), - [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = + [FSEntry = #file_summary_entry { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + right = undefined }] = ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> @@ -1066,8 +1073,9 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, ValidTotalSize1; true -> ContiguousTop end, - true = ets:insert(FileSummary, {CurName, ValidTotalSize1, - ContiguousTop1, Left, undefined}), + true = ets:insert(FileSummary, FSEntry #file_summary_entry { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), NextOffset = CurOffset + TotalSize, maybe_roll_to_new_file( NextOffset, State #dqstate {current_offset = NextOffset, @@ -1282,8 +1290,12 @@ maybe_roll_to_new_file(Offset, NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = open_file(NextName, ?WRITE_MODE), - true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right - true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), + true = ets:update_element(FileSummary, CurName, + {#file_summary_entry.right, NextName}), + true = ets:insert_new( + FileSummary, #file_summary_entry { + file = NextName, valid_total_size = 0, contiguous_top = 0, + left = CurName, right = undefined }), State2 = State1 #dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, @@ -1316,16 +1328,15 @@ combine_file(File, State = #dqstate { file_summary = FileSummary, %% been deleted within the current GC run case ets:lookup(FileSummary, File) of [] -> State; - [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] -> + [FSEntry = #file_summary_entry { left = Left, right = Right }] -> GoRight = fun() -> case Right of undefined -> State; _ when not (CurName == Right) -> - [RightObj] = ets:lookup(FileSummary, Right), - {_, State1} = - adjust_meta_and_combine(FileObj, RightObj, - State), + [FSRight] = ets:lookup(FileSummary, Right), + {_, State1} = adjust_meta_and_combine( + FSEntry, FSRight, State), State1; _ -> State end @@ -1333,8 +1344,8 @@ combine_file(File, State = #dqstate { file_summary = FileSummary, case Left of undefined -> GoRight(); - _ -> [LeftObj] = ets:lookup(FileSummary, Left), - case adjust_meta_and_combine(LeftObj, FileObj, State) of + _ -> [FSLeft] = ets:lookup(FileSummary, Left), + case adjust_meta_and_combine(FSLeft, FSEntry, State) of {true, State1} -> State1; {false, State} -> GoRight() end @@ -1342,21 +1353,23 @@ combine_file(File, State = #dqstate { file_summary = FileSummary, end. adjust_meta_and_combine( - LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile}, - RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight}, + LeftObj = #file_summary_entry { + file = LeftFile, valid_total_size = LeftValidData, right = RightFile }, + RightObj = #file_summary_entry { + file = RightFile, valid_total_size = RightValidData, left = LeftFile, + right = RightRight }, State = #dqstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary - }) -> + file_summary = FileSummary }) -> TotalValidData = LeftValidData + RightValidData, if FileSizeLimit >= TotalValidData -> State1 = combine_files(RightObj, LeftObj, State), %% this could fail if RightRight is undefined - %% left is the 4th field - ets:update_element(FileSummary, RightRight, {4, LeftFile}), - true = ets:insert(FileSummary, {LeftFile, - TotalValidData, TotalValidData, - LeftLeft, - RightRight}), + ets:update_element(FileSummary, RightRight, + {#file_summary_entry.left, LeftFile}), + true = ets:insert(FileSummary, LeftObj #file_summary_entry { + valid_total_size = TotalValidData, + contiguous_top = TotalValidData, + right = RightRight }), true = ets:delete(FileSummary, RightFile), {true, State1}; true -> {false, State} @@ -1383,11 +1396,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> ok = file:truncate(FileHdl), ok = preallocate(FileHdl, Highpoint, Lowpoint). -combine_files({Source, SourceValid, _SourceContiguousTop, - _SourceLeft, _SourceRight}, - {Destination, DestinationValid, DestinationContiguousTop, - _DestinationLeft, _DestinationRight}, - State) -> +combine_files(#file_summary_entry { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary_entry { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State) -> State1 = close_file(Source, close_file(Destination, State)), {ok, SourceHdl} = open_file(Source, ?READ_MODE), {ok, DestinationHdl} = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE), @@ -1494,23 +1510,25 @@ close_file(File, State = #dqstate { read_file_handle_cache = HC }) -> State #dqstate { read_file_handle_cache = HC1 }. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> - [{File, ValidData, _ContiguousTop, Left, Right}] = + [#file_summary_entry { valid_total_size = ValidData, + left = Left, right = Right }] = ets:lookup(FileSummary, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined 0 -> case {Left, Right} of - {undefined, _} when not (is_atom(Right)) -> - %% the eldest file is empty. YAY! - %% left is the 4th field - true = - ets:update_element(FileSummary, Right, {4, undefined}); + {undefined, _} when not is_atom(Right) -> + %% the eldest file is empty. + true = ets:update_element( + FileSummary, Right, + {#file_summary_entry.left, undefined}); {_, _} when not (is_atom(Right)) -> - %% left is the 4th field - true = ets:update_element(FileSummary, Right, {4, Left}), - %% right is the 5th field - true = ets:update_element(FileSummary, Left, {5, Right}) + true = ets:update_element(FileSummary, Right, + {#file_summary_entry.left, Left}), + true = + ets:update_element(FileSummary, Left, + {#file_summary_entry.right, Right}) end, true = ets:delete(FileSummary, File), ok = file:delete(form_filename(File)), @@ -1786,8 +1804,10 @@ load_messages(Left, [File|Files], [] -> undefined; [F|_] -> F end, - true = ets:insert_new(FileSummary, - {File, ValidTotalSize, ContiguousTop, Left, Right}), + true = ets:insert_new(FileSummary, #file_summary_entry { + file = File, valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, left = Left, + right = Right }), load_messages(File, Files, State). recover_crashed_compactions(Files, TmpFiles) -> |
