diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-28 16:12:26 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-28 16:12:26 +0200 |
| commit | f448e0bbe29a0a9d294fbeff29e20d6eee718791 (patch) | |
| tree | d90b520ca308faf99094e20b7dfa75665b0a2e9d /src | |
| parent | f4d5afd53510f952d9894294babb7f94b444b1a2 (diff) | |
| download | rabbitmq-server-git-f448e0bbe29a0a9d294fbeff29e20d6eee718791.tar.gz | |
refactors QI flushing
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 60 |
1 files changed, 20 insertions, 40 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b37c8da91c..92c52629c9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1828,20 +1828,13 @@ reduce_memory_use(State = #vqstate { State1 end. -limit_ram_acks(0, State = #vqstate{ index_state = IndexState, - target_ram_count = TargetRamCount }) -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {0, State#vqstate{ index_state = IndexState1 }}; -limit_ram_acks(Quota, State = #vqstate { index_state = IndexState, - target_ram_count = TargetRamCount, - ram_pending_ack = RPA, +limit_ram_acks(0, State) -> + {0, ui(State)}; +limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + {Quota, ui(State)}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = @@ -1979,27 +1972,17 @@ push_alphas_to_betas(Quota, State) -> push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { ram_msg_count = RamMsgCount, - index_state = IndexState, target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse TargetRamCount >= RamMsgCount -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, - State = #vqstate{ - index_state = IndexState, - target_ram_count = TargetRamCount}) -> + {Quota, ui(State)}; +push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case credit_flow:blocked() of - true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + true -> {Quota, ui(State)}; false -> case Generator(Q) of {empty, _Q} -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + {Quota, ui(State)}; {{value, MsgStatus}, Qa} -> {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, @@ -2045,26 +2028,16 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end. push_betas_to_deltas1(_Generator, _Limit, Q, - {0, Delta, State = - #vqstate{index_state = IndexState, - target_ram_count = TargetRamCount}}) -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Q, {0, Delta, State#vqstate{index_state = IndexState1}}}; + {0, Delta, State}) -> + {Q, {0, Delta, ui(State)}}; push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, State = - #vqstate{index_state = IndexState, - target_ram_count = TargetRamCount}}) -> + {Quota, Delta, State}) -> case Generator(Q) of {empty, _Q} -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}}; + {Q, {Quota, Delta, ui(State)}}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}}; + {Q, {Quota, Delta, ui(State)}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, State1} = maybe_batch_write_index_to_disk(true, MsgStatus, State), @@ -2074,6 +2047,13 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota - 1, Delta1, State2}) end. +%% Flushes queue index batch caches and updates queue index state. +ui(#vqstate{index_state = IndexState, + target_ram_count = TargetRamCount} = State) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + State#vqstate{index_state = IndexState1}. + %%---------------------------------------------------------------------------- %% Upgrading %%---------------------------------------------------------------------------- |
