summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 13:58:31 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 13:58:31 +0100
commit2b962b08311054ee40b24093879d6a653b3ae035 (patch)
tree4de635bb116049693ee8dc8d2cb242674798a5e0 /src
parent5a634b7fd8c256b619756fc11f3f27b7d7e2b434 (diff)
downloadrabbitmq-server-git-2b962b08311054ee40b24093879d6a653b3ae035.tar.gz
altered the prefetcher to drive the msg_store directly, and wired into vq
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl108
-rw-r--r--src/rabbit_queue_prefetcher.erl110
-rw-r--r--src/rabbit_variable_queue.erl38
3 files changed, 152 insertions, 104 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 707afc3807..70f8627ebd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -39,7 +39,7 @@
-export([sync/0]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, idle_read/1]).
-define(SERVER, ?MODULE).
@@ -231,14 +231,15 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
-read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
-contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
-remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
-release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-stop() -> gen_server2:call(?SERVER, stop, infinity).
-sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
+idle_read(MsgId) -> gen_server2:pcast(?SERVER, -1, {idle_read, MsgId, self()}).
+contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
+remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
+release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
+stop() -> gen_server2:call(?SERVER, stop, infinity).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -291,44 +292,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, State1 #msstate { current_file_handle = FileHdl }}.
handle_call({read, MsgId}, _From, State) ->
- case index_lookup(MsgId, State) of
- not_found -> reply(not_found, State);
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- {{ok, {MsgId, Msg}}, State1} =
- with_read_handle_at(
- File, Offset,
- fun(Hdl) ->
- Res = case rabbit_msg_file:read(
- Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- {ok, Rest} ->
- throw({error,
- {misread,
- [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest}]}})
- end,
- {Offset + TotalSize, Res}
- end, State),
- ok = if RefCount > 1 ->
- insert_into_cache(MsgId, Msg, State1);
- true -> ok
- %% it's not in the cache and we
- %% only have one reference to the
- %% message. So don't bother
- %% putting it in the cache.
- end,
- reply({ok, Msg}, State1);
- {Msg, _RefCount} ->
- reply({ok, Msg}, State)
- end
- end;
+ {Result, State1} = internal_read_message(MsgId, State),
+ reply(Result, State1);
handle_call({contains, MsgId}, _From, State) ->
reply(case index_lookup(MsgId, State) of
@@ -416,7 +381,14 @@ handle_cast({sync, MsgIds, K},
end;
handle_cast(sync, State) ->
- noreply(sync(State)).
+ noreply(sync(State));
+
+handle_cast({idle_read, MsgId, From}, State) ->
+ {Result, State1} = internal_read_message(MsgId, State),
+ rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () -> rabbit_queue_prefetcher:publish(From, Result) end),
+ noreply(State1).
handle_info(timeout, State) ->
noreply(sync(State)).
@@ -549,6 +521,46 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
no_compact
end.
+internal_read_message(MsgId, State) ->
+ case index_lookup(MsgId, State) of
+ not_found -> {not_found, State};
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ {{ok, {MsgId, Msg}}, State1} =
+ with_read_handle_at(
+ File, Offset,
+ fun(Hdl) ->
+ Res = case rabbit_msg_file:read(
+ Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ {ok, Rest} ->
+ throw({error,
+ {misread,
+ [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest}]}})
+ end,
+ {Offset + TotalSize, Res}
+ end, State),
+ ok = if RefCount > 1 ->
+ insert_into_cache(MsgId, Msg, State1);
+ true -> ok
+ %% it's not in the cache and we
+ %% only have one reference to the
+ %% message. So don't bother
+ %% putting it in the cache.
+ end,
+ {{ok, Msg}, State1};
+ {Msg, _RefCount} ->
+ {{ok, Msg}, State}
+ end
+ end.
+
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index 3b1c219d5a..cad4c695de 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/2]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -46,12 +46,27 @@
-define(DESIRED_HIBERNATE, 10000).
-record(pstate,
- { msg_buf,
- target_count,
- queue,
+ { alphas,
+ betas,
queue_mref
}).
+-record(alpha,
+ { msg,
+ seq_id,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk
+ }).
+
+-record(beta,
+ { msg_id,
+ seq_id,
+ is_persistent,
+ is_delivered,
+ index_on_disk
+ }).
+
%%----------------------------------------------------------------------------
%% Novel
%%----------------------------------------------------------------------------
@@ -182,22 +197,22 @@
-ifdef(use_specs).
--spec(start_link/2 :: (queue_name(), non_neg_integer()) ->
+-spec(start_link/1 :: (queue()) ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(publish/2 :: (pid(), (message()| 'empty')) -> 'ok').
--spec(drain/1 :: (pid()) -> ('empty' | {queue(), ('finished' | 'continuing')})).
--spec(drain_and_stop/1 :: (pid()) -> ('empty' | queue())).
+-spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})).
+-spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})).
-spec(stop/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Queue, Count) ->
- gen_server2:start_link(?MODULE, [Queue, Count, self()], []).
+start_link(Betas) ->
+ false = queue:is_empty(Betas), %% ASSERTION
+ gen_server2:start_link(?MODULE, [Betas, self()], []).
-publish(Prefetcher,
- Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) ->
+publish(Prefetcher, Obj = #basic_message {}) ->
gen_server2:call(Prefetcher, {publish, Obj}, infinity);
publish(Prefetcher, empty) ->
gen_server2:call(Prefetcher, publish_empty, infinity).
@@ -213,50 +228,50 @@ stop(Prefetcher) ->
%%----------------------------------------------------------------------------
-init([Q, Count, QPid]) when Count > 0 andalso is_pid(QPid) ->
+init([Betas, QPid]) when is_pid(QPid) ->
%% link isn't enough because the signal will not appear if the
%% queue exits normally. Thus have to use monitor.
MRef = erlang:monitor(process, QPid),
- State = #pstate { msg_buf = queue:new(),
- target_count = Count,
- queue = Q,
+ State = #pstate { alphas = queue:new(),
+ betas = Betas,
queue_mref = MRef
- },
- ok = rabbit_disk_queue:prefetch(Q),
- {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN,
- ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ },
+ {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({publish,
- {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}},
- DiskQueue, State = #pstate {
- target_count = Target, msg_buf = MsgBuf, queue = Q}) ->
+handle_call({publish, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent }},
+ DiskQueue, State = #pstate { alphas = Alphas, betas = Betas }) ->
gen_server2:reply(DiskQueue, ok),
- Timeout = case Target of
- 1 -> hibernate;
- _ -> ok = rabbit_disk_queue:prefetch(Q),
- infinity
- end,
- MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
- {noreply, State #pstate { target_count = Target - 1, msg_buf = MsgBuf1 },
- Timeout};
+ {{value, #beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk}}, Betas1} = queue:out(Betas),
+ Alphas1 = queue:in(#alpha { msg = Msg, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = true,
+ index_on_disk = IndexOnDisk }, Alphas),
+ State1 = State #pstate { alphas = Alphas1, betas = Betas1 },
+ {Timeout, State2} = case queue:is_empty(Betas1) of
+ true -> {hibernate, State1};
+ false -> {infinity, prefetch(State1)}
+ end,
+ {noreply, State2, Timeout};
handle_call(publish_empty, _From, State) ->
%% Very odd. This could happen if the queue is deleted or purged
%% and the mixed queue fails to shut us down.
{reply, ok, State, hibernate};
-handle_call(drain, _From, State = #pstate { target_count = 0,
- msg_buf = MsgBuf }) ->
- Res = case queue:is_empty(MsgBuf) of
- true -> empty;
- false -> {MsgBuf, finished}
- end,
- {stop, normal, Res, State};
-handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf }) ->
- {reply, {MsgBuf, continuing}, State #pstate { msg_buf = queue:new() },
- infinity};
-handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf }) ->
- Res = case queue:is_empty(MsgBuf) of
- true -> empty;
- false -> MsgBuf
+handle_call(drain, _From, State = #pstate { alphas = Alphas, betas = Betas }) ->
+ case {queue:is_empty(Betas), queue:is_empty(Alphas)} of
+ {true , _ } -> {stop, normal, {finished, Alphas}, State};
+ {false, true } -> {stop, normal, {empty, Betas}, State};
+ {false, false} -> {reply, {continuing, Alphas},
+ State #pstate { alphas = queue:new() }}
+ end;
+handle_call(drain_and_stop, _From, State = #pstate { alphas = Alphas,
+ betas = Betas }) ->
+ Res = case queue:is_empty(Alphas) of
+ true -> {empty, Betas};
+ false -> {Alphas, Betas}
end,
{stop, normal, Res, State};
handle_call(stop, _From, State) ->
@@ -276,3 +291,8 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+prefetch(State = #pstate { betas = Betas }) ->
+ {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas),
+ ok = rabbit_msg_store:idle_read(MsgId),
+ State.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cb2e9f242b..4dbcefc89f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_variable_queue).
-export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- fetch/1, len/1, is_empty/1]).
+ fetch/1, len/1, is_empty/1, maybe_start_prefetcher/1]).
%%----------------------------------------------------------------------------
@@ -171,19 +171,21 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
out_counter = 0 }.
fetch(State =
- #vqstate { q4 = Q4,
+ #vqstate { q3 = Q3, q4 = Q4,
out_counter = OutCount, prefetcher = Prefetcher,
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
{empty, _Q4} when Prefetcher == undefined ->
fetch_from_q3_or_gamma(State);
{empty, _Q4} ->
- Q4a =
- case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
- empty -> Q4;
- Q4b -> Q4b
+ {Q3a, Q4a, Prefetcher1} =
+ case rabbit_queue_prefetcher:drain(Prefetcher) of
+ {empty, Betas} -> {queue:join(Betas, Q3), Q4, undefined};
+ {finished, Alphas} -> {Q3, Alphas, undefined};
+ {continuing, Alphas} -> {Q3, Alphas, Prefetcher}
end,
- fetch(State #vqstate { q4 = Q4a, prefetcher = undefined });
+ fetch(State #vqstate { q3 = Q3a, q4 = Q4a,
+ prefetcher = Prefetcher1 });
{{value,
#alpha { msg = Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -233,6 +235,24 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
+maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ q3 = Q3, prefetcher = undefined
+ }) ->
+ PrefetchCount = erlang:min(queue:len(Q3), TargetRamMsgCount - RamMsgCount),
+ if PrefetchCount =< 0 -> State;
+ true ->
+ {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
+ {ok, Prefetcher} =
+ rabbit_queue_prefetcher:start_link(PrefetchQueue),
+ RamMsgCount1 = RamMsgCount + PrefetchCount,
+ maybe_load_next_segment(State #vqstate { q3 = Q3a,
+ ram_msg_count = RamMsgCount1,
+ prefetcher = Prefetcher })
+ end;
+maybe_start_prefetcher(State) ->
+ State.
+
%%----------------------------------------------------------------------------
publish(msg, Msg = #basic_message { guid = MsgId,
@@ -361,10 +381,6 @@ read_index_segment(SeqId, IndexState) ->
{List, IndexState1} -> {List, IndexState1, SeqId1}
end.
-maybe_start_prefetcher(State) ->
- %% TODO
- State.
-
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount >= RamMsgCount ->