diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 189 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 20 |
3 files changed, 114 insertions, 152 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4eef884f72..95ed8adf78 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1560,25 +1560,46 @@ load_from_disk(State) -> State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded - {atomic, true} = mnesia:transaction( - fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl( - fun (#dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = {Q, SeqId} }, - true) -> - case erlang:length - (dets_ets_lookup(State1, MsgId)) of - 0 -> ok == mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write); - 1 -> true - end - end, - true, rabbit_disk_queue) - end), - State2 = extract_sequence_numbers(State1), + {atomic, State2} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + {State6, FinalQ, MsgSeqIds2, _Len} = + mnesia:foldl( + fun (#dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = {Q, SeqId} }, + {State3, OldQ, MsgSeqIds, Len}) -> + {State4, MsgSeqIds1, Len1} = + case {OldQ == Q, MsgSeqIds} of + {true, _} when Len < 10000 -> + {State3, MsgSeqIds, Len}; + {false, []} -> {State3, MsgSeqIds, Len}; + {_, _} -> + {ok, State5} = + remove_messages(Q, MsgSeqIds, + txn, State3), + {State5, [], 0} + end, + case dets_ets_lookup(State4, MsgId) of + [] -> ok = mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write), + {State4, Q, MsgSeqIds1, Len1}; + [{MsgId, _RefCount, _File, _Offset, + _TotalSize, true}] -> + {State4, Q, MsgSeqIds1, Len1}; + [{MsgId, _RefCount, _File, _Offset, + _TotalSize, false}] -> + {State4, Q, + [{MsgId, SeqId} | MsgSeqIds1], Len1+1} + end + end, {State1, undefined, [], 0}, rabbit_disk_queue), + {ok, State7} = + remove_messages(FinalQ, MsgSeqIds2, txn, State6), + State7 + end), + State8 = extract_sequence_numbers(State2), ok = del_index(), - {ok, State2}. + {ok, State8}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> {atomic, true} = mnesia:transaction( diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 425d776377..3b86596b6a 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -101,10 +101,10 @@ init(Queue, IsDurable) -> Len = rabbit_disk_queue:length(Queue), - ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)), MsgBuf = inc_queue_length(Queue, queue:new(), Len), Size = rabbit_disk_queue:foldl( - fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) -> + fun ({Msg = #basic_message { is_persistent = true }, + _Size, _IsDelivered, _AckTag}, Acc) -> Acc + size_of_message(Msg) end, 0, Queue), {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, @@ -130,10 +130,8 @@ to_disk_only_mode(TxnMessages, State = %% message on disk. %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. - TransQ = transient_queue(Q), {ok, MsgBuf1} = - send_messages_to_disk(IsDurable, Q, TransQ, MsgBuf, 0, 0, [], - queue:new()), + send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], queue:new()), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -148,47 +146,42 @@ to_disk_only_mode(TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}. -send_messages_to_disk(IsDurable, Q, TransQ, Queue, PublishCount, RequeueCount, +send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, MsgBuf) -> case queue:out(Queue) of {empty, Queue} -> - ok = flush_messages_to_disk_queue(TransQ, Commit), - [] = flush_requeue_to_disk_queue(TransQ, RequeueCount, []), + ok = flush_messages_to_disk_queue(Q, Commit), + [] = flush_requeue_to_disk_queue(Q, RequeueCount, []), {ok, MsgBuf}; {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, _IsDelivered}}, Queue1} -> case IsDurable andalso IsPersistent of - true -> %% it's already in the persistent Q + true -> %% it's already in the Q send_messages_to_disk( - IsDurable, Q, TransQ, Queue1, PublishCount, RequeueCount, + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, inc_queue_length(Q, MsgBuf, 1)); false -> Commit1 = flush_requeue_to_disk_queue - (TransQ, RequeueCount, Commit), + (Q, RequeueCount, Commit), ok = rabbit_disk_queue:tx_publish(Msg), case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of true -> - ok = flush_messages_to_disk_queue(TransQ, Commit1), + ok = flush_messages_to_disk_queue(Q, Commit1), send_messages_to_disk( - IsDurable, Q, TransQ, Queue1, 1, 0, [MsgId], - inc_queue_length(TransQ, MsgBuf, 1)); + IsDurable, Q, Queue1, 1, 0, [MsgId], + inc_queue_length(Q, MsgBuf, 1)); false -> send_messages_to_disk( - IsDurable, Q, TransQ, Queue1, PublishCount + 1, 0, + IsDurable, Q, Queue1, PublishCount + 1, 0, [MsgId | Commit1], - inc_queue_length(TransQ, MsgBuf, 1)) + inc_queue_length(Q, MsgBuf, 1)) end end; {{value, {Q, Count}}, Queue1} -> - send_messages_to_disk(IsDurable, Q, TransQ, Queue1, PublishCount, + send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - inc_queue_length(Q, MsgBuf, Count)); - {{value, {TransQ, Count}}, Queue1} -> - ok = flush_messages_to_disk_queue(TransQ, Commit), - send_messages_to_disk(IsDurable, Q, TransQ, Queue1, 0, - RequeueCount + Count, [], - inc_queue_length(TransQ, MsgBuf, Count)) + inc_queue_length(Q, MsgBuf, Count)) end. flush_messages_to_disk_queue(Q, Commit) -> @@ -235,9 +228,6 @@ to_mixed_mode(TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = mixed }}. -transient_queue(Queue) -> - {Queue, transient}. - inc_queue_length(_Queue, MsgBuf, 0) -> MsgBuf; inc_queue_length(Queue, MsgBuf, Count) -> @@ -259,17 +249,11 @@ dec_queue_length(Mode, MsgBuf) -> end, {Queue, MsgBuf2}. -publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { mode = disk, queue = Q, length = Length, - is_durable = IsDurable, msg_buf = MsgBuf, - memory_size = QSize, memory_gain = Gain }) -> - Persist = IsDurable andalso IsPersistent, - PubQ = case Persist of - true -> Q; - false -> transient_queue(Q) - end, - MsgBuf1 = inc_queue_length(PubQ, MsgBuf, 1), - ok = rabbit_disk_queue:publish(PubQ, Msg, false), +publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, + msg_buf = MsgBuf, memory_size = QSize, + memory_gain = Gain }) -> + MsgBuf1 = inc_queue_length(Q, MsgBuf, 1), + ok = rabbit_disk_queue:publish(Q, Msg, false), MsgSize = size_of_message(Msg), {ok, State #mqstate { memory_gain = Gain + MsgSize, memory_size = QSize + MsgSize, @@ -298,11 +282,7 @@ publish_delivered(Msg = memory_size = QSize, memory_gain = Gain }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> Persist = IsDurable andalso IsPersistent, - PubQ = case Persist of - true -> Q; - false -> transient_queue(Q) - end, - rabbit_disk_queue:publish(PubQ, Msg, false), + rabbit_disk_queue:publish(Q, Msg, false), MsgSize = size_of_message(Msg), State1 = State #mqstate { memory_size = QSize + MsgSize, memory_gain = Gain + MsgSize }, @@ -312,12 +292,12 @@ publish_delivered(Msg = %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag {MsgId, IsPersistent, false, AckTag, 0} = - rabbit_disk_queue:phantom_deliver(PubQ), + rabbit_disk_queue:phantom_deliver(Q), {ok, AckTag, State1}; false -> %% in this case, we don't actually care about the ack, so %% auto ack it (asynchronously). - ok = rabbit_disk_queue:auto_ack_next_message(PubQ), + ok = rabbit_disk_queue:auto_ack_next_message(Q), {ok, noack, State1} end; publish_delivered(Msg, State = @@ -350,16 +330,16 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, ok = maybe_prefetch(Mode, MsgBuf1), {Msg1, IsDelivered1, AckTag1, MsgBuf1}; _ -> - {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf), + {Q, MsgBuf3} = dec_queue_length(Mode, MsgBuf), {Msg1 = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered1, AckTag1, _PersistRem} - = rabbit_disk_queue:deliver(ReadQ), + = rabbit_disk_queue:deliver(Q), AckTag2 = case IsDurable andalso IsPersistent of true -> AckTag1; false -> - ok = rabbit_disk_queue:ack(ReadQ, [AckTag1]), + ok = rabbit_disk_queue:ack(Q, [AckTag1]), noack end, {Msg1, IsDelivered1, AckTag2, MsgBuf3} @@ -368,7 +348,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag, Rem}, State #mqstate { msg_buf = MsgBuf2, length = Rem }}. -maybe_prefetch(disk, MsgBuf) -> +maybe_prefetch(disk, _MsgBuf) -> ok; maybe_prefetch(mixed, MsgBuf) -> case queue:peek(MsgBuf) of @@ -417,64 +397,19 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize, only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). -%% The last 2 params are accumulators. We work through the publishes, -%% sorting out our msgbuf as we go. Finally, when no more work to do, -%% we commit first transient, and the persistent msgs. This is safe -%% because in case of failure, transient messages will be lost on -%% restart anyway. -commit_to_queues(_IsDurable, _Q, _TransQ, MsgBuf, [], [], [], []) -> - MsgBuf; -commit_to_queues(_IsDurable, Q, _TransQ, MsgBuf, AckTags, [], - PersistMsgIds, []) -> - MsgIds = lists:flatten(lists:reverse(PersistMsgIds)), - ok = rabbit_disk_queue:tx_commit(Q, MsgIds, AckTags), - MsgBuf; -commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [], - PersistMsgIds, TransMsgIds) -> - MsgIds = lists:flatten(lists:reverse(TransMsgIds)), - ok = rabbit_disk_queue:tx_commit(TransQ, MsgIds, []), - commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [], - PersistMsgIds, []); -commit_to_queues(false, Q, TransQ, MsgBuf, AckTags, Publishes, [], []) -> - MsgIds = only_msg_ids(Publishes), - MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)), - commit_to_queues(false, Q, TransQ, MsgBuf1, AckTags, [], [], [MsgIds]); -commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes = - [#basic_message { is_persistent = true } | _], - PersistAcc, TransAcc) -> - {Persist, Publishes1} = lists:splitwith(fun is_persistent/1, Publishes), - MsgIds = only_msg_ids(Persist), - MsgBuf1 = inc_queue_length(Q, MsgBuf, erlang:length(MsgIds)), - commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1, - [MsgIds | PersistAcc], TransAcc); -commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes, - PersistAcc, TransAcc) -> - %% not persistent - {Trans, Publishes1} = lists:splitwith(fun is_not_persistent/1, Publishes), - MsgIds = only_msg_ids(Trans), - MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)), - commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1, - PersistAcc, [MsgIds | TransAcc]). - -is_persistent(#basic_message { is_persistent = IsPersistent }) -> - IsPersistent. - -is_not_persistent(#basic_message { is_persistent = IsPersistent }) -> - not IsPersistent. - tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = disk, queue = Q, length = Length, memory_size = QSize, memory_loss = Loss, - is_durable = IsDurable, msg_buf = MsgBuf }) -> + msg_buf = MsgBuf }) -> {RealAcks, ASize} = remove_noacks(MsgsWithAcks), - MsgBuf1 = case ([] == Publishes) andalso ([] == RealAcks) of - true -> MsgBuf; - false -> commit_to_queues - (IsDurable, Q, transient_queue(Q), MsgBuf, - RealAcks, Publishes, [], []) - end, - {ok, State #mqstate { length = Length + erlang:length(Publishes), - msg_buf = MsgBuf1, memory_size = QSize - ASize, + ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; + true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), + RealAcks) + end, + Len = erlang:length(Publishes), + {ok, State #mqstate { length = Length + Len, + msg_buf = inc_queue_length(Q, MsgBuf, Len), + memory_size = QSize - ASize, memory_loss = Loss + ASize }}; tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, @@ -493,9 +428,8 @@ tx_commit(Publishes, MsgsWithAcks, {RealAcks, ASize} = remove_noacks(MsgsWithAcks), ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of true -> ok; - false -> - rabbit_disk_queue:tx_commit( - Q, lists:reverse(PersistentPubs), RealAcks) + false -> rabbit_disk_queue:tx_commit( + Q, lists:reverse(PersistentPubs), RealAcks) end, {ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize, length = Length + erlang:length(Publishes), @@ -540,20 +474,24 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, %% here, we may have messages with no ack tags, because of the %% fact they are not persistent, but nevertheless we want to %% requeue them. This means publishing them delivered. - TransQ = transient_queue(Q), - {MsgBuf1, PersistRQ} + Requeue = lists:foldl( - fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, - {MB, PRQ}) + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) when IsDurable andalso IsPersistent -> - {inc_queue_length(Q, MB, 1), [{AckTag, true} | PRQ]}; - ({Msg, noack}, {MB, PRQ}) -> - ok = rabbit_disk_queue:publish(TransQ, Msg, true), - {inc_queue_length(TransQ, MB, 1), PRQ} - end, {MsgBuf, []}, MessagesWithAckTags), - ok = rabbit_disk_queue:requeue(Q, lists:reverse(PersistRQ)), - {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags), - msg_buf = MsgBuf1 }}; + [{AckTag, true} | RQ]; + ({Msg, noack}, RQ) -> + ok = case RQ == [] of + true -> ok; + false -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) + end, + ok = rabbit_disk_queue:publish(Q, Msg, true), + [] + end, [], MessagesWithAckTags), + ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), + Len = erlang:length(MessagesWithAckTags), + {ok, State #mqstate { length = Length + Len, + msg_buf = inc_queue_length(Q, MsgBuf, Len) }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, @@ -577,18 +515,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, {ok, State #mqstate {msg_buf = MsgBuf1, length = Length + erlang:length(MessagesWithAckTags)}}. -purge(State = #mqstate { queue = Q, length = Count, +purge(State = #mqstate { queue = Q, mode = disk, length = Count, + memory_loss = Loss, memory_size = QSize }) -> + Count = rabbit_disk_queue:purge(Q), + {Count, State #mqstate { length = 0, memory_size = 0, + memory_loss = Loss + QSize }}; +purge(State = #mqstate { queue = Q, mode = mixed, length = Length, memory_loss = Loss, memory_size = QSize }) -> - Len1 = rabbit_disk_queue:purge(Q), - Len2 = rabbit_disk_queue:purge(transient_queue(Q)), - true = Count >= Len1 + Len2, - {Count, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), - memory_loss = Loss + QSize }}. + rabbit_disk_queue:purge(Q), + {Length, + State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, + memory_loss = Loss + QSize }}. delete_queue(State = #mqstate { queue = Q, memory_size = QSize, memory_loss = Loss }) -> ok = rabbit_disk_queue:delete_queue(Q), - ok = rabbit_disk_queue:delete_queue(transient_queue(Q)), {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), memory_loss = Loss + QSize }}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 58a9d0cddd..b9777337ed 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -767,8 +767,8 @@ benchmark_disk_queue() -> ok = control_action(start_app, []), passed. -rdq_message(MsgId, MsgBody) -> - rabbit_basic:message(x, <<>>, [], MsgBody, MsgId). +rdq_message(MsgId, MsgBody, IsPersistent) -> + rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent). rdq_match_message( #basic_message { guid = MsgId, content = @@ -784,7 +784,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> List = lists:seq(1, MsgCount), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) + [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List, _ <- Qs] end, fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) || Q <- Qs] end @@ -820,7 +820,7 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- List], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], rabbit_disk_queue:tx_commit(q, List, []), StartChunk = round(MsgCount / 20), % 5% AckList = @@ -862,7 +862,7 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half @@ -918,7 +918,7 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half @@ -970,7 +970,7 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half @@ -1161,7 +1161,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc rabbit_mixed_queue:ack(AckTags, MS8) end, 0 = rabbit_mixed_queue:length(MS9), - Msg = rdq_message(0, <<0:256>>), + Msg = rdq_message(0, <<0:256>>, false), {ok, AckTag, MS10} = rabbit_mixed_queue:publish_delivered(Msg, MS9), {ok,MS11} = rabbit_mixed_queue:ack([{Msg, AckTag}], MS10), 0 = rabbit_mixed_queue:length(MS11), @@ -1174,12 +1174,12 @@ rdq_test_disk_queue_modes() -> Total = 1000, Half1 = lists:seq(1,round(Total/2)), Half2 = lists:seq(1 + round(Total/2), Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half1], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], ok = rabbit_disk_queue:tx_commit(q, Half1, []), io:format("Publish done~n", []), ok = rabbit_disk_queue:to_disk_only_mode(), io:format("To Disk Only done~n", []), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half2], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2], ok = rabbit_disk_queue:tx_commit(q, Half2, []), Seqs = [begin Remaining = Total - N, |
