diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 17:01:13 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 17:01:13 +0100 |
| commit | d397c015d90be9f08054259b7b87ffb6184c77a8 (patch) | |
| tree | c44da777a5ecc470a7cd402470627e98b6b2a607 /src | |
| parent | d4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9 (diff) | |
| download | rabbitmq-server-git-d397c015d90be9f08054259b7b87ffb6184c77a8.tar.gz | |
cosmetics
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
3 files changed, 31 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index db2735513b..2b4bb1f2df 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/1, idle_read/2, contains/1, remove/1, +-export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1, release/1, sync/2, stop/0]). -export([sync/0]). %% internal @@ -61,7 +61,7 @@ {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/2 :: (msg_id(), msg()) -> 'ok'). -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). --spec(idle_read/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) -> +-spec(peruse/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) -> 'ok'). -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). @@ -233,15 +233,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). -read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -idle_read(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, Fun}). -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -stop() -> gen_server2:call(?SERVER, stop, infinity). -sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). +read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). +peruse(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {peruse, MsgId, Fun}). +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). +stop() -> gen_server2:call(?SERVER, stop, infinity). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -344,9 +344,9 @@ handle_cast({write, MsgId, Msg}, noreply(State) end; -handle_cast({idle_read, MsgId, Fun}, State) -> +handle_cast({peruse, MsgId, Fun}, State) -> {Result, State1} = internal_read_message(MsgId, State), - rabbit_misc:with_exit_handler(fun () -> ok end, fun () -> Fun(Result) end), + Fun(Result), noreply(State1); handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index e3228beacd..fd407c9dd1 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -49,7 +49,7 @@ { alphas, betas, queue_mref, - idle_read_cb + peruse_cb }). -record(alpha, @@ -234,12 +234,19 @@ init([Betas, QPid]) when is_pid(QPid) -> %% queue exits normally. Thus have to use monitor. MRef = erlang:monitor(process, QPid), Self = self(), + CB = fun (Result) -> + rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> case Result of + {ok, Msg} -> publish(Self, Msg); + not_found -> publish(Self, not_found) + end + end) + end, State = #pstate { alphas = queue:new(), betas = Betas, queue_mref = MRef, - idle_read_cb = fun ({ok, Msg}) -> publish(Self, Msg); - (not_found) -> publish(Self, not_found) - end + peruse_cb = CB }, {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -297,7 +304,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -prefetch(State = #pstate { betas = Betas, idle_read_cb = CB }) -> +prefetch(State = #pstate { betas = Betas, peruse_cb = CB }) -> {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas), - ok = rabbit_msg_store:idle_read(MsgId, CB), + ok = rabbit_msg_store:peruse(MsgId, CB), State. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5cf0893973..ae9ca375d9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -228,10 +228,11 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). -maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount, - q1 = Q1, q3 = Q3, prefetcher = undefined - }) -> +maybe_start_prefetcher(State = #vqstate { + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + q1 = Q1, q3 = Q3, prefetcher = undefined + }) -> %% prefetched content takes priority over q1 AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1), PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), |
