summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_queue_prefetcher.erl4
-rw-r--r--src/rabbit_variable_queue.erl51
3 files changed, 43 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b0b75249c1..6e28faa090 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -384,7 +384,10 @@ handle_cast(sync, State) ->
noreply(sync(State));
handle_cast({idle_read, MsgId, From}, State) ->
- {Result, State1} = internal_read_message(MsgId, State),
+ {Result, State1} = case internal_read_message(MsgId, State) of
+ {not_found, _} = Res -> Res;
+ {{ok, Msg}, State2} -> {Msg, State2}
+ end,
rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () -> rabbit_queue_prefetcher:publish(From, Result) end),
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index cad4c695de..9d1b58bad2 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -199,7 +199,7 @@
-spec(start_link/1 :: (queue()) ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/2 :: (pid(), (message()| 'empty')) -> 'ok').
+-spec(publish/2 :: (pid(), (message()| 'not_found')) -> 'ok').
-spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})).
-spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})).
-spec(stop/1 :: (pid()) -> 'ok').
@@ -214,7 +214,7 @@ start_link(Betas) ->
publish(Prefetcher, Obj = #basic_message {}) ->
gen_server2:call(Prefetcher, {publish, Obj}, infinity);
-publish(Prefetcher, empty) ->
+publish(Prefetcher, not_found) ->
gen_server2:call(Prefetcher, publish_empty, infinity).
drain(Prefetcher) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c197f6b028..5a76c23e7a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -171,21 +171,14 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
out_counter = 0 }.
fetch(State =
- #vqstate { q3 = Q3, q4 = Q4,
+ #vqstate { q4 = Q4,
out_counter = OutCount, prefetcher = Prefetcher,
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
{empty, _Q4} when Prefetcher == undefined ->
fetch_from_q3_or_gamma(State);
{empty, _Q4} ->
- {Q3a, Q4a, Prefetcher1} =
- case rabbit_queue_prefetcher:drain(Prefetcher) of
- {empty, Betas} -> {queue:join(Betas, Q3), Q4, undefined};
- {finished, Alphas} -> {Q3, Alphas, undefined};
- {continuing, Alphas} -> {Q3, Alphas, Prefetcher}
- end,
- fetch(State #vqstate { q3 = Q3a, q4 = Q4a,
- prefetcher = Prefetcher1 });
+ fetch(drain_prefetcher(drain, State));
{{value,
#alpha { msg = Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -237,17 +230,17 @@ is_empty(State) ->
maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount,
- q3 = Q3, prefetcher = undefined
+ q1 = Q1, q3 = Q3, prefetcher = undefined
}) ->
- PrefetchCount = lists:min([queue:len(Q3), TargetRamMsgCount - RamMsgCount]),
+ %% prefetched content takes priority over q1
+ AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1),
+ PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
if PrefetchCount =< 0 -> State;
true ->
{PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
{ok, Prefetcher} =
rabbit_queue_prefetcher:start_link(PrefetchQueue),
- RamMsgCount1 = RamMsgCount + PrefetchCount,
maybe_load_next_segment(State #vqstate { q3 = Q3a,
- ram_msg_count = RamMsgCount1,
prefetcher = Prefetcher })
end;
maybe_start_prefetcher(State) ->
@@ -381,13 +374,43 @@ read_index_segment(SeqId, IndexState) ->
{List, IndexState1} -> {List, IndexState1, SeqId1}
end.
+drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) ->
+ State;
+drain_prefetcher(DrainOrStop,
+ State = #vqstate { prefetcher = Prefetcher, q3 = Q3, q4 = Q4,
+ ram_msg_count = RamMsgCount }) ->
+ Fun = case DrainOrStop of
+ drain -> fun rabbit_queue_prefetcher:drain/1;
+ stop -> fun rabbit_queue_prefetcher:drain_and_stop/1
+ end,
+ {Q3a, Q4a, Prefetcher1, RamMsgCountAdj} =
+ case Fun(Prefetcher) of
+ {empty, Betas} -> %% drain or drain_and_stop
+ {queue:join(Betas, Q3), Q4, undefined, 0};
+ {finished, Alphas} -> %% just drain
+ {Q3, Alphas, undefined, queue:len(Alphas)};
+ {continuing, Alphas} -> %% just drain
+ {Q3, Alphas, Prefetcher, queue:len(Alphas)};
+ {Alphas, Betas} -> %% just drain_and_stop
+ {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined,
+ queue:len(Alphas)}
+ end,
+ maybe_push_q1_to_betas(
+ State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a,
+ ram_msg_count = RamMsgCount + RamMsgCountAdj }).
+
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount >= RamMsgCount ->
State;
reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
+ %% strictly, it's not necessary to stop the prefetcher this early,
+ %% but because of its potential effect on q1 and the
+ %% ram_msg_count, it's just much simpler to stop it sooner and
+ %% relaunch when we next hibernate.
+ State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(
+ drain_prefetcher(stop, State))),
case TargetRamMsgCount of
0 -> push_betas_to_gammas(State1);
_ -> State1