diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-14 15:52:13 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-14 15:52:13 +0100 |
| commit | ede34cf5cfd141842babcd43f0c635614b2698a0 (patch) | |
| tree | 9654a4b76d31aa1e37634ed224deb18d3eac6e77 | |
| parent | 885575462f77893da94faccdaa8c490758d5bde2 (diff) | |
| download | rabbitmq-server-git-ede34cf5cfd141842babcd43f0c635614b2698a0.tar.gz | |
Some cosmetic changes (erlang style function names) and improvements to documentation.
Also, implemented requeue, and corrected bug in startup which would have lead to crashes if acks had appeared non contiguously prior to shutdown.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 145 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 78 |
2 files changed, 195 insertions, 28 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c08258c846..63076eb914 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1]). +-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -181,21 +181,21 @@ %% the data the size of which is tracked by the ContiguousTop %% variable. Judicious use of a mirror is required). %% -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | G | | G | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | D | | X | | F | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | X | | E | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | C | | F | ===> | D | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | X | | X | | C | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | B | | X | | B | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% | A | | E | | A | -%% +-------+ +-------- --------- +%% +-------+ +-------+ +-------+ %% left right left %% %% From this reasoning, we do have a bound on the number of times the @@ -231,6 +231,7 @@ -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -262,6 +263,9 @@ tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSe tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). +requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). + stop() -> gen_server:call(?SERVER, stop, infinity). @@ -305,7 +309,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {type, set} ]), MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - true = ets:safe_fixtable(MsgLocationEts, true), State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, @@ -381,6 +384,9 @@ handle_cast({tx_publish, MsgId, MsgBody}, State) -> {noreply, State1}; handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), + {noreply, State1}; +handle_cast({requeue, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqIds, State), {noreply, State1}. handle_info(_Info, State) -> @@ -469,7 +475,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - {FileHdl, State1} = getReadHandle(File, State), + {FileHdl, State1} = get_read_handle(File, State), %% read the message {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), @@ -483,7 +489,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> end end. -getReadHandle(File, State = +get_read_handle(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit }) -> Now = now(), @@ -653,6 +659,49 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). +internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> + %% We know that every seq_id in here is less than the ReadSeqId + %% you'll get if you look up this queue in Sequences (i.e. they've + %% already been delivered). We also know that the rows for these + %% messages are still in rabbit_disk_queue (i.e. they've not been + %% ack'd). + + %% Now, it would be nice if we could adjust the sequence ids in + %% rabbit_disk_queue (mnesia) to create a contiguous block and + %% then drop the ReadSeqId for the queue by the corresponding + %% amount. However, this is not safe because there may be other + %% sequence ids which have been sent out as part of deliveries + %% which are not being requeued. As such, moving things about in + %% rabbit_disk_queue _under_ the current ReadSeqId would result in + %% such sequence ids referring to the wrong messages. + + %% Therefore, the only solution is to take these messages, and to + %% reenqueue them at the top of the queue. Usefully, this only + %% affects the Sequences and rabbit_disk_queue structures - there + %% is no need to physically move the messages about on disk, so + %% MsgLocation and FileSummary stay put (which makes further sense + %% as they have no concept of sequence id anyway). + + %% the Q _must_ already exist + [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + {atomic, WriteSeqId2} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({MsgId, SeqId}, NextWriteSeqId) -> + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }}, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), + NextWriteSeqId + 1 + end, WriteSeqId, MsgSeqIds) + end), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), + {ok, State}. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, @@ -688,9 +737,9 @@ compact(FilesSet, State) -> RemainingFiles = lists:foldl(fun (File, Acc) -> delete_empty_files(File, Acc, State) end, [], Files), - lists:foldl(fun combineFile/2, State, lists:reverse(RemainingFiles)). + lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). -combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, +combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit, file_summary = FileSummary, current_file_name = CurName }) -> @@ -711,7 +760,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, if FileSizeLimit >= RightSumData -> %% here, Right will be the source and so will be deleted, %% File will be the destination - State1 = combineFiles(RightObj, FileObj, + State1 = combine_files(RightObj, FileObj, State), %% this could fail if RightRight is undefined %% left is the 4th field @@ -739,7 +788,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, if FileSizeLimit >= LeftSumData -> %% here, File will be the source and so will be deleted, %% Left will be the destination - State1 = combineFiles(FileObj, LeftObj, State), + State1 = combine_files(FileObj, LeftObj, State), %% this could fail if Right is undefined %% left is the 4th field ets:update_element(FileSummary, Right, {4, Left}), @@ -754,7 +803,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, end end. -sortMsgLocationsByOffset(Asc, List) -> +sort_msg_locations_by_offset(Asc, List) -> Comp = if Asc -> fun erlang:'<'/2; true -> fun erlang:'>'/2 end, @@ -762,7 +811,7 @@ sortMsgLocationsByOffset(Asc, List) -> Comp(OffA, OffB) end, List). -truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) -> +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok = file:truncate(FileHdl), {ok, Highpoint} = file:position(FileHdl, {bof, Highpoint}), @@ -770,12 +819,12 @@ truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) -> {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), ok. -combineFiles({Source, SourceValid, _SourceContiguousTop, +combine_files({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, State1) -> - State = closeFile(Source, closeFile(Destination, State1)), + State = close_file(Source, close_file(Destination, State1)), {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), @@ -788,7 +837,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% then truncate, copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> - ok = truncateAndExtendFile(DestinationHdl, + ok = truncate_and_extend_file(DestinationHdl, DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, @@ -805,7 +854,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, Offset < DestinationContiguousTop %% Given expected access patterns, I suspect that the list should be %% naturally sorted as we require, however, we need to enforce it anyway - end, sortMsgLocationsByOffset(true, + end, sort_msg_locations_by_offset(true, dets_ets_match_object(State, {'_', '_', Destination, @@ -854,7 +903,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - ok = truncateAndExtendFile(DestinationHdl, + ok = truncate_and_extend_file(DestinationHdl, DestinationContiguousTop, ExpectedSize), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be @@ -864,7 +913,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Tmp)) end, SourceWorkList = - sortMsgLocationsByOffset(true, + sort_msg_locations_by_offset(true, dets_ets_match_object(State, {'_', '_', Source, '_', '_'})), @@ -909,7 +958,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Source)), State. -closeFile(File, State = #dqstate { read_file_handles = +close_file(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) -> case dict:find(File, ReadHdls) of error -> @@ -981,8 +1030,6 @@ load_from_disk(State) -> {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - %% next-seqid-to-read is the lowest seqid which has is_delivered = - %% false {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), @@ -1003,8 +1050,50 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> end end, true, rabbit_disk_queue) end), + remove_gaps_in_sequences(State), State. +remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> + %% read the comments at internal_requeue. + + %% Because we are at startup, we know that no sequence ids have + %% been issued (or at least, they were, but have been + %% forgotten). Therefore, we can nicely shuffle up and not + %% worry. Note that I'm choosing to shuffle up, but alternatively + %% we could shuffle downwards. However, I think there's greater + %% likelihood of gaps being at the bottom rather than the top of + %% the queue, so shuffling up should be the better bet. + {atomic, _} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, WriteSeqId - 1, WriteSeqId - ReadSeqId, 0), + true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end). + +shuffle_up(_Q, _SeqId, 0, Gap) -> + Gap; +shuffle_up(Q, SeqId, N, 0) -> + %% no gaps so far so don't need to rewrite + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> shuffle_up(Q, SeqId - 1, N - 1, 1); + _ -> shuffle_up(Q, SeqId - 1, N - 1, 0) + end; +shuffle_up(Q, SeqId, N, Gap) -> + %% have gaps, so whenever we find something, rewrite it higher up + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> shuffle_up(Q, SeqId - 1, N - 1, Gap + 1); + [Obj = #dq_msg_loc { is_delivered = true }] -> + mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }}, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), + shuffle_up(Q, SeqId - 1, N - 1, Gap) + end. + load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), @@ -1014,7 +1103,7 @@ load_messages(Left, [], State) -> Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of [] -> 0; L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = - sortMsgLocationsByOffset(false, L), + sort_msg_locations_by_offset(false, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State #dqstate { current_file_num = Num, current_file_name = Left, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2640439e9f..5924bb38fe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -693,6 +693,8 @@ test_disk_queue() -> ], rdq_virgin(), passed = rdq_stress_gc(10000), + passed = rdq_test_startup_with_queue_gaps(), + passed = rdq_test_redeliver(), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -785,6 +787,82 @@ rdq_time_insane_startup() -> Micros = rdq_virgin(), io:format("...startup took ~w microseconds.~n", [Micros]). +rdq_test_startup_with_queue_gaps() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% ack every other message we have delivered (starting at the _first_) + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:ack(q, [SeqId2]), + false; + (_SeqId2, false) -> + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Acked every other message delivered done~n", []), + rdq_stop(), + rdq_start(), + io:format("Startup (with shuffle) done~n", []), + %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered + Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(2,Half,2)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + io:format("Reread non-acked messages done~n", []), + %% and now fetch the rest + Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1 + Half,Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + io:format("Read second half done~n", []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_redeliver() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% now requeue every other message (starting at the _first_) + %% and ack the other ones + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:requeue(q, [SeqId2]), + false; + (SeqId2, false) -> + rabbit_disk_queue:ack(q, [SeqId2]), + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Redeliver and acking done~n", []), + %% we should now get the 2nd half in order, followed by every-other-from-the-first-half + Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1+Half, Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1, Half, 2)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
