summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bpqueue.erl164
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl89
3 files changed, 192 insertions, 75 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 7237473f32..a556ec2342 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -63,12 +63,12 @@
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
-spec(map_fold_filter_l/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-spec(map_fold_filter_r/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-endif.
@@ -107,6 +107,40 @@ in_r(Prefix, Value, {N, Q}) ->
queue:in_r({Prefix, queue:in(Value, queue:new())}, Q)
end}.
+in_q(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out_r(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in({Prefix, queue:join(InnerQ, Queue)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in({Prefix, Queue}, Q)
+ end}
+ end.
+
+in_q_r(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q_r(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in_r({Prefix, queue:join(Queue, InnerQ)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in_r({Prefix, Queue}, Q)
+ end}
+ end.
+
out({0, _Q} = BPQ) ->
{empty, BPQ};
out({N, Q}) ->
@@ -195,7 +229,7 @@ to_list1({Prefix, InnerQ}) ->
%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init}
+%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
%%
%% The filter fun allows you to skip very quickly over blocks that
%% you're not interested in. Such blocks appear in the resulting bpq
@@ -204,50 +238,106 @@ to_list1({Prefix, InnerQ}) ->
%% value, and also to modify the Init/Acc (just like a fold).
map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_l1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_l1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_l1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldl(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(QNew, from_list([{Prefix, InnerList}]))}
- end,
- map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_l2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join(QNew1, {Len - len(QNew1), Q1}), Init1};
+ true ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init, Q1, in_q(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_l2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in(Value1, InnerQNew));
+ false ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.
map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_r1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_r1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_r1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out_r(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldr(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in_r(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(from_list([{Prefix, InnerList}]), QNew)}
- end,
- map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_r2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join({Len - len(QNew1), Q1}, QNew1), Init1};
+ true ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init, Q1, in_q_r(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_r2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out_r(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in_r(Value1, InnerQNew));
+ false ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q_r(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 45b480177d..291f4cb0e8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -239,7 +239,8 @@ test_bpqueue() ->
fun (foo) -> true;
(_) -> false
end,
- fun (V, Num) -> {bar, -V, V - Num} end,
+ fun (2, _Num) -> stop;
+ (V, Num) -> {bar, -V, V - Num} end,
0, Qn)
end,
@@ -248,14 +249,15 @@ test_bpqueue() ->
fun (foo) -> true;
(_) -> false
end,
- fun (V, Num) -> {bar, -V, V - Num} end,
+ fun (2, _Num) -> stop;
+ (V, Num) -> {bar, -V, V - Num} end,
0, Qn)
end,
- {Q9, 1} = F1(Q1), %% 2 - (1 - 0) == 1
- [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q9),
- {Q10, -1} = F2(Q1), %% 1 - (2 - 0) == -1
- [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q10),
+ {Q9, 1} = F1(Q1),
+ [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = bpqueue:to_list(Q9),
+ {Q10, 0} = F2(Q1),
+ [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(Q10),
{Q11, 0} = F1(Q),
[] = bpqueue:to_list(Q11),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6c5efbd5f8..a62a90ceae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -78,8 +78,20 @@
index_on_disk
}).
--define(RAM_INDEX_TARGET_RATIO, 256).
--define(RAM_INDEX_MAX_WORK, 32).
+%% If there are N msgs in the q, and M of them are betas, then it is
+%% required that RAM_INDEX_BETA_RATIO * (M/N) * M of those have their
+%% index on disk. Eg if RAM_INDEX_BETA_RATIO is 1.0, and there are 36
+%% msgs in the queue, of which 12 are betas, then 4 of those betas
+%% must have their index on disk.
+-define(RAM_INDEX_BETA_RATIO, 0.8).
+%% When we discover, on publish, that we should write some indices to
+%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
+%% betas that we must be due to write indices for before we do any
+%% work at all. This is both a minimum and a maximum - we don't write
+%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
+%% write more - we can always come back on the next publish to do
+%% more.
+-define(RAM_INDEX_BATCH_SIZE, 1024).
%%----------------------------------------------------------------------------
@@ -577,6 +589,26 @@ beta_fold_no_index_on_disk(Fun, Init, Q) ->
Fun(Value, Acc)
end, Init, Q).
+permitted_ram_index_count(#vqstate { len = 0 }) ->
+ undefined;
+permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3 }) ->
+ case bpqueue:len(Q2) + bpqueue:len(Q3) of
+ 0 ->
+ undefined;
+ BetaLength ->
+ %% the fraction of the queue that are betas
+ BetaFrac = BetaLength / Len,
+ BetaLength - trunc(BetaFrac * BetaLength * ?RAM_INDEX_BETA_RATIO)
+ end.
+
+
+should_force_index_to_disk(State =
+ #vqstate { ram_index_count = RamIndexCount }) ->
+ case permitted_ram_index_count(State) of
+ undefined -> false;
+ Permitted -> RamIndexCount >= Permitted
+ end.
+
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -771,17 +803,10 @@ publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
publish(index, MsgStatus, State =
#vqstate { index_state = IndexState, q1 = Q1,
- ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount }) ->
+ ram_index_count = RamIndexCount }) ->
MsgStatus1 = #msg_status { msg_on_disk = true } =
maybe_write_msg_to_disk(true, MsgStatus),
- ForceIndex = case TargetRamMsgCount of
- undefined ->
- false;
- _ ->
- RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)
- end,
+ ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of
@@ -872,17 +897,24 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
%% Phase changes
%%----------------------------------------------------------------------------
-limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount })
- when RamIndexCount > ?RAM_INDEX_TARGET_RATIO * TargetRamMsgCount ->
- Reduction = lists:min([?RAM_INDEX_MAX_WORK,
- RamIndexCount - (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)]),
- {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
- {_Reduction2, State2} = limit_q3_ram_index(Reduction1, State1),
- State2;
-limit_ram_index(State) ->
- State.
+limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
+ case permitted_ram_index_count(State) of
+ undefined ->
+ State;
+ Permitted when RamIndexCount > Permitted ->
+ Reduction = lists:min([RamIndexCount - Permitted,
+ ?RAM_INDEX_BATCH_SIZE]),
+ case Reduction < ?RAM_INDEX_BATCH_SIZE of
+ true ->
+ State;
+ false ->
+ {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
+ {_Red2, State2} = limit_q3_ram_index(Reduction1, State1),
+ State2
+ end;
+ _ ->
+ State
+ end.
limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 })
when Reduction > 0 ->
@@ -908,9 +940,9 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
{Qa, {Reduction1, IndexState1}} =
MapFoldFilterFun(
fun erlang:'not'/1,
- fun (MsgStatus, {0, _IndexStateN} = Acc) ->
+ fun (MsgStatus, {0, _IndexStateN}) ->
false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- {false, MsgStatus, Acc};
+ stop;
(MsgStatus, {N, IndexStateN}) when N > 0 ->
false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
{MsgStatus1, IndexStateN1} =
@@ -986,19 +1018,12 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount,
index_state = IndexState }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus),
- ForceIndex = case TargetRamMsgCount of
- undefined ->
- false;
- _ ->
- RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)
- end,
+ ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of