diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 22 |
4 files changed, 29 insertions, 19 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e8a63bc364..e739bfef64 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -260,8 +260,8 @@ -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> - 'ok'). +-spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}], + [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). @@ -1046,7 +1046,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, last_sync_offset = SyncOffset }) -> NeedsSync = IsDirty andalso - lists:any(fun (MsgId) -> + lists:any(fun ({MsgId, _Delivered}) -> [{MsgId, _RefCount, File, Offset, _TotalSize, _IsPersistent}] = dets_ets_lookup(State, MsgId), @@ -1070,12 +1070,12 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, ok = mnesia:write_lock_table(rabbit_disk_queue), {ok, WriteSeqId1} = lists:foldl( - fun (MsgId, {ok, SeqId}) -> + fun ({MsgId, Delivered}, {ok, SeqId}) -> {mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, msg_id = MsgId, - is_delivered = false + is_delivered = Delivered }, write), SeqId + 1} end, {ok, InitWriteSeqId}, PubMsgIds), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index fedc0e523f..afc1c8aae1 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -155,7 +155,7 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, {ok, MsgBuf}; {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - _IsDelivered}}, Queue1} -> + IsDelivered}}, Queue1} -> case IsDurable andalso IsPersistent of true -> %% it's already in the Q send_messages_to_disk( @@ -164,17 +164,18 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, false -> Commit1 = flush_requeue_to_disk_queue (Q, RequeueCount, Commit), - ok = rabbit_disk_queue:tx_publish(Msg), %% TODO - this is resetting the delivered flag to false! (well, actually, in the commit, but nevertheless, it's wrong) + ok = rabbit_disk_queue:tx_publish(Msg), case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of true -> ok = flush_messages_to_disk_queue(Q, Commit1), send_messages_to_disk( - IsDurable, Q, Queue1, 1, 0, [MsgId], + IsDurable, Q, Queue1, 1, 0, + [{MsgId, IsDelivered}], inc_queue_length(Q, MsgBuf, 1)); false -> send_messages_to_disk( IsDurable, Q, Queue1, PublishCount + 1, 0, - [MsgId | Commit1], + [{MsgId, IsDelivered} | Commit1], inc_queue_length(Q, MsgBuf, 1)) end end; @@ -387,7 +388,7 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize, memory_gain = Gain + MsgSize }}. only_msg_ids(Pubs) -> - lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). + lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs). tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = disk, queue = Q, length = Length, @@ -412,7 +413,8 @@ tx_commit(Publishes, MsgsWithAcks, {Acc, MsgBuf2}) -> Acc1 = case IsPersistent andalso IsDurable of - true -> [Msg #basic_message.guid | Acc]; + true -> [ {Msg #basic_message.guid, false} + | Acc]; false -> Acc end, {Acc1, queue:in({Msg, false}, MsgBuf2)} diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index c9bbbb8f8f..6cae54048b 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b9777337ed..10a9873adc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -782,11 +782,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), + CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List, _ <- Qs] end, - fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, CommitList, []) || Q <- Qs] end ]]), {Deliver, ok} = @@ -820,8 +821,9 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), + CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], - rabbit_disk_queue:tx_commit(q, List, []), + rabbit_disk_queue:tx_commit(q, CommitList, []), StartChunk = round(MsgCount / 20), % 5% AckList = lists:foldl( @@ -862,8 +864,9 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], - rabbit_disk_queue:tx_commit(q, All, []), + rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin @@ -918,8 +921,9 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], - rabbit_disk_queue:tx_commit(q, All, []), + rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin @@ -970,8 +974,9 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], - rabbit_disk_queue:tx_commit(q, All, []), + rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin @@ -1174,13 +1179,16 @@ rdq_test_disk_queue_modes() -> Total = 1000, Half1 = lists:seq(1,round(Total/2)), Half2 = lists:seq(1 + round(Total/2), Total), + CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)), + CommitHalf2 = lists:zip(Half2, lists:duplicate + (Total - round(Total/2), false)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], - ok = rabbit_disk_queue:tx_commit(q, Half1, []), + ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []), io:format("Publish done~n", []), ok = rabbit_disk_queue:to_disk_only_mode(), io:format("To Disk Only done~n", []), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2], - ok = rabbit_disk_queue:tx_commit(q, Half2, []), + ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []), Seqs = [begin Remaining = Total - N, {Message, _TSize, false, SeqId, Remaining} = |
