diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 753 |
1 files changed, 398 insertions, 355 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b7eca499d6..a8773af61c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -344,9 +344,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), Node = node(), ok = - case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, + disc_only_copies) of {atomic, ok} -> ok; - {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, + disc_only_copies}} -> ok; E -> E end, ok = filelib:ensure_dir(form_filename("nothing")), @@ -393,12 +395,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> end, %% read is only needed so that we can seek {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), - ok = if Exists -> ok; - true -> %% new file, so preallocate - {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}), - file:truncate(FileHdl) - end, - {ok, Offset} = file:position(FileHdl, {bof, Offset}), + if Exists -> {ok, Offset} = file:position(FileHdl, {bof, Offset}); + true -> %% new file, so preallocate + ok = preallocate(FileHdl, FileSizeLimit, Offset) + end, {ok, State1 #dqstate { current_file_handle = FileHdl }}. handle_call({deliver, Q}, _From, State) -> @@ -408,7 +408,7 @@ handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> - PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(erlang:length(PubMsgIds), next)), + PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}), {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), {reply, ok, State1}; handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) -> @@ -431,29 +431,33 @@ handle_call(stop_vaporise, _From, State) -> State1 #dqstate { current_file_handle = undefined, read_file_handles = {dict:new(), gb_trees:empty()}}}; %% gen_server now calls terminate, which then calls shutdown -handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) -> +handle_call(to_disk_only_mode, _From, + State = #dqstate { operation_mode = disk_only }) -> {reply, ok, State}; -handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> - {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), +handle_call(to_disk_only_mode, _From, + State = #dqstate { operation_mode = ram_disk, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_only_copies), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), {reply, ok, State #dqstate { operation_mode = disk_only }}; -handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) -> +handle_call(to_ram_disk_mode, _From, + State = #dqstate { operation_mode = ram_disk }) -> {reply, ok, State}; -handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> - {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), +handle_call(to_ram_disk_mode, _From, + State = #dqstate { operation_mode = disk_only, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), {reply, ok, State #dqstate { operation_mode = ram_disk }}; handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {reply, 0, State}; - [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State} - end; + {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), + {reply, Length, State}; handle_call({dump_queue, Q}, _From, State) -> {Result, State1} = internal_dump_queue(Q, State), {reply, Result, State1}; @@ -477,7 +481,7 @@ handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), {noreply, State1}; handle_cast({requeue, Q, MsgSeqIds}, State) -> - MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(erlang:length(MsgSeqIds), next)), + MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, next}), {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), {noreply, State1}; handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> @@ -524,40 +528,57 @@ form_filename(Name) -> base_directory() -> filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). -dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, +zip_with_tail(List1, List2) when length(List1) =:= length(List2) -> + lists:zip(List1, List2); +zip_with_tail(List = [_|Tail], {last, E}) -> + zip_with_tail(List, Tail ++ [E]); +zip_with_tail(List, {duplicate, E}) -> + zip_with_tail(List, lists:duplicate(erlang:length(List), E)). + +dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> dets:lookup(MsgLocationDets, Key); -dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, +dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> ets:lookup(MsgLocationEts, Key). -dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, +dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> ok = dets:delete(MsgLocationDets, Key); -dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, +dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> true = ets:delete(MsgLocationEts, Key), ok. -dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, +dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> ok = dets:insert(MsgLocationDets, Obj); -dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, +dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> true = ets:insert(MsgLocationEts, Obj), ok. -dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, +dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> true = dets:insert_new(MsgLocationDets, Obj); -dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, +dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> true = ets:insert_new(MsgLocationEts, Obj). -dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, +dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> dets:match_object(MsgLocationDets, Obj); -dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, +dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). @@ -612,18 +633,28 @@ adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) -> SuppliedSeqId; adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) -> ExpectedSeqId; -adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId -> +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) + when SuppliedSeqId > ExpectedSeqId -> [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), SuppliedSeqId; -adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId -> +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) + when SuppliedSeqId > ExpectedSeqId -> [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock), ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }, Lock), SuppliedSeqId. +sequence_lookup(Sequences, Q) -> + case ets:lookup(Sequences, Q) of + [] -> + {0, 0, 0}; + [{Q, ReadSeqId, WriteSeqId, Length}] -> + {ReadSeqId, WriteSeqId, Length} + end. + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> @@ -632,14 +663,18 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> Remaining = Length - 1, - {ok, Result, NextReadSeqId, State1} = internal_read_message(Q, ReadSeqId, false, ReadMsg, State), - true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}), - {ok, case Result of - {MsgId, Delivered, {MsgId, ReadSeqId}} -> - {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; - {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> - {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining} - end, State1} + {ok, Result, NextReadSeqId, State1} = + internal_read_message(Q, ReadSeqId, false, ReadMsg, State), + true = ets:insert(Sequences, + {Q, NextReadSeqId, WriteSeqId, Remaining}), + {ok, + case Result of + {MsgId, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, + Remaining} + end, State1} end. internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> @@ -661,7 +696,8 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> read_message_at_offset(FileHdl, Offset, TotalSize), {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} + true -> + {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} end. internal_ack(Q, MsgSeqIds, State) -> @@ -685,25 +721,30 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, Files3 = if 1 =:= RefCount -> ok = dets_ets_delete(State, MsgId), - [{File, ValidTotalSize, ContiguousTop, Left, Right}] = - ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, - {File, (ValidTotalSize - TotalSize - - ?FILE_PACKING_ADJUSTMENT), + [{File, ValidTotalSize, ContiguousTop, + Left, Right}] = ets:lookup(FileSummary, File), + ContiguousTop1 = + lists:min([ContiguousTop, Offset]), + true = + ets:insert(FileSummary, + {File, (ValidTotalSize-TotalSize- + ?FILE_PACKING_ADJUSTMENT), ContiguousTop1, Left, Right}), if CurName =:= File -> Files2; true -> sets:add_element(File, Files2) end; 1 < RefCount -> - ok = dets_ets_insert(State, {MsgId, RefCount - 1, - File, Offset, TotalSize}), + ok = dets_ets_insert( + State, {MsgId, RefCount - 1, + File, Offset, TotalSize}), Files2 end, ok = if MnesiaDelete -> - mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); + mnesia:dirty_delete(rabbit_disk_queue, + {Q, SeqId}); MnesiaDelete =:= txn -> - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); + mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); true -> ok end, Files3 @@ -735,8 +776,8 @@ internal_tx_publish(MsgId, MsgBody, true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - maybe_roll_to_new_file(NextOffset, - State #dqstate {current_offset = NextOffset}); + maybe_roll_to_new_file( + NextOffset, State #dqstate {current_offset = NextOffset}); [{MsgId, RefCount, File, Offset, TotalSize}] -> %% We already know about it, just update counter ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, @@ -753,49 +794,50 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of [] -> {[], undefined, undefined, undefined}; - [{_, FirstSeqIdTo}|PubMsgSeqIdsTail] -> + [{_, FirstSeqIdTo}|_] -> {InitReadSeqId, InitWriteSeqId, InitLength} = - case ets:lookup(Sequences, Q) of - [] -> {0,0,0}; - [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> - {ReadSeqId2, WriteSeqId2, Length2} - end, - InitReadSeqId2 = determine_next_read_id(InitReadSeqId, InitWriteSeqId, FirstSeqIdTo), - { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), + sequence_lookup(Sequences, Q), + InitReadSeqId2 = determine_next_read_id( + InitReadSeqId, InitWriteSeqId, FirstSeqIdTo), + { zip_with_tail(PubMsgSeqIds, {last, {next, next}}), InitWriteSeqId, InitReadSeqId2, InitLength} end, {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( - fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - %% must deal with publishes first, if we didn't - %% then we could end up acking a message before - %% it's been published, which is clearly - %% nonsense. I.e. in commit, do not do things in an - %% order which _could_not_ have happened. - {Sync2, WriteSeqId3} = - lists:foldl( - fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, - {Acc, ExpectedSeqId}) -> - [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - dets_ets_lookup(State, MsgId), - SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write), + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + %% must deal with publishes first, if we didn't + %% then we could end up acking a message before + %% it's been published, which is clearly + %% nonsense. I.e. in commit, do not do things in an + %% order which _could_not_ have happened. + {Sync2, WriteSeqId3} = + lists:foldl( + fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, + {Acc, ExpectedSeqId}) -> + [{MsgId, _RefCount, File, _Offset, + _TotalSize}] = dets_ets_lookup(State, MsgId), + SeqId2 = adjust_last_msg_seq_id( + Q, ExpectedSeqId, SeqId, write), NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId), - ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = - {Q, SeqId2}, - msg_id = MsgId, - is_delivered = false, - next_seq_id = NextSeqId2 - }, - write), - {Acc or (CurName =:= File), NextSeqId2} + ok = mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = + {Q, SeqId2}, + msg_id = MsgId, + is_delivered = false, + next_seq_id = NextSeqId2 + }, + write), + {Acc orelse (CurName =:= File), NextSeqId2} end, {false, PubAcc}, PubList), {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), {Sync2, WriteSeqId3, State3} end), true = if PubList =:= [] -> true; - true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)}) + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, + Length + erlang:length(PubList)}) end, ok = if Sync -> file:sync(CurHdl); true -> ok @@ -807,12 +849,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), {ReadSeqId, WriteSeqId, Length} = - case ets:lookup(Sequences, Q) of - [] -> %% previously unseen queue - {0, 0, 0}; - [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> - {ReadSeqId2, WriteSeqId2, Length2} - end, + sequence_lookup(Sequences, Q), ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId), WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty), WriteSeqId3Next = WriteSeqId3 + 1, @@ -827,12 +864,12 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> internal_tx_cancel(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned - MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), + MsgSeqIds = zip_with_tail(MsgIds, {duplicate, undefined}), remove_messages(undefined, MsgSeqIds, false, State). internal_requeue(_Q, [], State) -> {ok, State}; -internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], +internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_], State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've @@ -856,34 +893,39 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], %% MsgLocation and FileSummary stay put (which makes further sense %% as they have no concept of sequence id anyway). - %% the Q _must_ already exist - [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q), + {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), - MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), - {atomic, WriteSeqId2} = + MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, next}}), + {atomic, {WriteSeqId2, Q}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, - {_NextMsgSeqId, NextSeqIdTo}}, - ExpectedSeqIdTo) -> - SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), - NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), - [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), - mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2}, - next_seq_id = NextSeqIdTo2 - }, - write), - mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write), - NextSeqIdTo2 - end, WriteSeqId, MsgSeqIdsZipped) + lists:foldl(fun requeue_message/2, {WriteSeqId, Q}, + MsgSeqIdsZipped) end), - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, Length + erlang:length(MsgSeqIds)}), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, + Length + erlang:length(MsgSeqIds)}), {ok, State}. +requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo}, + {_NextMsgSeqId, NextSeqIdTo}}, + {ExpectedSeqIdTo, Q}) -> + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), + NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId, + next_seq_id = NextSeqIdOrig }] = + mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), + if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok; + true -> + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2}, + next_seq_id = NextSeqIdTo2 + }, + write), + ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) + end, + {NextSeqIdTo2, Q}. + internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, 0, State}; @@ -898,7 +940,8 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> (SeqId) -> [#dq_msg_loc { msg_id = MsgId, next_seq_id = NextSeqId } - ] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + ] = mnesia:read(rabbit_disk_queue, + {Q, SeqId}, write), {true, {MsgId, SeqId}, NextSeqId} end, ReadSeqId), remove_messages(Q, MsgSeqIds, txn, State) @@ -908,23 +951,27 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end. internal_delete_queue(Q, State) -> - {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State), + {ok, _Count, State1 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State), %% remove everything undelivered true = ets:delete(Sequences, Q), {atomic, {ok, State2}} = mnesia:transaction( - fun() -> + fun() -> %% now remove everything already delivered ok = mnesia:write_lock_table(rabbit_disk_queue), Objs = - mnesia:match_object(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, write), + mnesia:match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + write), MsgSeqIds = lists:map( - fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) -> - {MsgId, SeqId} - end, Objs), + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} end, Objs), remove_messages(Q, MsgSeqIds, txn, State1) end), {ok, State2}. @@ -938,26 +985,25 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> fun ({SeqId, _State1}) when SeqId == WriteSeq -> false; ({SeqId, State1}) -> - {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, NextReadSeqId, State2} = - internal_read_message(Q, SeqId, true, true, State1), - {true, {MsgId, Msg, Size, Delivered, SeqId}, {NextReadSeqId, State2}} + {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, + NextReadSeqId, State2} = + internal_read_message(Q, SeqId, true, true, + State1), + {true, {MsgId, Msg, Size, Delivered, SeqId}, + {NextReadSeqId, State2}} end, {ReadSeq, State}), {lists:reverse(QList), State3} end. -internal_delete_non_durable_queues(DurableQueues, State = #dqstate { sequences = Sequences }) -> - State3 = - ets:foldl( - fun ({Q, _Read, _Write, _Length}, State1) -> - case sets:is_element(Q, DurableQueues) of - true -> - State1; - false -> - {ok, State2} = internal_delete_queue(Q, State1), - State2 - end - end, State, Sequences), - {ok, State3}. +internal_delete_non_durable_queues( + DurableQueues, State = #dqstate { sequences = Sequences }) -> + ets:foldl( + fun ({Q, _Read, _Write, _Length}, {ok, State1}) -> + case sets:is_element(Q, DurableQueues) of + true -> {ok, State1}; + false -> internal_delete_queue(Q, State1) + end + end, {ok, State}, Sequences). %% ---- ROLLING OVER THE APPEND FILE ---- @@ -975,10 +1021,8 @@ maybe_roll_to_new_file(Offset, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]), - {ok, FileSizeLimit} = file:position(NextHdl, {bof, FileSizeLimit}), - ok = file:truncate(NextHdl), - {ok, 0} = file:position(NextHdl, {bof, 0}), - true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right + ok = preallocate(NextHdl, FileSizeLimit, 0), + true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State1 = State #dqstate { current_file_name = NextName, current_file_handle = NextHdl, @@ -989,6 +1033,12 @@ maybe_roll_to_new_file(Offset, maybe_roll_to_new_file(_, State) -> {ok, State}. +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + ok. + %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- compact(FilesSet, State) -> @@ -1000,70 +1050,59 @@ compact(FilesSet, State) -> end, [], Files), lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). -combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary, - current_file_name = CurName - }) -> +combine_file(File, State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> %% the file we're looking at may no longer exist as it may have %% been deleted within the current GC run case ets:lookup(FileSummary, File) of [] -> State; - [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> + [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] -> GoRight = fun() -> case Right of undefined -> State; - _ when not(CurName =:= Right) -> - [RightObj = {Right, RightValidData, - _RightContiguousTop, File, RightRight}] = - ets:lookup(FileSummary, Right), - RightSumData = ValidData + RightValidData, - if FileSizeLimit >= RightSumData -> - %% here, Right will be the source and so will be deleted, - %% File will be the destination - State1 = combine_files(RightObj, FileObj, - State), - %% this could fail if RightRight is undefined - %% left is the 4th field - ets:update_element(FileSummary, - RightRight, {4, File}), - true = ets:insert(FileSummary, {File, - RightSumData, - RightSumData, - Left, - RightRight}), - true = ets:delete(FileSummary, Right), - State1; - true -> State - end; + _ when not (CurName == Right) -> + [RightObj] = ets:lookup(FileSummary, Right), + {_, State1} = + adjust_meta_and_combine(FileObj, RightObj, + State), + State1; _ -> State end end, case Left of undefined -> GoRight(); - _ -> [LeftObj = - {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] = - ets:lookup(FileSummary, Left), - LeftSumData = ValidData + LeftValidData, - if FileSizeLimit >= LeftSumData -> - %% here, File will be the source and so will be deleted, - %% Left will be the destination - State1 = combine_files(FileObj, LeftObj, State), - %% this could fail if Right is undefined - %% left is the 4th field - ets:update_element(FileSummary, Right, {4, Left}), - true = ets:insert(FileSummary, {Left, LeftSumData, - LeftSumData, - LeftLeft, Right}), - true = ets:delete(FileSummary, File), - State1; - true -> - GoRight() + _ -> [LeftObj] = ets:lookup(FileSummary, Left), + case adjust_meta_and_combine(LeftObj, FileObj, State) of + {true, State1} -> State1; + {false, State} -> GoRight() end end end. +adjust_meta_and_combine( + LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile}, + RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight}, + State = #dqstate { file_size_limit = FileSizeLimit, + file_summary = FileSummary + }) -> + TotalValidData = LeftValidData + RightValidData, + if FileSizeLimit >= TotalValidData -> + State1 = combine_files(RightObj, LeftObj, State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, RightRight, {4, LeftFile}), + true = ets:insert(FileSummary, {LeftFile, + TotalValidData, TotalValidData, + LeftLeft, + RightRight}), + true = ets:delete(FileSummary, RightFile), + {true, State1}; + true -> {false, State} + end. + sort_msg_locations_by_offset(Asc, List) -> Comp = if Asc -> fun erlang:'<'/2; true -> fun erlang:'>'/2 @@ -1093,92 +1132,72 @@ combine_files({Source, SourceValid, _SourceContiguousTop, file:open(form_filename(Destination), [read, write, raw, binary, read_ahead, delayed_write]), ExpectedSize = SourceValid + DestinationValid, - %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file - %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file - %% then truncate, copy back in, and then copy over from Source + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); + DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = file:open(form_filename(Tmp), - [read, write, raw, binary, read_ahead, delayed_write]), + [read, write, raw, binary, + read_ahead, delayed_write]), Worklist = lists:dropwhile( fun ({_, _, _, Offset, _}) when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == DestinationContiguousTop - %% because if it was then DestinationContiguousTop would have been - %% extended by TotalSize + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect that the list should be - %% naturally sorted as we require, however, we need to enforce it anyway - end, sort_msg_locations_by_offset(true, - dets_ets_match_object(State, - {'_', '_', - Destination, - '_', '_'}))), + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Destination, + '_', '_'}))), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), TmpSize = DestinationValid - DestinationContiguousTop, - {TmpSize, BlockStart1, BlockEnd1} = - lists:foldl( - fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the TmpFile. - %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% this message is going to end up back in - %% Destination, at DestinationContiguousTop - %% + CurOffset - FinalOffset = DestinationContiguousTop + CurOffset, - ok = dets_ets_insert(State, {MsgId, RefCount, Destination, - FinalOffset, TotalSize}), - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - %% base case, called only for the - %% first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - %% extend the current block because - %% the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - %% found a gap, so actually do the - %% work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file:position(DestinationHdl, - {bof, BlockStart}), - {ok, BSize} = file:copy(DestinationHdl, - TmpHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {0, undefined, undefined}, Worklist), - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), - {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), %% so now Tmp contains everything we need to salvage from %% Destination, and MsgLocationDets has been updated to %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - ok = truncate_and_extend_file(DestinationHdl, - DestinationContiguousTop, ExpectedSize), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be - %% DestinationValid + %% position in DestinationHdl should now be DestinationValid ok = file:sync(DestinationHdl), ok = file:close(TmpHdl), ok = file:delete(form_filename(Tmp)) end, SourceWorkList = - sort_msg_locations_by_offset(true, - dets_ets_match_object(State, - {'_', '_', Source, - '_', '_'})), - {ExpectedSize, BlockStart2, BlockEnd2} = + sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Source, + '_', '_'})), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file:sync(DestinationHdl), + ok = file:close(SourceHdl), + ok = file:close(DestinationHdl), + ok = file:delete(form_filename(Source)), + State. + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, State) -> + {FinalOffset, BlockStart2, BlockEnd2} = lists:foldl( fun ({MsgId, RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> @@ -1190,8 +1209,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, CurOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> - %% base case, called only for the first list - %% elem + %% base case, called only for the first list elem {NextOffset, Offset, Offset + Size}; Offset =:= BlockEnd -> %% extend the current block because the next @@ -1207,17 +1225,12 @@ combine_files({Source, SourceValid, _SourceContiguousTop, file:copy(SourceHdl, DestinationHdl, BSize), {NextOffset, Offset, Offset + Size} end - end, {DestinationValid, undefined, undefined}, SourceWorkList), + end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block BSize2 = BlockEnd2 - BlockStart2, {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2), - %% tidy up - ok = file:sync(DestinationHdl), - ok = file:close(SourceHdl), - ok = file:close(DestinationHdl), - ok = file:delete(form_filename(Source)), - State. + ok. close_file(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) -> @@ -1237,20 +1250,22 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when not(is_atom(Right)) -> - %% the eldest file is empty. YAY! - %% left is the 4th field - true = ets:update_element(FileSummary, Right, {4, undefined}); - {_, _} when not(is_atom(Right)) -> - %% left is the 4th field - true = ets:update_element(FileSummary, Right, {4, Left}), - %% right is the 5th field - true = ets:update_element(FileSummary, Left, {5, Right}) - end, - true = ets:delete(FileSummary, File), - ok = file:delete(form_filename(File)), - Acc; + 0 -> + case {Left, Right} of + {undefined, _} when not (is_atom(Right)) -> + %% the eldest file is empty. YAY! + %% left is the 4th field + true = + ets:update_element(FileSummary, Right, {4, undefined}); + {_, _} when not (is_atom(Right)) -> + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, Left}), + %% right is the 5th field + true = ets:update_element(FileSummary, Left, {5, Right}) + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(File)), + Acc; _ -> [File|Acc] end. @@ -1268,7 +1283,7 @@ del_index() -> {atomic, ok} -> ok; %% hmm, something weird must be going on, but it's probably %% not the end of the world - {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; + {aborted, {no_exists, rabbit_disk_queue,_}} -> ok; E2 -> E2 end. @@ -1286,13 +1301,18 @@ load_from_disk(State) -> {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }, true) -> - case erlang:length(dets_ets_lookup(State1, MsgId)) of - 0 -> ok == mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); - 1 -> true - end - end, - true, rabbit_disk_queue) + mnesia:foldl( + fun (#dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = {Q, SeqId} }, + true) -> + case erlang:length(dets_ets_lookup( + State1, MsgId)) of + 0 -> ok == mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); + 1 -> true + end + end, + true, rabbit_disk_queue) end), State2 = extract_sequence_numbers(State1), ok = del_index(), @@ -1306,9 +1326,9 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, case ets:lookup(Sequences, Q) of - [] -> - true = ets:insert_new(Sequences, - {Q, SeqId, NextWrite, -1}); + [] -> true = + ets:insert_new(Sequences, + {Q, SeqId, NextWrite, -1}); [Orig = {Q, Read, Write, Length}] -> Repl = {Q, lists:min([Read, SeqId]), %% Length is wrong here, but @@ -1345,10 +1365,12 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foreach( fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> - Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0), + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), ReadSeqId2 = ReadSeqId + Gap, Length = WriteSeqId - ReadSeqId2, - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length}) + true = + ets:insert(Sequences, + {Q, ReadSeqId2, WriteSeqId, Length}) end, ets:match_object(Sequences, '_')) end). @@ -1361,8 +1383,9 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> [Obj] -> if Gap =:= 0 -> ok; true -> mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }, - next_seq_id = SeqId + Gap + 1 + Obj #dq_msg_loc { + queue_and_seq_id = {Q, SeqId + Gap }, + next_seq_id = SeqId + Gap + 1 }, write), mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) @@ -1371,8 +1394,9 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> end, shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). -load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, - current_file_name = CurName }) -> +load_messages(undefined, [], + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), State; load_messages(Left, [], State) -> @@ -1401,8 +1425,9 @@ load_messages(Left, [File|Files], msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> - true = dets_ets_insert_new(State, {MsgId, RefCount, File, - Offset, TotalSize}), + true = + dets_ets_insert_new(State, {MsgId, RefCount, File, + Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } @@ -1410,21 +1435,37 @@ load_messages(Left, [File|Files], end, {[], 0}, Messages), %% foldl reverses lists and find_contiguous_block_prefix needs %% elems in the same order as from scan_file_for_valid_messages - {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)), + {ContiguousTop, _} = find_contiguous_block_prefix( + lists:reverse(ValidMessagesRev)), Right = case Files of [] -> undefined; [F|_] -> F end, - true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}), + true = ets:insert_new(FileSummary, + {File, ValidTotalSize, ContiguousTop, Left, Right}), load_messages(File, Files, State). %% ---- DISK RECOVERY OF FAILED COMPACTION ---- recover_crashed_compactions(Files, TmpFiles) -> - lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end, + lists:foreach(fun (TmpFile) -> + ok = recover_crashed_compactions1(Files, TmpFile) end, TmpFiles), ok. +verify_messages_in_mnesia(MsgIds) -> + lists:foreach( + fun (MsgId) -> + true = 0 < erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) + end, MsgIds). + recover_crashed_compactions1(Files, TmpFile) -> GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, @@ -1435,37 +1476,35 @@ recover_crashed_compactions1(Files, TmpFile) -> MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out - lists:foreach(fun (MsgId) -> - true = 0 < erlang:length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, - msg_id)) - end, MsgIdsTmp), + verify_messages_in_mnesia(MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), - %% 1) It's possible that everything in the tmp file is also in the main file - %% such that the main file is (prefix ++ tmpfile). This means that compaction - %% failed immediately prior to the final step of deleting the tmp file. - %% Plan: just delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the main file - %% but with holes throughout (or just somthing like main = (prefix ++ hole ++ tmpfile)). - %% This means that compaction wrote out the tmp file successfully and then failed. - %% Plan: just delete the tmp file and allow the compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the main file - %% but such that the main file does not end with tmp file (and there are valid messages - %% in the suffix; main = (prefix ++ tmpfile[with extra holes?] ++ suffix)). - %% This means that compaction failed as we were writing out the tmp file. - %% Plan: just delete the tmp file and allow the compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which are not in the main file. - %% This means that writing out the tmp file succeeded, but then we failed as we - %% were copying them back over to the main file, after truncating the main file. - %% As the main file has already been truncated, it should consist only of valid messages - %% Plan: Truncate the main file back to before any of the files in the tmp file and copy + %% 1) It's possible that everything in the tmp file is also in the + %% main file such that the main file is (prefix ++ + %% tmpfile). This means that compaction failed immediately + %% prior to the final step of deleting the tmp file. Plan: just + %% delete the tmp file + %% 2) It's possible that everything in the tmp file is also in the + %% main file but with holes throughout (or just somthing like + %% main = (prefix ++ hole ++ tmpfile)). This means that + %% compaction wrote out the tmp file successfully and then + %% failed. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 3) It's possible that everything in the tmp file is also in the + %% main file but such that the main file does not end with tmp + %% file (and there are valid messages in the suffix; main = + %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This + %% means that compaction failed as we were writing out the tmp + %% file. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 4) It's possible that there are messages in the tmp file which + %% are not in the main file. This means that writing out the + %% tmp file succeeded, but then we failed as we were copying + %% them back over to the main file, after truncating the main + %% file. As the main file has already been truncated, it should + %% consist only of valid messages. Plan: Truncate the main file + %% back to before any of the files in the tmp file and copy %% them over again case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file @@ -1473,29 +1512,21 @@ recover_crashed_compactions1(Files, TmpFile) -> %% is empty ok = file:delete(TmpFile); _False -> - %% we're in case 4 above. - %% check that everything in the main file is a valid message in mnesia - lists:foreach(fun (MsgId) -> - true = 0 < erlang:length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_', - next_seq_id = '_' - }, - msg_id)) - end, MsgIds), + %% we're in case 4 above. Check that everything in the + %% main file is a valid message in mnesia + verify_messages_in_mnesia(MsgIds), %% The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), %% we should have that none of the messages in the prefix %% are in the tmp file - true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, - MsgIds), - + true = lists:all(fun (MsgId) -> + not (lists:member(MsgId, MsgIdsTmp)) + end, MsgIds), {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), [write, raw, binary, delayed_write]), {ok, Top} = file:position(MainHdl, Top), - ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file + %% wipe out any rubbish at the end of the file + ok = file:truncate(MainHdl), %% there really could be rubbish at the end of the file - %% we could have failed after the extending truncate. %% Remember the head of the list will be the highest entry @@ -1596,7 +1627,8 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> scan_file_for_valid_messages(File) -> {ok, Hdl} = file:open(File, [raw, binary, read]), Valid = scan_file_for_valid_messages(Hdl, 0, []), - _ = file:close(Hdl), %% if something really bad's happened, the close could fail, but ignore + %% if something really bad's happened, the close could fail, but ignore + file:close(Hdl), Valid. scan_file_for_valid_messages(FileHdl, Offset, Acc) -> @@ -1607,21 +1639,28 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) -> {ok, {ok, MsgId, TotalSize, NextOffset}} -> scan_file_for_valid_messages(FileHdl, NextOffset, [{MsgId, TotalSize, Offset}|Acc]); - _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages + _KO -> + %% bad message, but we may still have recovered some valid messages + {ok, Acc} end. read_next_file_entry(FileHdl, Offset) -> TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, case file:read(FileHdl, TwoIntegers) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + {ok, + <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> case {TotalSize =:= 0, MsgIdBinSize =:= 0} of {true, _} -> {ok, eof}; %% Nothing we can do other than stop - {false, true} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, + {false, true} -> + %% current message corrupted, try skipping past it + ExpectedAbsPos = + Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, case file:position(FileHdl, {cur, TotalSize + 1}) of - {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}}; - {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up + {ok, ExpectedAbsPos} -> + {ok, {corrupted, ExpectedAbsPos}}; + {ok, _SomeOtherPos} -> + {ok, eof}; %% seek failed, so give up KO -> KO end; {false, false} -> %% all good, let's continue @@ -1629,19 +1668,23 @@ read_next_file_entry(FileHdl, Offset) -> {ok, <<MsgId:MsgIdBinSize/binary>>} -> ExpectedAbsPos = Offset + TwoIntegers + TotalSize, case file:position(FileHdl, - {cur, TotalSize - MsgIdBinSize}) of + {cur, TotalSize - MsgIdBinSize} + ) of {ok, ExpectedAbsPos} -> NextOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, case file:read(FileHdl, 1) of - {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> + {ok, + <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> {ok, {ok, binary_to_term(MsgId), TotalSize, NextOffset}}; {ok, _SomeOtherData} -> {ok, {corrupted, NextOffset}}; KO -> KO end; - {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up + {ok, _SomeOtherPos} -> + %% seek failed, so give up + {ok, eof}; KO -> KO end; eof -> {ok, eof}; |
