diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 152 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 5 |
3 files changed, 98 insertions, 67 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 868eab4a5e..4eef884f72 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -53,7 +53,8 @@ -include("rabbit.hrl"). -define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK, 255). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). @@ -101,8 +102,8 @@ %% The components: %% -%% MsgLocation: this is a dets table which contains: -%% {MsgId, RefCount, File, Offset, TotalSize} +%% MsgLocation: this is a (d)ets table which contains: +%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent} %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} %% Sequences: this is an ets table which contains: @@ -393,7 +394,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ?FILE_EXTENSION_DETS)}, {min_no_slots, 1024*1024}, %% man says this should be <= 32M. But it works... - {max_no_slots, 1024*1024*1024}, + {max_no_slots, 30*1024*1024}, {type, set} ]), @@ -509,8 +510,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) -> handle_cast({auto_ack_next_message, Q}, State) -> {ok, State1} = internal_auto_ack(Q, State), noreply(State1); -handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) -> - {ok, State1} = internal_tx_publish(MsgId, Message, State), +handle_cast({tx_publish, Message}, State) -> + {ok, State1} = internal_tx_publish(Message, State), noreply(State1); handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), @@ -636,8 +637,8 @@ memory_use(#dqstate { operation_mode = disk_only, (WordSize * (ets:info(FileSummary, memory) + ets:info(Cache, memory) + ets:info(Sequences, memory))) + - round(MnesiaSizeEstimate) + - round(MsgLocationSizeEstimate). + rabbit_misc:ceil(MnesiaSizeEstimate) + + rabbit_misc:ceil(MsgLocationSizeEstimate). to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> State; @@ -872,7 +873,8 @@ insert_into_cache(Message = #basic_message { guid = MsgId }, true -> 0; false -> 1 end, - true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}), + true = + ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}), ok. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -890,8 +892,9 @@ internal_deliver(Q, ReadMsg, FakeDeliver, {Q, ReadSeqId+1, WriteSeqId}), {ok, case Result of - {MsgId, Delivered, {MsgId, ReadSeqId}} -> - {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; + {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}, + Remaining}; {Message, BodySize, Delivered, {MsgId, ReadSeqId}} -> {Message, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining} @@ -927,7 +930,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - [{MsgId, RefCount, File, Offset, TotalSize}] = + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = dets_ets_lookup(State, MsgId), ok = if FakeDeliver orelse Delivered -> ok; @@ -940,12 +943,13 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - case fetch_and_increment_cache(MsgId, State) of not_found -> {FileHdl, State1} = get_read_handle(File, Offset, State), - {ok, {MsgBody, BodySize}} = + {ok, {MsgBody, IsPersistent, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), - Message = bin_to_msg(MsgBody), + #basic_message { is_persistent=IsPersistent, guid=MsgId } = + Message = bin_to_msg(MsgBody), ok = if RefCount > 1 orelse ForceInCache -> - insert_into_cache(Message, BodySize, - ForceInCache, State1); + insert_into_cache + (Message, BodySize, ForceInCache, State1); true -> ok %% it's not in the cache and we only %% have 1 queue with the message. So @@ -959,13 +963,14 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - State} end; false -> - {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} + {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State} end. internal_auto_ack(Q, State) -> case internal_deliver(Q, false, true, State) of {ok, empty, State1} -> {ok, State1}; - {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} -> + {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining}, + State1} -> remove_messages(Q, [MsgSeqId], true, State1) end. @@ -985,7 +990,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, Files = lists:foldl( fun ({MsgId, SeqId}, Files1) -> - [{MsgId, RefCount, File, Offset, TotalSize}] = + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = dets_ets_lookup(State, MsgId), Files2 = case RefCount of @@ -1007,8 +1012,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), ok = dets_ets_insert( - State, {MsgId, RefCount - 1, - File, Offset, TotalSize}), + State, {MsgId, RefCount - 1, File, Offset, + TotalSize, IsPersistent}), Files1 end, ok = case MnesiaDelete of @@ -1023,7 +1028,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, State1 = compact(Files, State), {ok, State1}. -internal_tx_publish(MsgId, Message, +internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, + guid = MsgId }, State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, current_offset = CurOffset, @@ -1032,10 +1038,11 @@ internal_tx_publish(MsgId, Message, case dets_ets_lookup(State, MsgId) of [] -> %% New message, lots to do - {ok, TotalSize} = - append_message(CurHdl, MsgId, msg_to_bin(Message)), - true = dets_ets_insert_new(State, {MsgId, 1, CurName, - CurOffset, TotalSize}), + {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message), + IsPersistent), + true = dets_ets_insert_new + (State, {MsgId, 1, CurName, + CurOffset, TotalSize, IsPersistent}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize + @@ -1051,10 +1058,10 @@ internal_tx_publish(MsgId, Message, maybe_roll_to_new_file( NextOffset, State #dqstate {current_offset = NextOffset, current_dirty = true}); - [{MsgId, RefCount, File, Offset, TotalSize}] -> + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] -> %% We already know about it, just update counter ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, - Offset, TotalSize}), + Offset, TotalSize, IsPersistent}), {ok, State} end. @@ -1080,7 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, lists:foldl( fun (MsgId, {InCurFileAcc, SeqId}) -> [{MsgId, _RefCount, File, Offset, - _TotalSize}] = dets_ets_lookup(State, MsgId), + _TotalSize, _IsPersistent}] = + dets_ets_lookup(State, MsgId), ok = mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = @@ -1109,7 +1117,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, internal_publish(Q, Message = #basic_message { guid = MsgId }, IsDelivered, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = - internal_tx_publish(MsgId, Message, State), + internal_tx_publish(Message, State), {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, @@ -1363,7 +1371,7 @@ sort_msg_locations_by_offset(Asc, List) -> true -> fun erlang:'<'/2; false -> fun erlang:'>'/2 end, - lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) -> Comp(OffA, OffB) end, List). @@ -1402,7 +1410,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, read_ahead, delayed_write]), Worklist = lists:dropwhile( - fun ({_, _, _, Offset, _}) + fun ({_, _, _, Offset, _, _}) when Offset /= DestinationContiguousTop -> %% it cannot be that Offset == %% DestinationContiguousTop because if it @@ -1416,7 +1424,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, end, sort_msg_locations_by_offset( true, dets_ets_match_object(State, {'_', '_', Destination, - '_', '_'}))), + '_', '_', '_'}))), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State), @@ -1438,7 +1446,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, sort_msg_locations_by_offset( true, dets_ets_match_object(State, {'_', '_', Source, - '_', '_'})), + '_', '_', '_'})), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1452,14 +1460,15 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( - fun ({MsgId, RefCount, _Source, Offset, TotalSize}, + fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent}, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert(State, {MsgId, RefCount, Destination, - CurOffset, TotalSize}), + ok = dets_ets_insert + (State, {MsgId, RefCount, Destination, + CurOffset, TotalSize, IsPersistent}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> %% base case, called only for the first list elem @@ -1643,11 +1652,13 @@ load_messages(undefined, [], State; load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), - Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of - [] -> 0; - L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = - sort_msg_locations_by_offset(false, L), - MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + Offset = + case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of + [] -> 0; + L -> + [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent} + | _ ] = sort_msg_locations_by_offset(false, L), + MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State #dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; @@ -1656,7 +1667,7 @@ load_messages(Left, [File|Files], %% [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( - fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + fun ({MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case erlang:length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, @@ -1666,9 +1677,9 @@ load_messages(Left, [File|Files], msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> - true = - dets_ets_insert_new(State, {MsgId, RefCount, File, - Offset, TotalSize}), + true = dets_ets_insert_new + (State, {MsgId, RefCount, File, + Offset, TotalSize, IsPersistent}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } @@ -1706,20 +1717,22 @@ verify_messages_in_mnesia(MsgIds) -> msg_id)) end, MsgIds). +grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) -> + MsgId. + recover_crashed_compactions1(Files, TmpFile) -> - GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFile, Files), %% [{MsgId, TotalSize, FileOffset}] {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), - MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), + MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp), %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out verify_messages_in_mnesia(MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), - MsgIds = lists:map(GrabMsgId, UncorruptedMessages), + MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -1788,7 +1801,7 @@ recover_crashed_compactions1(Files, TmpFile) -> {ok, MainMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), - MsgIdsMain = lists:map(GrabMsgId, MainMessages), + MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages), %% check that everything in MsgIds is in MsgIdsMain true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIds), @@ -1833,16 +1846,20 @@ get_disk_queue_files() -> %% ---- RAW READING AND WRITING OF FILES ---- -append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) -> +append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> BodySize = size(MsgBody), MsgIdBin = term_to_binary(MsgId), MsgIdBinSize = size(MsgIdBin), TotalSize = BodySize + MsgIdBinSize, + StopByte = case IsPersistent of + true -> ?WRITE_OK_PERSISTENT; + false -> ?WRITE_OK_TRANSIENT + end, case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary, - ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of + StopByte:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, TotalSize}; KO -> KO end. @@ -1856,9 +1873,14 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> MsgIdBinSize:?INTEGER_SIZE_BITS, Rest:TotalSizeWriteOkBytes/binary>>} -> BodySize = TotalSize - MsgIdBinSize, - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, - ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, - {ok, {MsgBody, BodySize}}; + case Rest of + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, false, BodySize}}; + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, true, BodySize}} + end; KO -> KO end; KO -> KO @@ -1876,15 +1898,15 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) -> {ok, eof} -> {ok, Acc}; {ok, {corrupted, NextOffset}} -> scan_file_for_valid_messages(FileHdl, NextOffset, Acc); - {ok, {ok, MsgId, TotalSize, NextOffset}} -> - scan_file_for_valid_messages(FileHdl, NextOffset, - [{MsgId, TotalSize, Offset}|Acc]); + {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} -> + scan_file_for_valid_messages( + FileHdl, NextOffset, + [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); _KO -> %% bad message, but we may still have recovered some valid messages {ok, Acc} end. - read_next_file_entry(FileHdl, Offset) -> TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, case file:read(FileHdl, TwoIntegers) of @@ -1915,9 +1937,15 @@ read_next_file_entry(FileHdl, Offset) -> ?FILE_PACKING_ADJUSTMENT, case file:read(FileHdl, 1) of {ok, - <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> - {ok, {ok, binary_to_term(MsgId), - TotalSize, NextOffset}}; + <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> + {ok, + {ok, binary_to_term(MsgId), + false, TotalSize, NextOffset}}; + {ok, + <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> + {ok, + {ok, binary_to_term(MsgId), + true, TotalSize, NextOffset}}; {ok, _SomeOtherData} -> {ok, {corrupted, NextOffset}}; KO -> KO diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 176ddddb03..fc30834e3a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -446,10 +446,12 @@ unfold(Fun, Acc, Init) -> false -> {Acc, Init} end. -ceil(N) when N - trunc(N) > 0 -> - 1 + trunc(N); ceil(N) -> - N. + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. keygets(Keys, KeyList) -> lists:reverse( diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index d864d9b2f6..425d776377 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -311,7 +311,8 @@ publish_delivered(Msg = %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag - {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ), + {MsgId, IsPersistent, false, AckTag, 0} = + rabbit_disk_queue:phantom_deliver(PubQ), {ok, AckTag, State1}; false -> %% in this case, we don't actually care about the ack, so @@ -340,7 +341,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, AckTag1 = case IsDurable andalso IsPersistent of true -> - {MsgId, IsDelivered1, AckTag2, _PersistRem} + {MsgId, IsPersistent, IsDelivered1, AckTag2, _PRem} = rabbit_disk_queue:phantom_deliver(Q), AckTag2; false -> |
