diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 14:45:09 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-23 14:45:09 +0000 |
| commit | bacb167aa7e69900ee812ace37efc7d6045005c7 (patch) | |
| tree | be0d1a32ff08ff9828b46d570636b7f80870e5cf | |
| parent | 358ccf68180357b4471ce06f87828b9f4d89a214 (diff) | |
| parent | 6c6c29cffe250fce54f878db6ddc93fd1e21b15d (diff) | |
| download | rabbitmq-server-git-bacb167aa7e69900ee812ace37efc7d6045005c7.tar.gz | |
Merge bug26499
| -rw-r--r-- | src/rabbit_variable_queue.erl | 266 |
1 files changed, 107 insertions, 159 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index be3404c91f..acdd4c7521 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -544,38 +544,19 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len }) -> +purge(State = #vqstate { q4 = Q4, + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {0, 0, 0}, - {Stats1, IndexState1} = - remove_queue_entries(Q4, Stats, IndexState, MSCState), - - {Stats2, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }} = - purge_betas_and_deltas( - Stats1, State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - - {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} = - remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), - - {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), - index_state = IndexState3, - len = 0, - bytes = 0, - ram_msg_count = 0, - ram_bytes = RamBytes - RamBytesDec, - persistent_count = PCount - PCountDec, - persistent_bytes = PBytes - PBytesDec })}. + State1 = remove_queue_entries(Q4, State), + + State2 = #vqstate { q1 = Q1 } = + purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, State2), + + {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -584,9 +565,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, IsDelivered, _ChPid, _Flow, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, - len = Len, in_counter = InCount, - persistent_count = PCount, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, @@ -597,15 +576,11 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, InCount1 = InCount + 1, - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes( - 1, 0, MsgStatus1, - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 })), + State3 = stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, @@ -616,21 +591,18 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State = #vqstate { next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, - persistent_count = PCount, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes(0, 1, MsgStatus, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, _Flow, State) -> State. @@ -1203,10 +1175,9 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_bytes_count( - 1, MsgStatus, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) }) + MsgStatus1 = MsgStatus#msg_status{msg = Msg}, + stats(ready0, {MsgStatus, MsgStatus1}, + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1234,31 +1205,50 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate {msg_store_clients = MSCState1}}. -inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> - State#vqstate{ram_msg_count = RamMsgCount + 1}. - -upd_bytes(SignReady, SignUnacked, - MsgStatus = #msg_status{msg = undefined}, State) -> - upd_bytes0(SignReady, SignUnacked, MsgStatus, State); -upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> - upd_ram_bytes(SignReady + SignUnacked, MsgStatus, - upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). - -upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, - State = #vqstate{bytes = Bytes, - unacked_bytes = UBytes, - persistent_bytes = PBytes}) -> +stats(Signs, Statuses, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), State). + +expand_signs(ready0) -> {0, 0, true}; +expand_signs({A, B}) -> {A, B, false}. + +expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; +expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; +expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. + +%% In this function at least, we are religious: the variable name +%% contains "Ready" or "Unacked" iff that is what it counts. If +%% neither is present it counts both. +stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, + {InRamBefore, InRamAfter, MsgStatus}, + State = #vqstate{len = ReadyCount, + bytes = ReadyBytes, + ram_msg_count = RamReadyCount, + persistent_count = PersistentCount, + unacked_bytes = UnackedBytes, + ram_bytes = RamBytes, + persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), - SignTotal = SignReady + SignUnacked, - State#vqstate{bytes = Bytes + SignReady * S, - unacked_bytes = UBytes + SignUnacked * S, - persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. - -upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) -> - upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}). - -upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> - State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + DeltaTotal = DeltaReady + DeltaUnacked, + DeltaRam = case {InRamBefore, InRamAfter} of + {false, false} -> 0; + {false, true} -> 1; + {true, false} -> -1; + {true, true} -> 0 + end, + DeltaRamReady = case DeltaReady of + 1 -> one_if(InRamAfter); + -1 -> -one_if(InRamBefore); + 0 when ReadyMsgPaged -> DeltaRam; + 0 -> 0 + end, + DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), + State#vqstate{len = ReadyCount + DeltaReady, + ram_msg_count = RamReadyCount + DeltaRamReady, + persistent_count = PersistentCount + DeltaPersistent, + bytes = ReadyBytes + DeltaReady * S, + unacked_bytes = UnackedBytes + DeltaUnacked * S, + ram_bytes = RamBytes + DeltaRam * S, + persistent_bytes = PersistentBytes + DeltaPersistent * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1271,12 +1261,9 @@ remove(AckRequired, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, + State = #vqstate {out_counter = OutCount, index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount}) -> + msg_store_clients = MSCState}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1302,62 +1289,46 @@ remove(AckRequired, MsgStatus = #msg_status { {SeqId, StateN}; false -> {undefined, State} end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)), State2 = case AckRequired of - false -> upd_bytes(-1, 0, MsgStatus, State1); - true -> upd_bytes(-1, 1, MsgStatus, State1) + false -> stats({-1, 0}, {MsgStatus, none}, State1); + true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1) end, {AckTag, maybe_update_rates( - State2 #vqstate {ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1})}. - -purge_betas_and_deltas(Stats, - State = #vqstate { q3 = Q3, - index_state = IndexState, - msg_store_clients = MSCState }) -> + State2 #vqstate {out_counter = OutCount + 1, + index_state = IndexState2})}. + +purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of - true -> {Stats, State}; - false -> {Stats1, IndexState1} = remove_queue_entries( - Q3, Stats, IndexState, MSCState), - purge_betas_and_deltas(Stats1, - maybe_deltas_to_betas( - State #vqstate { - q3 = ?QUEUE:new(), - index_state = IndexState1 })) + true -> State; + false -> State1 = remove_queue_entries(Q3, State), + purge_betas_and_deltas(maybe_deltas_to_betas( + State1#vqstate{q3 = ?QUEUE:new()})) end. -remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec}, - IndexState, MSCState) -> - {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} = +remove_queue_entries(Q, State = #vqstate{index_state = IndexState, + msg_store_clients = MSCState}) -> + {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q), + {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytesDec1, PCountDec1, PBytesDec1}, - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. + IndexState1 = rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + State1#vqstate{index_state = IndexState1}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, - is_delivered = IsDelivered, msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent, - msg_props = #message_properties { size = Size } } = MsgStatus, - {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) -> + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, + is_persistent = IsPersistent} = MsgStatus, + {MsgIdsByStore, Delivers, Acks, State}) -> {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)), - PCountDec + one_if(IsPersistent), - PBytesDec + Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks)}. + cons_if(IndexOnDisk, SeqId, Acks), + stats({-1, 0}, {MsgStatus, none}, State)}. %%---------------------------------------------------------------------------- %% Internal gubbins for publishing @@ -1483,13 +1454,10 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, end end. -%% First parameter = UpdatePersistentCount +%% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = - remove_pending_ack(false, SeqId, State), - PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), - {MsgStatus, upd_bytes(0, -1, MsgStatus, - State1 # vqstate{ persistent_count = PCount1 })}; + {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA}) -> @@ -1603,24 +1571,15 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, - upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1] + MsgStatus1 = MsgStatus#msg_status { msg = Msg }, + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, inc_ram_msg_count(State)}. -%% [1] We increase the ram_bytes here because we paged the message in -%% to requeue it, not purely because we requeued it. Hence in the -%% second head it's already accounted for as already in memory. OTOH -%% ram_msg_count does not include unacked messages, so it needs -%% incrementing in both heads. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of - {true, false} -> upd_ram_bytes(-1, MsgStatus, State1); - {_, true} -> inc_ram_msg_count(State1); - _ -> State1 - end}. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1642,7 +1601,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1656,13 +1615,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - State3 = - case msg_in_ram(MsgStatus) of - false -> State2; - true -> upd_ram_bytes(-1, MsgStatus, State2) - end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State3)} + stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1822,12 +1776,12 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - upd_ram_bytes( - -1, MsgStatus1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) + stats({0, 0}, {MsgStatus, MsgStatus2}, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1968,11 +1922,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = case msg_in_ram(MsgStatus2) of - false -> upd_ram_bytes_count( - -1, MsgStatus2, State1); - true -> State1 - end, + State2 = stats( + ready0, {MsgStatus, MsgStatus2}, State1), State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State3) @@ -2026,10 +1977,7 @@ push_betas_to_deltas1(Generator, Limit, Q, {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - State1 = case msg_in_ram(MsgStatus) of - false -> State; - true -> upd_ram_bytes_count(-1, MsgStatus, State) - end, + State1 = stats(ready0, {MsgStatus, none}, State), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, {Quota - 1, Delta1, IndexState1, State1}) |
