summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl145
-rw-r--r--src/rabbit_tests.erl78
2 files changed, 195 insertions, 28 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c08258c846..63076eb914 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1]).
+-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -181,21 +181,21 @@
%% the data the size of which is tracked by the ContiguousTop
%% variable. Judicious use of a mirror is required).
%%
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | X | | G | | G |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | D | | X | | F |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | X | | X | | E |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | C | | F | ===> | D |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | X | | X | | C |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | B | | X | | B |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% | A | | E | | A |
-%% +-------+ +-------- ---------
+%% +-------+ +-------+ +-------+
%% left right left
%%
%% From this reasoning, we do have a bound on the number of times the
@@ -231,6 +231,7 @@
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
@@ -262,6 +263,9 @@ tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSe
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+
stop() ->
gen_server:call(?SERVER, stop, infinity).
@@ -305,7 +309,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{type, set}
]),
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- true = ets:safe_fixtable(MsgLocationEts, true),
State =
#dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
@@ -381,6 +384,9 @@ handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{noreply, State1};
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
+ {noreply, State1};
+handle_cast({requeue, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
{noreply, State1}.
handle_info(_Info, State) ->
@@ -469,7 +475,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) ->
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
[{MsgId, _RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
- {FileHdl, State1} = getReadHandle(File, State),
+ {FileHdl, State1} = get_read_handle(File, State),
%% read the message
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
@@ -483,7 +489,7 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) ->
end
end.
-getReadHandle(File, State =
+get_read_handle(File, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit }) ->
Now = now(),
@@ -653,6 +659,49 @@ internal_tx_cancel(MsgIds, State) ->
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
+internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
+ %% We know that every seq_id in here is less than the ReadSeqId
+ %% you'll get if you look up this queue in Sequences (i.e. they've
+ %% already been delivered). We also know that the rows for these
+ %% messages are still in rabbit_disk_queue (i.e. they've not been
+ %% ack'd).
+
+ %% Now, it would be nice if we could adjust the sequence ids in
+ %% rabbit_disk_queue (mnesia) to create a contiguous block and
+ %% then drop the ReadSeqId for the queue by the corresponding
+ %% amount. However, this is not safe because there may be other
+ %% sequence ids which have been sent out as part of deliveries
+ %% which are not being requeued. As such, moving things about in
+ %% rabbit_disk_queue _under_ the current ReadSeqId would result in
+ %% such sequence ids referring to the wrong messages.
+
+ %% Therefore, the only solution is to take these messages, and to
+ %% reenqueue them at the top of the queue. Usefully, this only
+ %% affects the Sequences and rabbit_disk_queue structures - there
+ %% is no need to physically move the messages about on disk, so
+ %% MsgLocation and FileSummary stay put (which makes further sense
+ %% as they have no concept of sequence id anyway).
+
+ %% the Q _must_ already exist
+ [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
+ {atomic, WriteSeqId2} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(
+ fun ({MsgId, SeqId}, NextWriteSeqId) ->
+ [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }},
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
+ NextWriteSeqId + 1
+ end, WriteSeqId, MsgSeqIds)
+ end),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}),
+ {ok, State}.
+
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
@@ -688,9 +737,9 @@ compact(FilesSet, State) ->
RemainingFiles = lists:foldl(fun (File, Acc) ->
delete_empty_files(File, Acc, State)
end, [], Files),
- lists:foldl(fun combineFile/2, State, lists:reverse(RemainingFiles)).
+ lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
-combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
+combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit,
file_summary = FileSummary,
current_file_name = CurName
}) ->
@@ -711,7 +760,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
if FileSizeLimit >= RightSumData ->
%% here, Right will be the source and so will be deleted,
%% File will be the destination
- State1 = combineFiles(RightObj, FileObj,
+ State1 = combine_files(RightObj, FileObj,
State),
%% this could fail if RightRight is undefined
%% left is the 4th field
@@ -739,7 +788,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
if FileSizeLimit >= LeftSumData ->
%% here, File will be the source and so will be deleted,
%% Left will be the destination
- State1 = combineFiles(FileObj, LeftObj, State),
+ State1 = combine_files(FileObj, LeftObj, State),
%% this could fail if Right is undefined
%% left is the 4th field
ets:update_element(FileSummary, Right, {4, Left}),
@@ -754,7 +803,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
end
end.
-sortMsgLocationsByOffset(Asc, List) ->
+sort_msg_locations_by_offset(Asc, List) ->
Comp = if Asc -> fun erlang:'<'/2;
true -> fun erlang:'>'/2
end,
@@ -762,7 +811,7 @@ sortMsgLocationsByOffset(Asc, List) ->
Comp(OffA, OffB)
end, List).
-truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) ->
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
{ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
ok = file:truncate(FileHdl),
{ok, Highpoint} = file:position(FileHdl, {bof, Highpoint}),
@@ -770,12 +819,12 @@ truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) ->
{ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
ok.
-combineFiles({Source, SourceValid, _SourceContiguousTop,
+combine_files({Source, SourceValid, _SourceContiguousTop,
_SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop,
_DestinationLeft, _DestinationRight},
State1) ->
- State = closeFile(Source, closeFile(Destination, State1)),
+ State = close_file(Source, close_file(Destination, State1)),
{ok, SourceHdl} =
file:open(form_filename(Source),
[read, write, raw, binary, delayed_write, read_ahead]),
@@ -788,7 +837,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% then truncate, copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
if DestinationContiguousTop =:= DestinationValid ->
- ok = truncateAndExtendFile(DestinationHdl,
+ ok = truncate_and_extend_file(DestinationHdl,
DestinationValid, ExpectedSize);
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
@@ -805,7 +854,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
Offset < DestinationContiguousTop
%% Given expected access patterns, I suspect that the list should be
%% naturally sorted as we require, however, we need to enforce it anyway
- end, sortMsgLocationsByOffset(true,
+ end, sort_msg_locations_by_offset(true,
dets_ets_match_object(State,
{'_', '_',
Destination,
@@ -854,7 +903,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
- ok = truncateAndExtendFile(DestinationHdl,
+ ok = truncate_and_extend_file(DestinationHdl,
DestinationContiguousTop, ExpectedSize),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be
@@ -864,7 +913,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
ok = file:delete(form_filename(Tmp))
end,
SourceWorkList =
- sortMsgLocationsByOffset(true,
+ sort_msg_locations_by_offset(true,
dets_ets_match_object(State,
{'_', '_', Source,
'_', '_'})),
@@ -909,7 +958,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
ok = file:delete(form_filename(Source)),
State.
-closeFile(File, State = #dqstate { read_file_handles =
+close_file(File, State = #dqstate { read_file_handles =
{ReadHdls, ReadHdlsAge} }) ->
case dict:find(File, ReadHdls) of
error ->
@@ -981,8 +1030,6 @@ load_from_disk(State) ->
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- %% next-seqid-to-read is the lowest seqid which has is_delivered =
- %% false
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
@@ -1003,8 +1050,50 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
end
end, true, rabbit_disk_queue)
end),
+ remove_gaps_in_sequences(State),
State.
+remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
+ %% read the comments at internal_requeue.
+
+ %% Because we are at startup, we know that no sequence ids have
+ %% been issued (or at least, they were, but have been
+ %% forgotten). Therefore, we can nicely shuffle up and not
+ %% worry. Note that I'm choosing to shuffle up, but alternatively
+ %% we could shuffle downwards. However, I think there's greater
+ %% likelihood of gaps being at the bottom rather than the top of
+ %% the queue, so shuffling up should be the better bet.
+ {atomic, _} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, WriteSeqId - 1, WriteSeqId - ReadSeqId, 0),
+ true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end).
+
+shuffle_up(_Q, _SeqId, 0, Gap) ->
+ Gap;
+shuffle_up(Q, SeqId, N, 0) ->
+ %% no gaps so far so don't need to rewrite
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> shuffle_up(Q, SeqId - 1, N - 1, 1);
+ _ -> shuffle_up(Q, SeqId - 1, N - 1, 0)
+ end;
+shuffle_up(Q, SeqId, N, Gap) ->
+ %% have gaps, so whenever we find something, rewrite it higher up
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> shuffle_up(Q, SeqId - 1, N - 1, Gap + 1);
+ [Obj = #dq_msg_loc { is_delivered = true }] ->
+ mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }},
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
+ shuffle_up(Q, SeqId - 1, N - 1, Gap)
+ end.
+
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
@@ -1014,7 +1103,7 @@ load_messages(Left, [], State) ->
Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
[] -> 0;
L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
- sortMsgLocationsByOffset(false, L),
+ sort_msg_locations_by_offset(false, L),
MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2640439e9f..5924bb38fe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -693,6 +693,8 @@ test_disk_queue() ->
],
rdq_virgin(),
passed = rdq_stress_gc(10000),
+ passed = rdq_test_startup_with_queue_gaps(),
+ passed = rdq_test_redeliver(),
passed.
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
@@ -785,6 +787,82 @@ rdq_time_insane_startup() ->
Micros = rdq_virgin(),
io:format("...startup took ~w microseconds.~n", [Micros]).
+rdq_test_startup_with_queue_gaps() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% ack every other message we have delivered (starting at the _first_)
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ false;
+ (_SeqId2, false) ->
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Acked every other message delivered done~n", []),
+ rdq_stop(),
+ rdq_start(),
+ io:format("Startup (with shuffle) done~n", []),
+ %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered
+ Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(2,Half,2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ io:format("Reread non-acked messages done~n", []),
+ %% and now fetch the rest
+ Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1 + Half,Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ io:format("Read second half done~n", []),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_redeliver() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% now requeue every other message (starting at the _first_)
+ %% and ack the other ones
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:requeue(q, [SeqId2]),
+ false;
+ (SeqId2, false) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Redeliver and acking done~n", []),
+ %% we should now get the 2nd half in order, followed by every-other-from-the-first-half
+ Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1+Half, Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1, Half, 2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).