summaryrefslogtreecommitdiff
path: root/src/bpqueue.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-13 19:32:51 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-13 19:32:51 +0000
commit24add8edcbeddbdfc5e0998e964984ee72e7c8c6 (patch)
treef1d0abc9d21953ceb5570ceef8a0cd838509bcc6 /src/bpqueue.erl
parentb7c1a4b15f129bf920adfc59881ca9498034499f (diff)
downloadrabbitmq-server-git-24add8edcbeddbdfc5e0998e964984ee72e7c8c6.tar.gz
Mechanism to limit the number of betas which don't have their index on disk is now in. Testing showed that if the queue is long, the change in target_ram_msg_count can be large, thus driving the number of ram indices directly off that still doesn't solve the problem. Thus am driving it from publish, with a limit on the maximum amount of work that can be done. This should allow the queue to remain responsive, as it works towards its goal. However, further testing, tuning and thinking is still needed.
Diffstat (limited to 'src/bpqueue.erl')
-rw-r--r--src/bpqueue.erl70
1 files changed, 69 insertions, 1 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 5e7471f71d..7237473f32 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -37,7 +37,8 @@
%% prefix. len/1 returns the flattened length of the queue and is O(1)
-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
- fold/3, from_list/1, to_list/1]).
+ fold/3, from_list/1, to_list/1, map_fold_filter_l/4,
+ map_fold_filter_r/4]).
%%----------------------------------------------------------------------------
@@ -60,6 +61,14 @@
-spec(fold/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
+-spec(map_fold_filter_l/4 ::
+ (fun ((prefix()) -> boolean()),
+ fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
+ {bpqueue(), B}).
+-spec(map_fold_filter_r/4 ::
+ (fun ((prefix()) -> boolean()),
+ fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
+ {bpqueue(), B}).
-endif.
@@ -183,3 +192,62 @@ to_list({_N, Q}) ->
to_list1({Prefix, InnerQ}) ->
{Prefix, queue:to_list(InnerQ)}.
+
+%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
+%% where FilterFun(Prefix) -> boolean()
+%% Fun(Value, Init) -> {Prefix, Value, Init}
+%%
+%% The filter fun allows you to skip very quickly over blocks that
+%% you're not interested in. Such blocks appear in the resulting bpq
+%% without modification. The Fun is then used both to map the value,
+%% which also allows you to change the prefix (and thus block) of the
+%% value, and also to modify the Init/Acc (just like a fold).
+map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
+ {BPQ, Init};
+map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) ->
+ map_fold_filter_l1(PFilter, Fun, Init, Q, new()).
+
+map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) ->
+ case queue:out(Q) of
+ {empty, _Q} ->
+ {QNew, Init};
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ InnerList = queue:to_list(InnerQ),
+ {Init1, QNew1} =
+ case PFilter(Prefix) of
+ true ->
+ lists:foldl(
+ fun (Value, {Acc, QNew2}) ->
+ {Prefix1, Value1, Acc1} = Fun(Value, Acc),
+ {Acc1, in(Prefix1, Value1, QNew2)}
+ end, {Init, QNew}, InnerList);
+ false ->
+ {Init, join(QNew, from_list([{Prefix, InnerList}]))}
+ end,
+ map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1)
+ end.
+
+map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
+ {BPQ, Init};
+map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) ->
+ map_fold_filter_r1(PFilter, Fun, Init, Q, new()).
+
+map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) ->
+ case queue:out_r(Q) of
+ {empty, _Q} ->
+ {QNew, Init};
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ InnerList = queue:to_list(InnerQ),
+ {Init1, QNew1} =
+ case PFilter(Prefix) of
+ true ->
+ lists:foldr(
+ fun (Value, {Acc, QNew2}) ->
+ {Prefix1, Value1, Acc1} = Fun(Value, Acc),
+ {Acc1, in_r(Prefix1, Value1, QNew2)}
+ end, {Init, QNew}, InnerList);
+ false ->
+ {Init, join(from_list([{Prefix, InnerList}]), QNew)}
+ end,
+ map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1)
+ end.