summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-27 15:56:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-27 15:56:36 +0100
commitd8896bb2e4c1de2098d6fae051ccb93e6ec96926 (patch)
tree0e827b5c81a8580c1bd899aa67b1538fca8f82ce
parent063305f12676de3c94c9bf5323777892be517d19 (diff)
downloadrabbitmq-server-git-d8896bb2e4c1de2098d6fae051ccb93e6ec96926.tar.gz
QA-corrections to prefetcher.
-rw-r--r--src/rabbit_mixed_queue.erl8
-rw-r--r--src/rabbit_queue_prefetcher.erl67
2 files changed, 40 insertions, 35 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index f0df7777a6..251c2046dc 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -206,8 +206,8 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
%% use State, not State1 as we've not dec'd length
fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of
empty -> State #mqstate { prefetcher = undefined };
- {Fetched, Len, Status} ->
- MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ {Fetched, Status} ->
+ MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)),
State #mqstate
{ msg_buf = queue:join(Fetched, MsgBuf2),
prefetcher = case Status of
@@ -363,8 +363,8 @@ set_storage_mode(disk, TxnMessages, State =
_ ->
case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
empty -> MsgBuf;
- {Fetched, Len} ->
- MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ Fetched ->
+ MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)),
queue:join(Fetched, MsgBuf2)
end
end,
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index ffa98d6967..eddb613cba 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -47,9 +47,7 @@
-record(pstate,
{ msg_buf,
- buf_length,
target_count,
- fetched_count,
queue,
queue_mref
}).
@@ -176,6 +174,17 @@
%% redelivered bit set false really are guaranteed to have not been
%% delivered already.
+-ifdef(use_specs).
+
+-spec(start_link/2 :: (queue_name(), non_neg_integer()) ->
+ ({'ok', pid()} | 'ignore' | {'error', any()})).
+-spec(publish/2 :: (pid(), message()) -> 'ok').
+-spec(drain/1 :: (pid()) -> ('empty' | {queue(), ('finished' | 'continuing')})).
+-spec(drain_and_stop/1 :: (pid()) -> ('empty' | queue())).
+-spec(stop/1 :: (pid()) -> 'ok').
+
+-endif.
+
start_link(Queue, Count) ->
gen_server2:start_link(?MODULE, [Queue, Count, self()], []).
@@ -194,14 +203,12 @@ drain_and_stop(Prefetcher) ->
stop(Prefetcher) ->
gen_server2:call(Prefetcher, stop, infinity).
-init([Q, Count, QPid]) ->
+init([Q, Count, QPid]) when Count > 0 andalso is_pid(QPid) ->
%% link isn't enough because the signal will not appear if the
%% queue exits normally. Thus have to use monitor.
MRef = erlang:monitor(process, QPid),
State = #pstate { msg_buf = queue:new(),
- buf_length = 0,
target_count = Count,
- fetched_count = 0,
queue = Q,
queue_mref = MRef
},
@@ -211,39 +218,37 @@ init([Q, Count, QPid]) ->
handle_call({publish,
{Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}},
- DiskQueue,
- State = #pstate { fetched_count = Fetched, target_count = Target,
- msg_buf = MsgBuf, buf_length = Length, queue = Q
- }) ->
+ DiskQueue, State = #pstate {
+ target_count = Target, msg_buf = MsgBuf, queue = Q}) ->
gen_server2:reply(DiskQueue, ok),
- Timeout = if Fetched + 1 == Target -> hibernate;
- true -> ok = rabbit_disk_queue:prefetch(Q),
- infinity
+ Timeout = case Target of
+ 1 -> hibernate;
+ _ -> ok = rabbit_disk_queue:prefetch(Q),
+ infinity
end,
MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
- {noreply, State #pstate { fetched_count = Fetched + 1,
- buf_length = Length + 1,
- msg_buf = MsgBuf1 }, Timeout};
+ {noreply, State #pstate { target_count = Target - 1, msg_buf = MsgBuf1 },
+ Timeout};
handle_call(publish_empty, _From, State) ->
%% Very odd. This could happen if the queue is deleted or purged
%% and the mixed queue fails to shut us down.
{reply, ok, State, hibernate};
-handle_call(drain, _From, State = #pstate { buf_length = 0 }) ->
- {stop, normal, empty, State};
-handle_call(drain, _From, State = #pstate { fetched_count = Count,
- target_count = Count,
- msg_buf = MsgBuf,
- buf_length = Length }) ->
- {stop, normal, {MsgBuf, Length, finished}, State};
-handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf,
- buf_length = Length }) ->
- {reply, {MsgBuf, Length, continuing},
- State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity};
-handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) ->
- {stop, normal, empty, State};
-handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
- buf_length = Length }) ->
- {stop, normal, {MsgBuf, Length}, State};
+handle_call(drain, _From, State = #pstate { target_count = 0,
+ msg_buf = MsgBuf }) ->
+ Res = case queue:is_empty(MsgBuf) of
+ true -> empty;
+ false -> {MsgBuf, finished}
+ end,
+ {stop, normal, Res, State};
+handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf }) ->
+ {reply, {MsgBuf, continuing}, State #pstate { msg_buf = queue:new() },
+ infinity};
+handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf }) ->
+ Res = case queue:is_empty(MsgBuf) of
+ true -> empty;
+ false -> MsgBuf
+ end,
+ {stop, normal, Res, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.