summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bpqueue.erl70
-rw-r--r--src/rabbit_tests.erl83
-rw-r--r--src/rabbit_variable_queue.erl56
3 files changed, 206 insertions, 3 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 5e7471f71d..7237473f32 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -37,7 +37,8 @@
%% prefix. len/1 returns the flattened length of the queue and is O(1)
-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
- fold/3, from_list/1, to_list/1]).
+ fold/3, from_list/1, to_list/1, map_fold_filter_l/4,
+ map_fold_filter_r/4]).
%%----------------------------------------------------------------------------
@@ -60,6 +61,14 @@
-spec(fold/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
+-spec(map_fold_filter_l/4 ::
+ (fun ((prefix()) -> boolean()),
+ fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
+ {bpqueue(), B}).
+-spec(map_fold_filter_r/4 ::
+ (fun ((prefix()) -> boolean()),
+ fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
+ {bpqueue(), B}).
-endif.
@@ -183,3 +192,62 @@ to_list({_N, Q}) ->
to_list1({Prefix, InnerQ}) ->
{Prefix, queue:to_list(InnerQ)}.
+
+%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
+%% where FilterFun(Prefix) -> boolean()
+%% Fun(Value, Init) -> {Prefix, Value, Init}
+%%
+%% The filter fun allows you to skip very quickly over blocks that
+%% you're not interested in. Such blocks appear in the resulting bpq
+%% without modification. The Fun is then used both to map the value,
+%% which also allows you to change the prefix (and thus block) of the
+%% value, and also to modify the Init/Acc (just like a fold).
+map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
+ {BPQ, Init};
+map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) ->
+ map_fold_filter_l1(PFilter, Fun, Init, Q, new()).
+
+map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) ->
+ case queue:out(Q) of
+ {empty, _Q} ->
+ {QNew, Init};
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ InnerList = queue:to_list(InnerQ),
+ {Init1, QNew1} =
+ case PFilter(Prefix) of
+ true ->
+ lists:foldl(
+ fun (Value, {Acc, QNew2}) ->
+ {Prefix1, Value1, Acc1} = Fun(Value, Acc),
+ {Acc1, in(Prefix1, Value1, QNew2)}
+ end, {Init, QNew}, InnerList);
+ false ->
+ {Init, join(QNew, from_list([{Prefix, InnerList}]))}
+ end,
+ map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1)
+ end.
+
+map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
+ {BPQ, Init};
+map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) ->
+ map_fold_filter_r1(PFilter, Fun, Init, Q, new()).
+
+map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) ->
+ case queue:out_r(Q) of
+ {empty, _Q} ->
+ {QNew, Init};
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ InnerList = queue:to_list(InnerQ),
+ {Init1, QNew1} =
+ case PFilter(Prefix) of
+ true ->
+ lists:foldr(
+ fun (Value, {Acc, QNew2}) ->
+ {Prefix1, Value1, Acc1} = Fun(Value, Acc),
+ {Acc1, in_r(Prefix1, Value1, QNew2)}
+ end, {Init, QNew}, InnerList);
+ false ->
+ {Init, join(from_list([{Prefix, InnerList}]), QNew)}
+ end,
+ map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1)
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 16332f325c..45b480177d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,7 @@ all_tests() ->
passed = test_queue_index(),
passed = test_variable_queue(),
passed = test_priority_queue(),
+ passed = test_bpqueue(),
passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
@@ -181,6 +182,88 @@ test_priority_queue(Q) ->
priority_queue:to_list(Q),
priority_queue_out_all(Q)}.
+test_bpqueue() ->
+ Q = bpqueue:new(),
+ true = bpqueue:is_empty(Q),
+ 0 = bpqueue:len(Q),
+
+ Q1 = bpqueue:in(bar, 3, bpqueue:in(foo, 2, bpqueue:in(foo, 1, Q))),
+ false = bpqueue:is_empty(Q1),
+ 3 = bpqueue:len(Q1),
+ [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(Q1),
+
+ Q2 = bpqueue:in_r(bar, 3, bpqueue:in_r(foo, 2, bpqueue:in_r(foo, 1, Q))),
+ false = bpqueue:is_empty(Q2),
+ 3 = bpqueue:len(Q2),
+ [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(Q2),
+
+ {empty, _Q} = bpqueue:out(Q),
+ {{value, foo, 1}, Q3} = bpqueue:out(Q1),
+ {{value, foo, 2}, Q4} = bpqueue:out(Q3),
+ {{value, bar, 3}, _Q5} = bpqueue:out(Q4),
+
+ {empty, _Q} = bpqueue:out_r(Q),
+ {{value, foo, 1}, Q6} = bpqueue:out_r(Q2),
+ {{value, foo, 2}, Q7} = bpqueue:out_r(Q6),
+ {{value, bar, 3}, _Q8} = bpqueue:out_r(Q7),
+
+ [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)),
+ [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)),
+ [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] =
+ bpqueue:to_list(bpqueue:join(Q1, Q2)),
+
+ [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] =
+ bpqueue:to_list(bpqueue:join(Q1, Q1)),
+
+ [{foo, [1, 2]}, {bar, [3]}] =
+ bpqueue:to_list(
+ bpqueue:from_list(
+ [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])),
+
+ [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])),
+
+ {4, [a,b,c,d]} =
+ bpqueue:fold(
+ fun (Prefix, Value, {Prefix, Acc}) ->
+ {Prefix + 1, [Value | Acc]}
+ end,
+ {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])),
+
+ ok = bpqueue:fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end,
+ ok, Q),
+
+ [] = bpqueue:to_list(Q),
+
+ F1 = fun (Qn) ->
+ bpqueue:map_fold_filter_l(
+ fun (foo) -> true;
+ (_) -> false
+ end,
+ fun (V, Num) -> {bar, -V, V - Num} end,
+ 0, Qn)
+ end,
+
+ F2 = fun (Qn) ->
+ bpqueue:map_fold_filter_r(
+ fun (foo) -> true;
+ (_) -> false
+ end,
+ fun (V, Num) -> {bar, -V, V - Num} end,
+ 0, Qn)
+ end,
+
+ {Q9, 1} = F1(Q1), %% 2 - (1 - 0) == 1
+ [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q9),
+ {Q10, -1} = F2(Q1), %% 1 - (2 - 0) == -1
+ [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q10),
+
+ {Q11, 0} = F1(Q),
+ [] = bpqueue:to_list(Q11),
+ {Q12, 0} = F2(Q),
+ [] = bpqueue:to_list(Q12),
+
+ passed.
+
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8205f79f66..e821cf6b72 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -78,7 +78,8 @@
index_on_disk
}).
--define(RAM_INDEX_TARGET_RATIO, 32768).
+-define(RAM_INDEX_TARGET_RATIO, 1024).
+-define(RAM_INDEX_MAX_WORK, 32).
%%----------------------------------------------------------------------------
@@ -226,7 +227,8 @@ terminate(State = #vqstate { index_state = IndexState,
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
publish(Msg, State) ->
- publish(Msg, false, false, State).
+ State1 = limit_ram_index(State),
+ publish(Msg, false, false, State1).
publish_delivered(Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -870,6 +872,56 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
%% Phase changes
%%----------------------------------------------------------------------------
+limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount,
+ target_ram_msg_count = TargetRamMsgCount })
+ when RamIndexCount > ?RAM_INDEX_TARGET_RATIO * TargetRamMsgCount ->
+ Reduction = lists:min([?RAM_INDEX_MAX_WORK,
+ RamIndexCount - (?RAM_INDEX_TARGET_RATIO *
+ TargetRamMsgCount)]),
+ io:format("~p~n", [Reduction]),
+ {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
+ {_Reduction2, State2} = limit_q3_ram_index(Reduction1, State1),
+ State2;
+limit_ram_index(State) ->
+ State.
+
+limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 })
+ when Reduction > 0 ->
+ {Q2a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_l/4,
+ Q2, Reduction, State),
+ {Reduction1, State1 #vqstate { q2 = Q2a }};
+limit_q2_ram_index(Reduction, State) ->
+ {Reduction, State}.
+
+limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 })
+ when Reduction > 0 ->
+ %% use the _r version so that we prioritise the msgs closest to
+ %% delta, and least soon to be delivered
+ {Q3a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_r/4,
+ Q3, Reduction, State),
+ {Reduction1, State1 #vqstate { q3 = Q3a }};
+limit_q3_ram_index(Reduction, State) ->
+ {Reduction, State}.
+
+limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
+ #vqstate { ram_index_count = RamIndexCount,
+ index_state = IndexState }) ->
+ {Qa, {Reduction1, IndexState1}} =
+ MapFoldFilterFun(
+ fun erlang:'not'/1,
+ fun (MsgStatus, {0, _IndexStateN} = Acc) ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ {false, MsgStatus, Acc};
+ (MsgStatus, {N, IndexStateN}) when N > 0 ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ {MsgStatus1, IndexStateN1} =
+ maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
+ {true, MsgStatus1, {N-1, IndexStateN1}}
+ end, {Reduction, IndexState}, Q),
+ RamIndexCount1 = RamIndexCount - (Reduction - Reduction1),
+ {Qa, Reduction1, State #vqstate { index_state = IndexState1,
+ ram_index_count = RamIndexCount1 }}.
+
maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) ->
State;
maybe_deltas_to_betas(