summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl48
-rw-r--r--src/rabbit_mixed_queue.erl261
-rw-r--r--src/rabbit_tests.erl60
3 files changed, 245 insertions, 124 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8a018d969d..5c1f969e1e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,8 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
- tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
+-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
dump_queue/1, delete_non_durable_queues/1
]).
@@ -232,8 +232,8 @@
-spec(start_link/0 :: () ->
{'ok', pid()} | 'ignore' | {'error', any()}).
--spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
--spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id_or_next(), binary()) -> 'ok').
+-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok').
+-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
@@ -267,11 +267,16 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE,
[?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
-publish(Q, MsgId, Msg) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}).
+publish(Q, MsgId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg});
+publish(Q, MsgId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity).
-publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}).
+publish_with_seq(Q, MsgId, SeqId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg});
+publish_with_seq(Q, MsgId, SeqId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg},
+ infinity).
deliver(Q) ->
gen_server2:call(?SERVER, {deliver, Q}, infinity).
@@ -285,12 +290,14 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}).
-tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+tx_commit(Q, PubMsgIds, AckSeqIds)
+ when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds)
when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) ->
- gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity).
+ gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds},
+ infinity).
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
@@ -332,8 +339,7 @@ next_write_seq(Q) ->
gen_server2:call(?SERVER, {next_write_seq, Q}, infinity).
is_empty(Q) ->
- Length = rabbit_disk_queue:length(Q),
- Length == 0.
+ 0 == rabbit_disk_queue:length(Q).
%% ---- GEN-SERVER INTERNAL API ----
@@ -407,6 +413,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
end,
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
+handle_call({publish, Q, MsgId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, next, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
+handle_call({publish_with_seq, Q, MsgId, SeqId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, SeqId, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, State),
{reply, Result, State1};
@@ -475,10 +489,10 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{reply, ok, State1}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
- {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State),
{noreply, State1};
handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) ->
- {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State),
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State),
{noreply, State1};
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
@@ -870,7 +884,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{ok, State2 #dqstate { current_dirty = IsDirty2 }}.
%% SeqId can be 'next'
-internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
+internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(MsgId, MsgBody, State),
{ReadSeqId, WriteSeqId, Length} =
@@ -882,9 +896,9 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
next_seq_id = WriteSeqId3Next,
- is_delivered = false}),
+ is_delivered = IsDelivered}),
true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}),
- {ok, State1}.
+ {ok, {MsgId, WriteSeqId3}, State1}.
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index c14aef5c10..dae4dad150 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -50,23 +50,27 @@
).
start_link(Queue, IsDurable, disk) ->
- NextSeq = rabbit_disk_queue:next_write_seq(Queue),
- {ok, #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- next_write_seq = NextSeq, is_durable = IsDurable }};
+ purge_non_persistent_messages(
+ #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
+ next_write_seq = 0, is_durable = IsDurable });
start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
- to_mixed_mode(State #mqstate { next_write_seq = 0 }).
+ to_mixed_mode(State).
to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable,
next_write_seq = NextSeq }) ->
+ %% We enqueue _everything_ here. This means that should a message
+ %% already be in the disk queue we must remove it and add it back
+ %% in. Fortunately, by using requeue, we avoid rewriting the
+ %% message on disk.
+ %% Note we also batch together messages on disk so that we minimise
+ %% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
{NextSeq1, Requeue} =
lists:foldl(
- fun ({_Seq, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered}, {NSeq, RQueueAcc}) ->
- if IsDurable andalso IsPersistent ->
+ fun ({_Seq, Msg = #basic_message { guid = MsgId },
+ IsDelivered, OnDisk}, {NSeq, RQueueAcc}) ->
+ if OnDisk ->
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
{NSeq + 1,
@@ -78,7 +82,7 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
Q, lists:reverse(RQueueAcc))
end,
ok = rabbit_disk_queue:publish_with_seq(
- Q, MsgId, NSeq, msg_to_bin(Msg)),
+ Q, MsgId, NSeq, msg_to_bin(Msg), false),
{NSeq + 1, []}
end
end, {NextSeq, []}, Msgs),
@@ -89,22 +93,52 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
{ok, State #mqstate { mode = disk, msg_buf = queue:new(),
next_write_seq = NextSeq1 }}.
-to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q,
- next_write_seq = NextSeq }) ->
+to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) ->
+ %% load up a new queue with everything that's on disk.
+ %% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
{MsgBuf1, NextSeq1} =
lists:foldl(
fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq})
when SeqId >= NSeq ->
- Msg = #basic_message { guid = MsgId }
- = bin_to_msg(MsgBin),
- Buf1 = queue:in({SeqId,
- Msg #basic_message { is_persistent = true },
- IsDelivered}, Buf),
- NSeq1 = SeqId + 1,
- {Buf1, NSeq1}
- end, {MsgBuf, NextSeq}, QList),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }}.
+ Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
+ {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId + 1}
+ end, {queue:new(), 0}, QList),
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1,
+ next_write_seq = NextSeq1 }}.
+
+purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% iterate through the content on disk, ack anything which isn't
+ %% persistent, accumulate everything else that is persistent and
+ %% requeue it
+ NextSeq = rabbit_disk_queue:next_write_seq(Q),
+ {Acks, Requeue, NextSeq2} =
+ deliver_all_messages(Q, IsDurable, [], [], NextSeq),
+ ok = if Requeue == [] -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ end,
+ ok = if Acks == [] -> ok;
+ true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
+ end,
+ {ok, State #mqstate { next_write_seq = NextSeq2 }}.
+
+deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {Acks, Requeue, NextSeq};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ bin_to_msg(MsgBin),
+ OnDisk = IsPersistent andalso IsDurable,
+ {Acks2, Requeue2, NextSeq2} =
+ if OnDisk -> {Acks,
+ [{AckTag, {NextSeq, IsDelivered}} | Requeue],
+ NextSeq + 1
+ };
+ true -> {[AckTag | Acks], Requeue, NextSeq}
+ end,
+ deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2)
+ end.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -115,48 +149,78 @@ bin_to_msg(MsgBin) ->
publish(Msg = #basic_message { guid = MsgId },
State = #mqstate { mode = disk, queue = Q }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
{ok, State};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
- ok = if IsDurable andalso IsPersistent ->
- rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg));
+ OnDisk = IsDurable andalso IsPersistent,
+ ok = if OnDisk ->
+ rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq,
+ msg_to_bin(Msg), false);
true -> ok
end,
{ok, State #mqstate { next_write_seq = NextSeq + 1,
- msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf)
- }}.
+ msg_buf = queue:in({NextSeq, Msg, false, OnDisk},
+ MsgBuf)
+ }}.
-%% assumption here is that the queue is empty already (only called via attempt_immediate_delivery)
-publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent},
- State = #mqstate { mode = Mode, queue = Q, is_durable = IsDurable,
- next_write_seq = NextSeq })
+%% Assumption here is that the queue is empty already (only called via
+%% attempt_immediate_delivery). Also note that the seq id assigned by
+%% the disk queue could well not be the same as the NextSeq (true =
+%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't
+%% matter because the AckTag will still be correct (AckTags for
+%% non-persistent messages don't exist). (next_write_seq is actually
+%% only used to calculate how many messages are in the queue).
+publish_delivered(Msg =
+ #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ State = #mqstate { mode = Mode, is_durable = IsDurable,
+ next_write_seq = NextSeq, queue = Q })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ true = rabbit_disk_queue:is_empty(Q),
+ rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ %% must call phantom_deliver otherwise the msg remains at the head
+ %% of the queue
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
- true -> State
- end,
+ State2 =
+ if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
+ true -> State
+ end,
{ok, AckTag, State2};
-publish_delivered(_Msg, State = #mqstate { mode = mixed }) ->
+publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+ true = queue:is_empty(MsgBuf),
{ok, noack, State}.
-deliver(State = #mqstate { mode = disk, queue = Q }) ->
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q),
- Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
- {{Msg, IsDelivered, AckTag, Remaining}, State};
-deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- next_write_seq = NextWrite, is_durable = IsDurable }) ->
+deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {empty, State};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ Msg = bin_to_msg(MsgBin),
+ AckTag2 = if IsPersistent andalso IsDurable -> AckTag;
+ true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ noack
+ end,
+ {{Msg, IsDelivered, AckTag2, Remaining}, State}
+ end;
+
+deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
+ next_write_seq = NextWrite, msg_buf = MsgBuf }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
empty ->
{empty, State};
- {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} ->
+ {value, {Seq, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ IsDelivered, OnDisk}} ->
AckTag =
- if IsDurable andalso IsPersistent ->
- {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
- AckTag2;
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag2, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ if IsPersistent andalso IsDurable -> AckTag2;
+ true -> ok = rabbit_disk_queue:ack(Q, [AckTag2]),
+ noack
+ end;
true -> noack
end,
{{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)},
@@ -173,7 +237,8 @@ ack(Acks, State = #mqstate { queue = Q }) ->
{ok, State}
end.
-tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) ->
+tx_publish(Msg = #basic_message { guid = MsgId },
+ State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
@@ -182,13 +247,18 @@ tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
+ %% this message will reappear in the tx_commit, so ignore for now
{ok, State}.
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
- ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks),
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
{ok, State};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
@@ -198,47 +268,69 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
{PersistentPubs, MsgBuf2, NextSeq2} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsPersistent andalso IsDurable,
Acc2 =
- if IsPersistent ->
- [{Msg #basic_message.guid, NextSeq3} | Acc];
+ if OnDisk ->
+ [{Msg #basic_message.guid, NextSeq3}
+ | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, Msg, false}, MsgBuf3),
+ MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk},
+ MsgBuf3),
{Acc2, MsgBuf4, NextSeq3 + 1}
end, {[], MsgBuf, NextSeq}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
- PersistentPubs2 = if IsDurable -> lists:reverse(PersistentPubs);
- true -> []
- end,
- ok = rabbit_disk_queue:tx_commit_with_seqs(Q, PersistentPubs2,
- remove_noacks(Acks)),
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
+ true ->
+ rabbit_disk_queue:tx_commit_with_seqs(
+ Q, lists:reverse(PersistentPubs), RealAcks)
+ end,
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
only_persistent_msg_ids(Pubs) ->
- lists:reverse(lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
- Acc) ->
- if IsPersistent -> [Msg #basic_message.guid | Acc];
- true -> Acc
- end
- end, [], Pubs)).
+ lists:reverse(
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ if IsPersistent -> [Msg #basic_message.guid | Acc];
+ true -> Acc
+ end
+ end, [], Pubs)).
tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
{ok, State};
-tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
- MsgIds = if IsDurable -> only_persistent_msg_ids(Publishes);
- true -> []
- end,
- ok = rabbit_disk_queue:tx_cancel(MsgIds),
+tx_cancel(Publishes,
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ ok =
+ if IsDurable ->
+ rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
+ true -> ok
+ end,
{ok, State}.
-only_ack_tags(MsgWithAcks) ->
- lists:map(fun (P) -> element(2, P) end, MsgWithAcks).
-
%% [{Msg, AckTag}]
-requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) ->
- rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)),
+requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% here, we may have messages with no ack tags, because of the
+ %% fact they are not persistent, but nevertheless we want to
+ %% requeue them. This means publishing them delivered.
+ Requeue
+ = lists:foldl(
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ when IsPersistent andalso IsDurable ->
+ [AckTag | RQ];
+ ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ ok = if RQ == [] -> ok;
+ true -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
+ end,
+ _AckTag2 = rabbit_disk_queue:publish(
+ Q, MsgId, msg_to_bin(Msg), true),
+ []
+ end, [], MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
{ok, State};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
@@ -246,18 +338,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
is_durable = IsDurable
}) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag},
- {Acc, MsgBuf3, NextSeq3}) ->
- Acc2 =
- if IsDurable andalso IsPersistent ->
- {MsgId, _OldSeqId} = AckTag,
- [{AckTag, {NextSeq3, true}} | Acc];
- true -> Acc
- end,
- MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)),
+ lists:foldl(
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ {Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsDurable andalso IsPersistent,
+ Acc2 =
+ if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
+ ok = if [] == PersistentPubs -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(PersistentPubs))
+ end,
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
purge(State = #mqstate { queue = Q, mode = disk }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3d173e2e45..a2a31a181a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -264,7 +264,7 @@ test_log_management() ->
%% original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
{error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
%% logging directed to tty (handlers were removed in last test)
ok = clean_logs([MainLog, SaslLog], Suffix),
@@ -283,7 +283,7 @@ test_log_management() ->
ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
- {rabbit_sasl_report_file_h, SaslLog}]),
+ {rabbit_sasl_report_file_h, SaslLog}]),
passed.
test_log_management_during_startup() ->
@@ -689,6 +689,20 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
rdq_stop(),
+ rdq_virgin(),
+ passed = rdq_stress_gc(10000),
+ passed = rdq_test_startup_with_queue_gaps(),
+ passed = rdq_test_redeliver(),
+ passed = rdq_test_purge(),
+ passed = rdq_test_dump_queue(),
+ passed = rdq_test_mixed_queue_modes(),
+ rdq_virgin(),
+ ok = control_action(stop_app, []),
+ ok = control_action(start_app, []),
+ passed.
+
+benchmark_disk_queue() ->
+ rdq_stop(),
% unicode chars are supported properly from r13 onwards
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
@@ -698,12 +712,6 @@ test_disk_queue() ->
MsgCount <- [1024, 4096, 16384]
],
rdq_virgin(),
- passed = rdq_stress_gc(10000),
- passed = rdq_test_startup_with_queue_gaps(),
- passed = rdq_test_redeliver(),
- passed = rdq_test_purge(),
- passed = rdq_test_dump_queue(),
- rdq_virgin(),
ok = control_action(stop_app, []),
ok = control_action(start_app, []),
passed.
@@ -953,49 +961,52 @@ rdq_test_mixed_queue_modes() ->
end, MS4, lists:seq(1,10)),
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages~n"),
- {ok, _MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ 30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode~n"),
- rdq_stop(),
- rdq_start(),
- {ok, MS8} = rabbit_mixed_queue:start_link(q, true, mixed),
+ {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7),
30 = rabbit_mixed_queue:length(MS8),
- io:format("Recovered queue~n"),
+ io:format("Converted to mixed mode~n"),
MS10 =
lists:foldl(
fun (N, MS9) ->
Rem = 30 - N,
- {{#basic_message { is_persistent = true },
+ {{#basic_message { is_persistent = false },
false, _AckTag, Rem},
MS9a} = rabbit_mixed_queue:deliver(MS9),
MS9a
end, MS8, lists:seq(1,10)),
+ 20 = rabbit_mixed_queue:length(MS10),
io:format("Delivered initial non persistent messages~n"),
- {ok, _MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ 20 = rabbit_mixed_queue:length(MS11),
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
{ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed),
- 30 = rabbit_mixed_queue:length(MS12),
+ 10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
lists:foldl(
fun (N, {MS13, AcksAcc}) ->
- Rem = 30 - N,
- IsDelivered = N < 11,
+ Rem = 10 - N,
{{#basic_message { is_persistent = true },
- IsDelivered, AckTag, Rem},
+ false, AckTag, Rem},
MS13a} = rabbit_mixed_queue:deliver(MS13),
{MS13a, [AckTag | AcksAcc]}
- end, {MS2, []}, lists:seq(1,20)),
+ end, {MS12, []}, lists:seq(1,10)),
+ 0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
- io:format("Delivered and acked initial non persistent messages~n"),
- {ok, _MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ io:format("Delivered and acked all messages~n"),
+ {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ 0 = rabbit_mixed_queue:length(MS16),
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
{ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed),
- 10 = rabbit_mixed_queue:length(MS17),
+ 0 = rabbit_mixed_queue:length(MS17),
io:format("Recovered queue~n"),
+ rdq_stop(),
passed.
rdq_time_commands(Funcs) ->
@@ -1010,7 +1021,8 @@ rdq_virgin() ->
rdq_start() ->
{ok, _} = rabbit_disk_queue:start_link(),
- rabbit_disk_queue:to_ram_disk_mode().
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ ok.
rdq_stop() ->
rabbit_disk_queue:stop(),