summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-28 14:50:05 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-28 16:10:22 +0200
commitf4d5afd53510f952d9894294babb7f94b444b1a2 (patch)
treebef7751e5b4c7b1111380782523885a85ec2921b
parent57d5d093da421534a00906cda729bdcc74adfef6 (diff)
downloadrabbitmq-server-git-f4d5afd53510f952d9894294babb7f94b444b1a2.tar.gz
removes stats optimization from reduce_memory_use
-rw-r--r--src/rabbit_variable_queue.erl138
1 files changed, 43 insertions, 95 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ee38f58e65..b37c8da91c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1828,39 +1828,30 @@ reduce_memory_use(State = #vqstate {
State1
end.
-limit_ram_acks(Quota, State = #vqstate{ram_bytes = CurrRamBytes}) ->
- limit_ram_acks(Quota, CurrRamBytes, State).
-
-limit_ram_acks(0, CurrRamBytes,
- State = #vqstate{index_state = IndexState,
- target_ram_count = TargetRamCount}) ->
+limit_ram_acks(0, State = #vqstate{ index_state = IndexState,
+ target_ram_count = TargetRamCount }) ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {0, State#vqstate{index_state = IndexState1,
- ram_bytes = CurrRamBytes}};
-limit_ram_acks(Quota, CurrRamBytes,
- State = #vqstate {
- index_state = IndexState,
- target_ram_count = TargetRamCount,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ {0, State#vqstate{ index_state = IndexState1 }};
+limit_ram_acks(Quota, State = #vqstate { index_state = IndexState,
+ target_ram_count = TargetRamCount,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:is_empty(RPA) of
true ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1,
- ram_bytes = CurrRamBytes}};
+ {Quota, State#vqstate{index_state = IndexState1}};
false ->
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
- DeltaRam = delta_ram(msg_in_ram(MsgStatus), msg_in_ram(MsgStatus2)),
limit_ram_acks(Quota - 1,
- CurrRamBytes + DeltaRam * msg_size(MsgStatus),
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 })
+ stats({0, 0}, {MsgStatus, MsgStatus2},
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1986,59 +1977,40 @@ push_alphas_to_betas(Quota, State) ->
end, Quota1, State1 #vqstate.q4, State1),
{Quota2, State2}.
-push_alphas_to_betas(Generator, Consumer, Quota, Q,
- State = #vqstate{ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}) ->
- push_alphas_to_betas1(Generator, Consumer, Quota, Q,
- CurrRamReadyCount, CurrRamBytes,
- State).
-
-push_alphas_to_betas1(_Generator, _Consumer, Quota, _Q,
- CurrRamReadyCount, CurrRamBytes,
- State = #vqstate { index_state = IndexState,
+push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ index_state = IndexState,
target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
- TargetRamCount >= CurrRamReadyCount ->
+ TargetRamCount >= RamMsgCount ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}};
-push_alphas_to_betas1(Generator, Consumer, Quota, Q,
- CurrRamReadyCount, CurrRamBytes,
+ {Quota, State#vqstate{index_state = IndexState1}};
+push_alphas_to_betas(Generator, Consumer, Quota, Q,
State = #vqstate{
index_state = IndexState,
target_ram_count = TargetRamCount}) ->
case credit_flow:blocked() of
- true ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}};
- false ->
- case Generator(Q) of
- {empty, _Q} ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1, State1} =
- maybe_prepare_write_to_disk(true, false, MsgStatus,
- State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- DeltaRam = delta_ram(msg_in_ram(MsgStatus),
- msg_in_ram(MsgStatus2)),
- DeltaRamReady = DeltaRam,
- State2 = Consumer(MsgStatus2, Qa, State1),
- push_alphas_to_betas1(Generator, Consumer, Quota - 1, Qa,
- CurrRamReadyCount + DeltaRamReady,
- CurrRamBytes + DeltaRam * msg_size(MsgStatus),
- State2)
- end
+ true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State3)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
@@ -2058,11 +2030,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
delta = Delta1,
q3 = Q3a }.
-push_betas_to_deltas(Generator, LimitFun, Q,
- {_Quota, _Delta,
- #vqstate{
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}} = PushState) ->
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
@@ -2072,24 +2040,18 @@ push_betas_to_deltas(Generator, LimitFun, Q,
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q,
- CurrRamReadyCount, CurrRamBytes,
- PushState)
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
end
end.
push_betas_to_deltas1(_Generator, _Limit, Q,
- CurrRamReadyCount, CurrRamBytes,
{0, Delta, State =
#vqstate{index_state = IndexState,
target_ram_count = TargetRamCount}}) ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Q, {0, Delta, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}}};
+ {Q, {0, Delta, State#vqstate{index_state = IndexState1}}};
push_betas_to_deltas1(Generator, Limit, Q,
- CurrRamReadyCount, CurrRamBytes,
{Quota, Delta, State =
#vqstate{index_state = IndexState,
target_ram_count = TargetRamCount}}) ->
@@ -2097,35 +2059,21 @@ push_betas_to_deltas1(Generator, Limit, Q,
{empty, _Q} ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Q, {Quota, Delta, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}}};
+ {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Q, {Quota, Delta, State#vqstate{index_state = IndexState1,
- ram_msg_count = CurrRamReadyCount,
- ram_bytes = CurrRamBytes}}};
+ {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, State1} =
maybe_batch_write_index_to_disk(true, MsgStatus, State),
- {Size, DeltaRam} = size_and_delta_ram(MsgStatus),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- CurrRamReadyCount + DeltaRam,
- CurrRamBytes + DeltaRam * Size,
- {Quota - 1, Delta1, State1})
+ {Quota - 1, Delta1, State2})
end.
-%% Optimised version for paging only, based on stats/3 being called
-%% like this: stats(ready0, {MsgStatus, none}, State1).
-size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size},
- msg = undefined}) ->
- {Size, 0};
-size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}}) ->
- {Size, -1}.
-
%%----------------------------------------------------------------------------
%% Upgrading
%%----------------------------------------------------------------------------