diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-21 19:30:49 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-21 19:30:49 +0100 |
| commit | 734b8e60fdf05fc541c61fed025cd35a3c07311f (patch) | |
| tree | 73ad2322c7f1f97a148a04e479913fb5a3324250 | |
| parent | 21d96ff7298e027d954bdd0058d0adaffebfa36a (diff) | |
| download | rabbitmq-server-git-734b8e60fdf05fc541c61fed025cd35a3c07311f.tar.gz | |
Prefetcher appears to be done and working well. None of the tests exercise it though because I decided to only start it up when in mixed mode and when the amqqueue_process starts to hibernate (otherwise, we start it up too soon, it doesn't make much progress and then we just have to shut it down anyway). However, other manual tests definitely exercise it and it seems to be very effective. Certainly can't make it crash now.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 233 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 75 |
4 files changed, 262 insertions, 97 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0597215fb6..ab96feff1a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -857,8 +857,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> - State1 = stop_memory_timer(report_memory(true, State)), +handle_info(timeout, State = #q { mixed_state = MS }) -> + MS1 = rabbit_mixed_queue:maybe_prefetch(MS), + State1 = + stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })), %% don't call noreply/1 as that'll restart the memory_report_timer {noreply, State1, hibernate}; diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e739bfef64..05ba3a6c62 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,7 +42,8 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, length/1, foldl/3 + requeue_next_n/2, length/1, foldl/3, prefetch/1, + set_delivered_and_advance/2 ]). -export([filesync/0, cache_info/0]). @@ -257,12 +258,15 @@ -spec(phantom_deliver/1 :: (queue_name()) -> ( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). +-spec(prefetch/1 :: (queue_name()) -> 'ok'). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(set_delivered_and_advance/2 :: + (queue_name(), {msg_id(), seq_id()}) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). @@ -298,6 +302,9 @@ deliver(Q) -> phantom_deliver(Q) -> gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity). +prefetch(Q) -> + gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}). + ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). @@ -314,6 +321,9 @@ tx_commit(Q, PubMsgIds, AckSeqIds) tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). +set_delivered_and_advance(Q, MsgSeqId) -> + gen_server2:cast(?SERVER, {set_delivered_and_advance, Q, MsgSeqId}). + requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). @@ -454,10 +464,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}}. handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, true, State), + {ok, Result, State1} = internal_deliver(Q, true, false, true, State), reply(Result, State1); handle_call({phantom_deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, false, State), + {ok, Result, State1} = internal_deliver(Q, false, false, true, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> State1 = @@ -531,8 +541,20 @@ handle_cast(report_memory, State) -> %% call noreply1/2, not noreply/1/2, as we don't want to restart the %% memory_report_timer %% by unsetting the timer, we force a report on the next normal message - noreply1(State #dqstate { memory_report_timer = undefined }). - + noreply1(State #dqstate { memory_report_timer = undefined }); +handle_cast({prefetch, Q, From}, State) -> + {ok, Result, State1} = internal_deliver(Q, true, true, false, State), + ok = rabbit_queue_prefetcher:publish(From, Result), + noreply(State1); +handle_cast({set_delivered_and_advance, Q, MsgSeqId}, State) -> + State2 = + case internal_deliver(Q, false, false, true, State) of + {ok, empty, State1} -> State1; + {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Rem}, State1} -> + State1 + end, + noreply(State2). + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) -> @@ -866,7 +888,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, ReadMsg, +internal_deliver(Q, ReadMsg, FakeDeliver, Advance, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; @@ -874,9 +896,12 @@ internal_deliver(Q, ReadMsg, Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( - Q, ReadSeqId, ReadMsg, false, false, State), - true = ets:insert(Sequences, - {Q, ReadSeqId+1, WriteSeqId}), + Q, ReadSeqId, ReadMsg, FakeDeliver, false, State), + true = case Advance of + true -> ets:insert(Sequences, + {Q, ReadSeqId+1, WriteSeqId}); + false -> true + end, {ok, case Result of {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}} -> @@ -941,7 +966,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - end. internal_auto_ack(Q, State) -> - case internal_deliver(Q, false, State) of + case internal_deliver(Q, false, false, true, State) of {ok, empty, State1} -> {ok, State1}; {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining}, State1} -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index d2d3c19f75..ac7495fea9 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -37,7 +37,7 @@ -export([publish/2, publish_delivered/2, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, - length/1, is_empty/1, delete_queue/1]). + length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). -export([to_disk_only_mode/2, to_mixed_mode/2, estimate_queue_memory/1, reset_counters/1, info/1]). @@ -49,7 +49,8 @@ length, memory_size, memory_gain, - memory_loss + memory_loss, + prefetcher } ). @@ -63,9 +64,10 @@ queue :: queue_name(), is_durable :: bool(), length :: non_neg_integer(), - memory_size :: non_neg_integer(), - memory_gain :: non_neg_integer(), - memory_loss :: non_neg_integer() + memory_size :: (non_neg_integer() | 'undefined'), + memory_gain :: (non_neg_integer() | 'undefined'), + memory_loss :: (non_neg_integer() | 'undefined'), + prefetcher :: (pid() | 'undefined') }). -type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). -type(okmqs() :: {'ok', mqstate()}). @@ -110,7 +112,7 @@ init(Queue, IsDurable) -> {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, is_durable = IsDurable, length = Len, memory_size = Size, memory_gain = undefined, - memory_loss = undefined }}. + memory_loss = undefined, prefetcher = undefined }}. size_of_message( #basic_message { content = #content { payload_fragments_rev = Payload }}) -> @@ -122,16 +124,30 @@ to_disk_only_mode(_TxnMessages, State = #mqstate { mode = disk }) -> {ok, State}; to_disk_only_mode(TxnMessages, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable }) -> + is_durable = IsDurable, prefetcher = Prefetcher + }) -> rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), + State1 = State #mqstate { mode = disk }, + {MsgBuf1, State2} = + case Prefetcher of + undefined -> {MsgBuf, State1}; + _ -> + case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of + empty -> {MsgBuf, State1}; + {Fetched, Len} -> + State3 = #mqstate { msg_buf = MsgBuf2 } = + dec_queue_length(Len, State1), + {queue:join(Fetched, MsgBuf2), State3} + end + end, %% We enqueue _everything_ here. This means that should a message %% already be in the disk queue we must remove it and add it back %% in. Fortunately, by using requeue, we avoid rewriting the %% message on disk. %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. - {ok, MsgBuf1} = - send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], queue:new()), + {ok, MsgBuf3} = + send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], queue:new()), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -144,7 +160,7 @@ to_disk_only_mode(TxnMessages, State = end end, TxnMessages), garbage_collect(), - {ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}. + {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}. send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, MsgBuf) -> @@ -179,6 +195,23 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, inc_queue_length(Q, MsgBuf, 1)) end end; + {{value, {Msg = #basic_message { guid = MsgId }, IsDelivered, _AckTag}}, + Queue1} -> + %% these have come via the prefetcher, so are no longer in + %% the disk queue so they need to be republished + Commit1 = flush_requeue_to_disk_queue(Q, RequeueCount, Commit), + ok = rabbit_disk_queue:tx_publish(Msg), + case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of + true -> + ok = flush_messages_to_disk_queue(Q, Commit1), + send_messages_to_disk(IsDurable, Q, Queue1, 1, 0, + [{MsgId, IsDelivered}], + inc_queue_length(Q, MsgBuf, 1)); + false -> + send_messages_to_disk(IsDurable, Q, Queue1, PublishCount+1, + 0, [{MsgId, IsDelivered} | Commit1], + inc_queue_length(Q, MsgBuf, 1)) + end; {{value, {Q, Count}}, Queue1} -> send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, RequeueCount + Count, Commit, @@ -203,15 +236,16 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> {ok, State}; -to_mixed_mode(TxnMessages, State = - #mqstate { mode = disk, queue = Q, - is_durable = IsDurable, msg_buf = MsgBuf }) -> +to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), - %% load up a new queue with a token that says how many messages - %% are on disk (this is already built for us by the disk mode) - %% don't actually do anything to the disk - ok = maybe_prefetch(mixed, MsgBuf), - %% remove txn messages from disk which are neither persistent and + %% The queue has a token just saying how many msgs are on disk + %% (this is already built for us when in disk mode). + %% Don't actually do anything to the disk + %% Don't start prefetcher just yet because the queue maybe busy - + %% wait for hibernate timeout in the amqqueue_process. + + %% Remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why %% we're not using it. @@ -219,8 +253,8 @@ to_mixed_mode(TxnMessages, State = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> case IsDurable andalso IsPersistent of - true -> Acc; - _ -> [Msg #basic_message.guid | Acc] + true -> Acc; + false -> [Msg #basic_message.guid | Acc] end end, [], TxnMessages), ok = if Cancel == [] -> ok; @@ -229,26 +263,43 @@ to_mixed_mode(TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = mixed }}. -inc_queue_length(_Queue, MsgBuf, 0) -> +inc_queue_length(_Q, MsgBuf, 0) -> MsgBuf; -inc_queue_length(Queue, MsgBuf, Count) -> +inc_queue_length(Q, MsgBuf, Count) -> case queue:out_r(MsgBuf) of {empty, MsgBuf} -> - queue:in({Queue, Count}, MsgBuf); - {{value, {Queue, Len}}, MsgBuf1} -> - queue:in({Queue, Len + Count}, MsgBuf1); + queue:in({Q, Count}, MsgBuf); + {{value, {Q, Len}}, MsgBuf1} -> + queue:in({Q, Len + Count}, MsgBuf1); {{value, _}, _MsgBuf1} -> - queue:in({Queue, Count}, MsgBuf) + queue:in({Q, Count}, MsgBuf) end. -dec_queue_length(Mode, MsgBuf) -> - {{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf), - MsgBuf2 = case Len of - 1 -> ok = maybe_prefetch(Mode, MsgBuf1), - MsgBuf1; - _ -> queue:in_r({Queue, Len-1}, MsgBuf1) - end, - {Queue, MsgBuf2}. +dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) -> + case queue:out(MsgBuf) of + {{value, {Q, Len}}, MsgBuf1} -> + case Len of + Count -> + maybe_prefetch(State #mqstate { msg_buf = MsgBuf1 }); + _ when Len > Count -> + State #mqstate { msg_buf = queue:in_r({Q, Len-Count}, + MsgBuf1)} + end; + _ -> State + end. + +maybe_prefetch(State = #mqstate { prefetcher = undefined, + mode = mixed, + msg_buf = MsgBuf, + queue = Q }) -> + case queue:peek(MsgBuf) of + {value, {Q, Count}} -> {ok, Prefetcher} = + rabbit_queue_prefetcher:start_link(Q, Count), + State #mqstate { prefetcher = Prefetcher }; + _ -> State + end; +maybe_prefetch(State) -> + State. publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, msg_buf = MsgBuf, memory_size = QSize, @@ -312,46 +363,67 @@ deliver(State = #mqstate { length = 0 }) -> {empty, State}; deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, is_durable = IsDurable, length = Length, - mode = Mode }) -> + prefetcher = Prefetcher }) -> {{value, Value}, MsgBuf1} = queue:out(MsgBuf), - {Msg, IsDelivered, AckTag, MsgBuf2} = - case Value of - {Msg1 = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered1} -> - AckTag1 = - case IsDurable andalso IsPersistent of - true -> - {MsgId, IsPersistent, IsDelivered1, AckTag2, _PRem} - = rabbit_disk_queue:phantom_deliver(Q), - AckTag2; - false -> - noack - end, - ok = maybe_prefetch(Mode, MsgBuf1), - {Msg1, IsDelivered1, AckTag1, MsgBuf1}; - _ -> - {Q, MsgBuf3} = dec_queue_length(Mode, MsgBuf), - {Msg1 = #basic_message { is_persistent = IsPersistent }, - _Size, IsDelivered1, AckTag1, _PersistRem} - = rabbit_disk_queue:deliver(Q), - AckTag2 = - case IsDurable andalso IsPersistent of - true -> - AckTag1; - false -> - ok = rabbit_disk_queue:ack(Q, [AckTag1]), - noack - end, - {Msg1, IsDelivered1, AckTag2, MsgBuf3} - end, Rem = Length - 1, - {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf2, length = Rem }}. - -maybe_prefetch(_, _) -> - %% disable just for the time being - ok. + State1 = State #mqstate { length = Rem }, + case Value of + {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + IsDelivered} -> + AckTag = + case IsDurable andalso IsPersistent of + true -> + {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem} + = rabbit_disk_queue:phantom_deliver(Q), + AckTag1; + false -> + noack + end, + State2 = maybe_prefetch(State1 #mqstate { msg_buf = MsgBuf1 }), + {{Msg, IsDelivered, AckTag, Rem}, State2}; + {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTag} -> + %% message has come via the prefetcher, thus it's been + %% delivered. If it's not persistent+durable, we should + %% ack it now + AckTag1 = + case IsDurable andalso IsPersistent of + true -> + AckTag; + false -> + ok = rabbit_disk_queue:ack(Q, [AckTag]), + noack + end, + {{Msg, IsDelivered, AckTag1, Rem}, + State1 #mqstate { msg_buf = MsgBuf1 }}; + _ when Prefetcher == undefined -> + State2 = dec_queue_length(1, State1), + {Msg = #basic_message { is_persistent = IsPersistent }, + _Size, IsDelivered, AckTag, _PersistRem} + = rabbit_disk_queue:deliver(Q), + AckTag1 = + case IsDurable andalso IsPersistent of + true -> + AckTag; + false -> + ok = rabbit_disk_queue:ack(Q, [AckTag]), + noack + end, + {{Msg, IsDelivered, AckTag1, Rem}, State2}; + _ -> + case rabbit_queue_prefetcher:drain(Prefetcher) of + empty -> deliver(State #mqstate { prefetcher = undefined }); + {Fetched, Len, Status} -> + State2 = #mqstate { msg_buf = MsgBuf2 } = + dec_queue_length(Len, State), + deliver(State2 #mqstate + { msg_buf = queue:join(Fetched, MsgBuf2), + prefetcher = case Status of + finished -> undefined; + _ -> Prefetcher + end }) + end + end. remove_noacks(MsgsWithAcks) -> lists:foldl( @@ -489,8 +561,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, - length = Length - }) -> + length = Length }) -> {PersistentPubs, MsgBuf1} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, @@ -515,14 +586,24 @@ purge(State = #mqstate { queue = Q, mode = disk, length = Count, {Count, State #mqstate { length = 0, memory_size = 0, memory_loss = Loss + QSize }}; purge(State = #mqstate { queue = Q, mode = mixed, length = Length, - memory_loss = Loss, memory_size = QSize }) -> + memory_loss = Loss, memory_size = QSize, + prefetcher = Prefetcher }) -> + case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) + end, rabbit_disk_queue:purge(Q), {Length, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, memory_loss = Loss + QSize }}. delete_queue(State = #mqstate { queue = Q, memory_size = QSize, - memory_loss = Loss }) -> + memory_loss = Loss, prefetcher = Prefetcher + }) -> + case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) + end, ok = rabbit_disk_queue:delete_queue(Q), {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), memory_loss = Loss + QSize }}. diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index 6cae54048b..dfd444b259 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -38,6 +38,10 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([publish/2, drain/1, drain_and_stop/1]). + +-include("rabbit.hrl"). + -define(HIBERNATE_AFTER_MIN, 1000). -record(pstate, @@ -45,7 +49,8 @@ buf_length, target_count, fetched_count, - queue + queue, + queue_mref }). %% The design of the prefetcher is based on the following: @@ -178,25 +183,77 @@ %% mixed_queue when it wants to drain the prefetcher. start_link(Queue, Count) -> - gen_server2:start_link(?MODULE, [Queue, Count], []). + gen_server2:start_link(?MODULE, [Queue, Count, self()], []). + +publish(Prefetcher, Obj = { #basic_message {}, _Size, _IsDelivered, + _AckTag, _Remaining }) -> + gen_server2:cast(Prefetcher, {publish, Obj}); +publish(Prefetcher, empty) -> + gen_server2:cast(Prefetcher, publish_empty). + +drain(Prefetcher) -> + gen_server2:call(Prefetcher, drain, infinity). + +drain_and_stop(Prefetcher) -> + gen_server2:call(Prefetcher, drain_and_stop, infinity). -init([Q, Count]) -> +init([Q, Count, QPid]) -> + %% link isn't enough because the signal will not appear if the + %% queue exits normally. Thus have to use monitor. + MRef = erlang:monitor(process, QPid), State = #pstate { msg_buf = queue:new(), buf_length = 0, target_count = Count, fetched_count = 0, - queue = Q + queue = Q, + queue_mref = MRef }, + ok = rabbit_disk_queue:prefetch(Q), {ok, State, {binary, ?HIBERNATE_AFTER_MIN}}. -handle_call(_Msg, _From, State) -> - {reply, confused, State}. +handle_call(drain, _From, State = #pstate { buf_length = 0 }) -> + {stop, normal, empty, State}; +handle_call(drain, _From, State = #pstate { fetched_count = Count, + target_count = Count, + msg_buf = MsgBuf, + buf_length = Length }) -> + {stop, normal, {MsgBuf, Length, finished}, State}; +handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf, + buf_length = Length }) -> + {reply, {MsgBuf, Length, continuing}, + State #pstate { msg_buf = queue:new(), buf_length = 0 }}; +handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> + {stop, normal, empty, State}; +handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, + buf_length = Length }) -> + {stop, normal, {MsgBuf, Length}, State}. -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(publish_empty, State) -> + %% Very odd. This could happen if the queue is deleted or purged + %% and the mixed queue fails to shut us down. + {noreply, State}; +handle_cast({publish, { Msg = #basic_message {}, + _Size, IsDelivered, AckTag, _Remaining }}, + State = #pstate { fetched_count = Fetched, target_count = Target, + msg_buf = MsgBuf, buf_length = Length, queue = Q + }) -> + ok = rabbit_disk_queue:set_delivered_and_advance(Q, AckTag), + ok = case Fetched + 1 == Target of + true -> ok; + false -> rabbit_disk_queue:prefetch(Q) + end, + MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), + {noreply, State #pstate { fetched_count = Fetched + 1, + buf_length = Length + 1, + msg_buf = MsgBuf1 }}. handle_info(timeout, State) -> - {noreply, State, hibernate}. + {noreply, State, hibernate}; +handle_info({'DOWN', MRef, process, _Pid, _Reason}, + State = #pstate { queue_mref = MRef }) -> + %% this is the amqqueue_process going down, so we should go down + %% too + {stop, normal, State}. terminate(_Reason, _State) -> ok. |
