summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-28 16:12:26 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-28 16:12:26 +0200
commitf448e0bbe29a0a9d294fbeff29e20d6eee718791 (patch)
treed90b520ca308faf99094e20b7dfa75665b0a2e9d /src
parentf4d5afd53510f952d9894294babb7f94b444b1a2 (diff)
downloadrabbitmq-server-git-f448e0bbe29a0a9d294fbeff29e20d6eee718791.tar.gz
refactors QI flushing
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl60
1 files changed, 20 insertions, 40 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b37c8da91c..92c52629c9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1828,20 +1828,13 @@ reduce_memory_use(State = #vqstate {
State1
end.
-limit_ram_acks(0, State = #vqstate{ index_state = IndexState,
- target_ram_count = TargetRamCount }) ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {0, State#vqstate{ index_state = IndexState1 }};
-limit_ram_acks(Quota, State = #vqstate { index_state = IndexState,
- target_ram_count = TargetRamCount,
- ram_pending_ack = RPA,
+limit_ram_acks(0, State) ->
+ {0, ui(State)};
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
case gb_trees:is_empty(RPA) of
true ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
+ {Quota, ui(State)};
false ->
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
@@ -1979,27 +1972,17 @@ push_alphas_to_betas(Quota, State) ->
push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate { ram_msg_count = RamMsgCount,
- index_state = IndexState,
target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
-push_alphas_to_betas(Generator, Consumer, Quota, Q,
- State = #vqstate{
- index_state = IndexState,
- target_ram_count = TargetRamCount}) ->
+ {Quota, ui(State)};
+push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case credit_flow:blocked() of
- true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
+ true -> {Quota, ui(State)};
false -> case Generator(Q) of
{empty, _Q} ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
+ {Quota, ui(State)};
{{value, MsgStatus}, Qa} ->
{MsgStatus1, State1} =
maybe_prepare_write_to_disk(true, false, MsgStatus,
@@ -2045,26 +2028,16 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end.
push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, Delta, State =
- #vqstate{index_state = IndexState,
- target_ram_count = TargetRamCount}}) ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Q, {0, Delta, State#vqstate{index_state = IndexState1}}};
+ {0, Delta, State}) ->
+ {Q, {0, Delta, ui(State)}};
push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, State =
- #vqstate{index_state = IndexState,
- target_ram_count = TargetRamCount}}) ->
+ {Quota, Delta, State}) ->
case Generator(Q) of
{empty, _Q} ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}};
+ {Q, {Quota, Delta, ui(State)}};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}};
+ {Q, {Quota, Delta, ui(State)}};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, State1} =
maybe_batch_write_index_to_disk(true, MsgStatus, State),
@@ -2074,6 +2047,13 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Quota - 1, Delta1, State2})
end.
+%% Flushes queue index batch caches and updates queue index state.
+ui(#vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount} = State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ State#vqstate{index_state = IndexState1}.
+
%%----------------------------------------------------------------------------
%% Upgrading
%%----------------------------------------------------------------------------