summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 10:54:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 10:54:17 +0100
commitc11d8c902cc59e50d91266ab18b724af80253aa7 (patch)
treef2a0bad0cdef22749c2aa1796eda58fad3544c99
parent65d6523be42f4ec44f83b2c542de15277eceadf5 (diff)
downloadrabbitmq-server-git-c11d8c902cc59e50d91266ab18b724af80253aa7.tar.gz
stupid bug in start up code because I still fail to remember that messages only go away after being ack'd, thus we should redeliver messages.
Also, reworked the stress gc test so that as before it really does ack messages in a non-linear order. This got quite a bit harder now we can't deliver arbitrary messages and need to build the mapping between msgid in the delivery and the seqid needed for the acks.
-rw-r--r--src/rabbit_disk_queue.erl10
-rw-r--r--src/rabbit_tests.erl11
2 files changed, 10 insertions, 11 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0631a04e76..25bc17e120 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -608,17 +608,13 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
- is_delivered = Delivered }, true) ->
- NextRead = if Delivered -> SeqId + 1;
- true -> SeqId
- end,
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
NextWrite = SeqId + 1,
case ets:lookup(Sequences, Q) of
[] ->
- true = ets:insert_new(Sequences, {Q, NextRead, NextWrite});
+ true = ets:insert_new(Sequences, {Q, SeqId, NextWrite});
[Orig = {Q, Read, Write}] ->
- Repl = {Q, lists:min([Read, NextRead]),
+ Repl = {Q, lists:min([Read, SeqId]),
lists:max([Write, NextWrite])},
if Orig /= Repl ->
true = ets:insert(Sequences, Repl);
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6d973c4532..a04c6f1bfb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -738,10 +738,13 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {_, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
- rabbit_disk_queue:ack(q, [SeqId]),
- rabbit_disk_queue:tx_commit(q, [])
- end || N <- AckList],
+ MsgIdToSeqDict
+ = lists:foldl(fun (_, Acc) ->
+ {MsgId, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
+ dict:store(MsgId, SeqId, Acc)
+ end, dict:new(), List),
+ rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), SeqId end || MsgId <- AckList]),
+ rabbit_disk_queue:tx_commit(q, []),
rdq_stop(),
passed.