summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 16:14:27 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 16:14:27 +0100
commit569579cadf4db05fd75f980f142356d5fc0d2774 (patch)
tree967cb816d9bf2fbc29ce027b82c964d611ce5468 /src
parent1031b829a0e48910c4048beb41d95f339c4ce880 (diff)
downloadrabbitmq-server-git-569579cadf4db05fd75f980f142356d5fc0d2774.tar.gz
tidying of the beloved msg_store
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl43
-rw-r--r--src/rabbit_queue_prefetcher.erl13
2 files changed, 29 insertions, 27 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6e28faa090..db2735513b 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,13 +33,13 @@
-behaviour(gen_server2).
--export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
- sync/2, stop/0]).
+-export([start_link/3, write/2, read/1, idle_read/2, contains/1, remove/1,
+ release/1, sync/2, stop/0]).
-export([sync/0]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, idle_read/1]).
+ terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
@@ -61,6 +61,8 @@
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/2 :: (msg_id(), msg()) -> 'ok').
-spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
+-spec(idle_read/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) ->
+ 'ok').
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
@@ -231,15 +233,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
-read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
-idle_read(MsgId) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, self()}).
-contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
-remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
-release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-stop() -> gen_server2:call(?SERVER, stop, infinity).
-sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
+idle_read(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, Fun}).
+contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
+remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
+release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
+stop() -> gen_server2:call(?SERVER, stop, infinity).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -342,6 +344,11 @@ handle_cast({write, MsgId, Msg},
noreply(State)
end;
+handle_cast({idle_read, MsgId, Fun}, State) ->
+ {Result, State1} = internal_read_message(MsgId, State),
+ rabbit_misc:with_exit_handler(fun () -> ok end, fun () -> Fun(Result) end),
+ noreply(State1);
+
handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
noreply(
compact(sets:to_list(
@@ -381,17 +388,7 @@ handle_cast({sync, MsgIds, K},
end;
handle_cast(sync, State) ->
- noreply(sync(State));
-
-handle_cast({idle_read, MsgId, From}, State) ->
- {Result, State1} = case internal_read_message(MsgId, State) of
- {not_found, _} = Res -> Res;
- {{ok, Msg}, State2} -> {Msg, State2}
- end,
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> rabbit_queue_prefetcher:publish(From, Result) end),
- noreply(State1).
+ noreply(sync(State)).
handle_info(timeout, State) ->
noreply(sync(State)).
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index 9d1b58bad2..e3228beacd 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -48,7 +48,8 @@
-record(pstate,
{ alphas,
betas,
- queue_mref
+ queue_mref,
+ idle_read_cb
}).
-record(alpha,
@@ -232,9 +233,13 @@ init([Betas, QPid]) when is_pid(QPid) ->
%% link isn't enough because the signal will not appear if the
%% queue exits normally. Thus have to use monitor.
MRef = erlang:monitor(process, QPid),
+ Self = self(),
State = #pstate { alphas = queue:new(),
betas = Betas,
- queue_mref = MRef
+ queue_mref = MRef,
+ idle_read_cb = fun ({ok, Msg}) -> publish(Self, Msg);
+ (not_found) -> publish(Self, not_found)
+ end
},
{ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN,
?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -292,7 +297,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prefetch(State = #pstate { betas = Betas }) ->
+prefetch(State = #pstate { betas = Betas, idle_read_cb = CB }) ->
{{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas),
- ok = rabbit_msg_store:idle_read(MsgId),
+ ok = rabbit_msg_store:idle_read(MsgId, CB),
State.