diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 261 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 60 |
3 files changed, 245 insertions, 124 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 8a018d969d..5c1f969e1e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,8 +38,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, - tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, +-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, dump_queue/1, delete_non_durable_queues/1 ]). @@ -232,8 +232,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). --spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id_or_next(), binary()) -> 'ok'). +-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok'). +-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). @@ -267,11 +267,16 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). -publish(Q, MsgId, Msg) when is_binary(Msg) -> - gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}). +publish(Q, MsgId, Msg, false) when is_binary(Msg) -> + gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}); +publish(Q, MsgId, Msg, true) when is_binary(Msg) -> + gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity). -publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) -> - gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}). +publish_with_seq(Q, MsgId, SeqId, Msg, false) when is_binary(Msg) -> + gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}); +publish_with_seq(Q, MsgId, SeqId, Msg, true) when is_binary(Msg) -> + gen_server2:call(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}, + infinity). deliver(Q) -> gen_server2:call(?SERVER, {deliver, Q}, infinity). @@ -285,12 +290,14 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> tx_publish(MsgId, Msg) when is_binary(Msg) -> gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}). -tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> +tx_commit(Q, PubMsgIds, AckSeqIds) + when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds) when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) -> - gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity). + gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, + infinity). tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). @@ -332,8 +339,7 @@ next_write_seq(Q) -> gen_server2:call(?SERVER, {next_write_seq, Q}, infinity). is_empty(Q) -> - Length = rabbit_disk_queue:length(Q), - Length == 0. + 0 == rabbit_disk_queue:length(Q). %% ---- GEN-SERVER INTERNAL API ---- @@ -407,6 +413,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> end, {ok, State1 #dqstate { current_file_handle = FileHdl }}. +handle_call({publish, Q, MsgId, MsgBody}, _From, State) -> + {ok, MsgSeqId, State1} = + internal_publish(Q, MsgId, next, MsgBody, true, State), + {reply, MsgSeqId, State1}; +handle_call({publish_with_seq, Q, MsgId, SeqId, MsgBody}, _From, State) -> + {ok, MsgSeqId, State1} = + internal_publish(Q, MsgId, SeqId, MsgBody, true, State), + {reply, MsgSeqId, State1}; handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, State), {reply, Result, State1}; @@ -475,10 +489,10 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {reply, ok, State1}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> - {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), + {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State), {noreply, State1}; handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) -> - {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State), + {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State), {noreply, State1}; handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), @@ -870,7 +884,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, {ok, State2 #dqstate { current_dirty = IsDirty2 }}. %% SeqId can be 'next' -internal_publish(Q, MsgId, SeqId, MsgBody, State) -> +internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), {ReadSeqId, WriteSeqId, Length} = @@ -882,9 +896,9 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, next_seq_id = WriteSeqId3Next, - is_delivered = false}), + is_delivered = IsDelivered}), true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}), - {ok, State1}. + {ok, {MsgId, WriteSeqId3}, State1}. internal_tx_cancel(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index c14aef5c10..dae4dad150 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -50,23 +50,27 @@ ). start_link(Queue, IsDurable, disk) -> - NextSeq = rabbit_disk_queue:next_write_seq(Queue), - {ok, #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - next_write_seq = NextSeq, is_durable = IsDurable }}; + purge_non_persistent_messages( + #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, + next_write_seq = 0, is_durable = IsDurable }); start_link(Queue, IsDurable, mixed) -> {ok, State} = start_link(Queue, IsDurable, disk), - to_mixed_mode(State #mqstate { next_write_seq = 0 }). + to_mixed_mode(State). to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, next_write_seq = NextSeq }) -> + %% We enqueue _everything_ here. This means that should a message + %% already be in the disk queue we must remove it and add it back + %% in. Fortunately, by using requeue, we avoid rewriting the + %% message on disk. + %% Note we also batch together messages on disk so that we minimise + %% the calls to requeue. Msgs = queue:to_list(MsgBuf), {NextSeq1, Requeue} = lists:foldl( - fun ({_Seq, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered}, {NSeq, RQueueAcc}) -> - if IsDurable andalso IsPersistent -> + fun ({_Seq, Msg = #basic_message { guid = MsgId }, + IsDelivered, OnDisk}, {NSeq, RQueueAcc}) -> + if OnDisk -> {MsgId, IsDelivered, AckTag, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), {NSeq + 1, @@ -78,7 +82,7 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, Q, lists:reverse(RQueueAcc)) end, ok = rabbit_disk_queue:publish_with_seq( - Q, MsgId, NSeq, msg_to_bin(Msg)), + Q, MsgId, NSeq, msg_to_bin(Msg), false), {NSeq + 1, []} end end, {NextSeq, []}, Msgs), @@ -89,22 +93,52 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, {ok, State #mqstate { mode = disk, msg_buf = queue:new(), next_write_seq = NextSeq1 }}. -to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q, - next_write_seq = NextSeq }) -> +to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) -> + %% load up a new queue with everything that's on disk. + %% don't remove non-persistent messages that happen to be on disk QList = rabbit_disk_queue:dump_queue(Q), {MsgBuf1, NextSeq1} = lists:foldl( fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq}) when SeqId >= NSeq -> - Msg = #basic_message { guid = MsgId } - = bin_to_msg(MsgBin), - Buf1 = queue:in({SeqId, - Msg #basic_message { is_persistent = true }, - IsDelivered}, Buf), - NSeq1 = SeqId + 1, - {Buf1, NSeq1} - end, {MsgBuf, NextSeq}, QList), - {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }}. + Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), + {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId + 1} + end, {queue:new(), 0}, QList), + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, + next_write_seq = NextSeq1 }}. + +purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + %% iterate through the content on disk, ack anything which isn't + %% persistent, accumulate everything else that is persistent and + %% requeue it + NextSeq = rabbit_disk_queue:next_write_seq(Q), + {Acks, Requeue, NextSeq2} = + deliver_all_messages(Q, IsDurable, [], [], NextSeq), + ok = if Requeue == [] -> ok; + true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) + end, + ok = if Acks == [] -> ok; + true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks)) + end, + {ok, State #mqstate { next_write_seq = NextSeq2 }}. + +deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) -> + case rabbit_disk_queue:deliver(Q) of + empty -> {Acks, Requeue, NextSeq}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> + #basic_message { guid = MsgId, is_persistent = IsPersistent } = + bin_to_msg(MsgBin), + OnDisk = IsPersistent andalso IsDurable, + {Acks2, Requeue2, NextSeq2} = + if OnDisk -> {Acks, + [{AckTag, {NextSeq, IsDelivered}} | Requeue], + NextSeq + 1 + }; + true -> {[AckTag | Acks], Requeue, NextSeq} + end, + deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2) + end. msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), @@ -115,48 +149,78 @@ bin_to_msg(MsgBin) -> publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk, queue = Q }) -> - ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), + ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), {ok, State}; publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, next_write_seq = NextSeq, msg_buf = MsgBuf }) -> - ok = if IsDurable andalso IsPersistent -> - rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg)); + OnDisk = IsDurable andalso IsPersistent, + ok = if OnDisk -> + rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, + msg_to_bin(Msg), false); true -> ok end, {ok, State #mqstate { next_write_seq = NextSeq + 1, - msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf) - }}. + msg_buf = queue:in({NextSeq, Msg, false, OnDisk}, + MsgBuf) + }}. -%% assumption here is that the queue is empty already (only called via attempt_immediate_delivery) -publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, - State = #mqstate { mode = Mode, queue = Q, is_durable = IsDurable, - next_write_seq = NextSeq }) +%% Assumption here is that the queue is empty already (only called via +%% attempt_immediate_delivery). Also note that the seq id assigned by +%% the disk queue could well not be the same as the NextSeq (true = +%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't +%% matter because the AckTag will still be correct (AckTags for +%% non-persistent messages don't exist). (next_write_seq is actually +%% only used to calculate how many messages are in the queue). +publish_delivered(Msg = + #basic_message { guid = MsgId, is_persistent = IsPersistent}, + State = #mqstate { mode = Mode, is_durable = IsDurable, + next_write_seq = NextSeq, queue = Q }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), + true = rabbit_disk_queue:is_empty(Q), + rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), + %% must call phantom_deliver otherwise the msg remains at the head + %% of the queue {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; - true -> State - end, + State2 = + if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; + true -> State + end, {ok, AckTag, State2}; -publish_delivered(_Msg, State = #mqstate { mode = mixed }) -> +publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> + true = queue:is_empty(MsgBuf), {ok, noack, State}. -deliver(State = #mqstate { mode = disk, queue = Q }) -> - {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), - Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), - {{Msg, IsDelivered, AckTag, Remaining}, State}; -deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextWrite, is_durable = IsDurable }) -> +deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> + case rabbit_disk_queue:deliver(Q) of + empty -> {empty, State}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} -> + #basic_message { guid = MsgId, is_persistent = IsPersistent } = + Msg = bin_to_msg(MsgBin), + AckTag2 = if IsPersistent andalso IsDurable -> AckTag; + true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), + noack + end, + {{Msg, IsDelivered, AckTag2, Remaining}, State} + end; + +deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, + next_write_seq = NextWrite, msg_buf = MsgBuf }) -> {Result, MsgBuf2} = queue:out(MsgBuf), case Result of empty -> {empty, State}; - {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} -> + {value, {Seq, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + IsDelivered, OnDisk}} -> AckTag = - if IsDurable andalso IsPersistent -> - {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), - AckTag2; + if OnDisk -> + {MsgId, IsDelivered, AckTag2, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + if IsPersistent andalso IsDurable -> AckTag2; + true -> ok = rabbit_disk_queue:ack(Q, [AckTag2]), + noack + end; true -> noack end, {{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)}, @@ -173,7 +237,8 @@ ack(Acks, State = #mqstate { queue = Q }) -> {ok, State} end. -tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) -> +tx_publish(Msg = #basic_message { guid = MsgId }, + State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -182,13 +247,18 @@ tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; tx_publish(_Msg, State = #mqstate { mode = mixed }) -> + %% this message will reappear in the tx_commit, so ignore for now {ok, State}. only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> - ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks), + RealAcks = remove_noacks(Acks), + ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; + true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), + RealAcks) + end, {ok, State}; tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, @@ -198,47 +268,69 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, {PersistentPubs, MsgBuf2, NextSeq2} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf3, NextSeq3}) -> + OnDisk = IsPersistent andalso IsDurable, Acc2 = - if IsPersistent -> - [{Msg #basic_message.guid, NextSeq3} | Acc]; + if OnDisk -> + [{Msg #basic_message.guid, NextSeq3} + | Acc]; true -> Acc end, - MsgBuf4 = queue:in({NextSeq3, Msg, false}, MsgBuf3), + MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk}, + MsgBuf3), {Acc2, MsgBuf4, NextSeq3 + 1} end, {[], MsgBuf, NextSeq}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) - PersistentPubs2 = if IsDurable -> lists:reverse(PersistentPubs); - true -> [] - end, - ok = rabbit_disk_queue:tx_commit_with_seqs(Q, PersistentPubs2, - remove_noacks(Acks)), + RealAcks = remove_noacks(Acks), + ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; + true -> + rabbit_disk_queue:tx_commit_with_seqs( + Q, lists:reverse(PersistentPubs), RealAcks) + end, {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. only_persistent_msg_ids(Pubs) -> - lists:reverse(lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, - Acc) -> - if IsPersistent -> [Msg #basic_message.guid | Acc]; - true -> Acc - end - end, [], Pubs)). + lists:reverse( + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + if IsPersistent -> [Msg #basic_message.guid | Acc]; + true -> Acc + end + end, [], Pubs)). tx_cancel(Publishes, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), {ok, State}; -tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) -> - MsgIds = if IsDurable -> only_persistent_msg_ids(Publishes); - true -> [] - end, - ok = rabbit_disk_queue:tx_cancel(MsgIds), +tx_cancel(Publishes, + State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + ok = + if IsDurable -> + rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)); + true -> ok + end, {ok, State}. -only_ack_tags(MsgWithAcks) -> - lists:map(fun (P) -> element(2, P) end, MsgWithAcks). - %% [{Msg, AckTag}] -requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) -> - rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)), +requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + %% here, we may have messages with no ack tags, because of the + %% fact they are not persistent, but nevertheless we want to + %% requeue them. This means publishing them delivered. + Requeue + = lists:foldl( + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + when IsPersistent andalso IsDurable -> + [AckTag | RQ]; + ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) -> + ok = if RQ == [] -> ok; + true -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) + end, + _AckTag2 = rabbit_disk_queue:publish( + Q, MsgId, msg_to_bin(Msg), true), + [] + end, [], MessagesWithAckTags), + ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), {ok, State}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, @@ -246,18 +338,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable }) -> {PersistentPubs, MsgBuf2, NextSeq2} = - lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag}, - {Acc, MsgBuf3, NextSeq3}) -> - Acc2 = - if IsDurable andalso IsPersistent -> - {MsgId, _OldSeqId} = AckTag, - [{AckTag, {NextSeq3, true}} | Acc]; - true -> Acc - end, - MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3), - {Acc2, MsgBuf4, NextSeq3 + 1} - end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), - ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)), + lists:foldl( + fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, + {Acc, MsgBuf3, NextSeq3}) -> + OnDisk = IsDurable andalso IsPersistent, + Acc2 = + if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), + ok = if [] == PersistentPubs -> ok; + true -> rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(PersistentPubs)) + end, {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. purge(State = #mqstate { queue = Q, mode = disk }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3d173e2e45..a2a31a181a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -264,7 +264,7 @@ test_log_management() -> %% original log files are not writable ok = make_files_non_writable([MainLog, SaslLog]), {error, {{cannot_rotate_main_logs, _}, - {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), %% logging directed to tty (handlers were removed in last test) ok = clean_logs([MainLog, SaslLog], Suffix), @@ -283,7 +283,7 @@ test_log_management() -> ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, - {rabbit_sasl_report_file_h, SaslLog}]), + {rabbit_sasl_report_file_h, SaslLog}]), passed. test_log_management_during_startup() -> @@ -689,6 +689,20 @@ delete_log_handlers(Handlers) -> test_disk_queue() -> rdq_stop(), + rdq_virgin(), + passed = rdq_stress_gc(10000), + passed = rdq_test_startup_with_queue_gaps(), + passed = rdq_test_redeliver(), + passed = rdq_test_purge(), + passed = rdq_test_dump_queue(), + passed = rdq_test_mixed_queue_modes(), + rdq_virgin(), + ok = control_action(stop_app, []), + ok = control_action(start_app, []), + passed. + +benchmark_disk_queue() -> + rdq_stop(), % unicode chars are supported properly from r13 onwards io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), @@ -698,12 +712,6 @@ test_disk_queue() -> MsgCount <- [1024, 4096, 16384] ], rdq_virgin(), - passed = rdq_stress_gc(10000), - passed = rdq_test_startup_with_queue_gaps(), - passed = rdq_test_redeliver(), - passed = rdq_test_purge(), - passed = rdq_test_dump_queue(), - rdq_virgin(), ok = control_action(stop_app, []), ok = control_action(start_app, []), passed. @@ -953,49 +961,52 @@ rdq_test_mixed_queue_modes() -> end, MS4, lists:seq(1,10)), 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages~n"), - {ok, _MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), + {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), + 30 = rabbit_mixed_queue:length(MS7), io:format("Converted to disk only mode~n"), - rdq_stop(), - rdq_start(), - {ok, MS8} = rabbit_mixed_queue:start_link(q, true, mixed), + {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7), 30 = rabbit_mixed_queue:length(MS8), - io:format("Recovered queue~n"), + io:format("Converted to mixed mode~n"), MS10 = lists:foldl( fun (N, MS9) -> Rem = 30 - N, - {{#basic_message { is_persistent = true }, + {{#basic_message { is_persistent = false }, false, _AckTag, Rem}, MS9a} = rabbit_mixed_queue:deliver(MS9), MS9a end, MS8, lists:seq(1,10)), + 20 = rabbit_mixed_queue:length(MS10), io:format("Delivered initial non persistent messages~n"), - {ok, _MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10), + {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10), + 20 = rabbit_mixed_queue:length(MS11), io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed), - 30 = rabbit_mixed_queue:length(MS12), + 10 = rabbit_mixed_queue:length(MS12), io:format("Recovered queue~n"), {MS14, AckTags} = lists:foldl( fun (N, {MS13, AcksAcc}) -> - Rem = 30 - N, - IsDelivered = N < 11, + Rem = 10 - N, {{#basic_message { is_persistent = true }, - IsDelivered, AckTag, Rem}, + false, AckTag, Rem}, MS13a} = rabbit_mixed_queue:deliver(MS13), {MS13a, [AckTag | AcksAcc]} - end, {MS2, []}, lists:seq(1,20)), + end, {MS12, []}, lists:seq(1,10)), + 0 = rabbit_mixed_queue:length(MS14), {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), - io:format("Delivered and acked initial non persistent messages~n"), - {ok, _MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15), + io:format("Delivered and acked all messages~n"), + {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15), + 0 = rabbit_mixed_queue:length(MS16), io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed), - 10 = rabbit_mixed_queue:length(MS17), + 0 = rabbit_mixed_queue:length(MS17), io:format("Recovered queue~n"), + rdq_stop(), passed. rdq_time_commands(Funcs) -> @@ -1010,7 +1021,8 @@ rdq_virgin() -> rdq_start() -> {ok, _} = rabbit_disk_queue:start_link(), - rabbit_disk_queue:to_ram_disk_mode(). + ok = rabbit_disk_queue:to_ram_disk_mode(), + ok. rdq_stop() -> rabbit_disk_queue:stop(), |
