diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-27 18:39:28 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-27 18:39:28 +0200 |
| commit | 7b841803f39a005671a7f603b95ec99570d086de (patch) | |
| tree | de1c2e5c6a9eecd9fee1bc6afa88630ed6791127 /src | |
| parent | 38d58c4c1f4e8090aa6ca42b101f6312ca16e467 (diff) | |
| download | rabbitmq-server-git-7b841803f39a005671a7f603b95ec99570d086de.tar.gz | |
improves push_alphas_to_betas paging performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddd75c7ed9..682b9e0eb2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1974,29 +1974,41 @@ push_alphas_to_betas(Quota, State) -> {Quota2, State2}. push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { ram_msg_count = RamMsgCount, + State = #vqstate { index_state = IndexState, + ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse TargetRamCount >= RamMsgCount -> - {Quota, State}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; +push_alphas_to_betas(Generator, Consumer, Quota, Q, + State = #vqstate{ + index_state = IndexState, + target_ram_count = TargetRamCount}) -> case credit_flow:blocked() of - true -> {Quota, State}; - false -> case Generator(Q) of - {empty, _Q} -> - {Quota, State}; - {{value, MsgStatus}, Qa} -> - %% TODO add QI batching here - {MsgStatus1, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), - State3 = Consumer(MsgStatus2, Qa, State2), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State3) - end + true -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; + false -> + case Generator(Q) of + {empty, _Q} -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1, State1} = + maybe_prepare_write_to_disk(true, false, MsgStatus, + State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = stats( + ready0, {MsgStatus, MsgStatus2}, State1), + State3 = Consumer(MsgStatus2, Qa, State2), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State3) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |
