diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 439 |
2 files changed, 264 insertions, 224 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4d00bc3a29..8b1487770d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,7 +42,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, prefetch/2 + requeue_next_n/2, prefetch/2, length/1 ]). -export([filesync/0, cache_info/0]). @@ -255,6 +255,7 @@ ( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). @@ -262,11 +263,13 @@ -spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(delete_queue/1 :: (queue_name()) -> 'ok'). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). +-spec(length/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). --spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). +-spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(filesync/0 :: () -> 'ok'). -spec(cache_info/0 :: () -> [{atom(), term()}]). -spec(report_memory/0 :: () -> 'ok'). @@ -322,6 +325,9 @@ delete_non_durable_queues(DurableQueues) -> gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). +length(Q) -> + gen_server2:call(?SERVER, {length, Q}, infinity). + stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -455,6 +461,9 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); +handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + reply(WriteSeqId - ReadSeqId, State); handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State) -> @@ -1033,7 +1042,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, last_sync_offset = SyncOffset }) -> {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - WriteSeqId = InitWriteSeqId + length(PubMsgIds), + WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds), {atomic, {InCurFile, WriteSeqId, State1}} = mnesia:transaction( fun() -> @@ -1088,7 +1097,8 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, internal_tx_cancel(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned - MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), + undefined)), remove_messages(undefined, MsgSeqIds, false, State). internal_requeue(_Q, [], State) -> @@ -1524,7 +1534,8 @@ load_from_disk(State) -> fun (#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }, true) -> - case length(dets_ets_lookup(State1, MsgId)) of + case erlang:length + (dets_ets_lookup(State1, MsgId)) of 0 -> ok == mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); 1 -> true @@ -1622,13 +1633,13 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, - msg_id)) of + case erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = @@ -1662,13 +1673,13 @@ recover_crashed_compactions(Files, TmpFiles) -> verify_messages_in_mnesia(MsgIds) -> lists:foreach( fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, - msg_id)) + true = 0 < erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) end, MsgIds). recover_crashed_compactions1(Files, TmpFile) -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 2ef534ff3d..a9013f3d8b 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -100,10 +100,12 @@ -endif. init(Queue, IsDurable, disk) -> - purge_non_persistent_messages( - #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - is_durable = IsDurable, length = 0, memory_size = 0, - memory_gain = 0, memory_loss = 0 }); + Len = rabbit_disk_queue:length(Queue), + ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)), + MsgBuf = inc_queue_length(Queue, queue:new(), Len), + {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, + is_durable = IsDurable, length = Len, + memory_size = 0, memory_gain = 0, memory_loss = 0 }}; init(Queue, IsDurable, mixed) -> {ok, State} = init(Queue, IsDurable, disk), to_mixed_mode([], State). @@ -126,7 +128,10 @@ to_disk_only_mode(TxnMessages, State = %% message on disk. %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. - ok = send_messages_to_disk(Q, MsgBuf, 0, 0, []), + TransQ = transient_queue(Q), + {ok, MsgBuf1} = + send_messages_to_disk(IsDurable, Q, TransQ, MsgBuf, 0, 0, [], + queue:new()), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -139,38 +144,49 @@ to_disk_only_mode(TxnMessages, State = end end, TxnMessages), garbage_collect(), - {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}. + {ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}. -send_messages_to_disk(Q, Queue, RequeueCount, PublishCount, Commit) -> +send_messages_to_disk(IsDurable, Q, TransQ, Queue, PublishCount, RequeueCount, + Commit, MsgBuf) -> case queue:out(Queue) of {empty, Queue} -> - ok = flush_messages_to_disk_queue(Q, Commit), - [] = flush_requeue_to_disk_queue(Q, RequeueCount, []), - ok; - {{value, {Msg = #basic_message { guid = MsgId }, _IsDelivered, OnDisk}}, - Queue1} -> - case OnDisk of - true -> - ok = flush_messages_to_disk_queue(Q, Commit), + ok = flush_messages_to_disk_queue(TransQ, Commit), + [] = flush_requeue_to_disk_queue(TransQ, RequeueCount, []), + {ok, MsgBuf}; + {{value, {Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + _IsDelivered}}, Queue1} -> + case IsDurable andalso IsPersistent of + true -> %% it's already in the persistent Q send_messages_to_disk( - Q, Queue1, 1 + RequeueCount, 0, []); + IsDurable, Q, TransQ, Queue1, PublishCount, RequeueCount, + Commit, inc_queue_length(Q, MsgBuf, 1)); false -> - Commit1 = - flush_requeue_to_disk_queue(Q, RequeueCount, Commit), + Commit1 = flush_requeue_to_disk_queue + (TransQ, RequeueCount, Commit), ok = rabbit_disk_queue:tx_publish(Msg), case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of true -> - ok = flush_messages_to_disk_queue(Q, Commit1), - send_messages_to_disk(Q, Queue1, 0, 1, [MsgId]); + ok = flush_messages_to_disk_queue(TransQ, Commit1), + send_messages_to_disk( + IsDurable, Q, TransQ, Queue1, 1, 0, [MsgId], + inc_queue_length(TransQ, MsgBuf, 1)); false -> - send_messages_to_disk - (Q, Queue1, 0, PublishCount + 1, - [MsgId | Commit1]) + send_messages_to_disk( + IsDurable, Q, TransQ, Queue1, PublishCount + 1, 0, + [MsgId | Commit1], + inc_queue_length(TransQ, MsgBuf, 1)) end end; - {{value, {disk, Count}}, Queue2} -> - ok = flush_messages_to_disk_queue(Q, Commit), - send_messages_to_disk(Q, Queue2, RequeueCount + Count, 0, []) + {{value, {Q, Count}}, Queue1} -> + send_messages_to_disk(IsDurable, Q, TransQ, Queue1, PublishCount, + RequeueCount, Commit, + inc_queue_length(Q, MsgBuf, Count)); + {{value, {TransQ, Count}}, Queue1} -> + ok = flush_messages_to_disk_queue(TransQ, Commit), + send_messages_to_disk(IsDurable, Q, TransQ, Queue1, 0, + RequeueCount + Count, [], + inc_queue_length(TransQ, MsgBuf, Count)) end. flush_messages_to_disk_queue(Q, Commit) -> @@ -192,17 +208,13 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> {ok, State}; to_mixed_mode(TxnMessages, State = - #mqstate { mode = disk, queue = Q, length = Length, - is_durable = IsDurable }) -> + #mqstate { mode = disk, queue = Q, + is_durable = IsDurable, msg_buf = MsgBuf }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), %% load up a new queue with a token that says how many messages - %% are on disk + %% are on disk (this is already built for us by the disk mode) %% don't actually do anything to the disk - MsgBuf = case Length of - 0 -> queue:new(); - _ -> ok = rabbit_disk_queue:prefetch(Q, Length), - queue:from_list([{disk, Length}]) - end, + ok = maybe_prefetch(MsgBuf), %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -219,57 +231,58 @@ to_mixed_mode(TxnMessages, State = true -> rabbit_disk_queue:tx_cancel(Cancel) end, garbage_collect(), - {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}. - -purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable, - memory_size = 0 }) -> - %% iterate through the content on disk, ack anything which isn't - %% persistent, accumulate everything else that is persistent and - %% requeue it - {Acks, Requeue, Length, QSize} = - deliver_all_messages(Q, IsDurable, [], [], 0, 0), - ok = if Requeue == [] -> ok; - true -> - rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)) - end, - ok = if Acks == [] -> ok; - true -> rabbit_disk_queue:ack(Q, Acks) - end, - {ok, State #mqstate { length = Length, memory_size = QSize }}. - -deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, QSize) -> - case rabbit_disk_queue:deliver(Q) of - empty -> {Acks, Requeue, Length, QSize}; - {Msg = #basic_message { is_persistent = IsPersistent }, - _Size, IsDelivered, AckTag, _Remaining} -> - OnDisk = IsPersistent andalso IsDurable, - {Acks1, Requeue1, Length1, QSize1} = - if OnDisk -> { Acks, - [{AckTag, IsDelivered} | Requeue], - Length + 1, QSize + size_of_message(Msg) }; - true -> { [AckTag | Acks], Requeue, Length, QSize } - end, - deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1, QSize1) + {ok, State #mqstate { mode = mixed }}. + +transient_queue(Queue) -> + {Queue, transient}. + +inc_queue_length(_Queue, MsgBuf, 0) -> + MsgBuf; +inc_queue_length(Queue, MsgBuf, Count) -> + case queue:out_r(MsgBuf) of + {empty, MsgBuf} -> + queue:in({Queue, Count}, MsgBuf); + {{value, {Queue, Len}}, MsgBuf1} -> + queue:in({Queue, Len + Count}, MsgBuf1); + {{value, _}, _MsgBuf1} -> + queue:in({Queue, Count}, MsgBuf) end. -publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, - memory_size = QSize, memory_gain = Gain }) -> - ok = rabbit_disk_queue:publish(Q, Msg, false), +dec_queue_length(MsgBuf) -> + {{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf), + MsgBuf2 = case Len of + 1 -> ok = maybe_prefetch(MsgBuf1), + MsgBuf1; + _ -> queue:in_r({Queue, Len-1}, MsgBuf1) + end, + {Queue, MsgBuf2}. + +publish(Msg = #basic_message { is_persistent = IsPersistent }, + State = #mqstate { mode = disk, queue = Q, length = Length, + is_durable = IsDurable, msg_buf = MsgBuf, + memory_size = QSize, memory_gain = Gain }) -> + Persist = IsDurable andalso IsPersistent, + PubQ = case Persist of + true -> Q; + false -> transient_queue(Q) + end, + MsgBuf1 = inc_queue_length(PubQ, MsgBuf, 1), + ok = rabbit_disk_queue:publish(PubQ, Msg, false), MsgSize = size_of_message(Msg), - {ok, State #mqstate { length = Length + 1, memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}; + {ok, State #mqstate { memory_gain = Gain + MsgSize, + memory_size = QSize + MsgSize, + msg_buf = MsgBuf1, length = Length + 1 }}; publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, msg_buf = MsgBuf, length = Length, memory_size = QSize, memory_gain = Gain }) -> - OnDisk = IsDurable andalso IsPersistent, - ok = if OnDisk -> - rabbit_disk_queue:publish(Q, Msg, false); - true -> ok + Persist = IsDurable andalso IsPersistent, + ok = case Persist of + true -> rabbit_disk_queue:publish(Q, Msg, false); + false -> ok end, MsgSize = size_of_message(Msg), - {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), + {ok, State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf), length = Length + 1, memory_size = QSize + MsgSize, memory_gain = Gain + MsgSize }}. @@ -279,23 +292,29 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, State = #mqstate { mode = Mode, is_durable = IsDurable, - queue = Q, length = 0, memory_size = QSize, - memory_gain = Gain }) + queue = Q, length = 0, + memory_size = QSize, memory_gain = Gain }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - rabbit_disk_queue:publish(Q, Msg, false), + Persist = IsDurable andalso IsPersistent, + PubQ = case Persist of + true -> Q; + false -> transient_queue(Q) + end, + rabbit_disk_queue:publish(PubQ, Msg, false), MsgSize = size_of_message(Msg), State1 = State #mqstate { memory_size = QSize + MsgSize, memory_gain = Gain + MsgSize }, - if IsDurable andalso IsPersistent -> + case Persist of + true -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag - {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), + {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ), {ok, AckTag, State1}; - true -> + false -> %% in this case, we don't actually care about the ack, so %% auto ack it (asynchronously). - ok = rabbit_disk_queue:auto_ack_next_message(Q), + ok = rabbit_disk_queue:auto_ack_next_message(PubQ), {ok, noack, State1} end; publish_delivered(Msg, State = @@ -307,103 +326,77 @@ publish_delivered(Msg, State = deliver(State = #mqstate { length = 0 }) -> {empty, State}; -deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length }) -> - {Msg = #basic_message { is_persistent = IsPersistent }, - _Size, IsDelivered, AckTag, Remaining} - = rabbit_disk_queue:deliver(Q), - AckTag1 = if IsPersistent andalso IsDurable -> AckTag; - true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), - noack - end, - {{Msg, IsDelivered, AckTag1, Remaining}, - State #mqstate { length = Length - 1 }}; -deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, +deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, is_durable = IsDurable, length = Length }) -> - {{value, Value}, MsgBuf1} - = queue:out(MsgBuf), + {{value, Value}, MsgBuf1} = queue:out(MsgBuf), {Msg, IsDelivered, AckTag, MsgBuf2} = case Value of {Msg1 = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - IsDelivered1, OnDisk} -> + IsDelivered1} -> AckTag1 = - case OnDisk of + case IsDurable andalso IsPersistent of true -> - case IsPersistent andalso IsDurable of - true -> - {MsgId, IsDelivered1, AckTag2, _PersistRem} - = rabbit_disk_queue:phantom_deliver(Q), - AckTag2; - false -> - ok = rabbit_disk_queue:auto_ack_next_message - (Q), - noack - end; - false -> noack + {MsgId, IsDelivered1, AckTag2, _PersistRem} + = rabbit_disk_queue:phantom_deliver(Q), + AckTag2; + false -> + noack end, - ok = maybe_prefetch(Q, MsgBuf1), + ok = maybe_prefetch(MsgBuf1), {Msg1, IsDelivered1, AckTag1, MsgBuf1}; - {disk, Rem1} -> + _ -> + {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf), {Msg1 = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered1, AckTag1, _PersistRem} - = rabbit_disk_queue:deliver(Q), + = rabbit_disk_queue:deliver(ReadQ), AckTag2 = - case IsPersistent andalso IsDurable of - true -> AckTag1; - false -> rabbit_disk_queue:ack(Q, [AckTag1]), - noack + case IsDurable andalso IsPersistent of + true -> + AckTag1; + false -> + ok = rabbit_disk_queue:ack(ReadQ, [AckTag1]), + noack end, - MsgBuf3 = case Rem1 of - 1 -> ok = maybe_prefetch(Q, MsgBuf1), - MsgBuf1; - _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1) - end, {Msg1, IsDelivered1, AckTag2, MsgBuf3} end, Rem = Length - 1, {{Msg, IsDelivered, AckTag, Rem}, State #mqstate { msg_buf = MsgBuf2, length = Rem }}. -maybe_prefetch(Q, MsgBuf) -> +maybe_prefetch(MsgBuf) -> case queue:peek(MsgBuf) of - empty -> ok; - {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count); - {value, _} -> ok + empty -> + ok; + {value, {#basic_message {}, _IsDelivered}} -> + ok; + {value, {Q, Count}} -> + rabbit_disk_queue:prefetch(Q, Count) end. - remove_noacks(MsgsWithAcks) -> - {AckTags, ASize} = - lists:foldl( - fun ({Msg, noack}, {AccAckTags, AccSize}) -> - {AccAckTags, size_of_message(Msg) + AccSize}; - ({Msg, AckTag}, {AccAckTags, AccSize}) -> - {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} - end, {[], 0}, MsgsWithAcks), - {AckTags, ASize}. + lists:foldl( + fun ({Msg, noack}, {AccAckTags, AccSize}) -> + {AccAckTags, size_of_message(Msg) + AccSize}; + ({Msg, AckTag}, {AccAckTags, AccSize}) -> + {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} + end, {[], 0}, MsgsWithAcks). ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize, memory_loss = Loss }) -> - ASize = case remove_noacks(MsgsWithAcks) of - {[], ASize1} -> ASize1; - {AckTags, ASize1} -> rabbit_disk_queue:ack(Q, AckTags), - ASize1 + {AckTags, ASize} = remove_noacks(MsgsWithAcks), + ok = case AckTags of + [] -> ok; + _ -> rabbit_disk_queue:ack(Q, AckTags) end, State1 = State #mqstate { memory_size = QSize - ASize, memory_loss = Loss + ASize }, {ok, State1}. -tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize, - memory_gain = Gain }) -> - ok = rabbit_disk_queue:tx_publish(Msg), - MsgSize = size_of_message(Msg), - {ok, State #mqstate { memory_size = QSize + MsgSize, - memory_gain = Gain + MsgSize }}; -tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = - #mqstate { mode = mixed, is_durable = IsDurable, - memory_size = QSize, memory_gain = Gain }) - when IsDurable andalso IsPersistent -> +tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, + State = #mqstate { mode = Mode, memory_size = QSize, + is_durable = IsDurable, memory_gain = Gain }) + when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> ok = rabbit_disk_queue:tx_publish(Msg), MsgSize = size_of_message(Msg), {ok, State #mqstate { memory_size = QSize + MsgSize, @@ -418,16 +411,64 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize, only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). +%% The last 2 params are accumulators. We work through the publishes, +%% sorting out our msgbuf as we go. Finally, when no more work to do, +%% we commit first transient, and the persistent msgs. This is safe +%% because in case of failure, transient messages will be lost on +%% restart anyway. +commit_to_queues(_IsDurable, _Q, _TransQ, MsgBuf, [], [], [], []) -> + MsgBuf; +commit_to_queues(_IsDurable, Q, _TransQ, MsgBuf, AckTags, [], + PersistMsgIds, []) -> + MsgIds = lists:flatten(lists:reverse(PersistMsgIds)), + ok = rabbit_disk_queue:tx_commit(Q, MsgIds, AckTags), + MsgBuf; +commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [], + PersistMsgIds, TransMsgIds) -> + MsgIds = lists:flatten(lists:reverse(TransMsgIds)), + ok = rabbit_disk_queue:tx_commit(TransQ, MsgIds, []), + commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [], + PersistMsgIds, []); +commit_to_queues(false, Q, TransQ, MsgBuf, AckTags, Publishes, [], []) -> + MsgIds = only_msg_ids(Publishes), + MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)), + commit_to_queues(false, Q, TransQ, MsgBuf1, AckTags, [], [], [MsgIds]); +commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes = + [#basic_message { is_persistent = true } | _], + PersistAcc, TransAcc) -> + {Persist, Publishes1} = lists:splitwith(fun is_persistent/1, Publishes), + MsgIds = only_msg_ids(Persist), + MsgBuf1 = inc_queue_length(Q, MsgBuf, erlang:length(MsgIds)), + commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1, + [MsgIds | PersistAcc], TransAcc); +commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes, + PersistAcc, TransAcc) -> + %% not persistent + {Trans, Publishes1} = lists:splitwith(fun is_not_persistent/1, Publishes), + MsgIds = only_msg_ids(Trans), + MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)), + commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1, + PersistAcc, [MsgIds | TransAcc]). + +is_persistent(#basic_message { is_persistent = IsPersistent }) -> + IsPersistent. + +is_not_persistent(#basic_message { is_persistent = IsPersistent }) -> + not IsPersistent. + tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = disk, queue = Q, length = Length, - memory_size = QSize, memory_loss = Loss }) -> + memory_size = QSize, memory_loss = Loss, + is_durable = IsDurable, msg_buf = MsgBuf }) -> {RealAcks, ASize} = remove_noacks(MsgsWithAcks), - ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; - true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), - RealAcks) - end, + MsgBuf1 = case ([] == Publishes) andalso ([] == RealAcks) of + true -> MsgBuf; + false -> commit_to_queues + (IsDurable, Q, transient_queue(Q), MsgBuf, + RealAcks, Publishes, [], []) + end, {ok, State #mqstate { length = Length + erlang:length(Publishes), - memory_size = QSize - ASize, + msg_buf = MsgBuf1, memory_size = QSize - ASize, memory_loss = Loss + ASize }}; tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, @@ -436,19 +477,17 @@ tx_commit(Publishes, MsgsWithAcks, {PersistentPubs, MsgBuf1} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf2}) -> - OnDisk = IsPersistent andalso IsDurable, Acc1 = - if OnDisk -> - [Msg #basic_message.guid | Acc]; - true -> Acc + case IsPersistent andalso IsDurable of + true -> [Msg #basic_message.guid | Acc]; + false -> Acc end, - {Acc1, queue:in({Msg, false, OnDisk}, MsgBuf2)} + {Acc1, queue:in({Msg, false}, MsgBuf2)} end, {[], MsgBuf}, Publishes), - %% foldl reverses, so re-reverse PersistentPubs to match - %% requirements of rabbit_disk_queue (ascending SeqIds) {RealAcks, ASize} = remove_noacks(MsgsWithAcks), - ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; - true -> + ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of + true -> ok; + false -> rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, @@ -490,28 +529,25 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, %% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length - }) -> + length = Length, + msg_buf = MsgBuf }) -> %% here, we may have messages with no ack tags, because of the %% fact they are not persistent, but nevertheless we want to %% requeue them. This means publishing them delivered. - Requeue + TransQ = transient_queue(Q), + {MsgBuf1, PersistRQ} = lists:foldl( - fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, + {MB, PRQ}) when IsDurable andalso IsPersistent -> - [{AckTag, true} | RQ]; - ({Msg, _AckTag}, RQ) -> - ok = case RQ == [] of - true -> ok; - false -> rabbit_disk_queue:requeue( - Q, lists:reverse(RQ)) - end, - ok = rabbit_disk_queue:publish(Q, Msg, true), - [] - end, [], MessagesWithAckTags), - ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), - {ok, - State #mqstate { length = Length + erlang:length(MessagesWithAckTags) }}; + {inc_queue_length(Q, MB, 1), [{AckTag, true} | PRQ]}; + ({Msg, noack}, {MB, PRQ}) -> + ok = rabbit_disk_queue:publish(TransQ, Msg, true), + {inc_queue_length(TransQ, MB, 1), PRQ} + end, {MsgBuf, []}, MessagesWithAckTags), + ok = rabbit_disk_queue:requeue(Q, lists:reverse(PersistRQ)), + {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags), + msg_buf = MsgBuf1 }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, @@ -521,40 +557,33 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, {Acc, MsgBuf2}) -> - OnDisk = IsDurable andalso IsPersistent, Acc1 = - if OnDisk -> [{AckTag, true} | Acc]; - true -> Acc + case IsDurable andalso IsPersistent of + true -> [{AckTag, true} | Acc]; + false -> Acc end, - {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)} + {Acc1, queue:in({Msg, true}, MsgBuf2)} end, {[], MsgBuf}, MessagesWithAckTags), - ok = if [] == PersistentPubs -> ok; - true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) + ok = case PersistentPubs of + [] -> ok; + _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, {ok, State #mqstate {msg_buf = MsgBuf1, length = Length + erlang:length(MessagesWithAckTags)}}. -purge(State = #mqstate { queue = Q, mode = disk, length = Count, - memory_loss = Loss, memory_size = QSize }) -> - Count = rabbit_disk_queue:purge(Q), - {Count, State #mqstate { length = 0, memory_size = 0, - memory_loss = Loss + QSize }}; -purge(State = #mqstate { queue = Q, mode = mixed, length = Length, +purge(State = #mqstate { queue = Q, length = Count, memory_loss = Loss, memory_size = QSize }) -> - rabbit_disk_queue:purge(Q), - {Length, - State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, - memory_loss = Loss + QSize }}. + Len1 = rabbit_disk_queue:purge(Q), + Len2 = rabbit_disk_queue:purge(transient_queue(Q)), + true = Count >= Len1 + Len2, + {Count, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), + memory_loss = Loss + QSize }}. -delete_queue(State = #mqstate { queue = Q, mode = disk, memory_size = QSize, - memory_loss = Loss }) -> - rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { length = 0, memory_size = 0, - memory_loss = Loss + QSize }}; -delete_queue(State = #mqstate { queue = Q, mode = mixed, memory_size = QSize, +delete_queue(State = #mqstate { queue = Q, memory_size = QSize, memory_loss = Loss }) -> - rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, + ok = rabbit_disk_queue:delete_queue(Q), + ok = rabbit_disk_queue:delete_queue(transient_queue(Q)), + {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), memory_loss = Loss + QSize }}. length(#mqstate { length = Length }) -> |
