diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 15:56:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 15:56:36 +0100 |
| commit | d8896bb2e4c1de2098d6fae051ccb93e6ec96926 (patch) | |
| tree | 0e827b5c81a8580c1bd899aa67b1538fca8f82ce | |
| parent | 063305f12676de3c94c9bf5323777892be517d19 (diff) | |
| download | rabbitmq-server-git-d8896bb2e4c1de2098d6fae051ccb93e6ec96926.tar.gz | |
QA-corrections to prefetcher.
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 67 |
2 files changed, 40 insertions, 35 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index f0df7777a6..251c2046dc 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -206,8 +206,8 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, %% use State, not State1 as we've not dec'd length fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of empty -> State #mqstate { prefetcher = undefined }; - {Fetched, Len, Status} -> - MsgBuf2 = dec_queue_length(MsgBuf, Len), + {Fetched, Status} -> + MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)), State #mqstate { msg_buf = queue:join(Fetched, MsgBuf2), prefetcher = case Status of @@ -363,8 +363,8 @@ set_storage_mode(disk, TxnMessages, State = _ -> case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of empty -> MsgBuf; - {Fetched, Len} -> - MsgBuf2 = dec_queue_length(MsgBuf, Len), + Fetched -> + MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)), queue:join(Fetched, MsgBuf2) end end, diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index ffa98d6967..eddb613cba 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -47,9 +47,7 @@ -record(pstate, { msg_buf, - buf_length, target_count, - fetched_count, queue, queue_mref }). @@ -176,6 +174,17 @@ %% redelivered bit set false really are guaranteed to have not been %% delivered already. +-ifdef(use_specs). + +-spec(start_link/2 :: (queue_name(), non_neg_integer()) -> + ({'ok', pid()} | 'ignore' | {'error', any()})). +-spec(publish/2 :: (pid(), message()) -> 'ok'). +-spec(drain/1 :: (pid()) -> ('empty' | {queue(), ('finished' | 'continuing')})). +-spec(drain_and_stop/1 :: (pid()) -> ('empty' | queue())). +-spec(stop/1 :: (pid()) -> 'ok'). + +-endif. + start_link(Queue, Count) -> gen_server2:start_link(?MODULE, [Queue, Count, self()], []). @@ -194,14 +203,12 @@ drain_and_stop(Prefetcher) -> stop(Prefetcher) -> gen_server2:call(Prefetcher, stop, infinity). -init([Q, Count, QPid]) -> +init([Q, Count, QPid]) when Count > 0 andalso is_pid(QPid) -> %% link isn't enough because the signal will not appear if the %% queue exits normally. Thus have to use monitor. MRef = erlang:monitor(process, QPid), State = #pstate { msg_buf = queue:new(), - buf_length = 0, target_count = Count, - fetched_count = 0, queue = Q, queue_mref = MRef }, @@ -211,39 +218,37 @@ init([Q, Count, QPid]) -> handle_call({publish, {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}}, - DiskQueue, - State = #pstate { fetched_count = Fetched, target_count = Target, - msg_buf = MsgBuf, buf_length = Length, queue = Q - }) -> + DiskQueue, State = #pstate { + target_count = Target, msg_buf = MsgBuf, queue = Q}) -> gen_server2:reply(DiskQueue, ok), - Timeout = if Fetched + 1 == Target -> hibernate; - true -> ok = rabbit_disk_queue:prefetch(Q), - infinity + Timeout = case Target of + 1 -> hibernate; + _ -> ok = rabbit_disk_queue:prefetch(Q), + infinity end, MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), - {noreply, State #pstate { fetched_count = Fetched + 1, - buf_length = Length + 1, - msg_buf = MsgBuf1 }, Timeout}; + {noreply, State #pstate { target_count = Target - 1, msg_buf = MsgBuf1 }, + Timeout}; handle_call(publish_empty, _From, State) -> %% Very odd. This could happen if the queue is deleted or purged %% and the mixed queue fails to shut us down. {reply, ok, State, hibernate}; -handle_call(drain, _From, State = #pstate { buf_length = 0 }) -> - {stop, normal, empty, State}; -handle_call(drain, _From, State = #pstate { fetched_count = Count, - target_count = Count, - msg_buf = MsgBuf, - buf_length = Length }) -> - {stop, normal, {MsgBuf, Length, finished}, State}; -handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf, - buf_length = Length }) -> - {reply, {MsgBuf, Length, continuing}, - State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity}; -handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> - {stop, normal, empty, State}; -handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, - buf_length = Length }) -> - {stop, normal, {MsgBuf, Length}, State}; +handle_call(drain, _From, State = #pstate { target_count = 0, + msg_buf = MsgBuf }) -> + Res = case queue:is_empty(MsgBuf) of + true -> empty; + false -> {MsgBuf, finished} + end, + {stop, normal, Res, State}; +handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf }) -> + {reply, {MsgBuf, continuing}, State #pstate { msg_buf = queue:new() }, + infinity}; +handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf }) -> + Res = case queue:is_empty(MsgBuf) of + true -> empty; + false -> MsgBuf + end, + {stop, normal, Res, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}. |
