summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-23 13:10:42 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-23 13:10:42 +0000
commit3e064be3eaeb72fa02fcc565f615261bc1c52942 (patch)
tree3cd9d68cfe5c555ef7d6382458af8c404ec13792 /src
parentcc835492a9bc62a411b540127df212260ff8ffa6 (diff)
downloadrabbitmq-server-git-3e064be3eaeb72fa02fcc565f615261bc1c52942.tar.gz
Rename things.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl113
1 files changed, 56 insertions, 57 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6ca9e2781f..56a3295d1c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -596,10 +596,10 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
InCount1 = InCount + 1,
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = counters({1, 0}, {none, MsgStatus1},
- State2#vqstate{ next_seq_id = SeqId + 1,
- in_counter = InCount1,
- unconfirmed = UC1 }),
+ State3 = stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -617,11 +617,11 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = counters({0, 1}, {none, MsgStatus1},
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -1195,8 +1195,8 @@ in_r(MsgStatus = #msg_status { msg = undefined },
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
MsgStatus1 = MsgStatus#msg_status{msg = Msg},
- counters(ready0, {MsgStatus, MsgStatus1},
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
+ stats(ready0, {MsgStatus, MsgStatus1},
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1224,8 +1224,8 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
-counters(Signs, Statuses, State) ->
- counters0(expand_signs(Signs), expand_statuses(Statuses), State).
+stats(Signs, Statuses, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), State).
expand_signs(ready0) -> {0, 0, true};
expand_signs({A, B}) -> {A, B, false}.
@@ -1237,37 +1237,37 @@ expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
%% In this function at least, we are religious: the variable name
%% contains "Ready" or "Unacked" iff that is what it counts. If
%% neither is present it counts both.
-counters0({SignReady, SignUnacked, ReadyMsgPaged},
- {InRamBefore, InRamAfter, MsgStatus},
- State = #vqstate{len = ReadyCount,
- bytes = ReadyBytes,
- ram_msg_count = RamReadyCount,
- persistent_count = PersistentCount,
- unacked_bytes = UnackedBytes,
- ram_bytes = RamBytes,
- persistent_bytes = PersistentBytes}) ->
+stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
+ {InRamBefore, InRamAfter, MsgStatus},
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ persistent_bytes = PersistentBytes}) ->
S = msg_size(MsgStatus),
- SignTotal = SignReady + SignUnacked,
- SignRam = case {InRamBefore, InRamAfter} of
- {false, false} -> 0;
- {false, true} -> 1;
- {true, false} -> -1;
- {true, true} -> 0
- end,
- SignRamReady = case SignReady of
- 1 -> one_if(InRamAfter);
- -1 -> -one_if(InRamBefore);
- 0 when ReadyMsgPaged -> SignRam;
- 0 -> 0
- end,
- SignPersistent = SignTotal * one_if(MsgStatus#msg_status.is_persistent),
- State#vqstate{len = ReadyCount + SignReady,
- ram_msg_count = RamReadyCount + SignRamReady,
- persistent_count = PersistentCount + SignPersistent,
- bytes = ReadyBytes + SignReady * S,
- unacked_bytes = UnackedBytes + SignUnacked * S,
- ram_bytes = RamBytes + SignRam * S,
- persistent_bytes = PersistentBytes + SignPersistent * S}.
+ DeltaTotal = DeltaReady + DeltaUnacked,
+ DeltaRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ DeltaRamReady = case DeltaReady of
+ 1 -> one_if(InRamAfter);
+ -1 -> -one_if(InRamBefore);
+ 0 when ReadyMsgPaged -> DeltaRam;
+ 0 -> 0
+ end,
+ DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + DeltaReady,
+ ram_msg_count = RamReadyCount + DeltaRamReady,
+ persistent_count = PersistentCount + DeltaPersistent,
+ bytes = ReadyBytes + DeltaReady * S,
+ unacked_bytes = UnackedBytes + DeltaUnacked * S,
+ ram_bytes = RamBytes + DeltaRam * S,
+ persistent_bytes = PersistentBytes + DeltaPersistent * S}.
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
@@ -1309,8 +1309,8 @@ remove(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
State2 = case AckRequired of
- false -> counters({-1, 0}, {MsgStatus, none}, State1);
- true -> counters({-1, 1}, {MsgStatus, MsgStatus}, State1)
+ false -> stats({-1, 0}, {MsgStatus, none}, State1);
+ true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1)
end,
{AckTag, maybe_update_rates(
State2 #vqstate {out_counter = OutCount + 1,
@@ -1483,10 +1483,10 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
end
end.
-%% First parameter = UpdateCounters
+%% First parameter = UpdateStats
remove_pending_ack(true, SeqId, State) ->
{MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
- {MsgStatus, counters({0, -1}, {MsgStatus, none}, State1)};
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA}) ->
@@ -1601,14 +1601,14 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
MsgStatus1 = MsgStatus#msg_status { msg = Msg },
- {MsgStatus1, counters({1, -1}, {MsgStatus, MsgStatus1}, State1)};
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, counters({1, -1}, {MsgStatus, MsgStatus}, State)}.
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, counters({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1645,7 +1645,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- counters({1, -1}, {MsgStatus, none}, State2)}
+ stats({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1808,10 +1808,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
- counters(
- {0, 0}, {MsgStatus, MsgStatus2},
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 }))
+ stats({0, 0}, {MsgStatus, MsgStatus2},
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1952,7 +1951,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = counters(
+ State2 = stats(
ready0, {MsgStatus, MsgStatus2}, State1),
State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
@@ -2007,7 +2006,7 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
- State1 = counters(ready0, {MsgStatus, none}, State),
+ State1 = stats(ready0, {MsgStatus, none}, State),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
{Quota - 1, Delta1, IndexState1, State1})