diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-09 11:00:48 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-09 11:00:48 +0100 |
| commit | 638246ba369774db9ad268dc6a8bb68d3b7b8a59 (patch) | |
| tree | e5303322835f1b557d5aaf78ecdc3e11b9549219 | |
| parent | cb8e40836c51fa9a731360f60dedc69abc6a120c (diff) | |
| download | rabbitmq-server-git-638246ba369774db9ad268dc6a8bb68d3b7b8a59.tar.gz | |
Fixes from removing the non-contiguous sequences support from the disk queue that I failed to spot last night, but apparently came to me during my dreams. I have no idea how the tests managed to pass last night...
| -rw-r--r-- | src/rabbit_disk_queue.erl | 51 |
1 files changed, 20 insertions, 31 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 96889dbdf4..763e544d7e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -845,10 +845,9 @@ insert_into_cache(Message = #basic_message { guid = MsgId }, internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {ok, empty, State}; - [{Q, SeqId, SeqId}] -> {ok, empty, State}; - [{Q, ReadSeqId, WriteSeqId}] when WriteSeqId >= ReadSeqId -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, empty, State}; + {ReadSeqId, WriteSeqId} when WriteSeqId >= ReadSeqId -> Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( @@ -1142,9 +1141,9 @@ requeue_next_messages(Q, N, ReadSeq, WriteSeq) -> requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1). internal_purge(Q, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {ok, 0, State}; - [{Q, ReadSeqId, WriteSeqId}] -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, 0, State}; + {ReadSeqId, WriteSeqId} -> {atomic, {ok, State1}} = mnesia:transaction( fun() -> @@ -1518,26 +1517,17 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> mnesia:foldl( fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, - true = - case ets:lookup(Sequences, Q) of - [] -> ets:insert_new(Sequences, - {Q, SeqId, NextWrite, -1}); - [Orig = {Q, Read, Write, Length}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite]), - %% Length is wrong here, - %% but it doesn't matter - %% because we'll pull out - %% the gaps in - %% remove_gaps_in_sequences - %% in then do a straight - %% subtraction to get the - %% right length - Length}, - if Orig =:= Repl -> true; - true -> ets:insert(Sequences, Repl) - end - end + case ets:lookup(Sequences, Q) of + [] -> ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + case Orig == Repl of + true -> true; + false -> ets:insert(Sequences, Repl) + end + end end, true, rabbit_disk_queue) end), remove_gaps_in_sequences(State), @@ -1558,12 +1548,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> + fun ({Q, ReadSeqId, WriteSeqId}) -> Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), ReadSeqId1 = ReadSeqId + Gap, - true = - ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) + true = ets:insert(Sequences, + {Q, ReadSeqId1, WriteSeqId}) end, ets:match_object(Sequences, '_')) end). |
