diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2017-02-10 10:56:23 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2017-02-10 17:21:17 +0000 |
| commit | ff8a0fc45bc0302ccf50f154be4e6d6829a8ffaa (patch) | |
| tree | 014d1a9d9af2bd82d9dccd7b489f06a413a49ffd | |
| parent | 58dd55b00851c98c004bb1a3ed7666a5c1a492de (diff) | |
| download | rabbitmq-server-git-ff8a0fc45bc0302ccf50f154be4e6d6829a8ffaa.tar.gz | |
Calculate memory size of pagged messages
| -rw-r--r-- | src/rabbit_variable_queue.erl | 133 |
1 files changed, 83 insertions, 50 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dd92256146..58f0230420 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -283,6 +283,7 @@ unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked + delta_transient_bytes, %% target_ram_count, ram_msg_count, %% w/o unacked @@ -330,6 +331,7 @@ -record(delta, { start_seq_id, %% start_seq_id is inclusive count, + transient, end_seq_id %% end_seq_id is exclusive }). @@ -415,9 +417,11 @@ -define(BLANK_DELTA, #delta { start_seq_id = undefined, count = 0, + transient = 0, end_seq_id = undefined }). -define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, count = 0, + transient = 0, end_seq_id = Z }). -define(MICROS_PER_SECOND, 1000000.0). @@ -875,6 +879,8 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; +info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) -> + Count; info(message_bytes, #vqstate{bytes = Bytes, unacked_bytes = UBytes}) -> Bytes + UBytes; @@ -886,6 +892,8 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) -> + PagedOutBytes; info(head_message_timestamp, #vqstate{ q3 = Q3, q4 = Q4, @@ -1242,14 +1250,14 @@ maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = + {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> + {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; + [SeqId | Acks1], RRC, RB, TC, TB}; false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), @@ -1257,12 +1265,15 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; + RB + one_if(HaveMsg) * Size, + TC + one_if(not IsPersistent), + TB + one_if(not IsPersistent) * Size}; true -> Acc %% [0] end end - end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. + end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State), + TransientCount, TransientBytes}. %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -1275,18 +1286,28 @@ is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, gb_trees:is_defined(SeqId, DPA) orelse gb_trees:is_defined(SeqId, QPA)). -expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> - d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); +expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) -> + d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1, + transient = one_if(not IsPersistent)}); expand_delta(SeqId, #delta { start_seq_id = StartSeqId, - count = Count } = Delta) + count = Count, + transient = Transient } = Delta, + IsPersistent ) when SeqId < StartSeqId -> - d(Delta #delta { start_seq_id = SeqId, count = Count + 1 }); + d(Delta #delta { start_seq_id = SeqId, count = Count + 1, + transient = Transient + one_if(not IsPersistent)}); expand_delta(SeqId, #delta { count = Count, - end_seq_id = EndSeqId } = Delta) + end_seq_id = EndSeqId, + transient = Transient } = Delta, + IsPersistent) when SeqId >= EndSeqId -> - d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }); -expand_delta(_SeqId, #delta { count = Count } = Delta) -> - d(Delta #delta { count = Count + 1 }). + d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1, + transient = Transient + one_if(not IsPersistent)}); +expand_delta(_SeqId, #delta { count = Count, + transient = Transient } = Delta, + IsPersistent ) -> + d(Delta #delta { count = Count + 1, + transient = Transient + one_if(not IsPersistent) }). %%---------------------------------------------------------------------------- %% Internal major helpers for Public API @@ -1308,6 +1329,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, true -> ?BLANK_DELTA; false -> d(#delta { start_seq_id = LowSeqId, count = DeltaCount1, + transient = 0, end_seq_id = NextSeqId }) end, Now = time_compat:monotonic_time(), @@ -1336,6 +1358,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, persistent_count = DeltaCount1, bytes = DeltaBytes1, persistent_bytes = DeltaBytes1, + delta_transient_bytes = 0, target_ram_count = infinity, ram_msg_count = 0, @@ -1375,22 +1398,22 @@ in_r(MsgStatus = #msg_status { msg = undefined }, false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - stats(ready0, {MsgStatus, MsgStatus1}, + stats(ready0, {MsgStatus, MsgStatus1}, 0, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { mode = default, q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; %% lazy queues -in_r(MsgStatus = #msg_status { seq_id = SeqId }, +in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent }, State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> case ?QUEUE:is_empty(Q3) of true -> {_MsgStatus1, State1} = maybe_write_to_disk(true, true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - State2 #vqstate{ delta = Delta1 }; + State2 = stats(ready0, {MsgStatus, none}, 1, State1), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), + State2 #vqstate{ delta = Delta1}; false -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } end. @@ -1426,8 +1449,8 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, {Msg, State #vqstate {msg_store_clients = MSCState1, disk_read_count = Count + 1}}. -stats(Signs, Statuses, State) -> - stats0(expand_signs(Signs), expand_statuses(Statuses), State). +stats(Signs, Statuses, DeltaPaged, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State). expand_signs(ready0) -> {0, 0, true}; expand_signs(lazy_pub) -> {1, 0, true}; @@ -1442,13 +1465,14 @@ expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% contains "Ready" or "Unacked" iff that is what it counts. If %% neither is present it counts both. stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, + {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged, State = #vqstate{len = ReadyCount, bytes = ReadyBytes, ram_msg_count = RamReadyCount, persistent_count = PersistentCount, unacked_bytes = UnackedBytes, ram_bytes = RamBytes, + delta_transient_bytes = DeltaBytes, persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), DeltaTotal = DeltaReady + DeltaUnacked, @@ -1471,7 +1495,8 @@ stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, bytes = ReadyBytes + DeltaReady * S, unacked_bytes = UnackedBytes + DeltaUnacked * S, ram_bytes = RamBytes + DeltaRam * S, - persistent_bytes = PersistentBytes + DeltaPersistent * S}. + persistent_bytes = PersistentBytes + DeltaPersistent * S, + delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1493,7 +1518,7 @@ remove(true, MsgStatus = #msg_status { MsgStatus #msg_status { is_delivered = true }, State), - State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1), + State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1), {SeqId, maybe_update_rates( State2 #vqstate {out_counter = OutCount + 1, @@ -1529,7 +1554,7 @@ remove(false, MsgStatus = #msg_status { false -> IndexState1 end, - State1 = stats({-1, 0}, {MsgStatus, none}, State), + State1 = stats({-1, 0}, {MsgStatus, none}, 0, State), {undefined, maybe_update_rates( State1 #vqstate {out_counter = OutCount + 1, @@ -1613,7 +1638,7 @@ process_queue_entries1( is_delivered = true }, State1), {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), Fun(Msg, SeqId, FetchAcc), - stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. + stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}. collect_by_predicate(Pred, QAcc, State) -> case queue_out(State) of @@ -1715,7 +1740,7 @@ remove_queue_entries1( end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks), - stats({-1, 0}, {MsgStatus, none}, State)}. + stats({-1, 0}, {MsgStatus, none}, 0, State)}. process_delivers_and_acks_fun(deliver_and_ack) -> fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> @@ -1752,7 +1777,7 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, InCount1 = InCount + 1, UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats({1, 0}, {none, MsgStatus1}, + stats({1, 0}, {none, MsgStatus1}, 0, State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, unconfirmed = UC1 }); @@ -1765,17 +1790,17 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount, durable = IsDurable, unconfirmed = UC, - delta = Delta }) -> + delta = Delta}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - Delta1 = expand_delta(SeqId, Delta), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats(lazy_pub, {lazy, m(MsgStatus1)}, + stats(lazy_pub, {lazy, m(MsgStatus1)}, 1, State1#vqstate{ delta = Delta1, next_seq_id = SeqId + 1, in_counter = InCount + 1, - unconfirmed = UC1 }). + unconfirmed = UC1}). batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, @@ -1798,7 +1823,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, + State3 = stats({0, 1}, {none, MsgStatus1}, 0, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -1821,7 +1846,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, + State3 = stats({0, 1}, {none, MsgStatus1}, 0, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -2009,7 +2034,7 @@ remove_pending_ack(true, SeqId, State) -> {none, _} -> {none, State}; {MsgStatus, State1} -> - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)} + {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)} end; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, @@ -2154,14 +2179,14 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -2200,11 +2225,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> case msg_from_pending_ack(SeqId, State0) of {none, _} -> Acc; - {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent } = MsgStatus, State1} -> {_MsgStatus, State2} = maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} + {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0], + stats({1, -1}, {MsgStatus, none}, 1, State2)} end end, {Delta, MsgIds, State}, SeqIds). @@ -2406,7 +2432,7 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, + stats({0, 0}, {MsgStatus, MsgStatus2}, 0, State1 #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1 })) end. @@ -2495,16 +2521,18 @@ maybe_deltas_to_betas(DelsAndAcksFun, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, disk_read_count = DiskReadCount, + delta_transient_bytes = DeltaTransientBytes, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, + transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, RamCountsInc, RamBytesInc, State1} = + {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), @@ -2527,13 +2555,16 @@ maybe_deltas_to_betas(DelsAndAcksFun, %% can now join q2 onto q3 State2 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2) }; + q3 = ?QUEUE:join(Q3b, Q2), + delta_transient_bytes = 0}; N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, + transient = Transient - TransientCount, end_seq_id = DeltaSeqIdEnd }), State2 #vqstate { delta = Delta1, - q3 = Q3b } + q3 = Q3b, + delta_transient_bytes = DeltaTransientBytes - TransientBytes } end end. @@ -2542,7 +2573,8 @@ push_alphas_to_betas(Quota, State) -> push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, - State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State0 = #vqstate { q3 = Q3, delta = #delta { count = 0, + transient = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } @@ -2578,7 +2610,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), + ready0, {MsgStatus, MsgStatus2}, 0, State1), State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State3) @@ -2641,10 +2673,11 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> when SeqId < Limit -> {Q, {Quota, Delta, ui(State)}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, State1} = + {#msg_status { index_on_disk = true, + is_persistent = IsPersistent }, State1} = maybe_batch_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), + State2 = stats(ready0, {MsgStatus, none}, 1, State1), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), push_betas_to_deltas1(Generator, Limit, Qa, {Quota - 1, Delta1, State2}) end. |
