diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 10:54:17 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 10:54:17 +0100 |
| commit | c11d8c902cc59e50d91266ab18b724af80253aa7 (patch) | |
| tree | f2a0bad0cdef22749c2aa1796eda58fad3544c99 /src | |
| parent | 65d6523be42f4ec44f83b2c542de15277eceadf5 (diff) | |
| download | rabbitmq-server-git-c11d8c902cc59e50d91266ab18b724af80253aa7.tar.gz | |
stupid bug in start up code because I still fail to remember that messages only go away after being ack'd, thus we should redeliver messages.
Also, reworked the stress gc test so that as before it really does ack messages in a non-linear order. This got quite a bit harder now we can't deliver arbitrary messages and need to build the mapping between msgid in the delivery and the seqid needed for the acks.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 11 |
2 files changed, 10 insertions, 11 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0631a04e76..25bc17e120 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -608,17 +608,13 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId}, - is_delivered = Delivered }, true) -> - NextRead = if Delivered -> SeqId + 1; - true -> SeqId - end, + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, case ets:lookup(Sequences, Q) of [] -> - true = ets:insert_new(Sequences, {Q, NextRead, NextWrite}); + true = ets:insert_new(Sequences, {Q, SeqId, NextWrite}); [Orig = {Q, Read, Write}] -> - Repl = {Q, lists:min([Read, NextRead]), + Repl = {Q, lists:min([Read, SeqId]), lists:max([Write, NextWrite])}, if Orig /= Repl -> true = ets:insert(Sequences, Repl); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6d973c4532..a04c6f1bfb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -738,10 +738,13 @@ rdq_stress_gc(MsgCount) -> end end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)]))) ++ lists:seq(1, (StartChunk - 1)), - [begin {_, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q), - rabbit_disk_queue:ack(q, [SeqId]), - rabbit_disk_queue:tx_commit(q, []) - end || N <- AckList], + MsgIdToSeqDict + = lists:foldl(fun (_, Acc) -> + {MsgId, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q), + dict:store(MsgId, SeqId, Acc) + end, dict:new(), List), + rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), SeqId end || MsgId <- AckList]), + rabbit_disk_queue:tx_commit(q, []), rdq_stop(), passed. |
