summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 15:43:22 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 15:43:22 +0100
commit1031b829a0e48910c4048beb41d95f339c4ce880 (patch)
treeafe61c72221bab7d1d049dbf8ebe30654d13e75a /src
parenta9149a4a4ab9e99de7d2571b74647fb625d9a6bf (diff)
downloadrabbitmq-server-git-1031b829a0e48910c4048beb41d95f339c4ce880.tar.gz
Prefetcher has priority over q1.
However, we actually assume that the prefetcher does no work. The only other possibility is to assume that the prefetcher always completes, which can lead to q1 being pointlessly evicted to disk. Also, we stop the prefetcher as soon as have to reduce our memory usage, and at that point everything should work out. So, when starting the prefetcher, we don't adjust the ram_msg_count, but as we drain or stop the prefetcher, we include the prefetched alphas in the ram_msg_count and then evict q1 to disk as necessary. Whilst this means we will have more msgs in RAM then we claim, the fact that we stop the prefetcher as soon as we have to reduce our memory usage saves us from ruin.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_queue_prefetcher.erl4
-rw-r--r--src/rabbit_variable_queue.erl51
3 files changed, 43 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b0b75249c1..6e28faa090 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -384,7 +384,10 @@ handle_cast(sync, State) ->
noreply(sync(State));
handle_cast({idle_read, MsgId, From}, State) ->
- {Result, State1} = internal_read_message(MsgId, State),
+ {Result, State1} = case internal_read_message(MsgId, State) of
+ {not_found, _} = Res -> Res;
+ {{ok, Msg}, State2} -> {Msg, State2}
+ end,
rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () -> rabbit_queue_prefetcher:publish(From, Result) end),
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index cad4c695de..9d1b58bad2 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -199,7 +199,7 @@
-spec(start_link/1 :: (queue()) ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/2 :: (pid(), (message()| 'empty')) -> 'ok').
+-spec(publish/2 :: (pid(), (message()| 'not_found')) -> 'ok').
-spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})).
-spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})).
-spec(stop/1 :: (pid()) -> 'ok').
@@ -214,7 +214,7 @@ start_link(Betas) ->
publish(Prefetcher, Obj = #basic_message {}) ->
gen_server2:call(Prefetcher, {publish, Obj}, infinity);
-publish(Prefetcher, empty) ->
+publish(Prefetcher, not_found) ->
gen_server2:call(Prefetcher, publish_empty, infinity).
drain(Prefetcher) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c197f6b028..5a76c23e7a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -171,21 +171,14 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
out_counter = 0 }.
fetch(State =
- #vqstate { q3 = Q3, q4 = Q4,
+ #vqstate { q4 = Q4,
out_counter = OutCount, prefetcher = Prefetcher,
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
{empty, _Q4} when Prefetcher == undefined ->
fetch_from_q3_or_gamma(State);
{empty, _Q4} ->
- {Q3a, Q4a, Prefetcher1} =
- case rabbit_queue_prefetcher:drain(Prefetcher) of
- {empty, Betas} -> {queue:join(Betas, Q3), Q4, undefined};
- {finished, Alphas} -> {Q3, Alphas, undefined};
- {continuing, Alphas} -> {Q3, Alphas, Prefetcher}
- end,
- fetch(State #vqstate { q3 = Q3a, q4 = Q4a,
- prefetcher = Prefetcher1 });
+ fetch(drain_prefetcher(drain, State));
{{value,
#alpha { msg = Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -237,17 +230,17 @@ is_empty(State) ->
maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount,
- q3 = Q3, prefetcher = undefined
+ q1 = Q1, q3 = Q3, prefetcher = undefined
}) ->
- PrefetchCount = lists:min([queue:len(Q3), TargetRamMsgCount - RamMsgCount]),
+ %% prefetched content takes priority over q1
+ AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1),
+ PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
if PrefetchCount =< 0 -> State;
true ->
{PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
{ok, Prefetcher} =
rabbit_queue_prefetcher:start_link(PrefetchQueue),
- RamMsgCount1 = RamMsgCount + PrefetchCount,
maybe_load_next_segment(State #vqstate { q3 = Q3a,
- ram_msg_count = RamMsgCount1,
prefetcher = Prefetcher })
end;
maybe_start_prefetcher(State) ->
@@ -381,13 +374,43 @@ read_index_segment(SeqId, IndexState) ->
{List, IndexState1} -> {List, IndexState1, SeqId1}
end.
+drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) ->
+ State;
+drain_prefetcher(DrainOrStop,
+ State = #vqstate { prefetcher = Prefetcher, q3 = Q3, q4 = Q4,
+ ram_msg_count = RamMsgCount }) ->
+ Fun = case DrainOrStop of
+ drain -> fun rabbit_queue_prefetcher:drain/1;
+ stop -> fun rabbit_queue_prefetcher:drain_and_stop/1
+ end,
+ {Q3a, Q4a, Prefetcher1, RamMsgCountAdj} =
+ case Fun(Prefetcher) of
+ {empty, Betas} -> %% drain or drain_and_stop
+ {queue:join(Betas, Q3), Q4, undefined, 0};
+ {finished, Alphas} -> %% just drain
+ {Q3, Alphas, undefined, queue:len(Alphas)};
+ {continuing, Alphas} -> %% just drain
+ {Q3, Alphas, Prefetcher, queue:len(Alphas)};
+ {Alphas, Betas} -> %% just drain_and_stop
+ {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined,
+ queue:len(Alphas)}
+ end,
+ maybe_push_q1_to_betas(
+ State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a,
+ ram_msg_count = RamMsgCount + RamMsgCountAdj }).
+
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount >= RamMsgCount ->
State;
reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
+ %% strictly, it's not necessary to stop the prefetcher this early,
+ %% but because of its potential effect on q1 and the
+ %% ram_msg_count, it's just much simpler to stop it sooner and
+ %% relaunch when we next hibernate.
+ State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(
+ drain_prefetcher(stop, State))),
case TargetRamMsgCount of
0 -> push_betas_to_gammas(State1);
_ -> State1