summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-15 18:03:21 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-15 18:03:21 +0100
commit6467e9a1c93c5b88cee3b214f8e9fc89b83743cb (patch)
tree58338ae20174e781390b5d49a345431aa9d90deb
parent407314953d34d39f38178b99797adf45d31b2f53 (diff)
downloadrabbitmq-server-git-6467e9a1c93c5b88cee3b214f8e9fc89b83743cb.tar.gz
Substantial changes to mixed_queue.
Previously, persistent and non-persistent messages went into the same queue on disk. The advantage of this is that you don't need to track which queue you're currently reading from and for how many messages. However, the downside to this is that on queue recovery you need to iterate through the entire queue and delete all non-persistent messages. This takes a huge amount of time. So now this is changed. Each amqqueue is now two on disk queues. One for persistent messages and one for non-persistent messages. Thus queue recovery is now trivial - just delete the non-persistent queue. However, we now _always_ use the erlang queue in mixed_queue to track (in disk mode) how many of each queue we need to read (i.e. run-length encoding). This, in the worst case (alternating persistent and non-persistent) is per-message cost. It's possible we need some sort of disk-based queue (AGH!). Not sure. Provided the queue only contains one sort of message, it degenerates to a simple single counter. All tests pass. However, there is a bug, which is that on recovery, the size of the queue (RAM cost) is not known. As such, the reporting of the queue to the queue_mode manager on queue recovery is incorrect (it starts of 0, and can go -ve). I've not decided how to fix this yet, because I do not want to have to iterate through all the messages to get the queue size out!
-rw-r--r--src/rabbit_disk_queue.erl49
-rw-r--r--src/rabbit_mixed_queue.erl439
2 files changed, 264 insertions, 224 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4d00bc3a29..8b1487770d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2
+ requeue_next_n/2, prefetch/2, length/1
]).
-export([filesync/0, cache_info/0]).
@@ -255,6 +255,7 @@
( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()},
non_neg_integer()})).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) ->
'ok').
@@ -262,11 +263,13 @@
-spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
+-spec(length/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
--spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
-spec(report_memory/0 :: () -> 'ok').
@@ -322,6 +325,9 @@ delete_non_durable_queues(DurableQueues) ->
gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
infinity).
+length(Q) ->
+ gen_server2:call(?SERVER, {length, Q}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -455,6 +461,9 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
+handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ reply(WriteSeqId - ReadSeqId, State);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -1033,7 +1042,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
last_sync_offset = SyncOffset
}) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId = InitWriteSeqId + length(PubMsgIds),
+ WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds),
{atomic, {InCurFile, WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
@@ -1088,7 +1097,8 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
+ undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
internal_requeue(_Q, [], State) ->
@@ -1524,7 +1534,8 @@ load_from_disk(State) ->
fun (#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = {Q, SeqId} },
true) ->
- case length(dets_ets_lookup(State1, MsgId)) of
+ case erlang:length
+ (dets_ets_lookup(State1, MsgId)) of
0 -> ok == mnesia:delete(rabbit_disk_queue,
{Q, SeqId}, write);
1 -> true
@@ -1622,13 +1633,13 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id)) of
+ case erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true =
@@ -1662,13 +1673,13 @@ recover_crashed_compactions(Files, TmpFiles) ->
verify_messages_in_mnesia(MsgIds) ->
lists:foreach(
fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id))
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id))
end, MsgIds).
recover_crashed_compactions1(Files, TmpFile) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 2ef534ff3d..a9013f3d8b 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -100,10 +100,12 @@
-endif.
init(Queue, IsDurable, disk) ->
- purge_non_persistent_messages(
- #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0, memory_size = 0,
- memory_gain = 0, memory_loss = 0 });
+ Len = rabbit_disk_queue:length(Queue),
+ ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
+ MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
+ is_durable = IsDurable, length = Len,
+ memory_size = 0, memory_gain = 0, memory_loss = 0 }};
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode([], State).
@@ -126,7 +128,10 @@ to_disk_only_mode(TxnMessages, State =
%% message on disk.
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
- ok = send_messages_to_disk(Q, MsgBuf, 0, 0, []),
+ TransQ = transient_queue(Q),
+ {ok, MsgBuf1} =
+ send_messages_to_disk(IsDurable, Q, TransQ, MsgBuf, 0, 0, [],
+ queue:new()),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -139,38 +144,49 @@ to_disk_only_mode(TxnMessages, State =
end
end, TxnMessages),
garbage_collect(),
- {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
+ {ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}.
-send_messages_to_disk(Q, Queue, RequeueCount, PublishCount, Commit) ->
+send_messages_to_disk(IsDurable, Q, TransQ, Queue, PublishCount, RequeueCount,
+ Commit, MsgBuf) ->
case queue:out(Queue) of
{empty, Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- [] = flush_requeue_to_disk_queue(Q, RequeueCount, []),
- ok;
- {{value, {Msg = #basic_message { guid = MsgId }, _IsDelivered, OnDisk}},
- Queue1} ->
- case OnDisk of
- true ->
- ok = flush_messages_to_disk_queue(Q, Commit),
+ ok = flush_messages_to_disk_queue(TransQ, Commit),
+ [] = flush_requeue_to_disk_queue(TransQ, RequeueCount, []),
+ {ok, MsgBuf};
+ {{value, {Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ _IsDelivered}}, Queue1} ->
+ case IsDurable andalso IsPersistent of
+ true -> %% it's already in the persistent Q
send_messages_to_disk(
- Q, Queue1, 1 + RequeueCount, 0, []);
+ IsDurable, Q, TransQ, Queue1, PublishCount, RequeueCount,
+ Commit, inc_queue_length(Q, MsgBuf, 1));
false ->
- Commit1 =
- flush_requeue_to_disk_queue(Q, RequeueCount, Commit),
+ Commit1 = flush_requeue_to_disk_queue
+ (TransQ, RequeueCount, Commit),
ok = rabbit_disk_queue:tx_publish(Msg),
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true ->
- ok = flush_messages_to_disk_queue(Q, Commit1),
- send_messages_to_disk(Q, Queue1, 0, 1, [MsgId]);
+ ok = flush_messages_to_disk_queue(TransQ, Commit1),
+ send_messages_to_disk(
+ IsDurable, Q, TransQ, Queue1, 1, 0, [MsgId],
+ inc_queue_length(TransQ, MsgBuf, 1));
false ->
- send_messages_to_disk
- (Q, Queue1, 0, PublishCount + 1,
- [MsgId | Commit1])
+ send_messages_to_disk(
+ IsDurable, Q, TransQ, Queue1, PublishCount + 1, 0,
+ [MsgId | Commit1],
+ inc_queue_length(TransQ, MsgBuf, 1))
end
end;
- {{value, {disk, Count}}, Queue2} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- send_messages_to_disk(Q, Queue2, RequeueCount + Count, 0, [])
+ {{value, {Q, Count}}, Queue1} ->
+ send_messages_to_disk(IsDurable, Q, TransQ, Queue1, PublishCount,
+ RequeueCount, Commit,
+ inc_queue_length(Q, MsgBuf, Count));
+ {{value, {TransQ, Count}}, Queue1} ->
+ ok = flush_messages_to_disk_queue(TransQ, Commit),
+ send_messages_to_disk(IsDurable, Q, TransQ, Queue1, 0,
+ RequeueCount + Count, [],
+ inc_queue_length(TransQ, MsgBuf, Count))
end.
flush_messages_to_disk_queue(Q, Commit) ->
@@ -192,17 +208,13 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) ->
to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
{ok, State};
to_mixed_mode(TxnMessages, State =
- #mqstate { mode = disk, queue = Q, length = Length,
- is_durable = IsDurable }) ->
+ #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable, msg_buf = MsgBuf }) ->
rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
%% load up a new queue with a token that says how many messages
- %% are on disk
+ %% are on disk (this is already built for us by the disk mode)
%% don't actually do anything to the disk
- MsgBuf = case Length of
- 0 -> queue:new();
- _ -> ok = rabbit_disk_queue:prefetch(Q, Length),
- queue:from_list([{disk, Length}])
- end,
+ ok = maybe_prefetch(MsgBuf),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -219,57 +231,58 @@ to_mixed_mode(TxnMessages, State =
true -> rabbit_disk_queue:tx_cancel(Cancel)
end,
garbage_collect(),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}.
-
-purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable,
- memory_size = 0 }) ->
- %% iterate through the content on disk, ack anything which isn't
- %% persistent, accumulate everything else that is persistent and
- %% requeue it
- {Acks, Requeue, Length, QSize} =
- deliver_all_messages(Q, IsDurable, [], [], 0, 0),
- ok = if Requeue == [] -> ok;
- true ->
- rabbit_disk_queue:requeue(Q, lists:reverse(Requeue))
- end,
- ok = if Acks == [] -> ok;
- true -> rabbit_disk_queue:ack(Q, Acks)
- end,
- {ok, State #mqstate { length = Length, memory_size = QSize }}.
-
-deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, QSize) ->
- case rabbit_disk_queue:deliver(Q) of
- empty -> {Acks, Requeue, Length, QSize};
- {Msg = #basic_message { is_persistent = IsPersistent },
- _Size, IsDelivered, AckTag, _Remaining} ->
- OnDisk = IsPersistent andalso IsDurable,
- {Acks1, Requeue1, Length1, QSize1} =
- if OnDisk -> { Acks,
- [{AckTag, IsDelivered} | Requeue],
- Length + 1, QSize + size_of_message(Msg) };
- true -> { [AckTag | Acks], Requeue, Length, QSize }
- end,
- deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1, QSize1)
+ {ok, State #mqstate { mode = mixed }}.
+
+transient_queue(Queue) ->
+ {Queue, transient}.
+
+inc_queue_length(_Queue, MsgBuf, 0) ->
+ MsgBuf;
+inc_queue_length(Queue, MsgBuf, Count) ->
+ case queue:out_r(MsgBuf) of
+ {empty, MsgBuf} ->
+ queue:in({Queue, Count}, MsgBuf);
+ {{value, {Queue, Len}}, MsgBuf1} ->
+ queue:in({Queue, Len + Count}, MsgBuf1);
+ {{value, _}, _MsgBuf1} ->
+ queue:in({Queue, Count}, MsgBuf)
end.
-publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize, memory_gain = Gain }) ->
- ok = rabbit_disk_queue:publish(Q, Msg, false),
+dec_queue_length(MsgBuf) ->
+ {{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf),
+ MsgBuf2 = case Len of
+ 1 -> ok = maybe_prefetch(MsgBuf1),
+ MsgBuf1;
+ _ -> queue:in_r({Queue, Len-1}, MsgBuf1)
+ end,
+ {Queue, MsgBuf2}.
+
+publish(Msg = #basic_message { is_persistent = IsPersistent },
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ is_durable = IsDurable, msg_buf = MsgBuf,
+ memory_size = QSize, memory_gain = Gain }) ->
+ Persist = IsDurable andalso IsPersistent,
+ PubQ = case Persist of
+ true -> Q;
+ false -> transient_queue(Q)
+ end,
+ MsgBuf1 = inc_queue_length(PubQ, MsgBuf, 1),
+ ok = rabbit_disk_queue:publish(PubQ, Msg, false),
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { length = Length + 1, memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }};
+ {ok, State #mqstate { memory_gain = Gain + MsgSize,
+ memory_size = QSize + MsgSize,
+ msg_buf = MsgBuf1, length = Length + 1 }};
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length, memory_size = QSize,
memory_gain = Gain }) ->
- OnDisk = IsDurable andalso IsPersistent,
- ok = if OnDisk ->
- rabbit_disk_queue:publish(Q, Msg, false);
- true -> ok
+ Persist = IsDurable andalso IsPersistent,
+ ok = case Persist of
+ true -> rabbit_disk_queue:publish(Q, Msg, false);
+ false -> ok
end,
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
+ {ok, State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
length = Length + 1, memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize }}.
@@ -279,23 +292,29 @@ publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
State =
#mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0, memory_size = QSize,
- memory_gain = Gain })
+ queue = Q, length = 0,
+ memory_size = QSize, memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- rabbit_disk_queue:publish(Q, Msg, false),
+ Persist = IsDurable andalso IsPersistent,
+ PubQ = case Persist of
+ true -> Q;
+ false -> transient_queue(Q)
+ end,
+ rabbit_disk_queue:publish(PubQ, Msg, false),
MsgSize = size_of_message(Msg),
State1 = State #mqstate { memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize },
- if IsDurable andalso IsPersistent ->
+ case Persist of
+ true ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ),
{ok, AckTag, State1};
- true ->
+ false ->
%% in this case, we don't actually care about the ack, so
%% auto ack it (asynchronously).
- ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ ok = rabbit_disk_queue:auto_ack_next_message(PubQ),
{ok, noack, State1}
end;
publish_delivered(Msg, State =
@@ -307,103 +326,77 @@ publish_delivered(Msg, State =
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
-deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
- length = Length }) ->
- {Msg = #basic_message { is_persistent = IsPersistent },
- _Size, IsDelivered, AckTag, Remaining}
- = rabbit_disk_queue:deliver(Q),
- AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
- true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
- noack
- end,
- {{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1 }};
-deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
+deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
is_durable = IsDurable, length = Length }) ->
- {{value, Value}, MsgBuf1}
- = queue:out(MsgBuf),
+ {{value, Value}, MsgBuf1} = queue:out(MsgBuf),
{Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
{Msg1 = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- IsDelivered1, OnDisk} ->
+ IsDelivered1} ->
AckTag1 =
- case OnDisk of
+ case IsDurable andalso IsPersistent of
true ->
- case IsPersistent andalso IsDurable of
- true ->
- {MsgId, IsDelivered1, AckTag2, _PersistRem}
- = rabbit_disk_queue:phantom_deliver(Q),
- AckTag2;
- false ->
- ok = rabbit_disk_queue:auto_ack_next_message
- (Q),
- noack
- end;
- false -> noack
+ {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ AckTag2;
+ false ->
+ noack
end,
- ok = maybe_prefetch(Q, MsgBuf1),
+ ok = maybe_prefetch(MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
- {disk, Rem1} ->
+ _ ->
+ {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
- = rabbit_disk_queue:deliver(Q),
+ = rabbit_disk_queue:deliver(ReadQ),
AckTag2 =
- case IsPersistent andalso IsDurable of
- true -> AckTag1;
- false -> rabbit_disk_queue:ack(Q, [AckTag1]),
- noack
+ case IsDurable andalso IsPersistent of
+ true ->
+ AckTag1;
+ false ->
+ ok = rabbit_disk_queue:ack(ReadQ, [AckTag1]),
+ noack
end,
- MsgBuf3 = case Rem1 of
- 1 -> ok = maybe_prefetch(Q, MsgBuf1),
- MsgBuf1;
- _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1)
- end,
{Msg1, IsDelivered1, AckTag2, MsgBuf3}
end,
Rem = Length - 1,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(Q, MsgBuf) ->
+maybe_prefetch(MsgBuf) ->
case queue:peek(MsgBuf) of
- empty -> ok;
- {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count);
- {value, _} -> ok
+ empty ->
+ ok;
+ {value, {#basic_message {}, _IsDelivered}} ->
+ ok;
+ {value, {Q, Count}} ->
+ rabbit_disk_queue:prefetch(Q, Count)
end.
-
remove_noacks(MsgsWithAcks) ->
- {AckTags, ASize} =
- lists:foldl(
- fun ({Msg, noack}, {AccAckTags, AccSize}) ->
- {AccAckTags, size_of_message(Msg) + AccSize};
- ({Msg, AckTag}, {AccAckTags, AccSize}) ->
- {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
- end, {[], 0}, MsgsWithAcks),
- {AckTags, ASize}.
+ lists:foldl(
+ fun ({Msg, noack}, {AccAckTags, AccSize}) ->
+ {AccAckTags, size_of_message(Msg) + AccSize};
+ ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
+ end, {[], 0}, MsgsWithAcks).
ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss }) ->
- ASize = case remove_noacks(MsgsWithAcks) of
- {[], ASize1} -> ASize1;
- {AckTags, ASize1} -> rabbit_disk_queue:ack(Q, AckTags),
- ASize1
+ {AckTags, ASize} = remove_noacks(MsgsWithAcks),
+ ok = case AckTags of
+ [] -> ok;
+ _ -> rabbit_disk_queue:ack(Q, AckTags)
end,
State1 = State #mqstate { memory_size = QSize - ASize,
memory_loss = Loss + ASize },
{ok, State1}.
-tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize,
- memory_gain = Gain }) ->
- ok = rabbit_disk_queue:tx_publish(Msg),
- MsgSize = size_of_message(Msg),
- {ok, State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }};
-tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
- #mqstate { mode = mixed, is_durable = IsDurable,
- memory_size = QSize, memory_gain = Gain })
- when IsDurable andalso IsPersistent ->
+tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
+ State = #mqstate { mode = Mode, memory_size = QSize,
+ is_durable = IsDurable, memory_gain = Gain })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
ok = rabbit_disk_queue:tx_publish(Msg),
MsgSize = size_of_message(Msg),
{ok, State #mqstate { memory_size = QSize + MsgSize,
@@ -418,16 +411,64 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
+%% The last 2 params are accumulators. We work through the publishes,
+%% sorting out our msgbuf as we go. Finally, when no more work to do,
+%% we commit first transient, and the persistent msgs. This is safe
+%% because in case of failure, transient messages will be lost on
+%% restart anyway.
+commit_to_queues(_IsDurable, _Q, _TransQ, MsgBuf, [], [], [], []) ->
+ MsgBuf;
+commit_to_queues(_IsDurable, Q, _TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, []) ->
+ MsgIds = lists:flatten(lists:reverse(PersistMsgIds)),
+ ok = rabbit_disk_queue:tx_commit(Q, MsgIds, AckTags),
+ MsgBuf;
+commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, TransMsgIds) ->
+ MsgIds = lists:flatten(lists:reverse(TransMsgIds)),
+ ok = rabbit_disk_queue:tx_commit(TransQ, MsgIds, []),
+ commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, []);
+commit_to_queues(false, Q, TransQ, MsgBuf, AckTags, Publishes, [], []) ->
+ MsgIds = only_msg_ids(Publishes),
+ MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(false, Q, TransQ, MsgBuf1, AckTags, [], [], [MsgIds]);
+commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes =
+ [#basic_message { is_persistent = true } | _],
+ PersistAcc, TransAcc) ->
+ {Persist, Publishes1} = lists:splitwith(fun is_persistent/1, Publishes),
+ MsgIds = only_msg_ids(Persist),
+ MsgBuf1 = inc_queue_length(Q, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
+ [MsgIds | PersistAcc], TransAcc);
+commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes,
+ PersistAcc, TransAcc) ->
+ %% not persistent
+ {Trans, Publishes1} = lists:splitwith(fun is_not_persistent/1, Publishes),
+ MsgIds = only_msg_ids(Trans),
+ MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
+ PersistAcc, [MsgIds | TransAcc]).
+
+is_persistent(#basic_message { is_persistent = IsPersistent }) ->
+ IsPersistent.
+
+is_not_persistent(#basic_message { is_persistent = IsPersistent }) ->
+ not IsPersistent.
+
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize, memory_loss = Loss }) ->
+ memory_size = QSize, memory_loss = Loss,
+ is_durable = IsDurable, msg_buf = MsgBuf }) ->
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
- true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
- RealAcks)
- end,
+ MsgBuf1 = case ([] == Publishes) andalso ([] == RealAcks) of
+ true -> MsgBuf;
+ false -> commit_to_queues
+ (IsDurable, Q, transient_queue(Q), MsgBuf,
+ RealAcks, Publishes, [], [])
+ end,
{ok, State #mqstate { length = Length + erlang:length(Publishes),
- memory_size = QSize - ASize,
+ msg_buf = MsgBuf1, memory_size = QSize - ASize,
memory_loss = Loss + ASize }};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
@@ -436,19 +477,17 @@ tx_commit(Publishes, MsgsWithAcks,
{PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf2}) ->
- OnDisk = IsPersistent andalso IsDurable,
Acc1 =
- if OnDisk ->
- [Msg #basic_message.guid | Acc];
- true -> Acc
+ case IsPersistent andalso IsDurable of
+ true -> [Msg #basic_message.guid | Acc];
+ false -> Acc
end,
- {Acc1, queue:in({Msg, false, OnDisk}, MsgBuf2)}
+ {Acc1, queue:in({Msg, false}, MsgBuf2)}
end, {[], MsgBuf}, Publishes),
- %% foldl reverses, so re-reverse PersistentPubs to match
- %% requirements of rabbit_disk_queue (ascending SeqIds)
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
- true ->
+ ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of
+ true -> ok;
+ false ->
rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
@@ -490,28 +529,25 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
- length = Length
- }) ->
+ length = Length,
+ msg_buf = MsgBuf }) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- Requeue
+ TransQ = transient_queue(Q),
+ {MsgBuf1, PersistRQ}
= lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag},
+ {MB, PRQ})
when IsDurable andalso IsPersistent ->
- [{AckTag, true} | RQ];
- ({Msg, _AckTag}, RQ) ->
- ok = case RQ == [] of
- true -> ok;
- false -> rabbit_disk_queue:requeue(
- Q, lists:reverse(RQ))
- end,
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- []
- end, [], MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- {ok,
- State #mqstate { length = Length + erlang:length(MessagesWithAckTags) }};
+ {inc_queue_length(Q, MB, 1), [{AckTag, true} | PRQ]};
+ ({Msg, noack}, {MB, PRQ}) ->
+ ok = rabbit_disk_queue:publish(TransQ, Msg, true),
+ {inc_queue_length(TransQ, MB, 1), PRQ}
+ end, {MsgBuf, []}, MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(PersistRQ)),
+ {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
+ msg_buf = MsgBuf1 }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
@@ -521,40 +557,33 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
{Acc, MsgBuf2}) ->
- OnDisk = IsDurable andalso IsPersistent,
Acc1 =
- if OnDisk -> [{AckTag, true} | Acc];
- true -> Acc
+ case IsDurable andalso IsPersistent of
+ true -> [{AckTag, true} | Acc];
+ false -> Acc
end,
- {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)}
+ {Acc1, queue:in({Msg, true}, MsgBuf2)}
end, {[], MsgBuf}, MessagesWithAckTags),
- ok = if [] == PersistentPubs -> ok;
- true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
+ ok = case PersistentPubs of
+ [] -> ok;
+ _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
{ok, State #mqstate {msg_buf = MsgBuf1,
length = Length + erlang:length(MessagesWithAckTags)}}.
-purge(State = #mqstate { queue = Q, mode = disk, length = Count,
- memory_loss = Loss, memory_size = QSize }) ->
- Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0, memory_size = 0,
- memory_loss = Loss + QSize }};
-purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
+purge(State = #mqstate { queue = Q, length = Count,
memory_loss = Loss, memory_size = QSize }) ->
- rabbit_disk_queue:purge(Q),
- {Length,
- State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
- memory_loss = Loss + QSize }}.
+ Len1 = rabbit_disk_queue:purge(Q),
+ Len2 = rabbit_disk_queue:purge(transient_queue(Q)),
+ true = Count >= Len1 + Len2,
+ {Count, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
+ memory_loss = Loss + QSize }}.
-delete_queue(State = #mqstate { queue = Q, mode = disk, memory_size = QSize,
- memory_loss = Loss }) ->
- rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0, memory_size = 0,
- memory_loss = Loss + QSize }};
-delete_queue(State = #mqstate { queue = Q, mode = mixed, memory_size = QSize,
+delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss }) ->
- rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
+ ok = rabbit_disk_queue:delete_queue(Q),
+ ok = rabbit_disk_queue:delete_queue(transient_queue(Q)),
+ {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
memory_loss = Loss + QSize }}.
length(#mqstate { length = Length }) ->