diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
3 files changed, 43 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b0b75249c1..6e28faa090 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -384,7 +384,10 @@ handle_cast(sync, State) -> noreply(sync(State)); handle_cast({idle_read, MsgId, From}, State) -> - {Result, State1} = internal_read_message(MsgId, State), + {Result, State1} = case internal_read_message(MsgId, State) of + {not_found, _} = Res -> Res; + {{ok, Msg}, State2} -> {Msg, State2} + end, rabbit_misc:with_exit_handler( fun () -> ok end, fun () -> rabbit_queue_prefetcher:publish(From, Result) end), diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index cad4c695de..9d1b58bad2 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -199,7 +199,7 @@ -spec(start_link/1 :: (queue()) -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(publish/2 :: (pid(), (message()| 'empty')) -> 'ok'). +-spec(publish/2 :: (pid(), (message()| 'not_found')) -> 'ok'). -spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})). -spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})). -spec(stop/1 :: (pid()) -> 'ok'). @@ -214,7 +214,7 @@ start_link(Betas) -> publish(Prefetcher, Obj = #basic_message {}) -> gen_server2:call(Prefetcher, {publish, Obj}, infinity); -publish(Prefetcher, empty) -> +publish(Prefetcher, not_found) -> gen_server2:call(Prefetcher, publish_empty, infinity). drain(Prefetcher) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c197f6b028..5a76c23e7a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -171,21 +171,14 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, out_counter = 0 }. fetch(State = - #vqstate { q3 = Q3, q4 = Q4, + #vqstate { q4 = Q4, out_counter = OutCount, prefetcher = Prefetcher, index_state = IndexState, len = Len }) -> case queue:out(Q4) of {empty, _Q4} when Prefetcher == undefined -> fetch_from_q3_or_gamma(State); {empty, _Q4} -> - {Q3a, Q4a, Prefetcher1} = - case rabbit_queue_prefetcher:drain(Prefetcher) of - {empty, Betas} -> {queue:join(Betas, Q3), Q4, undefined}; - {finished, Alphas} -> {Q3, Alphas, undefined}; - {continuing, Alphas} -> {Q3, Alphas, Prefetcher} - end, - fetch(State #vqstate { q3 = Q3a, q4 = Q4a, - prefetcher = Prefetcher1 }); + fetch(drain_prefetcher(drain, State)); {{value, #alpha { msg = Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -237,17 +230,17 @@ is_empty(State) -> maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount, - q3 = Q3, prefetcher = undefined + q1 = Q1, q3 = Q3, prefetcher = undefined }) -> - PrefetchCount = lists:min([queue:len(Q3), TargetRamMsgCount - RamMsgCount]), + %% prefetched content takes priority over q1 + AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1), + PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), if PrefetchCount =< 0 -> State; true -> {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(PrefetchQueue), - RamMsgCount1 = RamMsgCount + PrefetchCount, maybe_load_next_segment(State #vqstate { q3 = Q3a, - ram_msg_count = RamMsgCount1, prefetcher = Prefetcher }) end; maybe_start_prefetcher(State) -> @@ -381,13 +374,43 @@ read_index_segment(SeqId, IndexState) -> {List, IndexState1} -> {List, IndexState1, SeqId1} end. +drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) -> + State; +drain_prefetcher(DrainOrStop, + State = #vqstate { prefetcher = Prefetcher, q3 = Q3, q4 = Q4, + ram_msg_count = RamMsgCount }) -> + Fun = case DrainOrStop of + drain -> fun rabbit_queue_prefetcher:drain/1; + stop -> fun rabbit_queue_prefetcher:drain_and_stop/1 + end, + {Q3a, Q4a, Prefetcher1, RamMsgCountAdj} = + case Fun(Prefetcher) of + {empty, Betas} -> %% drain or drain_and_stop + {queue:join(Betas, Q3), Q4, undefined, 0}; + {finished, Alphas} -> %% just drain + {Q3, Alphas, undefined, queue:len(Alphas)}; + {continuing, Alphas} -> %% just drain + {Q3, Alphas, Prefetcher, queue:len(Alphas)}; + {Alphas, Betas} -> %% just drain_and_stop + {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined, + queue:len(Alphas)} + end, + maybe_push_q1_to_betas( + State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a, + ram_msg_count = RamMsgCount + RamMsgCountAdj }). + reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount >= RamMsgCount -> State; reduce_memory_use(State = #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), + %% strictly, it's not necessary to stop the prefetcher this early, + %% but because of its potential effect on q1 and the + %% ram_msg_count, it's just much simpler to stop it sooner and + %% relaunch when we next hibernate. + State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas( + drain_prefetcher(stop, State))), case TargetRamMsgCount of 0 -> push_betas_to_gammas(State1); _ -> State1 |
