summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-23 14:45:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-23 14:45:09 +0000
commitbacb167aa7e69900ee812ace37efc7d6045005c7 (patch)
treebe0d1a32ff08ff9828b46d570636b7f80870e5cf
parent358ccf68180357b4471ce06f87828b9f4d89a214 (diff)
parent6c6c29cffe250fce54f878db6ddc93fd1e21b15d (diff)
downloadrabbitmq-server-git-bacb167aa7e69900ee812ace37efc7d6045005c7.tar.gz
Merge bug26499
-rw-r--r--src/rabbit_variable_queue.erl266
1 files changed, 107 insertions, 159 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index be3404c91f..acdd4c7521 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -544,38 +544,19 @@ delete_and_terminate(_Reason, State) ->
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-purge(State = #vqstate { q4 = Q4,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len }) ->
+purge(State = #vqstate { q4 = Q4,
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {0, 0, 0},
- {Stats1, IndexState1} =
- remove_queue_entries(Q4, Stats, IndexState, MSCState),
-
- {Stats2, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }} =
- purge_betas_and_deltas(
- Stats1, State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
-
- {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} =
- remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
-
- {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
- index_state = IndexState3,
- len = 0,
- bytes = 0,
- ram_msg_count = 0,
- ram_bytes = RamBytes - RamBytesDec,
- persistent_count = PCount - PCountDec,
- persistent_bytes = PBytes - PBytesDec })}.
+ State1 = remove_queue_entries(Q4, State),
+
+ State2 = #vqstate { q1 = Q1 } =
+ purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),
+
+ State3 = remove_queue_entries(Q1, State2),
+
+ {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -584,9 +565,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
IsDelivered, _ChPid, _Flow,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
- len = Len,
in_counter = InCount,
- persistent_count = PCount,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -597,15 +576,11 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
InCount1 = InCount + 1,
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(
- 1, 0, MsgStatus1,
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 })),
+ State3 = stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -616,21 +591,18 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State = #vqstate { next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
- persistent_count = PCount,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(0, 1, MsgStatus,
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -1203,10 +1175,9 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes_count(
- 1, MsgStatus,
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ stats(ready0, {MsgStatus, MsgStatus1},
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1234,31 +1205,50 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
-inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
- State#vqstate{ram_msg_count = RamMsgCount + 1}.
-
-upd_bytes(SignReady, SignUnacked,
- MsgStatus = #msg_status{msg = undefined}, State) ->
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
-upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
- upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
-
-upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
- State = #vqstate{bytes = Bytes,
- unacked_bytes = UBytes,
- persistent_bytes = PBytes}) ->
+stats(Signs, Statuses, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), State).
+
+expand_signs(ready0) -> {0, 0, true};
+expand_signs({A, B}) -> {A, B, false}.
+
+expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
+expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
+
+%% In this function at least, we are religious: the variable name
+%% contains "Ready" or "Unacked" iff that is what it counts. If
+%% neither is present it counts both.
+stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
+ {InRamBefore, InRamAfter, MsgStatus},
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ persistent_bytes = PersistentBytes}) ->
S = msg_size(MsgStatus),
- SignTotal = SignReady + SignUnacked,
- State#vqstate{bytes = Bytes + SignReady * S,
- unacked_bytes = UBytes + SignUnacked * S,
- persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
-
-upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) ->
- upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}).
-
-upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
- State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+ DeltaTotal = DeltaReady + DeltaUnacked,
+ DeltaRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ DeltaRamReady = case DeltaReady of
+ 1 -> one_if(InRamAfter);
+ -1 -> -one_if(InRamBefore);
+ 0 when ReadyMsgPaged -> DeltaRam;
+ 0 -> 0
+ end,
+ DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + DeltaReady,
+ ram_msg_count = RamReadyCount + DeltaRamReady,
+ persistent_count = PersistentCount + DeltaPersistent,
+ bytes = ReadyBytes + DeltaReady * S,
+ unacked_bytes = UnackedBytes + DeltaUnacked * S,
+ ram_bytes = RamBytes + DeltaRam * S,
+ persistent_bytes = PersistentBytes + DeltaPersistent * S}.
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
@@ -1271,12 +1261,9 @@ remove(AckRequired, MsgStatus = #msg_status {
is_delivered = IsDelivered,
msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
+ State = #vqstate {out_counter = OutCount,
index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount}) ->
+ msg_store_clients = MSCState}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1302,62 +1289,46 @@ remove(AckRequired, MsgStatus = #msg_status {
{SeqId, StateN};
false -> {undefined, State}
end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)),
State2 = case AckRequired of
- false -> upd_bytes(-1, 0, MsgStatus, State1);
- true -> upd_bytes(-1, 1, MsgStatus, State1)
+ false -> stats({-1, 0}, {MsgStatus, none}, State1);
+ true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1)
end,
{AckTag, maybe_update_rates(
- State2 #vqstate {ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1})}.
-
-purge_betas_and_deltas(Stats,
- State = #vqstate { q3 = Q3,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
+
+purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {Stats, State};
- false -> {Stats1, IndexState1} = remove_queue_entries(
- Q3, Stats, IndexState, MSCState),
- purge_betas_and_deltas(Stats1,
- maybe_deltas_to_betas(
- State #vqstate {
- q3 = ?QUEUE:new(),
- index_state = IndexState1 }))
+ true -> State;
+ false -> State1 = remove_queue_entries(Q3, State),
+ purge_betas_and_deltas(maybe_deltas_to_betas(
+ State1#vqstate{q3 = ?QUEUE:new()}))
end.
-remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
- IndexState, MSCState) ->
- {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} =
+remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
+ msg_store_clients = MSCState}) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q),
+ {orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytesDec1, PCountDec1, PBytesDec1},
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState))}.
+ IndexState1 = rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State1#vqstate{index_state = IndexState1}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
- is_delivered = IsDelivered, msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
- msg_props = #message_properties { size = Size } } = MsgStatus,
- {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent} = MsgStatus,
+ {MsgIdsByStore, Delivers, Acks, State}) ->
{case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)),
- PCountDec + one_if(IsPersistent),
- PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- cons_if(IndexOnDisk, SeqId, Acks)}.
+ cons_if(IndexOnDisk, SeqId, Acks),
+ stats({-1, 0}, {MsgStatus, none}, State)}.
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
@@ -1483,13 +1454,10 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
end
end.
-%% First parameter = UpdatePersistentCount
+%% First parameter = UpdateStats
remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
- remove_pending_ack(false, SeqId, State),
- PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, upd_bytes(0, -1, MsgStatus,
- State1 # vqstate{ persistent_count = PCount1 })};
+ {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA}) ->
@@ -1603,24 +1571,15 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1]
+ MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, inc_ram_msg_count(State)}.
-%% [1] We increase the ram_bytes here because we paged the message in
-%% to requeue it, not purely because we requeued it. Hence in the
-%% second head it's already accounted for as already in memory. OTOH
-%% ram_msg_count does not include unacked messages, so it needs
-%% incrementing in both heads.
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of
- {true, false} -> upd_ram_bytes(-1, MsgStatus, State1);
- {_, true} -> inc_ram_msg_count(State1);
- _ -> State1
- end}.
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1642,7 +1601,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1656,13 +1615,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- State3 =
- case msg_in_ram(MsgStatus) of
- false -> State2;
- true -> upd_ram_bytes(-1, MsgStatus, State2)
- end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State3)}
+ stats({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1822,12 +1776,12 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
- upd_ram_bytes(
- -1, MsgStatus1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 }))
+ stats({0, 0}, {MsgStatus, MsgStatus2},
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1968,11 +1922,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = case msg_in_ram(MsgStatus2) of
- false -> upd_ram_bytes_count(
- -1, MsgStatus2, State1);
- true -> State1
- end,
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State3)
@@ -2026,10 +1977,7 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
- State1 = case msg_in_ram(MsgStatus) of
- false -> State;
- true -> upd_ram_bytes_count(-1, MsgStatus, State)
- end,
+ State1 = stats(ready0, {MsgStatus, none}, State),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
{Quota - 1, Delta1, IndexState1, State1})