summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-04 14:43:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-04 14:43:27 +0100
commit8e06d03debb2d39449ad5e2b90599bb3b5a6e4e4 (patch)
treede49033798f4bc1bc58ae6a19086179a6affd460 /src
parent587e3cf8aa4c4308662b957b1a374a468402f70c (diff)
parent234c2cd961f00ed0c2334a219ae098929cb25cb3 (diff)
downloadrabbitmq-server-git-8e06d03debb2d39449ad5e2b90599bb3b5a6e4e4.tar.gz
Merging default into bug24455
Diffstat (limited to 'src')
-rw-r--r--src/bpqueue.erl271
-rw-r--r--src/lqueue.erl89
-rw-r--r--src/rabbit_tests.erl140
-rw-r--r--src/rabbit_variable_queue.erl419
4 files changed, 271 insertions, 648 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
deleted file mode 100644
index 71a34262eb..0000000000
--- a/src/bpqueue.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(bpqueue).
-
-%% Block-prefixed queue. From the perspective of the queue interface
-%% the datastructure acts like a regular queue where each value is
-%% paired with the prefix.
-%%
-%% This is implemented as a queue of queues, which is more space and
-%% time efficient, whilst supporting the normal queue interface. Each
-%% inner queue has a prefix, which does not need to be unique, and it
-%% is guaranteed that no two consecutive blocks have the same
-%% prefix. len/1 returns the flattened length of the queue and is
-%% O(1).
-
--export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4,
- map_fold_filter_r/4]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--export_type([bpqueue/0]).
-
--type(bpqueue() :: {non_neg_integer(), queue()}).
--type(prefix() :: any()).
--type(value() :: any()).
--type(result() :: ({'empty', bpqueue()} |
- {{'value', prefix(), value()}, bpqueue()})).
-
--spec(new/0 :: () -> bpqueue()).
--spec(is_empty/1 :: (bpqueue()) -> boolean()).
--spec(len/1 :: (bpqueue()) -> non_neg_integer()).
--spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(out/1 :: (bpqueue()) -> result()).
--spec(out_r/1 :: (bpqueue()) -> result()).
--spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()).
--spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
--spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
--spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
--spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-new() -> {0, queue:new()}.
-
-is_empty({0, _Q}) -> true;
-is_empty(_BPQ) -> false.
-
-len({N, _Q}) -> N.
-
-in(Prefix, Value, {0, Q}) ->
- {1, queue:in({Prefix, queue:from_list([Value])}, Q)};
-in(Prefix, Value, BPQ) ->
- in1({fun queue:in/2, fun queue:out_r/1}, Prefix, Value, BPQ).
-
-in_r(Prefix, Value, BPQ = {0, _Q}) ->
- in(Prefix, Value, BPQ);
-in_r(Prefix, Value, BPQ) ->
- in1({fun queue:in_r/2, fun queue:out/1}, Prefix, Value, BPQ).
-
-in1({In, Out}, Prefix, Value, {N, Q}) ->
- {N+1, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, In(Value, InnerQ)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, queue:in(Value, queue:new())}, Q)
- end}.
-
-in_q(Prefix, Queue, BPQ = {0, Q}) ->
- case queue:len(Queue) of
- 0 -> BPQ;
- N -> {N, queue:in({Prefix, Queue}, Q)}
- end;
-in_q(Prefix, Queue, BPQ) ->
- in_q1({fun queue:in/2, fun queue:out_r/1,
- fun queue:join/2},
- Prefix, Queue, BPQ).
-
-in_q_r(Prefix, Queue, BPQ = {0, _Q}) ->
- in_q(Prefix, Queue, BPQ);
-in_q_r(Prefix, Queue, BPQ) ->
- in_q1({fun queue:in_r/2, fun queue:out/1,
- fun (T, H) -> queue:join(H, T) end},
- Prefix, Queue, BPQ).
-
-in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) ->
- case queue:len(Queue) of
- 0 -> BPQ;
- M -> {N + M, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, Join(InnerQ, Queue)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, Queue}, Q)
- end}
- end.
-
-out({0, _Q} = BPQ) -> {empty, BPQ};
-out(BPQ) -> out1({fun queue:in_r/2, fun queue:out/1}, BPQ).
-
-out_r({0, _Q} = BPQ) -> {empty, BPQ};
-out_r(BPQ) -> out1({fun queue:in/2, fun queue:out_r/1}, BPQ).
-
-out1({In, Out}, {N, Q}) ->
- {{value, {Prefix, InnerQ}}, Q1} = Out(Q),
- {{value, Value}, InnerQ1} = Out(InnerQ),
- Q2 = case queue:is_empty(InnerQ1) of
- true -> Q1;
- false -> In({Prefix, InnerQ1}, Q1)
- end,
- {{value, Prefix, Value}, {N-1, Q2}}.
-
-join({0, _Q}, BPQ) ->
- BPQ;
-join(BPQ, {0, _Q}) ->
- BPQ;
-join({NHead, QHead}, {NTail, QTail}) ->
- {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead),
- {NHead + NTail,
- case queue:out(QTail) of
- {{value, {Prefix, InnerQTail}}, QTail1} ->
- queue:join(
- queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1),
- QTail1);
- {{value, {_Prefix, _InnerQTail}}, _QTail1} ->
- queue:join(QHead, QTail)
- end}.
-
-foldl(_Fun, Init, {0, _Q}) -> Init;
-foldl( Fun, Init, {_N, Q}) -> fold1(fun queue:out/1, Fun, Init, Q).
-
-foldr(_Fun, Init, {0, _Q}) -> Init;
-foldr( Fun, Init, {_N, Q}) -> fold1(fun queue:out_r/1, Fun, Init, Q).
-
-fold1(Out, Fun, Init, Q) ->
- case Out(Q) of
- {empty, _Q} ->
- Init;
- {{value, {Prefix, InnerQ}}, Q1} ->
- fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1)
- end.
-
-fold1(Out, Fun, Prefix, Init, InnerQ) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- Init;
- {{value, Value}, InnerQ1} ->
- fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1)
- end.
-
-from_list(List) ->
- {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} =
- lists:foldl(
- fun ({_Prefix, []}, Acc) ->
- Acc;
- ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix, queue:join(InnerQ, queue:from_list(InnerList)),
- ListOfPQs, LenAcc + length(InnerList)};
- ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix1, queue:from_list(InnerList),
- [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)}
- end, {undefined, queue:new(), [], 0}, List),
- ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1],
- [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2),
- {Len, queue:from_list(case queue:is_empty(InnerQ1) of
- true -> Rest;
- false -> All
- end)}.
-
-to_list({0, _Q}) -> [];
-to_list({_N, Q}) -> [{Prefix, queue:to_list(InnerQ)} ||
- {Prefix, InnerQ} <- queue:to_list(Q)].
-
-%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
-%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
-%%
-%% The filter fun allows you to skip very quickly over blocks that
-%% you're not interested in. Such blocks appear in the resulting bpq
-%% without modification. The Fun is then used both to map the value,
-%% which also allows you to change the prefix (and thus block) of the
-%% value, and also to modify the Init/Acc (just like a fold). If the
-%% Fun returns 'stop' then it is not applied to any further items.
-map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun queue:out/1, fun queue:in/2,
- fun in_q/3, fun join/2},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun queue:out_r/1, fun queue:in_r/2,
- fun in_q_r/3, fun (T, H) -> join(H, T) end},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun,
- Init, Q, QNew) ->
- case Out(Q) of
- {empty, _Q} ->
- {QNew, Init};
- {{value, {Prefix, InnerQ}}, Q1} ->
- case PFilter(Prefix) of
- true ->
- {Init1, QNew1, Cont} =
- map_fold_filter2(Funs, Fun, Prefix, Prefix,
- Init, InnerQ, QNew, queue:new()),
- case Cont of
- false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1};
- true -> map_fold_filter1(Funs, Len, PFilter, Fun,
- Init1, Q1, QNew1)
- end;
- false ->
- map_fold_filter1(Funs, Len, PFilter, Fun,
- Init, Q1, InQ(Prefix, InnerQ, QNew))
- end
- end.
-
-map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix,
- Init, InnerQ, QNew, InnerQNew) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), true};
- {{value, Value}, InnerQ1} ->
- case Fun(Value, Init) of
- stop ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), false};
- {Prefix1, Value1, Init1} ->
- {Prefix2, QNew1, InnerQNew1} =
- case Prefix1 =:= Prefix of
- true -> {Prefix, QNew, In(Value1, InnerQNew)};
- false -> {Prefix1, InQ(Prefix, InnerQNew, QNew),
- In(Value1, queue:new())}
- end,
- map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2,
- Init1, InnerQ1, QNew1, InnerQNew1)
- end
- end.
diff --git a/src/lqueue.erl b/src/lqueue.erl
new file mode 100644
index 0000000000..4a8164f6db
--- /dev/null
+++ b/src/lqueue.erl
@@ -0,0 +1,89 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(lqueue).
+
+-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
+ foldl/3, foldr/3, from_list/1, to_list/1]).
+
+-define(QUEUE, queue).
+
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: {non_neg_integer(), ?MODULE()}).
+-type(value() :: any()).
+-type(result() :: ({'empty', ?MODULE()} |
+ {{'value', value()}, ?MODULE()})).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(len/1 :: (?MODULE()) -> non_neg_integer()).
+-spec(in/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(in_r/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(out/1 :: (?MODULE()) -> result()).
+-spec(out_r/1 :: (?MODULE()) -> result()).
+-spec(join/2 :: (?MODULE(), ?MODULE()) -> ?MODULE()).
+-spec(foldl/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(from_list/1 :: ([value()]) -> ?MODULE()).
+-spec(to_list/1 :: (?MODULE()) -> [value()]).
+
+-endif.
+
+new() -> {0, ?QUEUE:new()}.
+
+is_empty({0, _Q}) -> true;
+is_empty(_) -> false.
+
+in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}.
+
+in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}.
+
+out({0, _Q} = Q) ->
+ {empty, Q};
+out({L, Q}) ->
+ {Result, Q1} = ?QUEUE:out(Q),
+ {Result, {L-1, Q1}}.
+
+out_r({0, _Q} = Q) ->
+ {empty, Q};
+out_r({L, Q}) ->
+ {Result, Q1} = ?QUEUE:out_r(Q),
+ {Result, {L-1, Q1}}.
+
+join({L1, Q1}, {L2, Q2}) ->
+ {L1 + L2, ?QUEUE:join(Q1, Q2)}.
+
+to_list({_L, Q}) -> ?QUEUE:to_list(Q).
+
+from_list(L) -> {length(L), ?QUEUE:from_list(L)}.
+
+foldl(Fun, Init, Q) ->
+ case out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1)
+ end.
+
+foldr(Fun, Init, Q) ->
+ case out_r(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
+ end.
+
+len({L, _Q}) ->
+ L.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 39f67ced2d..eb456e8cf9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -44,7 +44,6 @@ all_tests() ->
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
- passed = test_bpqueue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
@@ -262,143 +261,6 @@ test_priority_queue(Q) ->
priority_queue:to_list(Q),
priority_queue_out_all(Q)}.
-test_bpqueue() ->
- Q = bpqueue:new(),
- true = bpqueue:is_empty(Q),
- 0 = bpqueue:len(Q),
- [] = bpqueue:to_list(Q),
-
- Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1,
- fun bpqueue:to_list/1,
- fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4),
- Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1,
- fun (QR) -> lists:reverse(
- [{P, lists:reverse(L)} ||
- {P, L} <- bpqueue:to_list(QR)])
- end,
- fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4),
-
- [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)),
- [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)),
- [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q2)),
-
- [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q1)),
-
- [{foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(
- bpqueue:from_list(
- [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])),
-
- [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])),
-
- {4, [a,b,c,d]} =
- bpqueue:foldl(
- fun (Prefix, Value, {Prefix, Acc}) ->
- {Prefix + 1, [Value | Acc]}
- end,
- {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])),
-
- [{bar,3}, {foo,2}, {foo,1}] =
- bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2),
-
- BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}],
- BPQ = bpqueue:from_list(BPQL),
-
- %% no effect
- {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ),
- {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ),
- {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ),
- {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ),
-
- %% process 1 item
- {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([foo,bar], {foo, [2]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([bar], {bar, [4]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} =
- bpqueue_mffr([foo,bar], {foo, [6]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffr([bar], {baz, [4]}, BPQ),
-
- %% change prefix
- {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} =
- bpqueue_mffl([foo,bar], {bar, []}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {bar, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} =
- bpqueue_mffl([foo], {bar, [7]}, BPQ),
- {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} =
- bpqueue_mffl([bar], {foo, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} =
- bpqueue_mffl([foo], {bar, []}, BPQ),
- {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} =
- bpqueue_mffl([bar], {foo, []}, BPQ),
-
- %% edge cases
- {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {foo, [5]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} =
- bpqueue_mffr([foo], {foo, [2]}, BPQ),
-
- passed.
-
-bpqueue_test(In, Out, List, Fold, MapFoldFilter) ->
- Q = bpqueue:new(),
- {empty, _Q} = Out(Q),
-
- ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q),
- {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end,
- fun(_V, _N) -> throw(explosion) end, 0, Q),
- [] = bpqueue:to_list(Q1M),
-
- Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))),
- false = bpqueue:is_empty(Q1),
- 3 = bpqueue:len(Q1),
- [{foo, [1, 2]}, {bar, [3]}] = List(Q1),
-
- {{value, foo, 1}, Q3} = Out(Q1),
- {{value, foo, 2}, Q4} = Out(Q3),
- {{value, bar, 3}, _Q5} = Out(Q4),
-
- F = fun (QN) ->
- MapFoldFilter(fun (foo) -> true;
- (_) -> false
- end,
- fun (2, _Num) -> stop;
- (V, Num) -> {bar, -V, V - Num} end,
- 0, QN)
- end,
- {Q6, 0} = F(Q),
- [] = bpqueue:to_list(Q6),
- {Q7, 1} = F(Q1),
- [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7),
-
- Q1.
-
-bpqueue_mffl(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ).
-
-bpqueue_mffr(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ).
-
-bpqueue_mff(Fold, FF1A, FF2A, BPQ) ->
- FF1 = fun (Prefixes) ->
- fun (P) -> lists:member(P, Prefixes) end
- end,
- FF2 = fun ({Prefix, Stoppers}) ->
- fun (Val, Num) ->
- case lists:member(Val, Stoppers) of
- true -> stop;
- false -> {Prefix, -Val, 1 + Num}
- end
- end
- end,
- Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end,
-
- Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)).
-
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
@@ -2312,7 +2174,7 @@ wait_for_confirms(Unconfirmed) ->
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
- fun test_variable_queue_partial_segments_delta_thing/1,
+ %%fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_dropwhile/1,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 94c0913dfc..45c6a17f69 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -280,8 +280,6 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(merge_funs, {new, join, out, in, publish}).
-
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -291,6 +289,7 @@
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(QUEUE, lqueue).
-include("rabbit.hrl").
@@ -315,11 +314,11 @@
end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: bpqueue:bpqueue(),
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
delta :: delta(),
- q3 :: bpqueue:bpqueue(),
- q4 :: queue(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
@@ -475,19 +474,19 @@ purge(State = #vqstate { q4 = Q4,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q4,
+ fun ?QUEUE:foldl/3, Q4,
orddict:new(), IndexState, MSCState),
{LensByStore1, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = queue:new(),
+ State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q1,
+ fun ?QUEUE:foldl/3, Q1,
LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
+ {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
@@ -593,11 +592,11 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
- alpha_funs(),
+ fun publish_alpha/2,
MsgPropsFun, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- beta_funs(),
+ fun publish_beta/2,
MsgPropsFun, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
MsgPropsFun, State2),
@@ -696,7 +695,6 @@ needs_timeout(State) ->
false -> case reduce_memory_use(
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
State) of
{true, _State} -> idle;
@@ -725,11 +723,11 @@ status(#vqstate {
avg_ingress = AvgIngressRate },
ack_rates = #rates { avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
+ [ {q1 , ?QUEUE:len(Q1)},
+ {q2 , ?QUEUE:len(Q2)},
{delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
+ {q3 , ?QUEUE:len(Q3)},
+ {q4 , ?QUEUE:len(Q4)},
{len , Len},
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
@@ -758,11 +756,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
persistent_count = PersistentCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }) ->
- E1 = queue:is_empty(Q1),
- E2 = bpqueue:is_empty(Q2),
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
- E3 = bpqueue:is_empty(Q3),
- E4 = queue:is_empty(Q4),
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
LZ = Len == 0,
true = E1 or not E3,
@@ -869,25 +867,26 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> case gb_trees:is_defined(SeqId, PA) of
- false -> {[m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }) | Filtered1],
- Delivers1,
- Acks1};
- true -> Acc
+ false ->
+ {?QUEUE:in_r(
+ m(#msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }), Filtered1),
+ Delivers1, Acks1};
+ true ->
+ Acc
end
end
- end, {[], [], []}, List),
- {bpqueue:from_list([{true, Filtered}]),
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState))}.
+ end, {?QUEUE:new(), [], []}, List),
+ {Filtered, rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
%% the first arg is the older delta
combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
@@ -915,8 +914,18 @@ combine_deltas(#delta { start_seq_id = StartLow,
andalso ((StartLow + Count) =< EndHigh),
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-beta_fold(Fun, Init, Q) ->
- bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
+expand_delta(SeqId, Delta) ->
+ DeltaInc = #delta { start_seq_id = SeqId,
+ count = 1,
+ end_seq_id = SeqId + 1 },
+ case Delta of
+ ?BLANK_DELTA ->
+ DeltaInc;
+ #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId ->
+ combine_deltas(DeltaInc, Delta);
+ #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId ->
+ combine_deltas(Delta, DeltaInc)
+ end.
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -939,11 +948,11 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
+ q1 = ?QUEUE:new(),
+ q2 = ?QUEUE:new(),
delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
+ q3 = ?QUEUE:new(),
+ q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
@@ -983,19 +992,19 @@ blank_rate(Timestamp, IngressLength) ->
in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- case queue:is_empty(Q4) of
+ case ?QUEUE:is_empty(Q4) of
true -> State #vqstate {
- q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ q3 = ?QUEUE:in_r(MsgStatus, Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) }
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
queue_out(State = #vqstate { q4 = Q4 }) ->
- case queue:out(Q4) of
+ case ?QUEUE:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
{empty, _State1} = Result -> Result;
@@ -1073,15 +1082,15 @@ purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- case bpqueue:is_empty(Q3) of
+ case ?QUEUE:is_empty(Q3) of
true -> {LensByStore, State};
false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun beta_fold/3, Q3,
+ remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
LensByStore, IndexState, MSCState),
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
- q3 = bpqueue:new(),
+ q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
@@ -1132,9 +1141,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case bpqueue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
@@ -1327,85 +1336,50 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-alpha_funs() ->
- #merge_funs {
- new = fun queue:new/0,
- join = fun queue:join/2,
- out = fun queue:out/1,
- in = fun queue:in/2,
- publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
- (MsgStatus, #vqstate {
- ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate {
- ram_msg_count = RamMsgCount + 1 }}
- end}.
-
-beta_funs() ->
- #merge_funs {
- new = fun bpqueue:new/0,
- join = fun bpqueue:join/2,
- out = fun (Q) ->
- case bpqueue:out(Q) of
- {{value, _IndexOnDisk, MsgStatus}, Q1} ->
- {{value, MsgStatus}, Q1};
- {empty, _Q1} = X ->
- X
- end
- end,
- in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
- bpqueue:in(IOD, MsgStatus, Q)
- end,
- publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
- State) ->
- {#msg_status { index_on_disk = IndexOnDisk,
- msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } =
- State1} =
- maybe_write_to_disk(not MsgOnDisk, false,
- MsgStatus, State),
- {MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount +
- one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount +
- one_if(not IndexOnDisk) }}
- end}.
+publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+
+publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) ->
+ {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
- MsgPropsFun, State) ->
- queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
+ queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
+ Limit, PubFun, MsgPropsFun, State).
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
- #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
- MsgPropsFun, State)
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
+ Limit, PubFun, MsgPropsFun, State)
when Limit == undefined orelse SeqId < Limit ->
- case QOut(Q) of
+ case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
- Limit, Funs, MsgPropsFun, State);
+ queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
+ Limit, PubFun, MsgPropsFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
{MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- QPublish(MsgStatus, State1),
- queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, Funs, MsgPropsFun, State2)
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, MsgPropsFun, State2)
end;
-queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
- _MsgPropsFun, State) ->
- {SeqIds, QJoin(Front, Q), MsgIds, State}.
+queue_merge(SeqIds, Q, Front, MsgIds,
+ _Limit, _PubFun, _MsgPropsFun, State) ->
+ {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};
-delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
- count = Count,
- end_seq_id = EndSeqId} = Delta,
- MsgIds, MsgPropsFun, State) ->
+delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
{#msg_status { msg_id = MsgId,
index_on_disk = IndexOnDisk,
@@ -1415,11 +1389,7 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
{_MsgStatus, State2} =
maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
MsgStatus, State1),
- {Delta0 #delta {
- start_seq_id = lists:min([SeqId, StartSeqId]),
- count = Count + 1,
- end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
- [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1430,13 +1400,13 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
msg_props = (MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false } }, State1}.
-beta_limit(BPQ) ->
- case bpqueue:out(BPQ) of
- {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId;
- {empty, _BPQ} -> undefined
+beta_limit(Q) ->
+ case ?QUEUE:out(Q) of
+ {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId;
+ {empty, _Q} -> undefined
end.
-delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%%----------------------------------------------------------------------------
@@ -1462,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1497,13 +1467,10 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- case State1 #vqstate.target_ram_count of
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
- end
+ case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
end.
limit_ram_acks(0, State) ->
@@ -1525,54 +1492,27 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI1 })
end.
-
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
- fun limit_ram_index/2,
- fun push_betas_to_deltas/1,
+ fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
State),
State1.
-limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- {Q2a, {Quota1, IndexState1}} = limit_ram_index(
- fun bpqueue:map_fold_filter_r/4,
- Q2, {Quota, IndexState}),
- %% TODO: we shouldn't be writing index entries for messages that
- %% can never end up in delta due them residing in the only segment
- %% held by q3.
- {Q3a, {Quota2, IndexState2}} = limit_ram_index(
- fun bpqueue:map_fold_filter_r/4,
- Q3, {Quota1, IndexState1}),
- State #vqstate { q2 = Q2a, q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount - (Quota - Quota2) }.
-
-limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
- {Q, {0, IndexState}};
-limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
- MapFoldFilterFun(
- fun erlang:'not'/1,
- fun (MsgStatus, {0, _IndexStateN}) ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- stop;
- (MsgStatus, {N, IndexStateN}) when N > 0 ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- {MsgStatus1, IndexStateN1} =
- maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
- {true, m(MsgStatus1), {N-1, IndexStateN1}}
- end, {Quota, IndexState}, Q).
-
-permitted_ram_index_count(#vqstate { len = 0 }) ->
+permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
-permitted_ram_index_count(#vqstate { len = Len,
- q2 = Q2,
- q3 = Q3,
- delta = #delta { count = DeltaCount } }) ->
- BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
- BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+permitted_beta_count(#vqstate { len = Len,
+ q1 = Q1,
+ q3 = Q3,
+ q4 = Q4 }) ->
+ BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
+ Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len),
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} -> Permitted;
+ {{value, #msg_status { seq_id = MinSeqId }}, _Q3} ->
+ lists:max([Permitted, rabbit_queue_index:next_segment_boundary(
+ MinSeqId) - MinSeqId])
+ end.
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1587,25 +1527,25 @@ fetch_from_q3(State = #vqstate {
q3 = Q3,
q4 = Q4,
ram_index_count = RamIndexCount}) ->
- case bpqueue:out(Q3) of
+ case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, IndexOnDisk, MsgStatus}, Q3a} ->
+ {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} ->
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
State1 = State #vqstate { q3 = Q3a,
ram_index_count = RamIndexCount1 },
State2 =
- case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
+ case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and we
%% know q4 is empty otherwise we wouldn't be
%% loading from q3. As such, we can just set
%% q4 to Q1.
- true = bpqueue:is_empty(Q2), %% ASSERTION
- true = queue:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = queue:new(),
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(),
q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
@@ -1638,7 +1578,7 @@ maybe_deltas_to_betas(State = #vqstate {
{Q3a, IndexState2} =
betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
- case bpqueue:len(Q3a) of
+ case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
@@ -1646,14 +1586,14 @@ maybe_deltas_to_betas(State = #vqstate {
State1 #vqstate {
delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
Q3aLen ->
- Q3b = bpqueue:join(Q3, Q3a),
+ Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { q2 = bpqueue:new(),
+ State1 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
- q3 = bpqueue:join(Q3b, Q2) };
+ q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = #delta { start_seq_id = DeltaSeqId1,
count = N,
@@ -1670,23 +1610,21 @@ push_alphas_to_betas(Quota, State) ->
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
- fun queue:out/1,
- fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ fun ?QUEUE:out/1,
+ fun (MsgStatus, Q1a,
+ State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
State1 #vqstate { q1 = Q1a,
- q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) };
- (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q1a, State1 = #vqstate { q2 = Q2 }) ->
+ q3 = ?QUEUE:in(MsgStatus, Q3) };
+ (MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
- q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
+ q2 = ?QUEUE:in(MsgStatus, Q2) }
end, Quota, Q1, State).
maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
- fun queue:out_r/1,
- fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ fun ?QUEUE:out_r/1,
+ fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) ->
+ State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3),
q4 = Q4a }
end, Quota, Q4, State).
@@ -1716,65 +1654,70 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
Consumer(MsgStatus2, Qa, State2))
end.
-push_betas_to_deltas(State = #vqstate { q2 = Q2,
+push_betas_to_deltas(Quota,
+ State = #vqstate { q2 = Q2,
delta = Delta,
q3 = Q3,
index_state = IndexState,
ram_index_count = RamIndexCount }) ->
- {Delta2, Q2a, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun bpqueue:out/1, Q2,
- RamIndexCount, IndexState),
- {Delta3, Q3a, RamIndexCount3, IndexState3} =
- push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun bpqueue:out_r/1, Q3,
- RamIndexCount2, IndexState2),
- Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
+ PushState = {Quota, Delta, RamIndexCount, IndexState},
+ {Q2a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, PushState),
+ {Q3a, PushState2} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, PushState1),
+ {_, Delta1, RamIndexCount1, IndexState1} = PushState2,
State #vqstate { q2 = Q2a,
- delta = Delta4,
+ delta = Delta1,
q3 = Q3a,
- index_state = IndexState3,
- ram_index_count = RamIndexCount3 }.
-
-push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
- case bpqueue:out(Q) of
- {empty, _Q} ->
- {?BLANK_DELTA, Q, RamIndexCount, IndexState};
- {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} ->
- {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} =
- bpqueue:out_r(Q),
+ index_state = IndexState1,
+ ram_index_count = RamIndexCount1 }.
+
+push_betas_to_deltas(_Generator, _LimitFun, Q,
+ {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
+ {Q, PushState};
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ case ?QUEUE:is_empty(Q) of
+ true ->
+ {Q, PushState};
+ false ->
+ {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q),
+ {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
- true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
- false -> {Len, Qc, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(Generator, Limit, Q, 0,
- RamIndexCount, IndexState),
- {#delta { start_seq_id = Limit,
- count = Len,
- end_seq_id = MaxSeqId + 1 },
- Qc, RamIndexCount1, IndexState1}
+ true -> {Q, PushState};
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
end
end.
-push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
+ {Q, PushState};
+push_betas_to_deltas1(Generator, Limit, Q,
+ {Quota, Delta, RamIndexCount, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
- {Count, Q, RamIndexCount, IndexState};
- {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa}
+ {Q, PushState};
+ {{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Count, Q, RamIndexCount, IndexState};
- {{value, IndexOnDisk, MsgStatus}, Qa} ->
- {RamIndexCount1, IndexState1} =
+ {Q, PushState};
+ {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
+ seq_id = SeqId }}, Qa} ->
+ {Quota1, RamIndexCount1, IndexState1} =
case IndexOnDisk of
- true -> {RamIndexCount, IndexState};
+ true -> {Quota, RamIndexCount, IndexState};
false -> {#msg_status { index_on_disk = true },
IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,
IndexState),
- {RamIndexCount - 1, IndexState2}
+ {Quota - 1, RamIndexCount - 1, IndexState2}
end,
- push_betas_to_deltas(
- Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
+ Delta1 = expand_delta(SeqId, Delta),
+ push_betas_to_deltas1(Generator, Limit, Qa,
+ {Quota1, Delta1, RamIndexCount1, IndexState1})
end.
%%----------------------------------------------------------------------------