summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 63076eb914..e3b47e89dc 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1069,30 +1069,28 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foreach(
fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, WriteSeqId - 1, WriteSeqId - ReadSeqId, 0),
+ Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0),
true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId})
end, ets:match_object(Sequences, '_'))
end).
-shuffle_up(_Q, _SeqId, 0, Gap) ->
+shuffle_up(_Q, SeqId, SeqId, Gap) ->
Gap;
-shuffle_up(Q, SeqId, N, 0) ->
- %% no gaps so far so don't need to rewrite
- case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
- [] -> shuffle_up(Q, SeqId - 1, N - 1, 1);
- _ -> shuffle_up(Q, SeqId - 1, N - 1, 0)
- end;
-shuffle_up(Q, SeqId, N, Gap) ->
- %% have gaps, so whenever we find something, rewrite it higher up
- case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
- [] -> shuffle_up(Q, SeqId - 1, N - 1, Gap + 1);
- [Obj = #dq_msg_loc { is_delivered = true }] ->
- mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }},
- write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- shuffle_up(Q, SeqId - 1, N - 1, Gap)
- end.
+shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
+ GapInc =
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> 1;
+ [Obj = #dq_msg_loc { is_delivered = IsDelivered }] when IsDelivered
+ orelse (Gap =:= 0) ->
+ if Gap =:= 0 -> ok;
+ true -> mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }},
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
+ end,
+ 0
+ end,
+ shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->