summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 17:01:13 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 17:01:13 +0100
commitd397c015d90be9f08054259b7b87ffb6184c77a8 (patch)
treec44da777a5ecc470a7cd402470627e98b6b2a607 /src
parentd4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9 (diff)
downloadrabbitmq-server-git-d397c015d90be9f08054259b7b87ffb6184c77a8.tar.gz
cosmetics
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl26
-rw-r--r--src/rabbit_queue_prefetcher.erl19
-rw-r--r--src/rabbit_variable_queue.erl9
3 files changed, 31 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index db2735513b..2b4bb1f2df 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/3, write/2, read/1, idle_read/2, contains/1, remove/1,
+-export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1,
release/1, sync/2, stop/0]).
-export([sync/0]). %% internal
@@ -61,7 +61,7 @@
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/2 :: (msg_id(), msg()) -> 'ok').
-spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
--spec(idle_read/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) ->
+-spec(peruse/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) ->
'ok').
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
@@ -233,15 +233,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
-read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
-idle_read(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, Fun}).
-contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
-remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
-release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-stop() -> gen_server2:call(?SERVER, stop, infinity).
-sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
+peruse(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {peruse, MsgId, Fun}).
+contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
+remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
+release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
+stop() -> gen_server2:call(?SERVER, stop, infinity).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -344,9 +344,9 @@ handle_cast({write, MsgId, Msg},
noreply(State)
end;
-handle_cast({idle_read, MsgId, Fun}, State) ->
+handle_cast({peruse, MsgId, Fun}, State) ->
{Result, State1} = internal_read_message(MsgId, State),
- rabbit_misc:with_exit_handler(fun () -> ok end, fun () -> Fun(Result) end),
+ Fun(Result),
noreply(State1);
handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index e3228beacd..fd407c9dd1 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -49,7 +49,7 @@
{ alphas,
betas,
queue_mref,
- idle_read_cb
+ peruse_cb
}).
-record(alpha,
@@ -234,12 +234,19 @@ init([Betas, QPid]) when is_pid(QPid) ->
%% queue exits normally. Thus have to use monitor.
MRef = erlang:monitor(process, QPid),
Self = self(),
+ CB = fun (Result) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () -> case Result of
+ {ok, Msg} -> publish(Self, Msg);
+ not_found -> publish(Self, not_found)
+ end
+ end)
+ end,
State = #pstate { alphas = queue:new(),
betas = Betas,
queue_mref = MRef,
- idle_read_cb = fun ({ok, Msg}) -> publish(Self, Msg);
- (not_found) -> publish(Self, not_found)
- end
+ peruse_cb = CB
},
{ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN,
?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -297,7 +304,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prefetch(State = #pstate { betas = Betas, idle_read_cb = CB }) ->
+prefetch(State = #pstate { betas = Betas, peruse_cb = CB }) ->
{{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas),
- ok = rabbit_msg_store:idle_read(MsgId, CB),
+ ok = rabbit_msg_store:peruse(MsgId, CB),
State.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5cf0893973..ae9ca375d9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -228,10 +228,11 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount,
- q1 = Q1, q3 = Q3, prefetcher = undefined
- }) ->
+maybe_start_prefetcher(State = #vqstate {
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ q1 = Q1, q3 = Q3, prefetcher = undefined
+ }) ->
%% prefetched content takes priority over q1
AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1),
PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),