diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 1432 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 123 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 126 |
3 files changed, 845 insertions, 836 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 37c91a855b..8c602b5310 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -196,7 +196,7 @@ %% +-------+ +-------+ +-------+ %% | B | | X | | B | %% +-------+ +-------+ +-------+ -%% | A | | E | | A | +%% | A | | E | | A | %% +-------+ +-------+ +-------+ %% left right left %% @@ -224,19 +224,19 @@ -type(seq_id() :: non_neg_integer()). -spec(start_link/1 :: (non_neg_integer()) -> - {'ok', pid()} | 'ignore' | {'error', any()}). + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). -spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> - {'empty' | {msg_id(), binary(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}}). + {'empty' | {msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}}). -spec(phantom_deliver/1 :: (queue_name()) -> - { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], - [{msg_id(), seq_id()}]) -> 'ok'). + [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok'). @@ -252,7 +252,7 @@ start_link(FileSizeLimit) -> gen_server:start_link({local, ?SERVER}, ?MODULE, - [FileSizeLimit, ?MAX_READ_FILE_HANDLES], []). + [FileSizeLimit, ?MAX_READ_FILE_HANDLES], []). publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). @@ -317,57 +317,57 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), Node = node(), ok = - case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of - {atomic, ok} -> ok; - {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok; - E -> E - end, + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok; + E -> E + end, ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, {ok, MsgLocationDets} = - dets:open_file(?MSG_LOC_NAME, - [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ - ?FILE_EXTENSION_DETS)}, - {min_no_slots, 1024*1024}, - %% man says this should be <= 32M. But it works... - {max_no_slots, 1024*1024*1024}, - {type, set} - ]), + dets:open_file(?MSG_LOC_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 1024*1024*1024}, + {type, set} + ]), %% it would be better to have this as private, but dets:from_ets/2 %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), State = - #dqstate { msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, - operation_mode = disk_only, - file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private]), - sequences = ets:new(?SEQUENCE_ETS_NAME, - [set, private]), - current_file_num = 0, - current_file_name = InitName, - current_file_handle = undefined, - current_offset = 0, - file_size_limit = FileSizeLimit, - read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit - }, + #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + operation_mode = disk_only, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, + [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, + [set, private]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + file_size_limit = FileSizeLimit, + read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_handles_limit = ReadFileHandlesLimit + }, {ok, State1 = #dqstate { current_file_name = CurrentName, - current_offset = Offset } } = - load_from_disk(State), + current_offset = Offset } } = + load_from_disk(State), Path = form_filename(CurrentName), Exists = case file:read_file_info(Path) of - {error,enoent} -> false; - {ok, _} -> true - end, + {error,enoent} -> false; + {ok, _} -> true + end, %% read is only needed so that we can seek {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), ok = if Exists -> ok; - true -> %% new file, so preallocate - {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}), - file:truncate(FileHdl) - end, + true -> %% new file, so preallocate + {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}), + file:truncate(FileHdl) + end, {ok, Offset} = file:position(FileHdl, {bof, Offset}), {ok, State1 #dqstate { current_file_handle = FileHdl }}. @@ -391,21 +391,21 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State) -> State1 = #dqstate { file_summary = FileSummary, - sequences = Sequences } = - shutdown(State), %% tidy up file handles early + sequences = Sequences } = + shutdown(State), %% tidy up file handles early {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), true = ets:delete(FileSummary), true = ets:delete(Sequences), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1 #dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}; + read_file_handles = {dict:new(), gb_trees:empty()}}}; %% gen_server now calls terminate, which then calls shutdown handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) -> {reply, ok, State}; handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), @@ -413,8 +413,8 @@ handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_di handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) -> {reply, ok, State}; handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), @@ -450,24 +450,24 @@ terminate(_Reason, State) -> shutdown(State). shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, - current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge} - }) -> + msg_location_ets = MsgLocationEts, + current_file_handle = FileHdl, + read_file_handles = {ReadHdls, _ReadHdlsAge} + }) -> %% deliberately ignoring return codes here dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ - ?FILE_EXTENSION_DETS)), + ?FILE_EXTENSION_DETS)), true = ets:delete_all_objects(MsgLocationEts), if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), - file:close(FileHdl) + file:close(FileHdl) end, dict:fold(fun (_File, Hdl, _Acc) -> - file:close(Hdl) - end, ok, ReadHdls), + file:close(Hdl) + end, ok, ReadHdls), State #dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}. + read_file_handles = {dict:new(), gb_trees:empty()}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -481,97 +481,97 @@ base_directory() -> filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, - Key) -> + Key) -> dets:lookup(MsgLocationDets, Key); dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, - Key) -> + Key) -> ets:lookup(MsgLocationEts, Key). dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, - Key) -> + Key) -> ok = dets:delete(MsgLocationDets, Key); dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, - Key) -> + Key) -> true = ets:delete(MsgLocationEts, Key), ok. dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, - Obj) -> + Obj) -> ok = dets:insert(MsgLocationDets, Obj); dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, - Obj) -> + Obj) -> true = ets:insert(MsgLocationEts, Obj), ok. dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, - Obj) -> + Obj) -> true = dets:insert_new(MsgLocationDets, Obj); dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, - Obj) -> + Obj) -> true = ets:insert_new(MsgLocationEts, Obj). dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, - Obj) -> + Obj) -> dets:match_object(MsgLocationDets, Obj); dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, - Obj) -> + Obj) -> ets:match_object(MsgLocationEts, Obj). %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of - [] -> {ok, empty, State}; - [{Q, ReadSeqId, WriteSeqId}] -> - case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of - [] -> {ok, empty, State}; - [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, - next_seq_id = ReadSeqId2}] -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = - dets_ets_lookup(State, MsgId), - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}), - ok = - if Delivered -> ok; - true -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - if ReadMsg -> - {FileHdl, State1} = get_read_handle(File, State), - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, - State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} - end - end + [] -> {ok, empty, State}; + [{Q, ReadSeqId, WriteSeqId}] -> + case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of + [] -> {ok, empty, State}; + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = ReadSeqId2}] -> + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}), + ok = + if Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + State1}; + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} + end + end end. get_read_handle(File, State = - #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, - read_file_handles_limit = ReadFileHandlesLimit }) -> + #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, + read_file_handles_limit = ReadFileHandlesLimit }) -> Now = now(), {FileHdl, ReadHdls1, ReadHdlsAge1} = - case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), - [read, raw, binary, - read_ahead]), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, ReadHdls, ReadHdlsAge}; - _False -> - {Then, OldFile, ReadHdlsAge2} = - gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, Then}} = - dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} - end; - {ok, {Hdl, Then}} -> - {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} - end, + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, ReadHdls, ReadHdlsAge}; + _False -> + {Then, OldFile, ReadHdlsAge2} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + end; + {ok, {Hdl, Then}} -> + {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + end, ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}. @@ -585,74 +585,74 @@ internal_ack(Q, MsgSeqIds, State) -> %% called from ack with MnesiaDelete = true %% called from purge with MnesiaDelete = txn remove_messages(Q, MsgSeqIds, MnesiaDelete, - State = #dqstate { file_summary = FileSummary, - current_file_name = CurName - }) -> + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> Files = - lists:foldl( - fun ({MsgId, SeqId}, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] = - dets_ets_lookup(State, MsgId), - Files3 = - if 1 =:= RefCount -> - ok = dets_ets_delete(State, MsgId), - [{File, ValidTotalSize, ContiguousTop, Left, Right}] = - ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, - {File, (ValidTotalSize - TotalSize - - ?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), - if CurName =:= File -> Files2; - true -> sets:add_element(File, Files2) - end; - 1 < RefCount -> - ok = dets_ets_insert(State, {MsgId, RefCount - 1, - File, Offset, TotalSize}), - Files2 - end, - ok = if MnesiaDelete -> - mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - MnesiaDelete =:= txn -> - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); - true -> ok - end, - Files3 - end, sets:new(), MsgSeqIds), + lists:foldl( + fun ({MsgId, SeqId}, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + Files3 = + if 1 =:= RefCount -> + ok = dets_ets_delete(State, MsgId), + [{File, ValidTotalSize, ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = ets:insert(FileSummary, + {File, (ValidTotalSize - TotalSize + - ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files2; + true -> sets:add_element(File, Files2) + end; + 1 < RefCount -> + ok = dets_ets_insert(State, {MsgId, RefCount - 1, + File, Offset, TotalSize}), + Files2 + end, + ok = if MnesiaDelete -> + mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); + MnesiaDelete =:= txn -> + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); + true -> ok + end, + Files3 + end, sets:new(), MsgSeqIds), State2 = compact(Files, State), {ok, State2}. internal_tx_publish(MsgId, MsgBody, - State = #dqstate { current_file_handle = CurHdl, - current_file_name = CurName, - current_offset = CurOffset, - file_summary = FileSummary - }) -> + State = #dqstate { current_file_handle = CurHdl, + current_file_name = CurName, + current_offset = CurOffset, + file_summary = FileSummary + }) -> case dets_ets_lookup(State, MsgId) of - [] -> - %% New message, lots to do - {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = dets_ets_insert_new(State, {MsgId, 1, CurName, - CurOffset, TotalSize}), - [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = - ets:lookup(FileSummary, CurName), - ValidTotalSize1 = ValidTotalSize + TotalSize + - ?FILE_PACKING_ADJUSTMENT, - ContiguousTop1 = if CurOffset =:= ContiguousTop -> - %% can't be any holes in this file - ValidTotalSize1; - true -> ContiguousTop - end, - true = ets:insert(FileSummary, {CurName, ValidTotalSize1, - ContiguousTop1, Left, undefined}), - NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - maybe_roll_to_new_file(NextOffset, - State #dqstate {current_offset = NextOffset}); - [{MsgId, RefCount, File, Offset, TotalSize}] -> - %% We already know about it, just update counter - ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, - Offset, TotalSize}), - {ok, State} + [] -> + %% New message, lots to do + {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), + true = dets_ets_insert_new(State, {MsgId, 1, CurName, + CurOffset, TotalSize}), + [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = + ets:lookup(FileSummary, CurName), + ValidTotalSize1 = ValidTotalSize + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + ContiguousTop1 = if CurOffset =:= ContiguousTop -> + %% can't be any holes in this file + ValidTotalSize1; + true -> ContiguousTop + end, + true = ets:insert(FileSummary, {CurName, ValidTotalSize1, + ContiguousTop1, Left, undefined}), + NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + maybe_roll_to_new_file(NextOffset, + State #dqstate {current_offset = NextOffset}); + [{MsgId, RefCount, File, Offset, TotalSize}] -> + %% We already know about it, just update counter + ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, + Offset, TotalSize}), + {ok, State} end. adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) -> @@ -664,87 +664,87 @@ adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) -> adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId -> [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), ok = mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), SuppliedSeqId. %% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, - State = #dqstate { current_file_handle = CurHdl, - current_file_name = CurName, - sequences = Sequences - }) -> + State = #dqstate { current_file_handle = CurHdl, + current_file_name = CurName, + sequences = Sequences + }) -> {PubList, PubAcc, ReadSeqId} = - case PubMsgSeqIds of - [] -> {[], undefined, undefined}; - [_|PubMsgSeqIdsTail] -> - {InitReadSeqId, InitWriteSeqId} = - case ets:lookup(Sequences, Q) of - [] -> {0,0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} - end, - { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), - InitWriteSeqId, InitReadSeqId} - end, + case PubMsgSeqIds of + [] -> {[], undefined, undefined}; + [_|PubMsgSeqIdsTail] -> + {InitReadSeqId, InitWriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> {0,0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + end, + { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), + InitWriteSeqId, InitReadSeqId} + end, {atomic, {Sync, WriteSeqId, State2}} = - mnesia:transaction( - fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - %% must deal with publishes first, if we didn't - %% then we could end up acking a message before - %% it's been published, which is clearly - %% nonsense. I.e. in commit, do not do things in an - %% order which _could_not_ have happened. - {Sync2, WriteSeqId3} = - lists:foldl( - fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, - {Acc, ExpectedSeqId}) -> - [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - dets_ets_lookup(State, MsgId), - SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId), - NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; - true -> NextSeqId - end, - true = NextSeqId2 > SeqId2, - ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = - {Q, SeqId2}, - msg_id = MsgId, - is_delivered = false, - next_seq_id = NextSeqId2 - }, - write), - {Acc or (CurName =:= File), NextSeqId2} - end, {false, PubAcc}, PubList), - - {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), - {Sync2, WriteSeqId3, State3} - end), + mnesia:transaction( + fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), + %% must deal with publishes first, if we didn't + %% then we could end up acking a message before + %% it's been published, which is clearly + %% nonsense. I.e. in commit, do not do things in an + %% order which _could_not_ have happened. + {Sync2, WriteSeqId3} = + lists:foldl( + fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, + {Acc, ExpectedSeqId}) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] = + dets_ets_lookup(State, MsgId), + SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId), + NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; + true -> NextSeqId + end, + true = NextSeqId2 > SeqId2, + ok = mnesia:write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = + {Q, SeqId2}, + msg_id = MsgId, + is_delivered = false, + next_seq_id = NextSeqId2 + }, + write), + {Acc or (CurName =:= File), NextSeqId2} + end, {false, PubAcc}, PubList), + + {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), + {Sync2, WriteSeqId3, State3} + end), true = if PubList =:= [] -> true; - true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}) - end, + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}) + end, ok = if Sync -> file:sync(CurHdl); - true -> ok - end, + true -> ok + end, {ok, State2}. %% SeqId can be 'next' internal_publish(Q, MsgId, SeqId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = - internal_tx_publish(MsgId, MsgBody, State), + internal_tx_publish(MsgId, MsgBody, State), {ReadSeqId, WriteSeqId} = - case ets:lookup(Sequences, Q) of - [] -> %% previously unseen queue - {0, 0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> - {ReadSeqId2, WriteSeqId2} - end, + case ets:lookup(Sequences, Q) of + [] -> %% previously unseen queue + {0, 0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> + {ReadSeqId2, WriteSeqId2} + end, WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId), WriteSeqId3Next = WriteSeqId3 + 1, true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}), ok = mnesia:dirty_write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, - msg_id = MsgId, - next_seq_id = WriteSeqId3Next, - is_delivered = false}), + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, + msg_id = MsgId, + next_seq_id = WriteSeqId3Next, + is_delivered = false}), {ok, State1}. internal_tx_cancel(MsgIds, State) -> @@ -756,7 +756,7 @@ internal_tx_cancel(MsgIds, State) -> internal_requeue(_Q, [], State) -> {ok, State}; internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], - State = #dqstate { sequences = Sequences }) -> + State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've %% already been delivered). We also know that the rows for these @@ -783,78 +783,78 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, - {_NextMsgSeqId, NextSeqIdTo}}, - ExpectedSeqIdTo) -> - SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo), - NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; - true -> NextSeqIdTo - end, - true = NextSeqIdTo2 > SeqIdTo2, - [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), - mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2}, - next_seq_id = NextSeqIdTo2 - }, - write), - mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write), - NextSeqIdTo2 - end, WriteSeqId, MsgSeqIdsZipped) - end), + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, + {_NextMsgSeqId, NextSeqIdTo}}, + ExpectedSeqIdTo) -> + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo), + NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; + true -> NextSeqIdTo + end, + true = NextSeqIdTo2 > SeqIdTo2, + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), + mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2}, + next_seq_id = NextSeqIdTo2 + }, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write), + NextSeqIdTo2 + end, WriteSeqId, MsgSeqIdsZipped) + end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), {ok, State}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of - [] -> {ok, 0, State}; - [{Q, ReadSeqId, WriteSeqId}] -> - {atomic, {ok, State2}} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - MsgSeqIds = lists:foldl( - fun (SeqId, Acc) -> - [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), - [{MsgId, SeqId} | Acc] - end, [], lists:seq(ReadSeqId, WriteSeqId - 1)), - remove_messages(Q, MsgSeqIds, txn, State) - end), - true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), - {ok, WriteSeqId - ReadSeqId, State2} + [] -> {ok, 0, State}; + [{Q, ReadSeqId, WriteSeqId}] -> + {atomic, {ok, State2}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + MsgSeqIds = lists:foldl( + fun (SeqId, Acc) -> + [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + [{MsgId, SeqId} | Acc] + end, [], lists:seq(ReadSeqId, WriteSeqId - 1)), + remove_messages(Q, MsgSeqIds, txn, State) + end), + true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + {ok, WriteSeqId - ReadSeqId, State2} end. %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, - State = #dqstate { file_size_limit = FileSizeLimit, - current_file_name = CurName, - current_file_handle = CurHdl, - current_file_num = CurNum, - file_summary = FileSummary - } - ) when Offset >= FileSizeLimit -> + State = #dqstate { file_size_limit = FileSizeLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_file_num = CurNum, + file_summary = FileSummary + } + ) when Offset >= FileSizeLimit -> ok = file:sync(CurHdl), ok = file:close(CurHdl), NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = file:open(form_filename(NextName), - [write, raw, binary, delayed_write]), + [write, raw, binary, delayed_write]), {ok, FileSizeLimit} = file:position(NextHdl, {bof, FileSizeLimit}), ok = file:truncate(NextHdl), {ok, 0} = file:position(NextHdl, {bof, 0}), true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State1 = State #dqstate { current_file_name = NextName, - current_file_handle = NextHdl, - current_file_num = NextNum, - current_offset = 0 - }, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0 + }, {ok, compact(sets:from_list([CurName]), State1)}; maybe_roll_to_new_file(_, State) -> {ok, State}. @@ -866,81 +866,81 @@ compact(FilesSet, State) -> Files = lists:sort(sets:to_list(FilesSet)), %% foldl reverses, so now youngest/right-most first RemainingFiles = lists:foldl(fun (File, Acc) -> - delete_empty_files(File, Acc, State) - end, [], Files), + delete_empty_files(File, Acc, State) + end, [], Files), lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary, - current_file_name = CurName - }) -> + file_summary = FileSummary, + current_file_name = CurName + }) -> %% the file we're looking at may no longer exist as it may have %% been deleted within the current GC run case ets:lookup(FileSummary, File) of - [] -> State; - [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> - GoRight = - fun() -> - case Right of - undefined -> State; - _ when not(CurName =:= Right) -> - [RightObj = {Right, RightValidData, - _RightContiguousTop, File, RightRight}] = - ets:lookup(FileSummary, Right), - RightSumData = ValidData + RightValidData, - if FileSizeLimit >= RightSumData -> - %% here, Right will be the source and so will be deleted, - %% File will be the destination - State1 = combine_files(RightObj, FileObj, - State), - %% this could fail if RightRight is undefined - %% left is the 4th field - ets:update_element(FileSummary, - RightRight, {4, File}), - true = ets:insert(FileSummary, {File, - RightSumData, - RightSumData, - Left, - RightRight}), - true = ets:delete(FileSummary, Right), - State1; - true -> State - end; - _ -> State - end - end, - case Left of - undefined -> - GoRight(); - _ -> [LeftObj = - {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] = - ets:lookup(FileSummary, Left), - LeftSumData = ValidData + LeftValidData, - if FileSizeLimit >= LeftSumData -> - %% here, File will be the source and so will be deleted, - %% Left will be the destination - State1 = combine_files(FileObj, LeftObj, State), - %% this could fail if Right is undefined - %% left is the 4th field - ets:update_element(FileSummary, Right, {4, Left}), - true = ets:insert(FileSummary, {Left, LeftSumData, - LeftSumData, - LeftLeft, Right}), - true = ets:delete(FileSummary, File), - State1; - true -> - GoRight() - end - end + [] -> State; + [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> + GoRight = + fun() -> + case Right of + undefined -> State; + _ when not(CurName =:= Right) -> + [RightObj = {Right, RightValidData, + _RightContiguousTop, File, RightRight}] = + ets:lookup(FileSummary, Right), + RightSumData = ValidData + RightValidData, + if FileSizeLimit >= RightSumData -> + %% here, Right will be the source and so will be deleted, + %% File will be the destination + State1 = combine_files(RightObj, FileObj, + State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, + RightRight, {4, File}), + true = ets:insert(FileSummary, {File, + RightSumData, + RightSumData, + Left, + RightRight}), + true = ets:delete(FileSummary, Right), + State1; + true -> State + end; + _ -> State + end + end, + case Left of + undefined -> + GoRight(); + _ -> [LeftObj = + {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] = + ets:lookup(FileSummary, Left), + LeftSumData = ValidData + LeftValidData, + if FileSizeLimit >= LeftSumData -> + %% here, File will be the source and so will be deleted, + %% Left will be the destination + State1 = combine_files(FileObj, LeftObj, State), + %% this could fail if Right is undefined + %% left is the 4th field + ets:update_element(FileSummary, Right, {4, Left}), + true = ets:insert(FileSummary, {Left, LeftSumData, + LeftSumData, + LeftLeft, Right}), + true = ets:delete(FileSummary, File), + State1; + true -> + GoRight() + end + end end. sort_msg_locations_by_offset(Asc, List) -> Comp = if Asc -> fun erlang:'<'/2; - true -> fun erlang:'>'/2 - end, + true -> fun erlang:'>'/2 + end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> - Comp(OffA, OffB) - end, List). + Comp(OffA, OffB) + end, List). truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), @@ -951,133 +951,133 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> ok. combine_files({Source, SourceValid, _SourceContiguousTop, - _SourceLeft, _SourceRight}, - {Destination, DestinationValid, DestinationContiguousTop, - _DestinationLeft, _DestinationRight}, - State1) -> + _SourceLeft, _SourceRight}, + {Destination, DestinationValid, DestinationContiguousTop, + _DestinationLeft, _DestinationRight}, + State1) -> State = close_file(Source, close_file(Destination, State1)), {ok, SourceHdl} = - file:open(form_filename(Source), - [read, write, raw, binary, delayed_write, read_ahead]), + file:open(form_filename(Source), + [read, write, raw, binary, delayed_write, read_ahead]), {ok, DestinationHdl} = - file:open(form_filename(Destination), - [read, write, raw, binary, delayed_write, read_ahead]), + file:open(form_filename(Destination), + [read, write, raw, binary, delayed_write, read_ahead]), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file %% then truncate, copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize); true -> - Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = - file:open(form_filename(Tmp), - [read, write, raw, binary, delayed_write, read_ahead]), - Worklist = - lists:dropwhile( - fun ({_, _, _, Offset, _}) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == DestinationContiguousTop - %% because if it was then DestinationContiguousTop would have been - %% extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect that the list should be - %% naturally sorted as we require, however, we need to enforce it anyway - end, sort_msg_locations_by_offset(true, - dets_ets_match_object(State, - {'_', '_', - Destination, - '_', '_'}))), - TmpSize = DestinationValid - DestinationContiguousTop, - {TmpSize, BlockStart1, BlockEnd1} = - lists:foldl( - fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the TmpFile. - %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% this message is going to end up back in - %% Destination, at DestinationContiguousTop - %% + CurOffset - FinalOffset = DestinationContiguousTop + CurOffset, - ok = dets_ets_insert(State, {MsgId, RefCount, Destination, - FinalOffset, TotalSize}), - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - %% base case, called only for the - %% first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - %% extend the current block because - %% the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - %% found a gap, so actually do the - %% work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file:position(DestinationHdl, - {bof, BlockStart}), - {ok, BSize} = file:copy(DestinationHdl, - TmpHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {0, undefined, undefined}, Worklist), - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), - {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), - %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocationDets has been updated to - %% reflect compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file:position(TmpHdl, {bof, 0}), - ok = truncate_and_extend_file(DestinationHdl, - DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be - %% DestinationValid - ok = file:sync(DestinationHdl), - ok = file:close(TmpHdl), - ok = file:delete(form_filename(Tmp)) + Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = + file:open(form_filename(Tmp), + [read, write, raw, binary, delayed_write, read_ahead]), + Worklist = + lists:dropwhile( + fun ({_, _, _, Offset, _}) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == DestinationContiguousTop + %% because if it was then DestinationContiguousTop would have been + %% extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect that the list should be + %% naturally sorted as we require, however, we need to enforce it anyway + end, sort_msg_locations_by_offset(true, + dets_ets_match_object(State, + {'_', '_', + Destination, + '_', '_'}))), + TmpSize = DestinationValid - DestinationContiguousTop, + {TmpSize, BlockStart1, BlockEnd1} = + lists:foldl( + fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the TmpFile. + %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% this message is going to end up back in + %% Destination, at DestinationContiguousTop + %% + CurOffset + FinalOffset = DestinationContiguousTop + CurOffset, + ok = dets_ets_insert(State, {MsgId, RefCount, Destination, + FinalOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the + %% first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because + %% the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the + %% work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(DestinationHdl, + {bof, BlockStart}), + {ok, BSize} = file:copy(DestinationHdl, + TmpHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {0, undefined, undefined}, Worklist), + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), + {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), + %% so now Tmp contains everything we need to salvage from + %% Destination, and MsgLocationDets has been updated to + %% reflect compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file:position(TmpHdl, {bof, 0}), + ok = truncate_and_extend_file(DestinationHdl, + DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be + %% DestinationValid + ok = file:sync(DestinationHdl), + ok = file:close(TmpHdl), + ok = file:delete(form_filename(Tmp)) end, SourceWorkList = - sort_msg_locations_by_offset(true, - dets_ets_match_object(State, - {'_', '_', Source, - '_', '_'})), + sort_msg_locations_by_offset(true, + dets_ets_match_object(State, + {'_', '_', Source, + '_', '_'})), {ExpectedSize, BlockStart2, BlockEnd2} = - lists:foldl( - fun ({MsgId, RefCount, _Source, Offset, TotalSize}, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert(State, {MsgId, RefCount, Destination, - CurOffset, TotalSize}), - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - %% base case, called only for the first list - %% elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - %% extend the current block because the next - %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - %% found a gap, so actually do the work for - %% the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file:position(SourceHdl, {bof, BlockStart}), - {ok, BSize} = - file:copy(SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {DestinationValid, undefined, undefined}, SourceWorkList), + lists:foldl( + fun ({MsgId, RefCount, _Source, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% update MsgLocationDets to reflect change of file and offset + ok = dets_ets_insert(State, {MsgId, RefCount, Destination, + CurOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list + %% elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next + %% msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for + %% the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(SourceHdl, {bof, BlockStart}), + {ok, BSize} = + file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {DestinationValid, undefined, undefined}, SourceWorkList), %% do the last remaining block BSize2 = BlockEnd2 - BlockStart2, {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), @@ -1090,38 +1090,38 @@ combine_files({Source, SourceValid, _SourceContiguousTop, State. close_file(File, State = #dqstate { read_file_handles = - {ReadHdls, ReadHdlsAge} }) -> + {ReadHdls, ReadHdlsAge} }) -> case dict:find(File, ReadHdls) of - error -> - State; - {ok, {Hdl, Then}} -> - ok = file:close(Hdl), - State #dqstate { read_file_handles = - { dict:erase(File, ReadHdls), - gb_trees:delete(Then, ReadHdlsAge) } } + error -> + State; + {ok, {Hdl, Then}} -> + ok = file:close(Hdl), + State #dqstate { read_file_handles = + { dict:erase(File, ReadHdls), + gb_trees:delete(Then, ReadHdlsAge) } } end. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> [{File, ValidData, _ContiguousTop, Left, Right}] = - ets:lookup(FileSummary, File), + ets:lookup(FileSummary, File), case ValidData of - %% we should NEVER find the current file in here hence right + %% we should NEVER find the current file in here hence right %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when not(is_atom(Right)) -> - %% the eldest file is empty. YAY! - %% left is the 4th field - true = ets:update_element(FileSummary, Right, {4, undefined}); - {_, _} when not(is_atom(Right)) -> - %% left is the 4th field - true = ets:update_element(FileSummary, Right, {4, Left}), - %% right is the 5th field - true = ets:update_element(FileSummary, Left, {5, Right}) - end, - true = ets:delete(FileSummary, File), - ok = file:delete(form_filename(File)), - Acc; - _ -> [File|Acc] + 0 -> case {Left, Right} of + {undefined, _} when not(is_atom(Right)) -> + %% the eldest file is empty. YAY! + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, undefined}); + {_, _} when not(is_atom(Right)) -> + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, Left}), + %% right is the 5th field + true = ets:update_element(FileSummary, Left, {5, Right}) + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(File)), + Acc; + _ -> [File|Acc] end. %% ---- DISK RECOVERY ---- @@ -1130,10 +1130,10 @@ load_from_disk(State) -> %% sorted so that smallest number is first. which also means %% eldest file (left-most) first ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; - E -> E - end, + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end, {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the @@ -1142,44 +1142,44 @@ load_from_disk(State) -> %% Finally, check there is nothing in mnesia which we haven't %% loaded {atomic, true} = mnesia:transaction( - fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> - true = 1 =:= - length(dets_ets_lookup(State1, MsgId)) - end, - true, rabbit_disk_queue) - end), + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> + true = 1 =:= + length(dets_ets_lookup(State1, MsgId)) + end, + true, rabbit_disk_queue) + end), State2 = extract_sequence_numbers(State1), ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - %% hmm, something weird must be going on, but it's - %% probably not the end of the world - {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; - E2 -> E2 - end, + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's + %% probably not the end of the world + {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; + E2 -> E2 + end, {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> {atomic, true} = mnesia:transaction( fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> - NextWrite = SeqId + 1, - case ets:lookup(Sequences, Q) of - [] -> - true = ets:insert_new(Sequences, - {Q, SeqId, NextWrite}); - [Orig = {Q, Read, Write}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite])}, - if Orig /= Repl -> - true = ets:insert(Sequences, Repl); - true -> true - end - end - end, true, rabbit_disk_queue) + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> + true = ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + if Orig /= Repl -> + true = ets:insert(Sequences, Repl); + true -> true + end + end + end, true, rabbit_disk_queue) end), remove_gaps_in_sequences(State), State. @@ -1195,79 +1195,79 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> %% likelihood of gaps being at the bottom rather than the top of %% the queue, so shuffling up should be the better bet. {atomic, _} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> - Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0), - true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId}) - end, ets:match_object(Sequences, '_')) - end). + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0), + true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end). shuffle_up(_Q, SeqId, SeqId, Gap) -> Gap; shuffle_up(Q, BaseSeqId, SeqId, Gap) -> GapInc = - case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of - [] -> 1; - [Obj] -> - if Gap =:= 0 -> ok; - true -> mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }, - next_seq_id = SeqId + Gap + 1 - }, - write), - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) - end, - 0 - end, + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> 1; + [Obj] -> + if Gap =:= 0 -> ok; + true -> mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }, + next_seq_id = SeqId + Gap + 1 + }, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) + end, + 0 + end, shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, - current_file_name = CurName }) -> + current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), State; load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of - [] -> 0; - L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = - sort_msg_locations_by_offset(false, L), - MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT - end, + [] -> 0; + L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = + sort_msg_locations_by_offset(false, L), + MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + end, State #dqstate { current_file_num = Num, current_file_name = Left, - current_offset = Offset }; + current_offset = Offset }; load_messages(Left, [File|Files], - State = #dqstate { file_summary = FileSummary }) -> + State = #dqstate { file_summary = FileSummary }) -> %% [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( - fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, - msg_id)) of - 0 -> {VMAcc, VTSAcc}; - RefCount -> - true = dets_ets_insert_new(State, {MsgId, RefCount, File, - Offset, TotalSize}), - {[{MsgId, TotalSize, Offset}|VMAcc], - VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT - } - end - end, {[], 0}, Messages), + fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) of + 0 -> {VMAcc, VTSAcc}; + RefCount -> + true = dets_ets_insert_new(State, {MsgId, RefCount, File, + Offset, TotalSize}), + {[{MsgId, TotalSize, Offset}|VMAcc], + VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT + } + end + end, {[], 0}, Messages), %% foldl reverses lists and find_contiguous_block_prefix needs %% elems in the same order as from scan_file_for_valid_messages {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)), Right = case Files of - [] -> undefined; - [F|_] -> F - end, + [] -> undefined; + [F|_] -> F + end, true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}), load_messages(File, Files, State). @@ -1275,7 +1275,7 @@ load_messages(Left, [File|Files], recover_crashed_compactions(Files, TmpFiles) -> lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end, - TmpFiles), + TmpFiles), ok. recover_crashed_compactions1(Files, TmpFile) -> @@ -1284,22 +1284,22 @@ recover_crashed_compactions1(Files, TmpFile) -> true = lists:member(NonTmpRelatedFile, Files), %% [{MsgId, TotalSize, FileOffset}] {ok, UncorruptedMessagesTmp} = - scan_file_for_valid_messages(form_filename(TmpFile)), + scan_file_for_valid_messages(form_filename(TmpFile)), MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, - msg_id)) - end, MsgIdsTmp), + true = 0 < length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) + end, MsgIdsTmp), {ok, UncorruptedMessages} = - scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), %% 1) It's possible that everything in the tmp file is also in the main file %% such that the main file is (prefix ++ tmpfile). This means that compaction @@ -1321,62 +1321,62 @@ recover_crashed_compactions1(Files, TmpFile) -> %% Plan: Truncate the main file back to before any of the files in the tmp file and copy %% them over again case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpFile); - _False -> - %% we're in case 4 above. - %% check that everything in the main file is a valid message in mnesia - lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, - msg_id)) - end, MsgIds), - %% The main file should be contiguous - {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, - MsgIds), - - {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), - [write, raw, binary, delayed_write]), - {ok, Top} = file:position(MainHdl, Top), - ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file - %% there really could be rubbish at the end of the file - - %% we could have failed after the extending truncate. - %% Remember the head of the list will be the highest entry - %% in the file - [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, - ExpectedAbsPos = Top + TmpSize, - {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), - %% and now extend the main file as big as necessary in a - %% single move if we run out of disk space, this truncate - %% could fail, but we still aren't risking losing data - ok = file:truncate(MainHdl), - {ok, TmpHdl} = file:open(form_filename(TmpFile), - [read, raw, binary, read_ahead]), - {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), - ok = file:close(MainHdl), - ok = file:close(TmpHdl), - ok = file:delete(TmpFile), - - {ok, MainMessages} = - scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), - MsgIdsMain = lists:map(GrabMsgId, MainMessages), - %% check that everything in MsgIds is in MsgIdsMain - true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, - MsgIds), - %% check that everything in MsgIdsTmp is in MsgIdsMain - true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, - MsgIdsTmp) + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file + %% is empty + ok = file:delete(TmpFile); + _False -> + %% we're in case 4 above. + %% check that everything in the main file is a valid message in mnesia + lists:foreach(fun (MsgId) -> + true = 0 < length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) + end, MsgIds), + %% The main file should be contiguous + {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), + %% we should have that none of the messages in the prefix + %% are in the tmp file + true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, + MsgIds), + + {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), + [write, raw, binary, delayed_write]), + {ok, Top} = file:position(MainHdl, Top), + ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file + %% there really could be rubbish at the end of the file - + %% we could have failed after the extending truncate. + %% Remember the head of the list will be the highest entry + %% in the file + [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Top + TmpSize, + {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), + %% and now extend the main file as big as necessary in a + %% single move if we run out of disk space, this truncate + %% could fail, but we still aren't risking losing data + ok = file:truncate(MainHdl), + {ok, TmpHdl} = file:open(form_filename(TmpFile), + [read, raw, binary, read_ahead]), + {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:close(MainHdl), + ok = file:close(TmpHdl), + ok = file:delete(TmpFile), + + {ok, MainMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIdsMain = lists:map(GrabMsgId, MainMessages), + %% check that everything in MsgIds is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIds), + %% check that everything in MsgIdsTmp is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIdsTmp) end, ok. @@ -1386,16 +1386,16 @@ recover_crashed_compactions1(Files, TmpFile) -> find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of - {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - lists:reverse(Acc)}; - Res -> Res + {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + lists:reverse(Acc)}; + Res -> Res end. find_contiguous_block_prefix([], 0, Acc) -> {ok, Acc}; find_contiguous_block_prefix([], _N, _Acc) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], - ExpectedOffset, Acc) + ExpectedOffset, Acc) when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT -> find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> @@ -1421,29 +1421,29 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) -> MsgIdBinSize = size(MsgIdBin), TotalSize = BodySize + MsgIdBinSize, case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, - MsgIdBin:MsgIdBinSize/binary, - MsgBody:BodySize/binary, - ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of - ok -> {ok, TotalSize}; - KO -> KO + MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdBin:MsgIdBinSize/binary, + MsgBody:BodySize/binary, + ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of + ok -> {ok, TotalSize}; + KO -> KO end. read_message_at_offset(FileHdl, Offset, TotalSize) -> TotalSizeWriteOkBytes = TotalSize + 1, case file:position(FileHdl, {bof, Offset}) of - {ok, Offset} -> - case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, - Rest:TotalSizeWriteOkBytes/binary>>} -> - BodySize = TotalSize - MsgIdBinSize, - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, - ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, - {ok, {MsgBody, BodySize}}; - KO -> KO - end; - KO -> KO + {ok, Offset} -> + case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of + {ok, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:TotalSizeWriteOkBytes/binary>>} -> + BodySize = TotalSize - MsgIdBinSize, + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, + {ok, {MsgBody, BodySize}}; + KO -> KO + end; + KO -> KO end. scan_file_for_valid_messages(File) -> @@ -1454,53 +1454,53 @@ scan_file_for_valid_messages(File) -> scan_file_for_valid_messages(FileHdl, Offset, Acc) -> case read_next_file_entry(FileHdl, Offset) of - {ok, eof} -> {ok, Acc}; - {ok, {corrupted, NextOffset}} -> - scan_file_for_valid_messages(FileHdl, NextOffset, Acc); - {ok, {ok, MsgId, TotalSize, NextOffset}} -> - scan_file_for_valid_messages(FileHdl, NextOffset, - [{MsgId, TotalSize, Offset}|Acc]); - _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages + {ok, eof} -> {ok, Acc}; + {ok, {corrupted, NextOffset}} -> + scan_file_for_valid_messages(FileHdl, NextOffset, Acc); + {ok, {ok, MsgId, TotalSize, NextOffset}} -> + scan_file_for_valid_messages(FileHdl, NextOffset, + [{MsgId, TotalSize, Offset}|Acc]); + _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages end. - + read_next_file_entry(FileHdl, Offset) -> TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, case file:read(FileHdl, TwoIntegers) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {TotalSize =:= 0, MsgIdBinSize =:= 0} of - {true, _} -> {ok, eof}; %% Nothing we can do other than stop - {false, true} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, - case file:position(FileHdl, {cur, TotalSize + 1}) of - {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}}; - {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up - KO -> KO - end; - {false, false} -> %% all good, let's continue - case file:read(FileHdl, MsgIdBinSize) of - {ok, <<MsgId:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + TwoIntegers + TotalSize, - case file:position(FileHdl, - {cur, TotalSize - MsgIdBinSize}) of - {ok, ExpectedAbsPos} -> - NextOffset = Offset + TotalSize + - ?FILE_PACKING_ADJUSTMENT, - case file:read(FileHdl, 1) of - {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> - {ok, {ok, binary_to_term(MsgId), - TotalSize, NextOffset}}; - {ok, _SomeOtherData} -> - {ok, {corrupted, NextOffset}}; - KO -> KO - end; - {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up - KO -> KO - end; - eof -> {ok, eof}; - KO -> KO - end - end; - eof -> {ok, eof}; - KO -> KO + {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {TotalSize =:= 0, MsgIdBinSize =:= 0} of + {true, _} -> {ok, eof}; %% Nothing we can do other than stop + {false, true} -> %% current message corrupted, try skipping past it + ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, + case file:position(FileHdl, {cur, TotalSize + 1}) of + {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}}; + {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up + KO -> KO + end; + {false, false} -> %% all good, let's continue + case file:read(FileHdl, MsgIdBinSize) of + {ok, <<MsgId:MsgIdBinSize/binary>>} -> + ExpectedAbsPos = Offset + TwoIntegers + TotalSize, + case file:position(FileHdl, + {cur, TotalSize - MsgIdBinSize}) of + {ok, ExpectedAbsPos} -> + NextOffset = Offset + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + case file:read(FileHdl, 1) of + {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> + {ok, {ok, binary_to_term(MsgId), + TotalSize, NextOffset}}; + {ok, _SomeOtherData} -> + {ok, {corrupted, NextOffset}}; + KO -> KO + end; + {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up + KO -> KO + end; + eof -> {ok, eof}; + KO -> KO + end + end; + eof -> {ok, eof}; + KO -> KO end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index c7c76eb230..4749e1dac4 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -31,57 +31,66 @@ -module(rabbit_mixed_queue). +-export([start_link/2]). + -export([publish/4, deliver/1, ack/2, - tx_publish/4, tx_commit/3, tx_cancel/2, - requeue/2, purge/1]). + tx_publish/4, tx_commit/3, tx_cancel/2, + requeue/2, purge/1]). -record(mqstate, { mode, - msg_buf, - next_write_seq, - queue - } + msg_buf, + next_write_seq, + queue + } ). +-define(FILE_SIZE_LIMIT, (100*1024*1024)). + +start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed -> + rabbit_disk_queue:start_link(?FILE_SIZE_LIMIT), + rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME + {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 0, queue = Queue }}. + publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) -> ok = rabbit_disk_queue:publish(Q, MsgId, Msg), {ok, State}; publish(MsgId, Msg, IsPersistent, - State = #mqstate { queue = Q, mode = mixed, - next_write_seq = NextSeq, msg_buf = MsgBuf }) -> + State = #mqstate { queue = Q, mode = mixed, + next_write_seq = NextSeq, msg_buf = MsgBuf }) -> if IsPersistent -> - ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg); + ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg); true -> ok end, {ok, State #mqstate { next_write_seq = NextSeq + 1, - msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}}, - MsgBuf) - }}. + msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}}, + MsgBuf) + }}. deliver(State = #mqstate { mode = disk, queue = Q }) -> {rabbit_disk_queue:deliver(Q), State}; deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) -> {Result, MsgBuf2} = queue:out(MsgBuf), case Result of - empty -> - {empty, State}; - {value, {_Seq, {MsgId, Msg, IsPersistent}}} -> - {IsDelivered, Ack} = - if IsPersistent -> - {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q), - {IsDelivered2, Ack2}; - true -> {false, noack} - end, - {{MsgId, Msg, size(Msg), IsDelivered, Ack}, - State #mqstate { msg_buf = MsgBuf2 }} + empty -> + {empty, State}; + {value, {_Seq, {MsgId, Msg, IsPersistent}}} -> + {IsDelivered, Ack} = + if IsPersistent -> + {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q), + {IsDelivered2, Ack2}; + true -> {false, noack} + end, + {{MsgId, Msg, size(Msg), IsDelivered, Ack}, + State #mqstate { msg_buf = MsgBuf2 }} end. remove_noacks(Acks) -> lists:filter(fun (A) -> A /= noack end, Acks). -ack(Acks, State = #mqstate { queue = Q }) -> +ack(Acks, State = #mqstate { queue = Q }) -> ok = rabbit_disk_queue:ack(Q, remove_noacks(Acks)), {ok, State}. - + tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_publish(MsgId, Msg), {ok, State}; @@ -98,32 +107,32 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks), {ok, State}; tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, - msg_buf = MsgBuf, - next_write_seq = NextSeq - }) -> + msg_buf = MsgBuf, + next_write_seq = NextSeq + }) -> {PersistentPubs, MsgBuf2, NextSeq2} = - lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) -> - Acc2 = - if IsPersistent -> - [{MsgId, NextSeq3} | Acc]; - true -> Acc - end, - MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, - MsgBuf3), - {Acc2, MsgBuf4, NextSeq3 + 1} - end, {[], MsgBuf, NextSeq}, Publishes), + lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) -> + Acc2 = + if IsPersistent -> + [{MsgId, NextSeq3} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, + MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) ok = rabbit_disk_queue:tx_commit_with_seqs(Q, lists:reverse(PersistentPubs), - remove_noacks(Acks)), + remove_noacks(Acks)), {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. only_persistent_msg_ids(Pubs) -> lists:reverse(lists:foldl(fun ({MsgId, _, IsPersistent}, Acc) -> - if IsPersistent -> [MsgId | Acc]; - true -> Acc - end - end, [], Pubs)). + if IsPersistent -> [MsgId | Acc]; + true -> Acc + end + end, [], Pubs)). tx_cancel(Publishes, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), @@ -139,21 +148,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) -> rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)), {ok, State}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, - msg_buf = MsgBuf, - next_write_seq = NextSeq - }) -> + msg_buf = MsgBuf, + next_write_seq = NextSeq + }) -> {PersistentPubs, MsgBuf2, NextSeq2} = - lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) -> - Acc2 = - if IsPersistent -> - {MsgId, _OldSeqId} = AckTag, - [{AckTag, NextSeq3} | Acc]; - true -> Acc - end, - MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, - MsgBuf3), - {Acc2, MsgBuf4, NextSeq3 + 1} - end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), + lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) -> + Acc2 = + if IsPersistent -> + {MsgId, _OldSeqId} = AckTag, + [{AckTag, NextSeq3} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, + MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)), {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 14461abb13..552e4ed959 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -686,10 +686,10 @@ test_disk_queue() -> % unicode chars are supported properly from r13 onwards io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), - timer:sleep(1000) end || % 1000 milliseconds - MsgSize <- [512, 8192, 32768, 131072], - Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)], - MsgCount <- [1024, 4096, 16384] + timer:sleep(1000) end || % 1000 milliseconds + MsgSize <- [512, 8192, 32768, 131072], + Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)], + MsgCount <- [1024, 4096, 16384] ], rdq_virgin(), passed = rdq_stress_gc(10000), @@ -706,27 +706,27 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), {Publish, ok} = - timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) - || N <- List, _ <- Qs] end, - fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) - || Q <- Qs] end - ]]), + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) + || N <- List, _ <- Qs] end, + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) + || Q <- Qs] end + ]]), {Deliver, ok} = - timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin SeqIds = - [begin {N, Msg, MsgSizeBytes, false, SeqId} = - rabbit_disk_queue:deliver(Q), SeqId end - || N <- List], - ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) - end || Q <- Qs] - end]]), + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [begin SeqIds = + [begin {N, Msg, MsgSizeBytes, false, SeqId} = + rabbit_disk_queue:deliver(Q), SeqId end + || N <- List], + ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) + end || Q <- Qs] + end]]), io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", - [MsgCount, MsgSizeBytes, QCount, float(Startup), - float(Publish), (Publish / (MsgCount * QCount)), - (Publish / (MsgCount * QCount * MsgSizeBytes)), - float(Deliver), (Deliver / (MsgCount * QCount)), - (Deliver / (MsgCount * QCount * MsgSizeBytes))]), + [MsgCount, MsgSizeBytes, QCount, float(Startup), + float(Publish), (Publish / (MsgCount * QCount)), + (Publish / (MsgCount * QCount * MsgSizeBytes)), + float(Deliver), (Deliver / (MsgCount * QCount)), + (Deliver / (MsgCount * QCount * MsgSizeBytes))]), rdq_stop(). % we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have @@ -741,30 +741,30 @@ rdq_stress_gc(MsgCount) -> rabbit_disk_queue:tx_commit(q, List, []), StartChunk = round(MsgCount / 20), % 5% AckList = - lists:reverse( - lists:foldl( - fun (E, Acc) -> - case Acc of - [] -> [E]; - [F|_Fs] -> - case E rem F of - 0 -> Acc; - _ -> [E|Acc] - end - end - end, [], lists:flatten([lists:seq(N,MsgCount,N) - || N <- lists:seq(StartChunk,MsgCount)]))) ++ - lists:seq(1, (StartChunk - 1)), + lists:reverse( + lists:foldl( + fun (E, Acc) -> + case Acc of + [] -> [E]; + [F|_Fs] -> + case E rem F of + 0 -> Acc; + _ -> [E|Acc] + end + end + end, [], lists:flatten([lists:seq(N,MsgCount,N) + || N <- lists:seq(StartChunk,MsgCount)]))) ++ + lists:seq(1, (StartChunk - 1)), MsgIdToSeqDict = - lists:foldl( - fun (_, Acc) -> - {MsgId, Msg, MsgSizeBytes, false, SeqId} = - rabbit_disk_queue:deliver(q), - dict:store(MsgId, SeqId, Acc) - end, dict:new(), List), + lists:foldl( + fun (_, Acc) -> + {MsgId, Msg, MsgSizeBytes, false, SeqId} = + rabbit_disk_queue:deliver(q), + dict:store(MsgId, SeqId, Acc) + end, dict:new(), List), %% we really do want to ack each of this individually [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), - rabbit_disk_queue:ack(q, [SeqId]) end + rabbit_disk_queue:ack(q, [SeqId]) end || MsgId <- AckList], rabbit_disk_queue:tx_commit(q, [], []), rdq_stop(), @@ -800,15 +800,15 @@ rdq_test_startup_with_queue_gaps() -> io:format("Publish done~n", []), %% deliver first half Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% ack every other message we have delivered (starting at the _first_) lists:foldl(fun (SeqId2, true) -> - rabbit_disk_queue:ack(q, [SeqId2]), - false; - (_SeqId2, false) -> - true - end, true, Seqs), + rabbit_disk_queue:ack(q, [SeqId2]), + false; + (_SeqId2, false) -> + true + end, true, Seqs), rabbit_disk_queue:tx_commit(q, [], []), io:format("Acked every other message delivered done~n", []), rdq_stop(), @@ -816,12 +816,12 @@ rdq_test_startup_with_queue_gaps() -> io:format("Startup (with shuffle) done~n", []), %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(2,Half,2)], + || N <- lists:seq(2,Half,2)], rabbit_disk_queue:tx_commit(q, [], Seqs2), io:format("Reread non-acked messages done~n", []), %% and now fetch the rest Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1 + Half,Total)], + || N <- lists:seq(1 + Half,Total)], rabbit_disk_queue:tx_commit(q, [], Seqs3), io:format("Read second half done~n", []), empty = rabbit_disk_queue:deliver(q), @@ -840,25 +840,25 @@ rdq_test_redeliver() -> io:format("Publish done~n", []), %% deliver first half Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% now requeue every other message (starting at the _first_) %% and ack the other ones lists:foldl(fun (SeqId2, true) -> - rabbit_disk_queue:requeue(q, [SeqId2]), - false; - (SeqId2, false) -> - rabbit_disk_queue:ack(q, [SeqId2]), - true - end, true, Seqs), + rabbit_disk_queue:requeue(q, [SeqId2]), + false; + (SeqId2, false) -> + rabbit_disk_queue:ack(q, [SeqId2]), + true + end, true, Seqs), rabbit_disk_queue:tx_commit(q, [], []), io:format("Redeliver and acking done~n", []), %% we should now get the 2nd half in order, followed by every-other-from-the-first-half Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1+Half, Total)], + || N <- lists:seq(1+Half, Total)], rabbit_disk_queue:tx_commit(q, [], Seqs2), Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1, Half, 2)], + || N <- lists:seq(1, Half, 2)], rabbit_disk_queue:tx_commit(q, [], Seqs3), empty = rabbit_disk_queue:deliver(q), rdq_stop(), @@ -876,7 +876,7 @@ rdq_test_purge() -> io:format("Publish done~n", []), %% deliver first half Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end - || N <- lists:seq(1,Half)], + || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), rabbit_disk_queue:purge(q), io:format("Purge done~n", []), @@ -891,7 +891,7 @@ rdq_time_commands(Funcs) -> rdq_virgin() -> {Micros, {ok, _}} = - timer:tc(rabbit_disk_queue, start_link, [1024*1024]), + timer:tc(rabbit_disk_queue, start_link, [1024*1024]), ok = rabbit_disk_queue:stop_and_obliterate(), timer:sleep(1000), Micros. |
