diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-02-11 01:17:51 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-02-11 01:17:51 +0300 |
| commit | f9ee1ec509b390b166d2d35b685ef2be45a1ad81 (patch) | |
| tree | da67f1217845824217bf148206aa4d4d74c597d8 | |
| parent | 43e54b28b16e58aba8f4279998ca41671db0acb6 (diff) | |
| parent | b374710802cf482500b3ccb10b3a974e09581ab5 (diff) | |
| download | rabbitmq-server-git-f9ee1ec509b390b166d2d35b685ef2be45a1ad81.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | src/rabbit_variable_queue.erl | 133 |
1 files changed, 83 insertions, 50 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5581143e69..c42b4856f2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -292,6 +292,7 @@ unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked + delta_transient_bytes, %% target_ram_count, ram_msg_count, %% w/o unacked @@ -339,6 +340,7 @@ -record(delta, { start_seq_id, %% start_seq_id is inclusive count, + transient, end_seq_id %% end_seq_id is exclusive }). @@ -430,9 +432,11 @@ -define(BLANK_DELTA, #delta { start_seq_id = undefined, count = 0, + transient = 0, end_seq_id = undefined }). -define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, count = 0, + transient = 0, end_seq_id = Z }). -define(MICROS_PER_SECOND, 1000000.0). @@ -933,6 +937,8 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; +info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) -> + Count; info(message_bytes, #vqstate{bytes = Bytes, unacked_bytes = UBytes}) -> Bytes + UBytes; @@ -944,6 +950,8 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) -> + PagedOutBytes; info(head_message_timestamp, #vqstate{ q3 = Q3, q4 = Q4, @@ -1303,14 +1311,14 @@ maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = + {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> + {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; + [SeqId | Acks1], RRC, RB, TC, TB}; false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), @@ -1318,12 +1326,15 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; + RB + one_if(HaveMsg) * Size, + TC + one_if(not IsPersistent), + TB + one_if(not IsPersistent) * Size}; true -> Acc %% [0] end end - end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. + end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State), + TransientCount, TransientBytes}. %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -1336,18 +1347,28 @@ is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, gb_trees:is_defined(SeqId, DPA) orelse gb_trees:is_defined(SeqId, QPA)). -expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> - d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); +expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) -> + d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1, + transient = one_if(not IsPersistent)}); expand_delta(SeqId, #delta { start_seq_id = StartSeqId, - count = Count } = Delta) + count = Count, + transient = Transient } = Delta, + IsPersistent ) when SeqId < StartSeqId -> - d(Delta #delta { start_seq_id = SeqId, count = Count + 1 }); + d(Delta #delta { start_seq_id = SeqId, count = Count + 1, + transient = Transient + one_if(not IsPersistent)}); expand_delta(SeqId, #delta { count = Count, - end_seq_id = EndSeqId } = Delta) + end_seq_id = EndSeqId, + transient = Transient } = Delta, + IsPersistent) when SeqId >= EndSeqId -> - d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }); -expand_delta(_SeqId, #delta { count = Count } = Delta) -> - d(Delta #delta { count = Count + 1 }). + d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1, + transient = Transient + one_if(not IsPersistent)}); +expand_delta(_SeqId, #delta { count = Count, + transient = Transient } = Delta, + IsPersistent ) -> + d(Delta #delta { count = Count + 1, + transient = Transient + one_if(not IsPersistent) }). %%---------------------------------------------------------------------------- %% Internal major helpers for Public API @@ -1369,6 +1390,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, true -> ?BLANK_DELTA; false -> d(#delta { start_seq_id = LowSeqId, count = DeltaCount1, + transient = 0, end_seq_id = NextSeqId }) end, Now = erlang:monotonic_time(), @@ -1397,6 +1419,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, persistent_count = DeltaCount1, bytes = DeltaBytes1, persistent_bytes = DeltaBytes1, + delta_transient_bytes = 0, target_ram_count = infinity, ram_msg_count = 0, @@ -1436,22 +1459,22 @@ in_r(MsgStatus = #msg_status { msg = undefined }, false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - stats(ready0, {MsgStatus, MsgStatus1}, + stats(ready0, {MsgStatus, MsgStatus1}, 0, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { mode = default, q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; %% lazy queues -in_r(MsgStatus = #msg_status { seq_id = SeqId }, +in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent }, State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> case ?QUEUE:is_empty(Q3) of true -> {_MsgStatus1, State1} = maybe_write_to_disk(true, true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - State2 #vqstate{ delta = Delta1 }; + State2 = stats(ready0, {MsgStatus, none}, 1, State1), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), + State2 #vqstate{ delta = Delta1}; false -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } end. @@ -1487,8 +1510,8 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, {Msg, State #vqstate {msg_store_clients = MSCState1, disk_read_count = Count + 1}}. -stats(Signs, Statuses, State) -> - stats0(expand_signs(Signs), expand_statuses(Statuses), State). +stats(Signs, Statuses, DeltaPaged, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State). expand_signs(ready0) -> {0, 0, true}; expand_signs(lazy_pub) -> {1, 0, true}; @@ -1503,13 +1526,14 @@ expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% contains "Ready" or "Unacked" iff that is what it counts. If %% neither is present it counts both. stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, + {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged, State = #vqstate{len = ReadyCount, bytes = ReadyBytes, ram_msg_count = RamReadyCount, persistent_count = PersistentCount, unacked_bytes = UnackedBytes, ram_bytes = RamBytes, + delta_transient_bytes = DeltaBytes, persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), DeltaTotal = DeltaReady + DeltaUnacked, @@ -1532,7 +1556,8 @@ stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, bytes = ReadyBytes + DeltaReady * S, unacked_bytes = UnackedBytes + DeltaUnacked * S, ram_bytes = RamBytes + DeltaRam * S, - persistent_bytes = PersistentBytes + DeltaPersistent * S}. + persistent_bytes = PersistentBytes + DeltaPersistent * S, + delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1554,7 +1579,7 @@ remove(true, MsgStatus = #msg_status { MsgStatus #msg_status { is_delivered = true }, State), - State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1), + State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1), {SeqId, maybe_update_rates( State2 #vqstate {out_counter = OutCount + 1, @@ -1590,7 +1615,7 @@ remove(false, MsgStatus = #msg_status { false -> IndexState1 end, - State1 = stats({-1, 0}, {MsgStatus, none}, State), + State1 = stats({-1, 0}, {MsgStatus, none}, 0, State), {undefined, maybe_update_rates( State1 #vqstate {out_counter = OutCount + 1, @@ -1674,7 +1699,7 @@ process_queue_entries1( is_delivered = true }, State1), {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), Fun(Msg, SeqId, FetchAcc), - stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. + stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}. collect_by_predicate(Pred, QAcc, State) -> case queue_out(State) of @@ -1776,7 +1801,7 @@ remove_queue_entries1( end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks), - stats({-1, 0}, {MsgStatus, none}, State)}. + stats({-1, 0}, {MsgStatus, none}, 0, State)}. process_delivers_and_acks_fun(deliver_and_ack) -> fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> @@ -1813,7 +1838,7 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, InCount1 = InCount + 1, UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats({1, 0}, {none, MsgStatus1}, + stats({1, 0}, {none, MsgStatus1}, 0, State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, unconfirmed = UC1 }); @@ -1826,17 +1851,17 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount, durable = IsDurable, unconfirmed = UC, - delta = Delta }) -> + delta = Delta}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - Delta1 = expand_delta(SeqId, Delta), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats(lazy_pub, {lazy, m(MsgStatus1)}, + stats(lazy_pub, {lazy, m(MsgStatus1)}, 1, State1#vqstate{ delta = Delta1, next_seq_id = SeqId + 1, in_counter = InCount + 1, - unconfirmed = UC1 }). + unconfirmed = UC1}). batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, @@ -1859,7 +1884,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, + State3 = stats({0, 1}, {none, MsgStatus1}, 0, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -1882,7 +1907,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, + State3 = stats({0, 1}, {none, MsgStatus1}, 0, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -2070,7 +2095,7 @@ remove_pending_ack(true, SeqId, State) -> {none, _} -> {none, State}; {MsgStatus, State1} -> - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)} + {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)} end; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, @@ -2215,14 +2240,14 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -2261,11 +2286,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> case msg_from_pending_ack(SeqId, State0) of {none, _} -> Acc; - {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent } = MsgStatus, State1} -> {_MsgStatus, State2} = maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} + {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0], + stats({1, -1}, {MsgStatus, none}, 1, State2)} end end, {Delta, MsgIds, State}, SeqIds). @@ -2467,7 +2493,7 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, + stats({0, 0}, {MsgStatus, MsgStatus2}, 0, State1 #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1 })) end. @@ -2556,16 +2582,18 @@ maybe_deltas_to_betas(DelsAndAcksFun, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, disk_read_count = DiskReadCount, + delta_transient_bytes = DeltaTransientBytes, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, + transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, RamCountsInc, RamBytesInc, State1} = + {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), @@ -2588,13 +2616,16 @@ maybe_deltas_to_betas(DelsAndAcksFun, %% can now join q2 onto q3 State2 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2) }; + q3 = ?QUEUE:join(Q3b, Q2), + delta_transient_bytes = 0}; N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, + transient = Transient - TransientCount, end_seq_id = DeltaSeqIdEnd }), State2 #vqstate { delta = Delta1, - q3 = Q3b } + q3 = Q3b, + delta_transient_bytes = DeltaTransientBytes - TransientBytes } end end. @@ -2603,7 +2634,8 @@ push_alphas_to_betas(Quota, State) -> push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, - State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State0 = #vqstate { q3 = Q3, delta = #delta { count = 0, + transient = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } @@ -2639,7 +2671,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), + ready0, {MsgStatus, MsgStatus2}, 0, State1), State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State3) @@ -2702,10 +2734,11 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> when SeqId < Limit -> {Q, {Quota, Delta, ui(State)}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, State1} = + {#msg_status { index_on_disk = true, + is_persistent = IsPersistent }, State1} = maybe_batch_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), + State2 = stats(ready0, {MsgStatus, none}, 1, State1), + Delta1 = expand_delta(SeqId, Delta, IsPersistent), push_betas_to_deltas1(Generator, Limit, Qa, {Quota - 1, Delta1, State2}) end. |
