summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-21 13:07:23 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-21 13:07:23 +0100
commit1c003c407e9c727189343a97a5f80d036914bb0c (patch)
tree2908314f8e781fe3aef135cc047af6de662d0d7d /src
parent021518d51c71981e7cf44cf55e2739ae0ae8709a (diff)
downloadrabbitmq-server-git-1c003c407e9c727189343a97a5f80d036914bb0c.tar.gz
Fixed a bug in the mixed_queue which could lead to messages being marked undelivered when in fact they have been delivered when converting to disk_only mode. In truth, this bug didn't exist because there is no way in which a message could end up in that form in the mixed_queue which had previously been delivered. However, that will change when the prefetcher comes in, necessitating this "bug" gets fixed.
The solution is to make tx_commit not just take a list of msg ids in the txn, but to take a list of {msgid, delivered} tuples. In this way it mirrors the disk_queue:publish function in that the delivery flag can be set explicitly. Tests adjusted. All tests pass.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl10
-rw-r--r--src/rabbit_mixed_queue.erl14
-rw-r--r--src/rabbit_queue_prefetcher.erl2
-rw-r--r--src/rabbit_tests.erl22
4 files changed, 29 insertions, 19 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e8a63bc364..e739bfef64 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -260,8 +260,8 @@
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
--spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) ->
- 'ok').
+-spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}],
+ [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
@@ -1046,7 +1046,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
last_sync_offset = SyncOffset
}) ->
NeedsSync = IsDirty andalso
- lists:any(fun (MsgId) ->
+ lists:any(fun ({MsgId, _Delivered}) ->
[{MsgId, _RefCount, File, Offset,
_TotalSize, _IsPersistent}] =
dets_ets_lookup(State, MsgId),
@@ -1070,12 +1070,12 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
lists:foldl(
- fun (MsgId, {ok, SeqId}) ->
+ fun ({MsgId, Delivered}, {ok, SeqId}) ->
{mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
msg_id = MsgId,
- is_delivered = false
+ is_delivered = Delivered
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index fedc0e523f..afc1c8aae1 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -155,7 +155,7 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
{ok, MsgBuf};
{{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- _IsDelivered}}, Queue1} ->
+ IsDelivered}}, Queue1} ->
case IsDurable andalso IsPersistent of
true -> %% it's already in the Q
send_messages_to_disk(
@@ -164,17 +164,18 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
false ->
Commit1 = flush_requeue_to_disk_queue
(Q, RequeueCount, Commit),
- ok = rabbit_disk_queue:tx_publish(Msg), %% TODO - this is resetting the delivered flag to false! (well, actually, in the commit, but nevertheless, it's wrong)
+ ok = rabbit_disk_queue:tx_publish(Msg),
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true ->
ok = flush_messages_to_disk_queue(Q, Commit1),
send_messages_to_disk(
- IsDurable, Q, Queue1, 1, 0, [MsgId],
+ IsDurable, Q, Queue1, 1, 0,
+ [{MsgId, IsDelivered}],
inc_queue_length(Q, MsgBuf, 1));
false ->
send_messages_to_disk(
IsDurable, Q, Queue1, PublishCount + 1, 0,
- [MsgId | Commit1],
+ [{MsgId, IsDelivered} | Commit1],
inc_queue_length(Q, MsgBuf, 1))
end
end;
@@ -387,7 +388,7 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
memory_gain = Gain + MsgSize }}.
only_msg_ids(Pubs) ->
- lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
+ lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs).
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
@@ -412,7 +413,8 @@ tx_commit(Publishes, MsgsWithAcks,
{Acc, MsgBuf2}) ->
Acc1 =
case IsPersistent andalso IsDurable of
- true -> [Msg #basic_message.guid | Acc];
+ true -> [ {Msg #basic_message.guid, false}
+ | Acc];
false -> Acc
end,
{Acc1, queue:in({Msg, false}, MsgBuf2)}
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index c9bbbb8f8f..6cae54048b 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b9777337ed..10a9873adc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -782,11 +782,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
+ CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
|| N <- List, _ <- Qs] end,
- fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, CommitList, [])
|| Q <- Qs] end
]]),
{Deliver, ok} =
@@ -820,8 +821,9 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
+ CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
- rabbit_disk_queue:tx_commit(q, List, []),
+ rabbit_disk_queue:tx_commit(q, CommitList, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
lists:foldl(
@@ -862,8 +864,9 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
- rabbit_disk_queue:tx_commit(q, All, []),
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
@@ -918,8 +921,9 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
- rabbit_disk_queue:tx_commit(q, All, []),
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
@@ -970,8 +974,9 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
- rabbit_disk_queue:tx_commit(q, All, []),
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
@@ -1174,13 +1179,16 @@ rdq_test_disk_queue_modes() ->
Total = 1000,
Half1 = lists:seq(1,round(Total/2)),
Half2 = lists:seq(1 + round(Total/2), Total),
+ CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)),
+ CommitHalf2 = lists:zip(Half2, lists:duplicate
+ (Total - round(Total/2), false)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
- ok = rabbit_disk_queue:tx_commit(q, Half1, []),
+ ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []),
io:format("Publish done~n", []),
ok = rabbit_disk_queue:to_disk_only_mode(),
io:format("To Disk Only done~n", []),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2],
- ok = rabbit_disk_queue:tx_commit(q, Half2, []),
+ ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []),
Seqs = [begin
Remaining = Total - N,
{Message, _TSize, false, SeqId, Remaining} =