diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 13:10:42 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 13:10:42 +0000 |
| commit | 3e064be3eaeb72fa02fcc565f615261bc1c52942 (patch) | |
| tree | 3cd9d68cfe5c555ef7d6382458af8c404ec13792 /src | |
| parent | cc835492a9bc62a411b540127df212260ff8ffa6 (diff) | |
| download | rabbitmq-server-git-3e064be3eaeb72fa02fcc565f615261bc1c52942.tar.gz | |
Rename things.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 113 |
1 files changed, 56 insertions, 57 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6ca9e2781f..56a3295d1c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -596,10 +596,10 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, InCount1 = InCount + 1, UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = counters({1, 0}, {none, MsgStatus1}, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }), + State3 = stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, @@ -617,11 +617,11 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = counters({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, _Flow, State) -> State. @@ -1195,8 +1195,8 @@ in_r(MsgStatus = #msg_status { msg = undefined }, false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - counters(ready0, {MsgStatus, MsgStatus1}, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) + stats(ready0, {MsgStatus, MsgStatus1}, + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1224,8 +1224,8 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate {msg_store_clients = MSCState1}}. -counters(Signs, Statuses, State) -> - counters0(expand_signs(Signs), expand_statuses(Statuses), State). +stats(Signs, Statuses, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), State). expand_signs(ready0) -> {0, 0, true}; expand_signs({A, B}) -> {A, B, false}. @@ -1237,37 +1237,37 @@ expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% In this function at least, we are religious: the variable name %% contains "Ready" or "Unacked" iff that is what it counts. If %% neither is present it counts both. -counters0({SignReady, SignUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, - State = #vqstate{len = ReadyCount, - bytes = ReadyBytes, - ram_msg_count = RamReadyCount, - persistent_count = PersistentCount, - unacked_bytes = UnackedBytes, - ram_bytes = RamBytes, - persistent_bytes = PersistentBytes}) -> +stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, + {InRamBefore, InRamAfter, MsgStatus}, + State = #vqstate{len = ReadyCount, + bytes = ReadyBytes, + ram_msg_count = RamReadyCount, + persistent_count = PersistentCount, + unacked_bytes = UnackedBytes, + ram_bytes = RamBytes, + persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), - SignTotal = SignReady + SignUnacked, - SignRam = case {InRamBefore, InRamAfter} of - {false, false} -> 0; - {false, true} -> 1; - {true, false} -> -1; - {true, true} -> 0 - end, - SignRamReady = case SignReady of - 1 -> one_if(InRamAfter); - -1 -> -one_if(InRamBefore); - 0 when ReadyMsgPaged -> SignRam; - 0 -> 0 - end, - SignPersistent = SignTotal * one_if(MsgStatus#msg_status.is_persistent), - State#vqstate{len = ReadyCount + SignReady, - ram_msg_count = RamReadyCount + SignRamReady, - persistent_count = PersistentCount + SignPersistent, - bytes = ReadyBytes + SignReady * S, - unacked_bytes = UnackedBytes + SignUnacked * S, - ram_bytes = RamBytes + SignRam * S, - persistent_bytes = PersistentBytes + SignPersistent * S}. + DeltaTotal = DeltaReady + DeltaUnacked, + DeltaRam = case {InRamBefore, InRamAfter} of + {false, false} -> 0; + {false, true} -> 1; + {true, false} -> -1; + {true, true} -> 0 + end, + DeltaRamReady = case DeltaReady of + 1 -> one_if(InRamAfter); + -1 -> -one_if(InRamBefore); + 0 when ReadyMsgPaged -> DeltaRam; + 0 -> 0 + end, + DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), + State#vqstate{len = ReadyCount + DeltaReady, + ram_msg_count = RamReadyCount + DeltaRamReady, + persistent_count = PersistentCount + DeltaPersistent, + bytes = ReadyBytes + DeltaReady * S, + unacked_bytes = UnackedBytes + DeltaUnacked * S, + ram_bytes = RamBytes + DeltaRam * S, + persistent_bytes = PersistentBytes + DeltaPersistent * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1309,8 +1309,8 @@ remove(AckRequired, MsgStatus = #msg_status { false -> {undefined, State} end, State2 = case AckRequired of - false -> counters({-1, 0}, {MsgStatus, none}, State1); - true -> counters({-1, 1}, {MsgStatus, MsgStatus}, State1) + false -> stats({-1, 0}, {MsgStatus, none}, State1); + true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1) end, {AckTag, maybe_update_rates( State2 #vqstate {out_counter = OutCount + 1, @@ -1483,10 +1483,10 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, end end. -%% First parameter = UpdateCounters +%% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, counters({0, -1}, {MsgStatus, none}, State1)}; + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA}) -> @@ -1601,14 +1601,14 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, counters({1, -1}, {MsgStatus, MsgStatus1}, State1)}; + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, counters({1, -1}, {MsgStatus, MsgStatus}, State)}. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, counters({1, -1}, {MsgStatus, MsgStatus2}, State1)}. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1645,7 +1645,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - counters({1, -1}, {MsgStatus, none}, State2)} + stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1808,10 +1808,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - counters( - {0, 0}, {MsgStatus, MsgStatus2}, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) + stats({0, 0}, {MsgStatus, MsgStatus2}, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1952,7 +1951,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = counters( + State2 = stats( ready0, {MsgStatus, MsgStatus2}, State1), State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, @@ -2007,7 +2006,7 @@ push_betas_to_deltas1(Generator, Limit, Q, {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - State1 = counters(ready0, {MsgStatus, none}, State), + State1 = stats(ready0, {MsgStatus, none}, State), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, {Quota - 1, Delta1, IndexState1, State1}) |
