diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 145 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 78 |
2 files changed, 195 insertions, 28 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c08258c846..63076eb914 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1]). +-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -181,21 +181,21 @@ %% the data the size of which is tracked by the ContiguousTop %% variable. Judicious use of a mirror is required). %% -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | G | | G | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | D | | X | | F | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | X | | E | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | C | | F | ===> | D | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | X | | C | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | B | | X | | B | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | A | | E | | A | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% left right left %% %% From this reasoning, we do have a bound on the number of times the @@ -231,6 +231,7 @@ -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -262,6 +263,9 @@ tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSe tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). +requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). + stop() -> gen_server:call(?SERVER, stop, infinity). @@ -305,7 +309,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {type, set} ]), MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - true = ets:safe_fixtable(MsgLocationEts, true), State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, @@ -381,6 +384,9 @@ handle_cast({tx_publish, MsgId, MsgBody}, State) -> {noreply, State1}; handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), + {noreply, State1}; +handle_cast({requeue, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqIds, State), {noreply, State1}. handle_info(_Info, State) -> @@ -469,7 +475,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - {FileHdl, State1} = getReadHandle(File, State), + {FileHdl, State1} = get_read_handle(File, State), %% read the message {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), @@ -483,7 +489,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> end end. -getReadHandle(File, State = +get_read_handle(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit }) -> Now = now(), @@ -653,6 +659,49 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). +internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> + %% We know that every seq_id in here is less than the ReadSeqId + %% you'll get if you look up this queue in Sequences (i.e. they've + %% already been delivered). We also know that the rows for these + %% messages are still in rabbit_disk_queue (i.e. they've not been + %% ack'd). + + %% Now, it would be nice if we could adjust the sequence ids in + %% rabbit_disk_queue (mnesia) to create a contiguous block and + %% then drop the ReadSeqId for the queue by the corresponding + %% amount. However, this is not safe because there may be other + %% sequence ids which have been sent out as part of deliveries + %% which are not being requeued. As such, moving things about in + %% rabbit_disk_queue _under_ the current ReadSeqId would result in + %% such sequence ids referring to the wrong messages. + + %% Therefore, the only solution is to take these messages, and to + %% reenqueue them at the top of the queue. Usefully, this only + %% affects the Sequences and rabbit_disk_queue structures - there + %% is no need to physically move the messages about on disk, so + %% MsgLocation and FileSummary stay put (which makes further sense + %% as they have no concept of sequence id anyway). + + %% the Q _must_ already exist + [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + {atomic, WriteSeqId2} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({MsgId, SeqId}, NextWriteSeqId) -> + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }}, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), + NextWriteSeqId + 1 + end, WriteSeqId, MsgSeqIds) + end), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), + {ok, State}. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, @@ -688,9 +737,9 @@ compact(FilesSet, State) -> RemainingFiles = lists:foldl(fun (File, Acc) -> delete_empty_files(File, Acc, State) end, [], Files), - lists:foldl(fun combineFile/2, State, lists:reverse(RemainingFiles)). + lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). -combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, +combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit, file_summary = FileSummary, current_file_name = CurName }) -> @@ -711,7 +760,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, if FileSizeLimit >= RightSumData -> %% here, Right will be the source and so will be deleted, %% File will be the destination - State1 = combineFiles(RightObj, FileObj, + State1 = combine_files(RightObj, FileObj, State), %% this could fail if RightRight is undefined %% left is the 4th field @@ -739,7 +788,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, if FileSizeLimit >= LeftSumData -> %% here, File will be the source and so will be deleted, %% Left will be the destination - State1 = combineFiles(FileObj, LeftObj, State), + State1 = combine_files(FileObj, LeftObj, State), %% this could fail if Right is undefined %% left is the 4th field ets:update_element(FileSummary, Right, {4, Left}), @@ -754,7 +803,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, end end. -sortMsgLocationsByOffset(Asc, List) -> +sort_msg_locations_by_offset(Asc, List) -> Comp = if Asc -> fun erlang:'<'/2; true -> fun erlang:'>'/2 end, @@ -762,7 +811,7 @@ sortMsgLocationsByOffset(Asc, List) -> Comp(OffA, OffB) end, List). -truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) -> +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok = file:truncate(FileHdl), {ok, Highpoint} = file:position(FileHdl, {bof, Highpoint}), @@ -770,12 +819,12 @@ truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok. -combineFiles({Source, SourceValid, _SourceContiguousTop, +combine_files({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, State1) -> - State = closeFile(Source, closeFile(Destination, State1)), + State = close_file(Source, close_file(Destination, State1)), {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), @@ -788,7 +837,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% then truncate, copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> - ok = truncateAndExtendFile(DestinationHdl, + ok = truncate_and_extend_file(DestinationHdl, DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, @@ -805,7 +854,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, Offset < DestinationContiguousTop %% Given expected access patterns, I suspect that the list should be %% naturally sorted as we require, however, we need to enforce it anyway - end, sortMsgLocationsByOffset(true, + end, sort_msg_locations_by_offset(true, dets_ets_match_object(State, {'_', '_', Destination, @@ -854,7 +903,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - ok = truncateAndExtendFile(DestinationHdl, + ok = truncate_and_extend_file(DestinationHdl, DestinationContiguousTop, ExpectedSize), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be @@ -864,7 +913,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Tmp)) end, SourceWorkList = - sortMsgLocationsByOffset(true, + sort_msg_locations_by_offset(true, dets_ets_match_object(State, {'_', '_', Source, '_', '_'})), @@ -909,7 +958,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Source)), State. -closeFile(File, State = #dqstate { read_file_handles = +close_file(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) -> case dict:find(File, ReadHdls) of error -> @@ -981,8 +1030,6 @@ load_from_disk(State) -> {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - %% next-seqid-to-read is the lowest seqid which has is_delivered = - %% false {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), @@ -1003,8 +1050,50 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> end end, true, rabbit_disk_queue) end), + remove_gaps_in_sequences(State), State. +remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> + %% read the comments at internal_requeue. + + %% Because we are at startup, we know that no sequence ids have + %% been issued (or at least, they were, but have been + %% forgotten). Therefore, we can nicely shuffle up and not + %% worry. Note that I'm choosing to shuffle up, but alternatively + %% we could shuffle downwards. However, I think there's greater + %% likelihood of gaps being at the bottom rather than the top of + %% the queue, so shuffling up should be the better bet. + {atomic, _} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, WriteSeqId - 1, WriteSeqId - ReadSeqId, 0), + true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end). + +shuffle_up(_Q, _SeqId, 0, Gap) -> + Gap; +shuffle_up(Q, SeqId, N, 0) -> + %% no gaps so far so don't need to rewrite + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> shuffle_up(Q, SeqId - 1, N - 1, 1); + _ -> shuffle_up(Q, SeqId - 1, N - 1, 0) + end; +shuffle_up(Q, SeqId, N, Gap) -> + %% have gaps, so whenever we find something, rewrite it higher up + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> shuffle_up(Q, SeqId - 1, N - 1, Gap + 1); + [Obj = #dq_msg_loc { is_delivered = true }] -> + mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }}, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), + shuffle_up(Q, SeqId - 1, N - 1, Gap) + end. + load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), @@ -1014,7 +1103,7 @@ load_messages(Left, [], State) -> Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of [] -> 0; L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = - sortMsgLocationsByOffset(false, L), + sort_msg_locations_by_offset(false, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State #dqstate { current_file_num = Num, current_file_name = Left, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2640439e9f..5924bb38fe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -693,6 +693,8 @@ test_disk_queue() -> ], rdq_virgin(), passed = rdq_stress_gc(10000), + passed = rdq_test_startup_with_queue_gaps(), + passed = rdq_test_redeliver(), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -785,6 +787,82 @@ rdq_time_insane_startup() -> Micros = rdq_virgin(), io:format("...startup took ~w microseconds.~n", [Micros]). +rdq_test_startup_with_queue_gaps() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% ack every other message we have delivered (starting at the _first_) + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:ack(q, [SeqId2]), + false; + (_SeqId2, false) -> + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Acked every other message delivered done~n", []), + rdq_stop(), + rdq_start(), + io:format("Startup (with shuffle) done~n", []), + %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered + Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(2,Half,2)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + io:format("Reread non-acked messages done~n", []), + %% and now fetch the rest + Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1 + Half,Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + io:format("Read second half done~n", []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_redeliver() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% now requeue every other message (starting at the _first_) + %% and ack the other ones + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:requeue(q, [SeqId2]), + false; + (SeqId2, false) -> + rabbit_disk_queue:ack(q, [SeqId2]), + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Redeliver and acking done~n", []), + %% we should now get the 2nd half in order, followed by every-other-from-the-first-half + Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1+Half, Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1, Half, 2)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
