summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl51
1 files changed, 20 insertions, 31 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 96889dbdf4..763e544d7e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -845,10 +845,9 @@ insert_into_cache(Message = #basic_message { guid = MsgId },
internal_deliver(Q, ReadMsg, FakeDeliver,
State = #dqstate { sequences = Sequences }) ->
- case ets:lookup(Sequences, Q) of
- [] -> {ok, empty, State};
- [{Q, SeqId, SeqId}] -> {ok, empty, State};
- [{Q, ReadSeqId, WriteSeqId}] when WriteSeqId >= ReadSeqId ->
+ case sequence_lookup(Sequences, Q) of
+ {SeqId, SeqId} -> {ok, empty, State};
+ {ReadSeqId, WriteSeqId} when WriteSeqId >= ReadSeqId ->
Remaining = WriteSeqId - ReadSeqId - 1,
{ok, Result, State1} =
internal_read_message(
@@ -1142,9 +1141,9 @@ requeue_next_messages(Q, N, ReadSeq, WriteSeq) ->
requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1).
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
- case ets:lookup(Sequences, Q) of
- [] -> {ok, 0, State};
- [{Q, ReadSeqId, WriteSeqId}] ->
+ case sequence_lookup(Sequences, Q) of
+ {SeqId, SeqId} -> {ok, 0, State};
+ {ReadSeqId, WriteSeqId} ->
{atomic, {ok, State1}} =
mnesia:transaction(
fun() ->
@@ -1518,26 +1517,17 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
mnesia:foldl(
fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
NextWrite = SeqId + 1,
- true =
- case ets:lookup(Sequences, Q) of
- [] -> ets:insert_new(Sequences,
- {Q, SeqId, NextWrite, -1});
- [Orig = {Q, Read, Write, Length}] ->
- Repl = {Q, lists:min([Read, SeqId]),
- lists:max([Write, NextWrite]),
- %% Length is wrong here,
- %% but it doesn't matter
- %% because we'll pull out
- %% the gaps in
- %% remove_gaps_in_sequences
- %% in then do a straight
- %% subtraction to get the
- %% right length
- Length},
- if Orig =:= Repl -> true;
- true -> ets:insert(Sequences, Repl)
- end
- end
+ case ets:lookup(Sequences, Q) of
+ [] -> ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, SeqId]),
+ lists:max([Write, NextWrite])},
+ case Orig == Repl of
+ true -> true;
+ false -> ets:insert(Sequences, Repl)
+ end
+ end
end, true, rabbit_disk_queue)
end),
remove_gaps_in_sequences(State),
@@ -1558,12 +1548,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId, _Length}) ->
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
ReadSeqId1 = ReadSeqId + Gap,
- true =
- ets:insert(Sequences,
- {Q, ReadSeqId1, WriteSeqId})
+ true = ets:insert(Sequences,
+ {Q, ReadSeqId1, WriteSeqId})
end, ets:match_object(Sequences, '_'))
end).