summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 10:34:30 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 10:34:30 +0100
commit65d6523be42f4ec44f83b2c542de15277eceadf5 (patch)
treec4092d1f634ec6efb56e986fc4df8bc208dbe336
parent9ceaa816c04422119c8010b1f41a24dafdeb3eb7 (diff)
downloadrabbitmq-server-git-65d6523be42f4ec44f83b2c542de15277eceadf5.tar.gz
I don't understand why I thought I needed to adjust sequences in remove_messages.
The only case that code would be called would be from acks, and the effect would be to increment the read seqid. But this would require acking a message which hasn't been delivered, which is clearly insane. Also, fixed a bug in the tests.
-rw-r--r--src/rabbit_disk_queue.erl30
-rw-r--r--src/rabbit_tests.erl7
2 files changed, 13 insertions, 24 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 32512c2bca..0631a04e76 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -250,12 +250,11 @@ internal_ack(Q, MsgIds, State) ->
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
- sequences = Sequences,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- {Files, MaxSeqId}
- = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) ->
+ Files
+ = lists:foldl(fun ({MsgId, SeqId}, Files2) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -274,23 +273,12 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = M
ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
Files2
end,
- MaxSeqId3 =
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
- lists:max([SeqId, MaxSeqId2]);
- true ->
- MaxSeqId2
- end,
- {Files3, MaxSeqId3}
- end, {sets:new(), 0}, MsgSeqIds),
- true = if MnesiaDelete ->
- [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
- if MaxSeqId > ReadSeqId ->
- true = ets:insert(Sequences, {Q, MaxSeqId, WriteSeqId});
- true -> true
- end;
- true -> true
- end,
+ if MnesiaDelete ->
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
+ true -> ok
+ end,
+ Files3
+ end, sets:new(), MsgSeqIds),
State2 = compact(Files, State),
{ok, State2}.
@@ -369,7 +357,7 @@ internal_tx_cancel(MsgIds, State) ->
% we don't need seq ids because we're not touching mnesia, because seqids were
% never assigned
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
- remove_messages(undefined, MsgIds, false, State).
+ remove_messages(undefined, MsgSeqIds, false, State).
%% ---- ROLLING OVER THE APPEND FILE ----
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3eab352d22..6d973c4532 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -691,7 +691,7 @@ test_disk_queue() ->
MsgCount <- [1024, 4096, 16384]
],
rdq_virgin(),
- rdq_stress_gc(10000),
+ passed = rdq_stress_gc(10000),
passed.
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
@@ -738,11 +738,12 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
+ [begin {_, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
rabbit_disk_queue:ack(q, [SeqId]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],
- rdq_stop().
+ rdq_stop(),
+ passed.
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).