diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 34 |
3 files changed, 29 insertions, 84 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 990e5917d7..dc328792a0 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -41,7 +41,7 @@ -export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, - dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 + delete_non_durable_queues/1, auto_ack_next_message/1 ]). -export([length/1, filesync/0, cache_info/0]). @@ -259,7 +259,6 @@ (queue_name(), [{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). --spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), bool()}]). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -315,9 +314,6 @@ purge(Q) -> delete_queue(Q) -> gen_server2:cast(?SERVER, {delete_queue, Q}). -dump_queue(Q) -> - gen_server2:call(?SERVER, {dump_queue, Q}, infinity). - delete_non_durable_queues(DurableQueues) -> gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). @@ -463,9 +459,6 @@ handle_call(to_ram_disk_mode, _From, State) -> handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), reply(Length, State); -handle_call({dump_queue, Q}, _From, State) -> - {Result, State1} = internal_dump_queue(Q, State), - reply(Result, State1); handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), reply(ok, State1); @@ -1157,33 +1150,6 @@ internal_delete_queue(Q, State) -> end), {ok, State2}. -internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {[], State}; - [{Q, ReadSeq, WriteSeq, _Length}] -> - Objs = - mnesia:dirty_match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_', - next_seq_id = '_' - }), - {Msgs, WriteSeq} = - lists:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {_, Seq}, - msg_id = MsgId, - is_delivered = Delivered, - next_seq_id = NextSeq }, - {Acc, Seq}) -> - {[{MsgId, Delivered} | Acc], NextSeq}; - (#dq_msg_loc { queue_and_seq_id = {_, Seq} }, - {[], RSeq}) when Seq < RSeq -> - {[], RSeq} - end, {[], ReadSeq}, lists:keysort(2, Objs)), - {lists:reverse(Msgs), State} - end. - internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> ets:foldl( diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 23696f27fc..f6ef355d9b 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -144,14 +144,20 @@ to_disk_only_mode(TxnMessages, State = ok = rabbit_disk_queue:tx_publish(Msg), {[], [ MsgId | TxPublishAcc ]} end; - ({MsgId, IsDelivered}, {RQueueAcc, TxPublishAcc}) -> + ({disk, Count}, {RQueueAcc, TxPublishAcc}) -> ok = if [] == TxPublishAcc -> ok; true -> rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []) end, - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []} + {RQueueAcc1, 0} = + rabbit_misc:unfold( + fun (0) -> false; + (N) -> + {_MsgId, IsDelivered, AckTag, _PersistRemaining} + = rabbit_disk_queue:phantom_deliver(Q), + {true, {AckTag, {next, IsDelivered}}, N - 1} + end, Count), + {RQueueAcc1 ++ RQueueAcc, []} end, {[], []}, Msgs), ok = if [] == TxPublish -> ok; true -> rabbit_disk_queue:tx_commit(Q, TxPublish, []) @@ -179,11 +185,13 @@ to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, length = Length, is_durable = IsDurable }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), - %% load up a new queue with everything that's on disk. - %% don't remove non-persistent messages that happen to be on disk - QList = rabbit_disk_queue:dump_queue(Q), - Length = erlang:length(QList), - MsgBuf = queue:from_list(QList), + %% load up a new queue with a token that says how many messages + %% are on disk + %% don't actually do anything to the disk + MsgBuf = case Length of + 0 -> queue:new(); + _ -> queue:from_list([{disk, Length}]) + end, %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -300,7 +308,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, is_durable = IsDurable, length = Length }) -> {{value, Value}, MsgBuf1} = queue:out(MsgBuf), - {Msg, IsDelivered, AckTag} = + {Msg, IsDelivered, AckTag, MsgBuf2} = case Value of {Msg1 = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -320,10 +328,9 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, end; false -> noack end, - {Msg1, IsDelivered1, AckTag1}; - {MsgId, IsDelivered1} -> - {Msg1 = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, + {Msg1, IsDelivered1, AckTag1, MsgBuf1}; + {disk, Rem1} -> + {Msg1 = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered1, AckTag1, _PersistRem} = rabbit_disk_queue:deliver(Q), AckTag2 = @@ -332,11 +339,15 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, false -> rabbit_disk_queue:ack(Q, [AckTag1]), noack end, - {Msg1, IsDelivered1, AckTag2} + MsgBuf3 = case Rem1 of + 1 -> MsgBuf1; + _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1) + end, + {Msg1, IsDelivered1, AckTag2, MsgBuf3} end, Rem = Length - 1, {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf1, length = Rem }}. + State #mqstate { msg_buf = MsgBuf2, length = Rem }}. remove_noacks(MsgsWithAcks) -> {AckTags, ASize} = diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0b70be0c9b..b56d71c8c2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -690,11 +690,10 @@ delete_log_handlers(Handlers) -> test_disk_queue() -> rdq_stop(), rdq_virgin(), - passed = rdq_stress_gc(10000), + passed = rdq_stress_gc(1000), passed = rdq_test_startup_with_queue_gaps(), passed = rdq_test_redeliver(), passed = rdq_test_purge(), - passed = rdq_test_dump_queue(1000), passed = rdq_test_mixed_queue_modes(), passed = rdq_test_mode_conversion_mid_txn(), rdq_virgin(), @@ -938,37 +937,6 @@ rdq_test_purge() -> rdq_stop(), passed. -rdq_test_dump_queue(Total) -> - rdq_virgin(), - rdq_start(), - Msg = <<0:(8*256)>>, - All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], - rabbit_disk_queue:tx_commit(q, All, []), - io:format("Publish done~n", []), - QList = [{N, false} || N <- All], - {Micros, QList} = timer:tc(rabbit_disk_queue, dump_queue, [q]), - rdq_stop(), - io:format("dump ok undelivered (~w micros)~n", [Micros]), - {Micros1, _} = timer:tc(rabbit_tests, rdq_start, []), - io:format("restarted (~w micros)~n", [Micros1]), - lists:foreach( - fun (N) -> - Remaining = Total - N, - {Message, _TSize, false, _SeqId, Remaining} = - rabbit_disk_queue:deliver(q), - ok = rdq_match_message(Message, N, Msg, 256) - end, All), - [] = rabbit_disk_queue:dump_queue(q), - rdq_stop(), - io:format("dump ok post delivery~n", []), - rdq_start(), - QList2 = [{N, true} || N <- All], - {Micros2, QList2} = timer:tc(rabbit_disk_queue, dump_queue, [q]), - io:format("dump ok post delivery + restart (~w micros)~n", [Micros2]), - rdq_stop(), - passed. - rdq_test_mixed_queue_modes() -> rdq_virgin(), rdq_start(), |
