summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-14 12:46:40 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-14 12:46:40 +0100
commit202138a1d722a806891ddada37bf42df31f68ca8 (patch)
tree2079b58e7c2079d2bfc6a6fccb4c22ba5eacd2ee
parent6da285ab1609ba470cf3a22ef936d3290ea344d8 (diff)
downloadrabbitmq-server-git-202138a1d722a806891ddada37bf42df31f68ca8.tar.gz
Made the prefetcher only consider hibernation when it's fully drained the queue.
-rw-r--r--src/rabbit_queue_prefetcher.erl17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index bab5396e44..ec969bfca6 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -203,8 +203,8 @@ init([Q, Count, QPid]) ->
queue_mref = MRef
},
ok = rabbit_disk_queue:prefetch(Q),
- {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN,
- ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({publish, { Msg = #basic_message {},
_Size, IsDelivered, AckTag, _Remaining }},
@@ -213,14 +213,15 @@ handle_call({publish, { Msg = #basic_message {},
msg_buf = MsgBuf, buf_length = Length, queue = Q
}) ->
gen_server2:reply(DiskQueue, ok),
- ok = case Fetched + 1 == Target of
- true -> ok;
- false -> rabbit_disk_queue:prefetch(Q)
- end,
+ Timeout = case Fetched + 1 == Target of
+ true -> hibernate;
+ false -> ok = rabbit_disk_queue:prefetch(Q),
+ infinity
+ end,
MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
{noreply, State #pstate { fetched_count = Fetched + 1,
buf_length = Length + 1,
- msg_buf = MsgBuf1 }, hibernate};
+ msg_buf = MsgBuf1 }, Timeout};
handle_call(publish_empty, _From, State) ->
%% Very odd. This could happen if the queue is deleted or purged
%% and the mixed queue fails to shut us down.
@@ -235,7 +236,7 @@ handle_call(drain, _From, State = #pstate { fetched_count = Count,
handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf,
buf_length = Length }) ->
{reply, {MsgBuf, Length, continuing},
- State #pstate { msg_buf = queue:new(), buf_length = 0 }, hibernate};
+ State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity};
handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) ->
{stop, normal, empty, State};
handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,