diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 16:14:27 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 16:14:27 +0100 |
| commit | 569579cadf4db05fd75f980f142356d5fc0d2774 (patch) | |
| tree | 967cb816d9bf2fbc29ce027b82c964d611ce5468 /src | |
| parent | 1031b829a0e48910c4048beb41d95f339c4ce880 (diff) | |
| download | rabbitmq-server-git-569579cadf4db05fd75f980f142356d5fc0d2774.tar.gz | |
tidying of the beloved msg_store
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 13 |
2 files changed, 29 insertions, 27 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 6e28faa090..db2735513b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,13 +33,13 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, - sync/2, stop/0]). +-export([start_link/3, write/2, read/1, idle_read/2, contains/1, remove/1, + release/1, sync/2, stop/0]). -export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, idle_read/1]). + terminate/2, code_change/3]). -define(SERVER, ?MODULE). @@ -61,6 +61,8 @@ {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/2 :: (msg_id(), msg()) -> 'ok'). -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +-spec(idle_read/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) -> + 'ok'). -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). @@ -231,15 +233,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). -read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -idle_read(MsgId) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, self()}). -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -stop() -> gen_server2:call(?SERVER, stop, infinity). -sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). +read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). +idle_read(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, Fun}). +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). +stop() -> gen_server2:call(?SERVER, stop, infinity). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -342,6 +344,11 @@ handle_cast({write, MsgId, Msg}, noreply(State) end; +handle_cast({idle_read, MsgId, Fun}, State) -> + {Result, State1} = internal_read_message(MsgId, State), + rabbit_misc:with_exit_handler(fun () -> ok end, fun () -> Fun(Result) end), + noreply(State1); + handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> noreply( compact(sets:to_list( @@ -381,17 +388,7 @@ handle_cast({sync, MsgIds, K}, end; handle_cast(sync, State) -> - noreply(sync(State)); - -handle_cast({idle_read, MsgId, From}, State) -> - {Result, State1} = case internal_read_message(MsgId, State) of - {not_found, _} = Res -> Res; - {{ok, Msg}, State2} -> {Msg, State2} - end, - rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> rabbit_queue_prefetcher:publish(From, Result) end), - noreply(State1). + noreply(sync(State)). handle_info(timeout, State) -> noreply(sync(State)). diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index 9d1b58bad2..e3228beacd 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -48,7 +48,8 @@ -record(pstate, { alphas, betas, - queue_mref + queue_mref, + idle_read_cb }). -record(alpha, @@ -232,9 +233,13 @@ init([Betas, QPid]) when is_pid(QPid) -> %% link isn't enough because the signal will not appear if the %% queue exits normally. Thus have to use monitor. MRef = erlang:monitor(process, QPid), + Self = self(), State = #pstate { alphas = queue:new(), betas = Betas, - queue_mref = MRef + queue_mref = MRef, + idle_read_cb = fun ({ok, Msg}) -> publish(Self, Msg); + (not_found) -> publish(Self, not_found) + end }, {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -292,7 +297,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -prefetch(State = #pstate { betas = Betas }) -> +prefetch(State = #pstate { betas = Betas, idle_read_cb = CB }) -> {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas), - ok = rabbit_msg_store:idle_read(MsgId), + ok = rabbit_msg_store:idle_read(MsgId, CB), State. |
