diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-02 17:35:48 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-02 17:35:48 +0100 |
| commit | a29a0b1e7e135e6b10f38900ff2426302fe8d1bd (patch) | |
| tree | 8674dcba4a82211c878799ecd26ed95e22e9b091 /src | |
| parent | 9c1c368264f6845e36e710180291202290ac4b35 (diff) | |
| download | rabbitmq-server-git-a29a0b1e7e135e6b10f38900ff2426302fe8d1bd.tar.gz | |
well if we're going to not actually pull messages off disk when going to mixed mode, we may as well do it really lazily and not bother with any communication with the disk_queue. We just have a token in the queue which indicates how many messages we are expecting to get from the disk queue. This makes disk -> mixed almost instantaneous. This also means that performance is not initially brilliant. Maybe we need some way of the queue knowing that both it and the disk_queue are idle and deciding to prefetch. Even batching could work well. It's an endless trade off between getting operations to happen quickly and being able to get good performance. Dunno what the third thing is, probably not necessary, as you can't even have both of those, let alone pick 2 from 3!
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 34 |
3 files changed, 29 insertions, 84 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 990e5917d7..dc328792a0 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -41,7 +41,7 @@ -export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, - dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 + delete_non_durable_queues/1, auto_ack_next_message/1 ]). -export([length/1, filesync/0, cache_info/0]). @@ -259,7 +259,6 @@ (queue_name(), [{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). --spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), bool()}]). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -315,9 +314,6 @@ purge(Q) -> delete_queue(Q) -> gen_server2:cast(?SERVER, {delete_queue, Q}). -dump_queue(Q) -> - gen_server2:call(?SERVER, {dump_queue, Q}, infinity). - delete_non_durable_queues(DurableQueues) -> gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). @@ -463,9 +459,6 @@ handle_call(to_ram_disk_mode, _From, State) -> handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), reply(Length, State); -handle_call({dump_queue, Q}, _From, State) -> - {Result, State1} = internal_dump_queue(Q, State), - reply(Result, State1); handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), reply(ok, State1); @@ -1157,33 +1150,6 @@ internal_delete_queue(Q, State) -> end), {ok, State2}. -internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> - case ets:lookup(Sequences, Q) of - [] -> {[], State}; - [{Q, ReadSeq, WriteSeq, _Length}] -> - Objs = - mnesia:dirty_match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_', - next_seq_id = '_' - }), - {Msgs, WriteSeq} = - lists:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {_, Seq}, - msg_id = MsgId, - is_delivered = Delivered, - next_seq_id = NextSeq }, - {Acc, Seq}) -> - {[{MsgId, Delivered} | Acc], NextSeq}; - (#dq_msg_loc { queue_and_seq_id = {_, Seq} }, - {[], RSeq}) when Seq < RSeq -> - {[], RSeq} - end, {[], ReadSeq}, lists:keysort(2, Objs)), - {lists:reverse(Msgs), State} - end. - internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> ets:foldl( diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 23696f27fc..f6ef355d9b 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -144,14 +144,20 @@ to_disk_only_mode(TxnMessages, State = ok = rabbit_disk_queue:tx_publish(Msg), {[], [ MsgId | TxPublishAcc ]} end; - ({MsgId, IsDelivered}, {RQueueAcc, TxPublishAcc}) -> + ({disk, Count}, {RQueueAcc, TxPublishAcc}) -> ok = if [] == TxPublishAcc -> ok; true -> rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []) end, - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []} + {RQueueAcc1, 0} = + rabbit_misc:unfold( + fun (0) -> false; + (N) -> + {_MsgId, IsDelivered, AckTag, _PersistRemaining} + = rabbit_disk_queue:phantom_deliver(Q), + {true, {AckTag, {next, IsDelivered}}, N - 1} + end, Count), + {RQueueAcc1 ++ RQueueAcc, []} end, {[], []}, Msgs), ok = if [] == TxPublish -> ok; true -> rabbit_disk_queue:tx_commit(Q, TxPublish, []) @@ -179,11 +185,13 @@ to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, length = Length, is_durable = IsDurable }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), - %% load up a new queue with everything that's on disk. - %% don't remove non-persistent messages that happen to be on disk - QList = rabbit_disk_queue:dump_queue(Q), - Length = erlang:length(QList), - MsgBuf = queue:from_list(QList), + %% load up a new queue with a token that says how many messages + %% are on disk + %% don't actually do anything to the disk + MsgBuf = case Length of + 0 -> queue:new(); + _ -> queue:from_list([{disk, Length}]) + end, %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -300,7 +308,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, is_durable = IsDurable, length = Length }) -> {{value, Value}, MsgBuf1} = queue:out(MsgBuf), - {Msg, IsDelivered, AckTag} = + {Msg, IsDelivered, AckTag, MsgBuf2} = case Value of {Msg1 = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -320,10 +328,9 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, end; false -> noack end, - {Msg1, IsDelivered1, AckTag1}; - {MsgId, IsDelivered1} -> - {Msg1 = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, + {Msg1, IsDelivered1, AckTag1, MsgBuf1}; + {disk, Rem1} -> + {Msg1 = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered1, AckTag1, _PersistRem} = rabbit_disk_queue:deliver(Q), AckTag2 = @@ -332,11 +339,15 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, false -> rabbit_disk_queue:ack(Q, [AckTag1]), noack end, - {Msg1, IsDelivered1, AckTag2} + MsgBuf3 = case Rem1 of + 1 -> MsgBuf1; + _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1) + end, + {Msg1, IsDelivered1, AckTag2, MsgBuf3} end, Rem = Length - 1, {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf1, length = Rem }}. + State #mqstate { msg_buf = MsgBuf2, length = Rem }}. remove_noacks(MsgsWithAcks) -> {AckTags, ASize} = diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0b70be0c9b..b56d71c8c2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -690,11 +690,10 @@ delete_log_handlers(Handlers) -> test_disk_queue() -> rdq_stop(), rdq_virgin(), - passed = rdq_stress_gc(10000), + passed = rdq_stress_gc(1000), passed = rdq_test_startup_with_queue_gaps(), passed = rdq_test_redeliver(), passed = rdq_test_purge(), - passed = rdq_test_dump_queue(1000), passed = rdq_test_mixed_queue_modes(), passed = rdq_test_mode_conversion_mid_txn(), rdq_virgin(), @@ -938,37 +937,6 @@ rdq_test_purge() -> rdq_stop(), passed. -rdq_test_dump_queue(Total) -> - rdq_virgin(), - rdq_start(), - Msg = <<0:(8*256)>>, - All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], - rabbit_disk_queue:tx_commit(q, All, []), - io:format("Publish done~n", []), - QList = [{N, false} || N <- All], - {Micros, QList} = timer:tc(rabbit_disk_queue, dump_queue, [q]), - rdq_stop(), - io:format("dump ok undelivered (~w micros)~n", [Micros]), - {Micros1, _} = timer:tc(rabbit_tests, rdq_start, []), - io:format("restarted (~w micros)~n", [Micros1]), - lists:foreach( - fun (N) -> - Remaining = Total - N, - {Message, _TSize, false, _SeqId, Remaining} = - rabbit_disk_queue:deliver(q), - ok = rdq_match_message(Message, N, Msg, 256) - end, All), - [] = rabbit_disk_queue:dump_queue(q), - rdq_stop(), - io:format("dump ok post delivery~n", []), - rdq_start(), - QList2 = [{N, true} || N <- All], - {Micros2, QList2} = timer:tc(rabbit_disk_queue, dump_queue, [q]), - io:format("dump ok post delivery + restart (~w micros)~n", [Micros2]), - rdq_stop(), - passed. - rdq_test_mixed_queue_modes() -> rdq_virgin(), rdq_start(), |
