summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl49
-rw-r--r--src/rabbit_mixed_queue.erl439
2 files changed, 264 insertions, 224 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4d00bc3a29..8b1487770d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2
+ requeue_next_n/2, prefetch/2, length/1
]).
-export([filesync/0, cache_info/0]).
@@ -255,6 +255,7 @@
( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()},
non_neg_integer()})).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) ->
'ok').
@@ -262,11 +263,13 @@
-spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
+-spec(length/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
--spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
-spec(report_memory/0 :: () -> 'ok').
@@ -322,6 +325,9 @@ delete_non_durable_queues(DurableQueues) ->
gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
infinity).
+length(Q) ->
+ gen_server2:call(?SERVER, {length, Q}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -455,6 +461,9 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
+handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ reply(WriteSeqId - ReadSeqId, State);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -1033,7 +1042,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
last_sync_offset = SyncOffset
}) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId = InitWriteSeqId + length(PubMsgIds),
+ WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds),
{atomic, {InCurFile, WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
@@ -1088,7 +1097,8 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
+ undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
internal_requeue(_Q, [], State) ->
@@ -1524,7 +1534,8 @@ load_from_disk(State) ->
fun (#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = {Q, SeqId} },
true) ->
- case length(dets_ets_lookup(State1, MsgId)) of
+ case erlang:length
+ (dets_ets_lookup(State1, MsgId)) of
0 -> ok == mnesia:delete(rabbit_disk_queue,
{Q, SeqId}, write);
1 -> true
@@ -1622,13 +1633,13 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id)) of
+ case erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true =
@@ -1662,13 +1673,13 @@ recover_crashed_compactions(Files, TmpFiles) ->
verify_messages_in_mnesia(MsgIds) ->
lists:foreach(
fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id))
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id))
end, MsgIds).
recover_crashed_compactions1(Files, TmpFile) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 2ef534ff3d..a9013f3d8b 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -100,10 +100,12 @@
-endif.
init(Queue, IsDurable, disk) ->
- purge_non_persistent_messages(
- #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0, memory_size = 0,
- memory_gain = 0, memory_loss = 0 });
+ Len = rabbit_disk_queue:length(Queue),
+ ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
+ MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
+ is_durable = IsDurable, length = Len,
+ memory_size = 0, memory_gain = 0, memory_loss = 0 }};
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode([], State).
@@ -126,7 +128,10 @@ to_disk_only_mode(TxnMessages, State =
%% message on disk.
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
- ok = send_messages_to_disk(Q, MsgBuf, 0, 0, []),
+ TransQ = transient_queue(Q),
+ {ok, MsgBuf1} =
+ send_messages_to_disk(IsDurable, Q, TransQ, MsgBuf, 0, 0, [],
+ queue:new()),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -139,38 +144,49 @@ to_disk_only_mode(TxnMessages, State =
end
end, TxnMessages),
garbage_collect(),
- {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
+ {ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}.
-send_messages_to_disk(Q, Queue, RequeueCount, PublishCount, Commit) ->
+send_messages_to_disk(IsDurable, Q, TransQ, Queue, PublishCount, RequeueCount,
+ Commit, MsgBuf) ->
case queue:out(Queue) of
{empty, Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- [] = flush_requeue_to_disk_queue(Q, RequeueCount, []),
- ok;
- {{value, {Msg = #basic_message { guid = MsgId }, _IsDelivered, OnDisk}},
- Queue1} ->
- case OnDisk of
- true ->
- ok = flush_messages_to_disk_queue(Q, Commit),
+ ok = flush_messages_to_disk_queue(TransQ, Commit),
+ [] = flush_requeue_to_disk_queue(TransQ, RequeueCount, []),
+ {ok, MsgBuf};
+ {{value, {Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ _IsDelivered}}, Queue1} ->
+ case IsDurable andalso IsPersistent of
+ true -> %% it's already in the persistent Q
send_messages_to_disk(
- Q, Queue1, 1 + RequeueCount, 0, []);
+ IsDurable, Q, TransQ, Queue1, PublishCount, RequeueCount,
+ Commit, inc_queue_length(Q, MsgBuf, 1));
false ->
- Commit1 =
- flush_requeue_to_disk_queue(Q, RequeueCount, Commit),
+ Commit1 = flush_requeue_to_disk_queue
+ (TransQ, RequeueCount, Commit),
ok = rabbit_disk_queue:tx_publish(Msg),
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true ->
- ok = flush_messages_to_disk_queue(Q, Commit1),
- send_messages_to_disk(Q, Queue1, 0, 1, [MsgId]);
+ ok = flush_messages_to_disk_queue(TransQ, Commit1),
+ send_messages_to_disk(
+ IsDurable, Q, TransQ, Queue1, 1, 0, [MsgId],
+ inc_queue_length(TransQ, MsgBuf, 1));
false ->
- send_messages_to_disk
- (Q, Queue1, 0, PublishCount + 1,
- [MsgId | Commit1])
+ send_messages_to_disk(
+ IsDurable, Q, TransQ, Queue1, PublishCount + 1, 0,
+ [MsgId | Commit1],
+ inc_queue_length(TransQ, MsgBuf, 1))
end
end;
- {{value, {disk, Count}}, Queue2} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- send_messages_to_disk(Q, Queue2, RequeueCount + Count, 0, [])
+ {{value, {Q, Count}}, Queue1} ->
+ send_messages_to_disk(IsDurable, Q, TransQ, Queue1, PublishCount,
+ RequeueCount, Commit,
+ inc_queue_length(Q, MsgBuf, Count));
+ {{value, {TransQ, Count}}, Queue1} ->
+ ok = flush_messages_to_disk_queue(TransQ, Commit),
+ send_messages_to_disk(IsDurable, Q, TransQ, Queue1, 0,
+ RequeueCount + Count, [],
+ inc_queue_length(TransQ, MsgBuf, Count))
end.
flush_messages_to_disk_queue(Q, Commit) ->
@@ -192,17 +208,13 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) ->
to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
{ok, State};
to_mixed_mode(TxnMessages, State =
- #mqstate { mode = disk, queue = Q, length = Length,
- is_durable = IsDurable }) ->
+ #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable, msg_buf = MsgBuf }) ->
rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
%% load up a new queue with a token that says how many messages
- %% are on disk
+ %% are on disk (this is already built for us by the disk mode)
%% don't actually do anything to the disk
- MsgBuf = case Length of
- 0 -> queue:new();
- _ -> ok = rabbit_disk_queue:prefetch(Q, Length),
- queue:from_list([{disk, Length}])
- end,
+ ok = maybe_prefetch(MsgBuf),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -219,57 +231,58 @@ to_mixed_mode(TxnMessages, State =
true -> rabbit_disk_queue:tx_cancel(Cancel)
end,
garbage_collect(),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}.
-
-purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable,
- memory_size = 0 }) ->
- %% iterate through the content on disk, ack anything which isn't
- %% persistent, accumulate everything else that is persistent and
- %% requeue it
- {Acks, Requeue, Length, QSize} =
- deliver_all_messages(Q, IsDurable, [], [], 0, 0),
- ok = if Requeue == [] -> ok;
- true ->
- rabbit_disk_queue:requeue(Q, lists:reverse(Requeue))
- end,
- ok = if Acks == [] -> ok;
- true -> rabbit_disk_queue:ack(Q, Acks)
- end,
- {ok, State #mqstate { length = Length, memory_size = QSize }}.
-
-deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, QSize) ->
- case rabbit_disk_queue:deliver(Q) of
- empty -> {Acks, Requeue, Length, QSize};
- {Msg = #basic_message { is_persistent = IsPersistent },
- _Size, IsDelivered, AckTag, _Remaining} ->
- OnDisk = IsPersistent andalso IsDurable,
- {Acks1, Requeue1, Length1, QSize1} =
- if OnDisk -> { Acks,
- [{AckTag, IsDelivered} | Requeue],
- Length + 1, QSize + size_of_message(Msg) };
- true -> { [AckTag | Acks], Requeue, Length, QSize }
- end,
- deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1, QSize1)
+ {ok, State #mqstate { mode = mixed }}.
+
+transient_queue(Queue) ->
+ {Queue, transient}.
+
+inc_queue_length(_Queue, MsgBuf, 0) ->
+ MsgBuf;
+inc_queue_length(Queue, MsgBuf, Count) ->
+ case queue:out_r(MsgBuf) of
+ {empty, MsgBuf} ->
+ queue:in({Queue, Count}, MsgBuf);
+ {{value, {Queue, Len}}, MsgBuf1} ->
+ queue:in({Queue, Len + Count}, MsgBuf1);
+ {{value, _}, _MsgBuf1} ->
+ queue:in({Queue, Count}, MsgBuf)
end.
-publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize, memory_gain = Gain }) ->
- ok = rabbit_disk_queue:publish(Q, Msg, false),
+dec_queue_length(MsgBuf) ->
+ {{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf),
+ MsgBuf2 = case Len of
+ 1 -> ok = maybe_prefetch(MsgBuf1),
+ MsgBuf1;
+ _ -> queue:in_r({Queue, Len-1}, MsgBuf1)
+ end,
+ {Queue, MsgBuf2}.
+
+publish(Msg = #basic_message { is_persistent = IsPersistent },
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ is_durable = IsDurable, msg_buf = MsgBuf,
+ memory_size = QSize, memory_gain = Gain }) ->
+ Persist = IsDurable andalso IsPersistent,
+ PubQ = case Persist of
+ true -> Q;
+ false -> transient_queue(Q)
+ end,
+ MsgBuf1 = inc_queue_length(PubQ, MsgBuf, 1),
+ ok = rabbit_disk_queue:publish(PubQ, Msg, false),
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { length = Length + 1, memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }};
+ {ok, State #mqstate { memory_gain = Gain + MsgSize,
+ memory_size = QSize + MsgSize,
+ msg_buf = MsgBuf1, length = Length + 1 }};
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length, memory_size = QSize,
memory_gain = Gain }) ->
- OnDisk = IsDurable andalso IsPersistent,
- ok = if OnDisk ->
- rabbit_disk_queue:publish(Q, Msg, false);
- true -> ok
+ Persist = IsDurable andalso IsPersistent,
+ ok = case Persist of
+ true -> rabbit_disk_queue:publish(Q, Msg, false);
+ false -> ok
end,
MsgSize = size_of_message(Msg),
- {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
+ {ok, State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
length = Length + 1, memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize }}.
@@ -279,23 +292,29 @@ publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
State =
#mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0, memory_size = QSize,
- memory_gain = Gain })
+ queue = Q, length = 0,
+ memory_size = QSize, memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- rabbit_disk_queue:publish(Q, Msg, false),
+ Persist = IsDurable andalso IsPersistent,
+ PubQ = case Persist of
+ true -> Q;
+ false -> transient_queue(Q)
+ end,
+ rabbit_disk_queue:publish(PubQ, Msg, false),
MsgSize = size_of_message(Msg),
State1 = State #mqstate { memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize },
- if IsDurable andalso IsPersistent ->
+ case Persist of
+ true ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ),
{ok, AckTag, State1};
- true ->
+ false ->
%% in this case, we don't actually care about the ack, so
%% auto ack it (asynchronously).
- ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ ok = rabbit_disk_queue:auto_ack_next_message(PubQ),
{ok, noack, State1}
end;
publish_delivered(Msg, State =
@@ -307,103 +326,77 @@ publish_delivered(Msg, State =
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
-deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
- length = Length }) ->
- {Msg = #basic_message { is_persistent = IsPersistent },
- _Size, IsDelivered, AckTag, Remaining}
- = rabbit_disk_queue:deliver(Q),
- AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
- true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
- noack
- end,
- {{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1 }};
-deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
+deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
is_durable = IsDurable, length = Length }) ->
- {{value, Value}, MsgBuf1}
- = queue:out(MsgBuf),
+ {{value, Value}, MsgBuf1} = queue:out(MsgBuf),
{Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
{Msg1 = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- IsDelivered1, OnDisk} ->
+ IsDelivered1} ->
AckTag1 =
- case OnDisk of
+ case IsDurable andalso IsPersistent of
true ->
- case IsPersistent andalso IsDurable of
- true ->
- {MsgId, IsDelivered1, AckTag2, _PersistRem}
- = rabbit_disk_queue:phantom_deliver(Q),
- AckTag2;
- false ->
- ok = rabbit_disk_queue:auto_ack_next_message
- (Q),
- noack
- end;
- false -> noack
+ {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ AckTag2;
+ false ->
+ noack
end,
- ok = maybe_prefetch(Q, MsgBuf1),
+ ok = maybe_prefetch(MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
- {disk, Rem1} ->
+ _ ->
+ {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
- = rabbit_disk_queue:deliver(Q),
+ = rabbit_disk_queue:deliver(ReadQ),
AckTag2 =
- case IsPersistent andalso IsDurable of
- true -> AckTag1;
- false -> rabbit_disk_queue:ack(Q, [AckTag1]),
- noack
+ case IsDurable andalso IsPersistent of
+ true ->
+ AckTag1;
+ false ->
+ ok = rabbit_disk_queue:ack(ReadQ, [AckTag1]),
+ noack
end,
- MsgBuf3 = case Rem1 of
- 1 -> ok = maybe_prefetch(Q, MsgBuf1),
- MsgBuf1;
- _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1)
- end,
{Msg1, IsDelivered1, AckTag2, MsgBuf3}
end,
Rem = Length - 1,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(Q, MsgBuf) ->
+maybe_prefetch(MsgBuf) ->
case queue:peek(MsgBuf) of
- empty -> ok;
- {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count);
- {value, _} -> ok
+ empty ->
+ ok;
+ {value, {#basic_message {}, _IsDelivered}} ->
+ ok;
+ {value, {Q, Count}} ->
+ rabbit_disk_queue:prefetch(Q, Count)
end.
-
remove_noacks(MsgsWithAcks) ->
- {AckTags, ASize} =
- lists:foldl(
- fun ({Msg, noack}, {AccAckTags, AccSize}) ->
- {AccAckTags, size_of_message(Msg) + AccSize};
- ({Msg, AckTag}, {AccAckTags, AccSize}) ->
- {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
- end, {[], 0}, MsgsWithAcks),
- {AckTags, ASize}.
+ lists:foldl(
+ fun ({Msg, noack}, {AccAckTags, AccSize}) ->
+ {AccAckTags, size_of_message(Msg) + AccSize};
+ ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
+ end, {[], 0}, MsgsWithAcks).
ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss }) ->
- ASize = case remove_noacks(MsgsWithAcks) of
- {[], ASize1} -> ASize1;
- {AckTags, ASize1} -> rabbit_disk_queue:ack(Q, AckTags),
- ASize1
+ {AckTags, ASize} = remove_noacks(MsgsWithAcks),
+ ok = case AckTags of
+ [] -> ok;
+ _ -> rabbit_disk_queue:ack(Q, AckTags)
end,
State1 = State #mqstate { memory_size = QSize - ASize,
memory_loss = Loss + ASize },
{ok, State1}.
-tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize,
- memory_gain = Gain }) ->
- ok = rabbit_disk_queue:tx_publish(Msg),
- MsgSize = size_of_message(Msg),
- {ok, State #mqstate { memory_size = QSize + MsgSize,
- memory_gain = Gain + MsgSize }};
-tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
- #mqstate { mode = mixed, is_durable = IsDurable,
- memory_size = QSize, memory_gain = Gain })
- when IsDurable andalso IsPersistent ->
+tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
+ State = #mqstate { mode = Mode, memory_size = QSize,
+ is_durable = IsDurable, memory_gain = Gain })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
ok = rabbit_disk_queue:tx_publish(Msg),
MsgSize = size_of_message(Msg),
{ok, State #mqstate { memory_size = QSize + MsgSize,
@@ -418,16 +411,64 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
+%% The last 2 params are accumulators. We work through the publishes,
+%% sorting out our msgbuf as we go. Finally, when no more work to do,
+%% we commit first transient, and the persistent msgs. This is safe
+%% because in case of failure, transient messages will be lost on
+%% restart anyway.
+commit_to_queues(_IsDurable, _Q, _TransQ, MsgBuf, [], [], [], []) ->
+ MsgBuf;
+commit_to_queues(_IsDurable, Q, _TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, []) ->
+ MsgIds = lists:flatten(lists:reverse(PersistMsgIds)),
+ ok = rabbit_disk_queue:tx_commit(Q, MsgIds, AckTags),
+ MsgBuf;
+commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, TransMsgIds) ->
+ MsgIds = lists:flatten(lists:reverse(TransMsgIds)),
+ ok = rabbit_disk_queue:tx_commit(TransQ, MsgIds, []),
+ commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
+ PersistMsgIds, []);
+commit_to_queues(false, Q, TransQ, MsgBuf, AckTags, Publishes, [], []) ->
+ MsgIds = only_msg_ids(Publishes),
+ MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(false, Q, TransQ, MsgBuf1, AckTags, [], [], [MsgIds]);
+commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes =
+ [#basic_message { is_persistent = true } | _],
+ PersistAcc, TransAcc) ->
+ {Persist, Publishes1} = lists:splitwith(fun is_persistent/1, Publishes),
+ MsgIds = only_msg_ids(Persist),
+ MsgBuf1 = inc_queue_length(Q, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
+ [MsgIds | PersistAcc], TransAcc);
+commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes,
+ PersistAcc, TransAcc) ->
+ %% not persistent
+ {Trans, Publishes1} = lists:splitwith(fun is_not_persistent/1, Publishes),
+ MsgIds = only_msg_ids(Trans),
+ MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
+ commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
+ PersistAcc, [MsgIds | TransAcc]).
+
+is_persistent(#basic_message { is_persistent = IsPersistent }) ->
+ IsPersistent.
+
+is_not_persistent(#basic_message { is_persistent = IsPersistent }) ->
+ not IsPersistent.
+
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize, memory_loss = Loss }) ->
+ memory_size = QSize, memory_loss = Loss,
+ is_durable = IsDurable, msg_buf = MsgBuf }) ->
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
- true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
- RealAcks)
- end,
+ MsgBuf1 = case ([] == Publishes) andalso ([] == RealAcks) of
+ true -> MsgBuf;
+ false -> commit_to_queues
+ (IsDurable, Q, transient_queue(Q), MsgBuf,
+ RealAcks, Publishes, [], [])
+ end,
{ok, State #mqstate { length = Length + erlang:length(Publishes),
- memory_size = QSize - ASize,
+ msg_buf = MsgBuf1, memory_size = QSize - ASize,
memory_loss = Loss + ASize }};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
@@ -436,19 +477,17 @@ tx_commit(Publishes, MsgsWithAcks,
{PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf2}) ->
- OnDisk = IsPersistent andalso IsDurable,
Acc1 =
- if OnDisk ->
- [Msg #basic_message.guid | Acc];
- true -> Acc
+ case IsPersistent andalso IsDurable of
+ true -> [Msg #basic_message.guid | Acc];
+ false -> Acc
end,
- {Acc1, queue:in({Msg, false, OnDisk}, MsgBuf2)}
+ {Acc1, queue:in({Msg, false}, MsgBuf2)}
end, {[], MsgBuf}, Publishes),
- %% foldl reverses, so re-reverse PersistentPubs to match
- %% requirements of rabbit_disk_queue (ascending SeqIds)
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
- true ->
+ ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of
+ true -> ok;
+ false ->
rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
@@ -490,28 +529,25 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
- length = Length
- }) ->
+ length = Length,
+ msg_buf = MsgBuf }) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- Requeue
+ TransQ = transient_queue(Q),
+ {MsgBuf1, PersistRQ}
= lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag},
+ {MB, PRQ})
when IsDurable andalso IsPersistent ->
- [{AckTag, true} | RQ];
- ({Msg, _AckTag}, RQ) ->
- ok = case RQ == [] of
- true -> ok;
- false -> rabbit_disk_queue:requeue(
- Q, lists:reverse(RQ))
- end,
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- []
- end, [], MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- {ok,
- State #mqstate { length = Length + erlang:length(MessagesWithAckTags) }};
+ {inc_queue_length(Q, MB, 1), [{AckTag, true} | PRQ]};
+ ({Msg, noack}, {MB, PRQ}) ->
+ ok = rabbit_disk_queue:publish(TransQ, Msg, true),
+ {inc_queue_length(TransQ, MB, 1), PRQ}
+ end, {MsgBuf, []}, MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(PersistRQ)),
+ {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
+ msg_buf = MsgBuf1 }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
@@ -521,40 +557,33 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
{Acc, MsgBuf2}) ->
- OnDisk = IsDurable andalso IsPersistent,
Acc1 =
- if OnDisk -> [{AckTag, true} | Acc];
- true -> Acc
+ case IsDurable andalso IsPersistent of
+ true -> [{AckTag, true} | Acc];
+ false -> Acc
end,
- {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)}
+ {Acc1, queue:in({Msg, true}, MsgBuf2)}
end, {[], MsgBuf}, MessagesWithAckTags),
- ok = if [] == PersistentPubs -> ok;
- true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
+ ok = case PersistentPubs of
+ [] -> ok;
+ _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
{ok, State #mqstate {msg_buf = MsgBuf1,
length = Length + erlang:length(MessagesWithAckTags)}}.
-purge(State = #mqstate { queue = Q, mode = disk, length = Count,
- memory_loss = Loss, memory_size = QSize }) ->
- Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0, memory_size = 0,
- memory_loss = Loss + QSize }};
-purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
+purge(State = #mqstate { queue = Q, length = Count,
memory_loss = Loss, memory_size = QSize }) ->
- rabbit_disk_queue:purge(Q),
- {Length,
- State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
- memory_loss = Loss + QSize }}.
+ Len1 = rabbit_disk_queue:purge(Q),
+ Len2 = rabbit_disk_queue:purge(transient_queue(Q)),
+ true = Count >= Len1 + Len2,
+ {Count, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
+ memory_loss = Loss + QSize }}.
-delete_queue(State = #mqstate { queue = Q, mode = disk, memory_size = QSize,
- memory_loss = Loss }) ->
- rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0, memory_size = 0,
- memory_loss = Loss + QSize }};
-delete_queue(State = #mqstate { queue = Q, mode = mixed, memory_size = QSize,
+delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss }) ->
- rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
+ ok = rabbit_disk_queue:delete_queue(Q),
+ ok = rabbit_disk_queue:delete_queue(transient_queue(Q)),
+ {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
memory_loss = Loss + QSize }}.
length(#mqstate { length = Length }) ->