summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue.hrl27
-rw-r--r--src/q3tree.erl107
-rw-r--r--src/rabbit_variable_queue.erl68
3 files changed, 163 insertions, 39 deletions
diff --git a/include/rabbit_backing_queue.hrl b/include/rabbit_backing_queue.hrl
new file mode 100644
index 0000000000..5947356d16
--- /dev/null
+++ b/include/rabbit_backing_queue.hrl
@@ -0,0 +1,27 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-record(msg_status,
+ { seq_id,
+ msg_id,
+ msg,
+ is_persistent,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk,
+ msg_props
+ }).
+
diff --git a/src/q3tree.erl b/src/q3tree.erl
new file mode 100644
index 0000000000..a5fcd74014
--- /dev/null
+++ b/src/q3tree.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(q3tree).
+
+%% A less general random access variation of bpqueue for message status records
+
+-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, least_key/1,
+ join/2, join_bpqueue/2, foldr/3, from_batch/1, map_fold_filter_r/4]).
+
+-include("rabbit.hrl").
+-include("rabbit_backing_queue.hrl").
+
+new() ->
+ gb_trees:empty().
+
+is_empty(T) ->
+ gb_trees:is_empty(T).
+
+len(T) ->
+ gb_trees:size(T).
+
+in(IndexOnDisk, MsgStatus, Tree) -> in_r(IndexOnDisk, MsgStatus, Tree).
+
+in_r(IndexOnDisk,
+ #msg_status { seq_id = SeqId, index_on_disk = IndexOnDisk } = MsgStatus,
+ Tree) ->
+ gb_trees:insert(SeqId, MsgStatus, Tree);
+in_r(IndexOnDisk, _Msgstatus, _Tree) ->
+ throw({prefix_and_msg_disagree, IndexOnDisk}).
+
+out(Tree) -> out1(Tree, fun gb_trees:take_smallest/1).
+out_r(Tree) -> out1(Tree, fun gb_trees:take_largest/1).
+
+out1(Tree, TakeFun) ->
+ case gb_trees:is_empty(Tree) of
+ true -> {empty, Tree};
+ false -> {_Key, #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
+ Tree2} = TakeFun(Tree),
+ {{value, IndexOnDisk, MsgStatus}, Tree2}
+ end.
+
+least_key(Tree) ->
+ {Least, _} = gb_trees:smallest(Tree),
+ Least.
+
+join(T1, T2) ->
+ join1(gb_trees:iterator(T1), T2).
+join1(Iter, T) ->
+ case gb_trees:next(Iter) of
+ none -> T;
+ {_SeqId,
+ #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
+ Iter1} -> join1(Iter1, in_r(IndexOnDisk, MsgStatus, T))
+ end.
+
+join_bpqueue(T, Q) ->
+ bpqueue:foldr(fun (IndexOnDisk, MsgStatus, Tree) ->
+ in_r(IndexOnDisk, MsgStatus, Tree)
+ end, T, Q).
+
+foldr(Fun, Acc, Tree) ->
+ lists:foldr(Fun, Acc, gb_trees:to_list(Tree)).
+
+from_batch({IndexOnDisk, L}) ->
+ lists:foldl(fun (MsgStatus, Tree) ->
+ in_r(IndexOnDisk, MsgStatus, Tree)
+ end, new(), L).
+
+map_fold_filter_r(PFilter, Fun, Acc, Tree) ->
+ map_fold_filter_r1(PFilter, Fun, Acc, Tree, new()).
+
+map_fold_filter_r1(PFilter, Fun, Acc, TreeOld, TreeNew) ->
+ case out_r(TreeOld) of
+ {empty, _T} -> {TreeNew, Acc};
+ {{value,
+ IndexOnDisk, #msg_status{index_on_disk = IndexOnDisk} = MsgStatus},
+ TreeOld1} ->
+ case PFilter(IndexOnDisk) of
+ false ->
+ map_fold_filter_r1(PFilter, Fun, Acc, TreeOld1,
+ in_r(IndexOnDisk, MsgStatus, TreeNew));
+ true ->
+ case Fun(MsgStatus, Acc) of
+ stop ->
+ {join(TreeOld, TreeNew), Acc};
+ {IndexOnDisk1, MsgStatus1, Acc1} ->
+ map_fold_filter_r1(PFilter, Fun, Acc1, TreeOld1,
+ in_r(IndexOnDisk1, MsgStatus1,
+ TreeNew))
+ end
+ end
+ end.
+
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0944788869..ecfe3cdac1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -264,17 +264,6 @@
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
--record(msg_status,
- { seq_id,
- msg_id,
- msg,
- is_persistent,
- is_delivered,
- msg_on_disk,
- index_on_disk,
- msg_props
- }).
-
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
@@ -292,6 +281,7 @@
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-include("rabbit.hrl").
+-include("rabbit_backing_queue.hrl").
%%----------------------------------------------------------------------------
@@ -317,7 +307,7 @@
q1 :: queue(),
q2 :: bpqueue:bpqueue(),
delta :: delta(),
- q3 :: bpqueue:bpqueue(),
+ q3 :: gb_tree(),
q4 :: gb_tree(),
next_seq_id :: seq_id(),
pending_ack :: dict(),
@@ -711,7 +701,7 @@ status(#vqstate {
[ {q1 , queue:len(Q1)},
{q2 , bpqueue:len(Q2)},
{delta , Delta},
- {q3 , bpqueue:len(Q3)},
+ {q3 , q3tree:len(Q3)},
{q4 , gb_trees:size(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
@@ -744,7 +734,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
E1 = queue:is_empty(Q1),
E2 = bpqueue:is_empty(Q2),
ED = Delta#delta.count == 0,
- E3 = bpqueue:is_empty(Q3),
+ E3 = q3tree:is_empty(Q3),
E4 = gb_trees:is_empty(Q4),
LZ = Len == 0,
@@ -882,7 +872,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
Acks1}
end
end, {[], [], []}, List),
- {bpqueue:from_list([{true, Filtered}]),
+ {q3tree:from_batch({true, Filtered}),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
@@ -913,7 +903,7 @@ combine_deltas(#delta { start_seq_id = StartLow,
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
beta_fold(Fun, Init, Q) ->
- bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
+ q3tree:foldr(fun ({_SeqID, Value}, Acc) -> Fun(Value, Acc) end, Init, Q).
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -939,7 +929,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q1 = queue:new(),
q2 = bpqueue:new(),
delta = Delta,
- q3 = bpqueue:new(),
+ q3 = q3tree:new(),
q4 = gb_trees:empty(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
@@ -982,7 +972,7 @@ in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
case gb_trees:is_empty(Q4) of
true -> State #vqstate {
- q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
false -> {MsgStatus1 = #msg_status{ seq_id = SeqId},
State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State),
@@ -1072,7 +1062,7 @@ purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- case bpqueue:is_empty(Q3) of
+ case q3tree:is_empty(Q3) of
true -> {LensByStore, State};
false -> {LensByStore1, IndexState1} =
remove_queue_entries(fun beta_fold/3, Q3,
@@ -1080,7 +1070,7 @@ purge_betas_and_deltas(LensByStore,
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
- q3 = bpqueue:new(),
+ q3 = q3tree:new(),
index_state = IndexState1 }))
end.
@@ -1131,7 +1121,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case bpqueue:is_empty(Q3) of
+ State2 = case q3tree:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
true -> State1 #vqstate { q4 = gb_trees:insert(SeqId,
m(MsgStatus1), Q4) }
@@ -1451,7 +1441,7 @@ limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
%% can never end up in delta due them residing in the only segment
%% held by q3.
{Q3a, {Quota2, IndexState2}} = limit_ram_index(
- fun bpqueue:map_fold_filter_r/4,
+ fun q3tree:map_fold_filter_r/4,
Q3, {Quota1, IndexState1}),
State #vqstate { q2 = Q2a, q3 = Q3a,
index_state = IndexState2,
@@ -1478,7 +1468,7 @@ permitted_ram_index_count(#vqstate { len = Len,
q2 = Q2,
q3 = Q3,
delta = #delta { count = DeltaCount } }) ->
- BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
+ BetaLen = bpqueue:len(Q2) + q3tree:len(Q3),
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
chunk_size(Current, Permitted)
@@ -1494,7 +1484,7 @@ fetch_from_q3(State = #vqstate {
q3 = Q3,
q4 = Q4,
ram_index_count = RamIndexCount}) ->
- case bpqueue:out(Q3) of
+ case q3tree:out(Q3) of
{empty, _Q3} ->
{empty, State};
{{value, IndexOnDisk, MsgStatus}, Q3a} ->
@@ -1503,7 +1493,7 @@ fetch_from_q3(State = #vqstate {
State1 = State #vqstate { q3 = Q3a,
ram_index_count = RamIndexCount1 },
State2 =
- case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
+ case {q3tree:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and we
@@ -1544,7 +1534,7 @@ maybe_deltas_to_betas(State = #vqstate {
{Q3a, IndexState2} =
betas_from_index_entries(List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
- case bpqueue:len(Q3a) of
+ case q3tree:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
@@ -1552,14 +1542,14 @@ maybe_deltas_to_betas(State = #vqstate {
State1 #vqstate {
delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
Q3aLen ->
- Q3b = bpqueue:join(Q3, Q3a),
+ Q3b = q3tree:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
State1 #vqstate { q2 = bpqueue:new(),
delta = ?BLANK_DELTA,
- q3 = bpqueue:join(Q3b, Q2) };
+ q3 = q3tree:join_bpqueue(Q3b, Q2) };
N when N > 0 ->
Delta1 = #delta { start_seq_id = DeltaSeqId1,
count = N,
@@ -1580,7 +1570,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
State1 #vqstate { q1 = Q1a,
- q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) };
+ q3 = q3tree:in(IndexOnDisk, MsgStatus, Q3) };
(MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
@@ -1592,7 +1582,7 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
fun q4_out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
q4 = Q4a }
end, Quota, Q4, State).
@@ -1629,11 +1619,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
ram_index_count = RamIndexCount }) ->
{Delta2, Q2a, RamIndexCount2, IndexState2} =
push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun bpqueue:out/1, Q2,
+ fun bpqueue:out/1, bpqueue, Q2,
RamIndexCount, IndexState),
{Delta3, Q3a, RamIndexCount3, IndexState3} =
push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun bpqueue:out_r/1, Q3,
+ fun q3tree:out_r/1, q3tree, Q3,
RamIndexCount2, IndexState2),
Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
State #vqstate { q2 = Q2a,
@@ -1642,19 +1632,19 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
index_state = IndexState3,
ram_index_count = RamIndexCount3 }.
-push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
- case bpqueue:out(Q) of
+push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) ->
+ case QMod:out(Q) of
{empty, _Q} ->
{?BLANK_DELTA, Q, RamIndexCount, IndexState};
{{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} ->
{{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} =
- bpqueue:out_r(Q),
+ QMod:out_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
false -> {Len, Qc, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(Generator, Limit, Q, 0,
- RamIndexCount, IndexState),
+ push_betas_to_deltas1(Generator, Limit, Q, 0,
+ RamIndexCount, IndexState),
{#delta { start_seq_id = Limit,
count = Len,
end_seq_id = MaxSeqId + 1 },
@@ -1662,7 +1652,7 @@ push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
end
end.
-push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, _Q} ->
{Count, Q, RamIndexCount, IndexState};
@@ -1679,7 +1669,7 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
IndexState),
{RamIndexCount - 1, IndexState2}
end,
- push_betas_to_deltas(
+ push_betas_to_deltas1(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.