diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 13:58:31 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 13:58:31 +0100 |
| commit | 2b962b08311054ee40b24093879d6a653b3ae035 (patch) | |
| tree | 4de635bb116049693ee8dc8d2cb242674798a5e0 /src | |
| parent | 5a634b7fd8c256b619756fc11f3f27b7d7e2b434 (diff) | |
| download | rabbitmq-server-git-2b962b08311054ee40b24093879d6a653b3ae035.tar.gz | |
altered the prefetcher to drive the msg_store directly, and wired into vq
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 110 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
3 files changed, 152 insertions, 104 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 707afc3807..70f8627ebd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -39,7 +39,7 @@ -export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, idle_read/1]). -define(SERVER, ?MODULE). @@ -231,14 +231,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). -read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -stop() -> gen_server2:call(?SERVER, stop, infinity). -sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). +read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). +idle_read(MsgId) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, self()}). +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). +stop() -> gen_server2:call(?SERVER, stop, infinity). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -291,44 +292,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, State1 #msstate { current_file_handle = FileHdl }}. handle_call({read, MsgId}, _From, State) -> - case index_lookup(MsgId, State) of - not_found -> reply(not_found, State); - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - {{ok, {MsgId, Msg}}, State1} = - with_read_handle_at( - File, Offset, - fun(Hdl) -> - Res = case rabbit_msg_file:read( - Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - {ok, Rest} -> - throw({error, - {misread, - [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}]}}) - end, - {Offset + TotalSize, Res} - end, State), - ok = if RefCount > 1 -> - insert_into_cache(MsgId, Msg, State1); - true -> ok - %% it's not in the cache and we - %% only have one reference to the - %% message. So don't bother - %% putting it in the cache. - end, - reply({ok, Msg}, State1); - {Msg, _RefCount} -> - reply({ok, Msg}, State) - end - end; + {Result, State1} = internal_read_message(MsgId, State), + reply(Result, State1); handle_call({contains, MsgId}, _From, State) -> reply(case index_lookup(MsgId, State) of @@ -416,7 +381,14 @@ handle_cast({sync, MsgIds, K}, end; handle_cast(sync, State) -> - noreply(sync(State)). + noreply(sync(State)); + +handle_cast({idle_read, MsgId, From}, State) -> + {Result, State1} = internal_read_message(MsgId, State), + rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> rabbit_queue_prefetcher:publish(From, Result) end), + noreply(State1). handle_info(timeout, State) -> noreply(sync(State)). @@ -549,6 +521,46 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> no_compact end. +internal_read_message(MsgId, State) -> + case index_lookup(MsgId, State) of + not_found -> {not_found, State}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + {{ok, {MsgId, Msg}}, State1} = + with_read_handle_at( + File, Offset, + fun(Hdl) -> + Res = case rabbit_msg_file:read( + Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + {ok, Rest} -> + throw({error, + {misread, + [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}]}}) + end, + {Offset + TotalSize, Res} + end, State), + ok = if RefCount > 1 -> + insert_into_cache(MsgId, Msg, State1); + true -> ok + %% it's not in the cache and we + %% only have one reference to the + %% message. So don't bother + %% putting it in the cache. + end, + {{ok, Msg}, State1}; + {Msg, _RefCount} -> + {{ok, Msg}, State} + end + end. + %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index 3b1c219d5a..cad4c695de 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/2]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,12 +46,27 @@ -define(DESIRED_HIBERNATE, 10000). -record(pstate, - { msg_buf, - target_count, - queue, + { alphas, + betas, queue_mref }). +-record(alpha, + { msg, + seq_id, + is_delivered, + msg_on_disk, + index_on_disk + }). + +-record(beta, + { msg_id, + seq_id, + is_persistent, + is_delivered, + index_on_disk + }). + %%---------------------------------------------------------------------------- %% Novel %%---------------------------------------------------------------------------- @@ -182,22 +197,22 @@ -ifdef(use_specs). --spec(start_link/2 :: (queue_name(), non_neg_integer()) -> +-spec(start_link/1 :: (queue()) -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(publish/2 :: (pid(), (message()| 'empty')) -> 'ok'). --spec(drain/1 :: (pid()) -> ('empty' | {queue(), ('finished' | 'continuing')})). --spec(drain_and_stop/1 :: (pid()) -> ('empty' | queue())). +-spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})). +-spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})). -spec(stop/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Queue, Count) -> - gen_server2:start_link(?MODULE, [Queue, Count, self()], []). +start_link(Betas) -> + false = queue:is_empty(Betas), %% ASSERTION + gen_server2:start_link(?MODULE, [Betas, self()], []). -publish(Prefetcher, - Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) -> +publish(Prefetcher, Obj = #basic_message {}) -> gen_server2:call(Prefetcher, {publish, Obj}, infinity); publish(Prefetcher, empty) -> gen_server2:call(Prefetcher, publish_empty, infinity). @@ -213,50 +228,50 @@ stop(Prefetcher) -> %%---------------------------------------------------------------------------- -init([Q, Count, QPid]) when Count > 0 andalso is_pid(QPid) -> +init([Betas, QPid]) when is_pid(QPid) -> %% link isn't enough because the signal will not appear if the %% queue exits normally. Thus have to use monitor. MRef = erlang:monitor(process, QPid), - State = #pstate { msg_buf = queue:new(), - target_count = Count, - queue = Q, + State = #pstate { alphas = queue:new(), + betas = Betas, queue_mref = MRef - }, - ok = rabbit_disk_queue:prefetch(Q), - {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN, - ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + }, + {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN, + ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({publish, - {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}}, - DiskQueue, State = #pstate { - target_count = Target, msg_buf = MsgBuf, queue = Q}) -> +handle_call({publish, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }}, + DiskQueue, State = #pstate { alphas = Alphas, betas = Betas }) -> gen_server2:reply(DiskQueue, ok), - Timeout = case Target of - 1 -> hibernate; - _ -> ok = rabbit_disk_queue:prefetch(Q), - infinity - end, - MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), - {noreply, State #pstate { target_count = Target - 1, msg_buf = MsgBuf1 }, - Timeout}; + {{value, #beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk}}, Betas1} = queue:out(Betas), + Alphas1 = queue:in(#alpha { msg = Msg, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = true, + index_on_disk = IndexOnDisk }, Alphas), + State1 = State #pstate { alphas = Alphas1, betas = Betas1 }, + {Timeout, State2} = case queue:is_empty(Betas1) of + true -> {hibernate, State1}; + false -> {infinity, prefetch(State1)} + end, + {noreply, State2, Timeout}; handle_call(publish_empty, _From, State) -> %% Very odd. This could happen if the queue is deleted or purged %% and the mixed queue fails to shut us down. {reply, ok, State, hibernate}; -handle_call(drain, _From, State = #pstate { target_count = 0, - msg_buf = MsgBuf }) -> - Res = case queue:is_empty(MsgBuf) of - true -> empty; - false -> {MsgBuf, finished} - end, - {stop, normal, Res, State}; -handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf }) -> - {reply, {MsgBuf, continuing}, State #pstate { msg_buf = queue:new() }, - infinity}; -handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf }) -> - Res = case queue:is_empty(MsgBuf) of - true -> empty; - false -> MsgBuf +handle_call(drain, _From, State = #pstate { alphas = Alphas, betas = Betas }) -> + case {queue:is_empty(Betas), queue:is_empty(Alphas)} of + {true , _ } -> {stop, normal, {finished, Alphas}, State}; + {false, true } -> {stop, normal, {empty, Betas}, State}; + {false, false} -> {reply, {continuing, Alphas}, + State #pstate { alphas = queue:new() }} + end; +handle_call(drain_and_stop, _From, State = #pstate { alphas = Alphas, + betas = Betas }) -> + Res = case queue:is_empty(Alphas) of + true -> {empty, Betas}; + false -> {Alphas, Betas} end, {stop, normal, Res, State}; handle_call(stop, _From, State) -> @@ -276,3 +291,8 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +prefetch(State = #pstate { betas = Betas }) -> + {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas), + ok = rabbit_msg_store:idle_read(MsgId), + State. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cb2e9f242b..4dbcefc89f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, len/1, is_empty/1]). + fetch/1, len/1, is_empty/1, maybe_start_prefetcher/1]). %%---------------------------------------------------------------------------- @@ -171,19 +171,21 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, out_counter = 0 }. fetch(State = - #vqstate { q4 = Q4, + #vqstate { q3 = Q3, q4 = Q4, out_counter = OutCount, prefetcher = Prefetcher, index_state = IndexState, len = Len }) -> case queue:out(Q4) of {empty, _Q4} when Prefetcher == undefined -> fetch_from_q3_or_gamma(State); {empty, _Q4} -> - Q4a = - case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of - empty -> Q4; - Q4b -> Q4b + {Q3a, Q4a, Prefetcher1} = + case rabbit_queue_prefetcher:drain(Prefetcher) of + {empty, Betas} -> {queue:join(Betas, Q3), Q4, undefined}; + {finished, Alphas} -> {Q3, Alphas, undefined}; + {continuing, Alphas} -> {Q3, Alphas, Prefetcher} end, - fetch(State #vqstate { q4 = Q4a, prefetcher = undefined }); + fetch(State #vqstate { q3 = Q3a, q4 = Q4a, + prefetcher = Prefetcher1 }); {{value, #alpha { msg = Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -233,6 +235,24 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). +maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + q3 = Q3, prefetcher = undefined + }) -> + PrefetchCount = erlang:min(queue:len(Q3), TargetRamMsgCount - RamMsgCount), + if PrefetchCount =< 0 -> State; + true -> + {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), + {ok, Prefetcher} = + rabbit_queue_prefetcher:start_link(PrefetchQueue), + RamMsgCount1 = RamMsgCount + PrefetchCount, + maybe_load_next_segment(State #vqstate { q3 = Q3a, + ram_msg_count = RamMsgCount1, + prefetcher = Prefetcher }) + end; +maybe_start_prefetcher(State) -> + State. + %%---------------------------------------------------------------------------- publish(msg, Msg = #basic_message { guid = MsgId, @@ -361,10 +381,6 @@ read_index_segment(SeqId, IndexState) -> {List, IndexState1} -> {List, IndexState1, SeqId1} end. -maybe_start_prefetcher(State) -> - %% TODO - State. - reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount >= RamMsgCount -> |
