diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-14 12:46:40 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-14 12:46:40 +0100 |
| commit | 202138a1d722a806891ddada37bf42df31f68ca8 (patch) | |
| tree | 2079b58e7c2079d2bfc6a6fccb4c22ba5eacd2ee | |
| parent | 6da285ab1609ba470cf3a22ef936d3290ea344d8 (diff) | |
| download | rabbitmq-server-git-202138a1d722a806891ddada37bf42df31f68ca8.tar.gz | |
Made the prefetcher only consider hibernation when it's fully drained the queue.
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index bab5396e44..ec969bfca6 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -203,8 +203,8 @@ init([Q, Count, QPid]) -> queue_mref = MRef }, ok = rabbit_disk_queue:prefetch(Q), - {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, - ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN, + ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({publish, { Msg = #basic_message {}, _Size, IsDelivered, AckTag, _Remaining }}, @@ -213,14 +213,15 @@ handle_call({publish, { Msg = #basic_message {}, msg_buf = MsgBuf, buf_length = Length, queue = Q }) -> gen_server2:reply(DiskQueue, ok), - ok = case Fetched + 1 == Target of - true -> ok; - false -> rabbit_disk_queue:prefetch(Q) - end, + Timeout = case Fetched + 1 == Target of + true -> hibernate; + false -> ok = rabbit_disk_queue:prefetch(Q), + infinity + end, MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), {noreply, State #pstate { fetched_count = Fetched + 1, buf_length = Length + 1, - msg_buf = MsgBuf1 }, hibernate}; + msg_buf = MsgBuf1 }, Timeout}; handle_call(publish_empty, _From, State) -> %% Very odd. This could happen if the queue is deleted or purged %% and the mixed queue fails to shut us down. @@ -235,7 +236,7 @@ handle_call(drain, _From, State = #pstate { fetched_count = Count, handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf, buf_length = Length }) -> {reply, {MsgBuf, Length, continuing}, - State #pstate { msg_buf = queue:new(), buf_length = 0 }, hibernate}; + State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity}; handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> {stop, normal, empty, State}; handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, |
