summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-02-11 01:17:51 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-02-11 01:17:51 +0300
commitf9ee1ec509b390b166d2d35b685ef2be45a1ad81 (patch)
treeda67f1217845824217bf148206aa4d4d74c597d8
parent43e54b28b16e58aba8f4279998ca41671db0acb6 (diff)
parentb374710802cf482500b3ccb10b3a974e09581ab5 (diff)
downloadrabbitmq-server-git-f9ee1ec509b390b166d2d35b685ef2be45a1ad81.tar.gz
Merge branch 'stable'
-rw-r--r--src/rabbit_variable_queue.erl133
1 files changed, 83 insertions, 50 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5581143e69..c42b4856f2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -292,6 +292,7 @@
unacked_bytes,
persistent_count, %% w unacked
persistent_bytes, %% w unacked
+ delta_transient_bytes, %%
target_ram_count,
ram_msg_count, %% w/o unacked
@@ -339,6 +340,7 @@
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
+ transient,
end_seq_id %% end_seq_id is exclusive
}).
@@ -430,9 +432,11 @@
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
+ transient = 0,
end_seq_id = undefined }).
-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
count = 0,
+ transient = 0,
end_seq_id = Z }).
-define(MICROS_PER_SECOND, 1000000.0).
@@ -933,6 +937,8 @@ info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
+info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) ->
+ Count;
info(message_bytes, #vqstate{bytes = Bytes,
unacked_bytes = UBytes}) ->
Bytes + UBytes;
@@ -944,6 +950,8 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
+info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) ->
+ PagedOutBytes;
info(head_message_timestamp, #vqstate{
q3 = Q3,
q4 = Q4,
@@ -1303,14 +1311,14 @@ maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
- {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
+ {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1], RRC, RB};
+ [SeqId | Acks1], RRC, RB, TC, TB};
false -> MsgStatus = m(beta_msg_status(M)),
HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
@@ -1318,12 +1326,15 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
false -> {?QUEUE:in_r(MsgStatus, Filtered1),
Delivers1, Acks1,
RRC + one_if(HaveMsg),
- RB + one_if(HaveMsg) * Size};
+ RB + one_if(HaveMsg) * Size,
+ TC + one_if(not IsPersistent),
+ TB + one_if(not IsPersistent) * Size};
true -> Acc %% [0]
end
end
- end, {?QUEUE:new(), [], [], 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
+ end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State),
+ TransientCount, TransientBytes}.
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
@@ -1336,18 +1347,28 @@ is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
gb_trees:is_defined(SeqId, DPA) orelse
gb_trees:is_defined(SeqId, QPA)).
-expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
- d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) ->
+ d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1,
+ transient = one_if(not IsPersistent)});
expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
- count = Count } = Delta)
+ count = Count,
+ transient = Transient } = Delta,
+ IsPersistent )
when SeqId < StartSeqId ->
- d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
+ d(Delta #delta { start_seq_id = SeqId, count = Count + 1,
+ transient = Transient + one_if(not IsPersistent)});
expand_delta(SeqId, #delta { count = Count,
- end_seq_id = EndSeqId } = Delta)
+ end_seq_id = EndSeqId,
+ transient = Transient } = Delta,
+ IsPersistent)
when SeqId >= EndSeqId ->
- d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
-expand_delta(_SeqId, #delta { count = Count } = Delta) ->
- d(Delta #delta { count = Count + 1 }).
+ d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1,
+ transient = Transient + one_if(not IsPersistent)});
+expand_delta(_SeqId, #delta { count = Count,
+ transient = Transient } = Delta,
+ IsPersistent ) ->
+ d(Delta #delta { count = Count + 1,
+ transient = Transient + one_if(not IsPersistent) }).
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
@@ -1369,6 +1390,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
true -> ?BLANK_DELTA;
false -> d(#delta { start_seq_id = LowSeqId,
count = DeltaCount1,
+ transient = 0,
end_seq_id = NextSeqId })
end,
Now = erlang:monotonic_time(),
@@ -1397,6 +1419,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
persistent_count = DeltaCount1,
bytes = DeltaBytes1,
persistent_bytes = DeltaBytes1,
+ delta_transient_bytes = 0,
target_ram_count = infinity,
ram_msg_count = 0,
@@ -1436,22 +1459,22 @@ in_r(MsgStatus = #msg_status { msg = undefined },
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
MsgStatus1 = MsgStatus#msg_status{msg = Msg},
- stats(ready0, {MsgStatus, MsgStatus1},
+ stats(ready0, {MsgStatus, MsgStatus1}, 0,
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
in_r(MsgStatus,
State = #vqstate { mode = default, q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
%% lazy queues
-in_r(MsgStatus = #msg_status { seq_id = SeqId },
+in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent },
State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
case ?QUEUE:is_empty(Q3) of
true ->
{_MsgStatus1, State1} =
maybe_write_to_disk(true, true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, State1),
- Delta1 = expand_delta(SeqId, Delta),
- State2 #vqstate{ delta = Delta1 };
+ State2 = stats(ready0, {MsgStatus, none}, 1, State1),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
+ State2 #vqstate{ delta = Delta1};
false ->
State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
end.
@@ -1487,8 +1510,8 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
{Msg, State #vqstate {msg_store_clients = MSCState1,
disk_read_count = Count + 1}}.
-stats(Signs, Statuses, State) ->
- stats0(expand_signs(Signs), expand_statuses(Statuses), State).
+stats(Signs, Statuses, DeltaPaged, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State).
expand_signs(ready0) -> {0, 0, true};
expand_signs(lazy_pub) -> {1, 0, true};
@@ -1503,13 +1526,14 @@ expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
%% contains "Ready" or "Unacked" iff that is what it counts. If
%% neither is present it counts both.
stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
- {InRamBefore, InRamAfter, MsgStatus},
+ {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged,
State = #vqstate{len = ReadyCount,
bytes = ReadyBytes,
ram_msg_count = RamReadyCount,
persistent_count = PersistentCount,
unacked_bytes = UnackedBytes,
ram_bytes = RamBytes,
+ delta_transient_bytes = DeltaBytes,
persistent_bytes = PersistentBytes}) ->
S = msg_size(MsgStatus),
DeltaTotal = DeltaReady + DeltaUnacked,
@@ -1532,7 +1556,8 @@ stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
bytes = ReadyBytes + DeltaReady * S,
unacked_bytes = UnackedBytes + DeltaUnacked * S,
ram_bytes = RamBytes + DeltaRam * S,
- persistent_bytes = PersistentBytes + DeltaPersistent * S}.
+ persistent_bytes = PersistentBytes + DeltaPersistent * S,
+ delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}.
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
@@ -1554,7 +1579,7 @@ remove(true, MsgStatus = #msg_status {
MsgStatus #msg_status {
is_delivered = true }, State),
- State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1),
+ State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1),
{SeqId, maybe_update_rates(
State2 #vqstate {out_counter = OutCount + 1,
@@ -1590,7 +1615,7 @@ remove(false, MsgStatus = #msg_status {
false -> IndexState1
end,
- State1 = stats({-1, 0}, {MsgStatus, none}, State),
+ State1 = stats({-1, 0}, {MsgStatus, none}, 0, State),
{undefined, maybe_update_rates(
State1 #vqstate {out_counter = OutCount + 1,
@@ -1674,7 +1699,7 @@ process_queue_entries1(
is_delivered = true }, State1),
{cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
Fun(Msg, SeqId, FetchAcc),
- stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
+ stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}.
collect_by_predicate(Pred, QAcc, State) ->
case queue_out(State) of
@@ -1776,7 +1801,7 @@ remove_queue_entries1(
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks),
- stats({-1, 0}, {MsgStatus, none}, State)}.
+ stats({-1, 0}, {MsgStatus, none}, 0, State)}.
process_delivers_and_acks_fun(deliver_and_ack) ->
fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
@@ -1813,7 +1838,7 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
InCount1 = InCount + 1,
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats({1, 0}, {none, MsgStatus1},
+ stats({1, 0}, {none, MsgStatus1}, 0,
State2#vqstate{ next_seq_id = SeqId + 1,
in_counter = InCount1,
unconfirmed = UC1 });
@@ -1826,17 +1851,17 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount,
durable = IsDurable,
unconfirmed = UC,
- delta = Delta }) ->
+ delta = Delta}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
{MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- Delta1 = expand_delta(SeqId, Delta),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats(lazy_pub, {lazy, m(MsgStatus1)},
+ stats(lazy_pub, {lazy, m(MsgStatus1)}, 1,
State1#vqstate{ delta = Delta1,
next_seq_id = SeqId + 1,
in_counter = InCount + 1,
- unconfirmed = UC1 }).
+ unconfirmed = UC1}).
batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
{ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
@@ -1859,7 +1884,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
+ State3 = stats({0, 1}, {none, MsgStatus1}, 0,
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
@@ -1882,7 +1907,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
+ State3 = stats({0, 1}, {none, MsgStatus1}, 0,
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
@@ -2070,7 +2095,7 @@ remove_pending_ack(true, SeqId, State) ->
{none, _} ->
{none, State};
{MsgStatus, State1} ->
- {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)}
end;
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
@@ -2215,14 +2240,14 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
MsgStatus1 = MsgStatus#msg_status { msg = Msg },
- {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -2261,11 +2286,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
case msg_from_pending_ack(SeqId, State0) of
{none, _} ->
Acc;
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} ->
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent } = MsgStatus, State1} ->
{_MsgStatus, State2} =
maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- stats({1, -1}, {MsgStatus, none}, State2)}
+ {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0],
+ stats({1, -1}, {MsgStatus, none}, 1, State2)}
end
end, {Delta, MsgIds, State}, SeqIds).
@@ -2467,7 +2493,7 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
- stats({0, 0}, {MsgStatus, MsgStatus2},
+ stats({0, 0}, {MsgStatus, MsgStatus2}, 0,
State1 #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1 }))
end.
@@ -2556,16 +2582,18 @@ maybe_deltas_to_betas(DelsAndAcksFun,
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes,
disk_read_count = DiskReadCount,
+ delta_transient_bytes = DeltaTransientBytes,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
+ transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, RamCountsInc, RamBytesInc, State1} =
+ {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
betas_from_index_entries(List, TransientThreshold,
DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
@@ -2588,13 +2616,16 @@ maybe_deltas_to_betas(DelsAndAcksFun,
%% can now join q2 onto q3
State2 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
- q3 = ?QUEUE:join(Q3b, Q2) };
+ q3 = ?QUEUE:join(Q3b, Q2),
+ delta_transient_bytes = 0};
N when N > 0 ->
Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
count = N,
+ transient = Transient - TransientCount,
end_seq_id = DeltaSeqIdEnd }),
State2 #vqstate { delta = Delta1,
- q3 = Q3b }
+ q3 = Q3b,
+ delta_transient_bytes = DeltaTransientBytes - TransientBytes }
end
end.
@@ -2603,7 +2634,8 @@ push_alphas_to_betas(Quota, State) ->
push_alphas_to_betas(
fun ?QUEUE:out/1,
fun (MsgStatus, Q1a,
- State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ State0 = #vqstate { q3 = Q3, delta = #delta { count = 0,
+ transient = 0 } }) ->
State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
(MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
@@ -2639,7 +2671,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, State1),
+ ready0, {MsgStatus, MsgStatus2}, 0, State1),
State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State3)
@@ -2702,10 +2734,11 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
when SeqId < Limit ->
{Q, {Quota, Delta, ui(State)}};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true }, State1} =
+ {#msg_status { index_on_disk = true,
+ is_persistent = IsPersistent }, State1} =
maybe_batch_write_index_to_disk(true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, State1),
- Delta1 = expand_delta(SeqId, Delta),
+ State2 = stats(ready0, {MsgStatus, none}, 1, State1),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
push_betas_to_deltas1(Generator, Limit, Qa,
{Quota - 1, Delta1, State2})
end.