summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl188
1 files changed, 89 insertions, 99 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index be3404c91f..f1a4bb3f02 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -584,9 +584,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
IsDelivered, _ChPid, _Flow,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
- len = Len,
in_counter = InCount,
- persistent_count = PCount,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -597,15 +595,11 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
InCount1 = InCount + 1,
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(
- 1, 0, MsgStatus1,
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 })),
+ State3 = counters({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -616,21 +610,18 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State = #vqstate { next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
- persistent_count = PCount,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(0, 1, MsgStatus,
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = counters({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -1203,10 +1194,9 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes_count(
- 1, MsgStatus,
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ counters(ready0, {MsgStatus, MsgStatus1},
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1234,31 +1224,62 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
-inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
- State#vqstate{ram_msg_count = RamMsgCount + 1}.
-
-upd_bytes(SignReady, SignUnacked,
- MsgStatus = #msg_status{msg = undefined}, State) ->
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
-upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
- upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
-
-upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
- State = #vqstate{bytes = Bytes,
- unacked_bytes = UBytes,
- persistent_bytes = PBytes}) ->
+counters(Signs, Statuses, State) ->
+ counters0(expand_signs(Signs), expand_statuses(Statuses), State).
+
+expand_signs(ready0) -> {0, 0, ready};
+expand_signs(unacked0) -> {0, 0, unacked};
+expand_signs({A, B}) -> {A, B, no_hint}.
+
+expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
+expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
+
+%% In this function at least, we are religious: the variable name
+%% contains "Ready" or "Unacked" iff that is what it counts. If
+%% neither is present it counts both.
+counters0({SignReady, SignUnacked, RamReadyHint},
+ {InRamBefore, InRamAfter, MsgStatus},
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ persistent_bytes = PersistentBytes}) ->
S = msg_size(MsgStatus),
SignTotal = SignReady + SignUnacked,
- State#vqstate{bytes = Bytes + SignReady * S,
- unacked_bytes = UBytes + SignUnacked * S,
- persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
-
-upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) ->
- upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}).
-
-upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
- State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+ %% TODO there has got to be a simpler way!
+ SignRamReady =
+ case {SignReady, SignUnacked} of
+ {0, 0} -> case {InRamBefore, InRamAfter, RamReadyHint} of
+ {false, false, _} -> 0;
+ {true, true, _} -> 0;
+ {false, true, unacked} -> 0;
+ {true, false, unacked} -> 0;
+ {false, true, ready} -> 1;
+ {true, false, ready} -> -1;
+ {false, true, no_hint} -> -1;
+ {true, false, no_hint} -> 1
+ end;
+ {1, _} -> one_if(InRamAfter);
+ {-1, _} -> -one_if(InRamBefore);
+ {0, _} -> 0
+ end,
+ SignRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ SignPersistent = SignTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + SignReady,
+ ram_msg_count = RamReadyCount + SignRamReady,
+ persistent_count = PersistentCount + SignPersistent,
+ bytes = ReadyBytes + SignReady * S,
+ unacked_bytes = UnackedBytes + SignUnacked * S,
+ ram_bytes = RamBytes + SignRam * S,
+ persistent_bytes = PersistentBytes + SignPersistent * S}.
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
@@ -1271,12 +1292,9 @@ remove(AckRequired, MsgStatus = #msg_status {
is_delivered = IsDelivered,
msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
+ State = #vqstate {out_counter = OutCount,
index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount}) ->
+ msg_store_clients = MSCState}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1302,19 +1320,13 @@ remove(AckRequired, MsgStatus = #msg_status {
{SeqId, StateN};
false -> {undefined, State}
end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)),
State2 = case AckRequired of
- false -> upd_bytes(-1, 0, MsgStatus, State1);
- true -> upd_bytes(-1, 1, MsgStatus, State1)
+ false -> counters({-1, 0}, {MsgStatus, none}, State1);
+ true -> counters({-1, 1}, {MsgStatus, MsgStatus}, State1)
end,
{AckTag, maybe_update_rates(
- State2 #vqstate {ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1})}.
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
purge_betas_and_deltas(Stats,
State = #vqstate { q3 = Q3,
@@ -1483,13 +1495,10 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
end
end.
-%% First parameter = UpdatePersistentCount
+%% First parameter = UpdateCounters
remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
- remove_pending_ack(false, SeqId, State),
- PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, upd_bytes(0, -1, MsgStatus,
- State1 # vqstate{ persistent_count = PCount1 })};
+ {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
+ {MsgStatus, counters({0, -1}, {MsgStatus, none}, State1)};
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA}) ->
@@ -1603,24 +1612,15 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1]
+ MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {MsgStatus1, counters({1, -1}, {MsgStatus, MsgStatus1}, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, inc_ram_msg_count(State)}.
-%% [1] We increase the ram_bytes here because we paged the message in
-%% to requeue it, not purely because we requeued it. Hence in the
-%% second head it's already accounted for as already in memory. OTOH
-%% ram_msg_count does not include unacked messages, so it needs
-%% incrementing in both heads.
+ {MsgStatus, counters({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of
- {true, false} -> upd_ram_bytes(-1, MsgStatus, State1);
- {_, true} -> inc_ram_msg_count(State1);
- _ -> State1
- end}.
+ {MsgStatus2, counters({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1642,7 +1642,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1656,13 +1656,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- State3 =
- case msg_in_ram(MsgStatus) of
- false -> State2;
- true -> upd_ram_bytes(-1, MsgStatus, State2)
- end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State3)}
+ counters({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1822,10 +1817,11 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
- upd_ram_bytes(
- -1, MsgStatus1,
+ counters(
+ unacked0, {MsgStatus, MsgStatus2},
State1 #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1 }))
end.
@@ -1968,11 +1964,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = case msg_in_ram(MsgStatus2) of
- false -> upd_ram_bytes_count(
- -1, MsgStatus2, State1);
- true -> State1
- end,
+ State2 = counters(
+ ready0, {MsgStatus, MsgStatus2}, State1),
State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State3)
@@ -2024,12 +2017,9 @@ push_betas_to_deltas1(Generator, Limit, Q,
when SeqId < Limit ->
{Q, PushState};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true }, IndexState1} =
+ {#msg_status { index_on_disk = true } = MsgStatus1, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
- State1 = case msg_in_ram(MsgStatus) of
- false -> State;
- true -> upd_ram_bytes_count(-1, MsgStatus, State)
- end,
+ State1 = counters(ready0, {MsgStatus, none}, State),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
{Quota - 1, Delta1, IndexState1, State1})