diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-30 16:24:50 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-30 16:24:50 +0100 |
| commit | d66a200813e0483d4a171f3815a16436371fac92 (patch) | |
| tree | ab92e8830728713ee9dee1b6bb78e89ef29cb571 /src | |
| parent | 675e13c1ade6c1a13d6215ab52f5c9095e09547f (diff) | |
| download | rabbitmq-server-git-d66a200813e0483d4a171f3815a16436371fac92.tar.gz | |
changed disk -> mixed mode so that messages stay on disk and don't get read. This means the conversion is much faster than it was which is a good thing, at the cost of slower initial delivery.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 13 |
3 files changed, 71 insertions, 50 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index cf8ddba095..990e5917d7 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -259,9 +259,7 @@ (queue_name(), [{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). --spec(dump_queue/1 :: (queue_name()) -> - [{msg_id(), binary(), non_neg_integer(), bool(), - {msg_id(), seq_id()}, seq_id()}]). +-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), bool()}]). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -1163,20 +1161,27 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {[], State}; [{Q, ReadSeq, WriteSeq, _Length}] -> - {QList, {WriteSeq, State3}} = - rabbit_misc:unfold( - fun ({SeqId, _State1}) when SeqId == WriteSeq -> - false; - ({SeqId, State1}) -> - {ok, {Message, Size, Delivered, {MsgId, SeqId}}, - NextReadSeqId, State2} = - internal_read_message(Q, SeqId, true, true, - State1), - {true, - {Message, Size, Delivered, {MsgId, SeqId}, SeqId}, - {NextReadSeqId, State2}} - end, {ReadSeq, State}), - {lists:reverse(QList), State3} + Objs = + mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_', + next_seq_id = '_' + }), + {Msgs, WriteSeq} = + lists:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {_, Seq}, + msg_id = MsgId, + is_delivered = Delivered, + next_seq_id = NextSeq }, + {Acc, Seq}) -> + {[{MsgId, Delivered} | Acc], NextSeq}; + (#dq_msg_loc { queue_and_seq_id = {_, Seq} }, + {[], RSeq}) when Seq < RSeq -> + {[], RSeq} + end, {[], ReadSeq}, lists:keysort(2, Objs)), + {lists:reverse(Msgs), State} end. internal_delete_non_durable_queues( diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index d171cf186f..3c60d25fbc 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -129,11 +129,12 @@ to_disk_only_mode(TxnMessages, State = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, RQueueAcc) -> - if OnDisk -> + case OnDisk of + true -> {MsgId, IsDelivered, AckTag, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; - true -> + false -> ok = if [] == RQueueAcc -> ok; true -> rabbit_disk_queue:requeue_with_seqs( @@ -142,7 +143,11 @@ to_disk_only_mode(TxnMessages, State = ok = rabbit_disk_queue:publish( Q, Msg, false), [] - end + end; + ({MsgId, IsDelivered}, RQueueAcc) -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + [ {AckTag, {next, IsDelivered}} | RQueueAcc ] end, [], Msgs), ok = if [] == Requeue -> ok; true -> @@ -170,12 +175,8 @@ to_mixed_mode(TxnMessages, State = %% load up a new queue with everything that's on disk. %% don't remove non-persistent messages that happen to be on disk QList = rabbit_disk_queue:dump_queue(Q), - {MsgBuf1, Length} = - lists:foldl( - fun ({Msg, _Size, IsDelivered, _AckTag, _SeqId}, - {Buf, L}) -> - {queue:in({Msg, IsDelivered, true}, Buf), L+1} - end, {queue:new(), 0}, QList), + Length = erlang:length(QList), + MsgBuf = queue:from_list(QList), %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -189,7 +190,7 @@ to_mixed_mode(TxnMessages, State = end end, [], TxnMessages), ok = rabbit_disk_queue:tx_cancel(Cancel), - {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, @@ -290,21 +291,41 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, State #mqstate { length = Length - 1 }}; deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, is_durable = IsDurable, length = Length }) -> - {{value, {Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered, OnDisk}}, MsgBuf1} + {{value, Value}, MsgBuf1} = queue:out(MsgBuf), - AckTag = - if OnDisk -> - if IsPersistent andalso IsDurable -> - {MsgId, IsDelivered, AckTag1, _PersistRem} = - rabbit_disk_queue:phantom_deliver(Q), - AckTag1; - true -> - ok = rabbit_disk_queue:auto_ack_next_message(Q), - noack - end; - true -> noack + {Msg, IsDelivered, AckTag} = + case Value of + {Msg1 = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + IsDelivered1, OnDisk} -> + AckTag1 = + case OnDisk of + true -> + case IsPersistent andalso IsDurable of + true -> + {MsgId, IsDelivered1, AckTag2, _PersistRem} + = rabbit_disk_queue:phantom_deliver(Q), + AckTag2; + false -> + ok = rabbit_disk_queue:auto_ack_next_message + (Q), + noack + end; + false -> noack + end, + {Msg1, IsDelivered1, AckTag1}; + {MsgId, IsDelivered1} -> + {Msg1 = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + _Size, IsDelivered1, AckTag1, _PersistRem} + = rabbit_disk_queue:deliver(Q), + AckTag2 = + case IsPersistent andalso IsDurable of + true -> AckTag1; + false -> rabbit_disk_queue:ack(Q, [AckTag1]), + noack + end, + {Msg1, IsDelivered1, AckTag2} end, Rem = Length - 1, {{Msg, IsDelivered, AckTag, Rem}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f5447fe39e..b80bdab201 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -948,14 +948,12 @@ rdq_test_dump_queue(Total) -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), - QList = [begin Message = rdq_message(N, Msg), - Size = size(term_to_binary(Message)), - {Message, Size, false, {N, (N-1)}, (N-1)} - end || N <- All], + QList = [{N, false} || N <- All], {Micros, QList} = timer:tc(rabbit_disk_queue, dump_queue, [q]), rdq_stop(), io:format("dump ok undelivered (~w micros)~n", [Micros]), - rdq_start(), + {Micros1, _} = timer:tc(rabbit_tests, rdq_start, []), + io:format("restarted (~w micros)~n", [Micros1]), lists:foreach( fun (N) -> Remaining = Total - N, @@ -967,10 +965,7 @@ rdq_test_dump_queue(Total) -> rdq_stop(), io:format("dump ok post delivery~n", []), rdq_start(), - QList2 = [begin Message = rdq_message(N, Msg), - Size = size(term_to_binary(Message)), - {Message, Size, true, {N, (N-1)}, (N-1)} - end || N <- All], + QList2 = [{N, true} || N <- All], {Micros2, QList2} = timer:tc(rabbit_disk_queue, dump_queue, [q]), io:format("dump ok post delivery + restart (~w micros)~n", [Micros2]), rdq_stop(), |
