diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 51 |
1 files changed, 20 insertions, 31 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 96889dbdf4..763e544d7e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -845,10 +845,9 @@ insert_into_cache(Message = #basic_message { guid = MsgId }, internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {ok, empty, State}; - [{Q, SeqId, SeqId}] -> {ok, empty, State}; - [{Q, ReadSeqId, WriteSeqId}] when WriteSeqId >= ReadSeqId -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, empty, State}; + {ReadSeqId, WriteSeqId} when WriteSeqId >= ReadSeqId -> Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( @@ -1142,9 +1141,9 @@ requeue_next_messages(Q, N, ReadSeq, WriteSeq) -> requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1). internal_purge(Q, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {ok, 0, State}; - [{Q, ReadSeqId, WriteSeqId}] -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, 0, State}; + {ReadSeqId, WriteSeqId} -> {atomic, {ok, State1}} = mnesia:transaction( fun() -> @@ -1518,26 +1517,17 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> mnesia:foldl( fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, - true = - case ets:lookup(Sequences, Q) of - [] -> ets:insert_new(Sequences, - {Q, SeqId, NextWrite, -1}); - [Orig = {Q, Read, Write, Length}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite]), - %% Length is wrong here, - %% but it doesn't matter - %% because we'll pull out - %% the gaps in - %% remove_gaps_in_sequences - %% in then do a straight - %% subtraction to get the - %% right length - Length}, - if Orig =:= Repl -> true; - true -> ets:insert(Sequences, Repl) - end - end + case ets:lookup(Sequences, Q) of + [] -> ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + case Orig == Repl of + true -> true; + false -> ets:insert(Sequences, Repl) + end + end end, true, rabbit_disk_queue) end), remove_gaps_in_sequences(State), @@ -1558,12 +1548,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> + fun ({Q, ReadSeqId, WriteSeqId}) -> Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), ReadSeqId1 = ReadSeqId + Gap, - true = - ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) + true = ets:insert(Sequences, + {Q, ReadSeqId1, WriteSeqId}) end, ets:match_object(Sequences, '_')) end). |
