summaryrefslogtreecommitdiff
path: root/src/q3tree.erl
blob: a5fcd740147b6a50d7252b1288e63ad8e7ffb6e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
%%

-module(q3tree).

%% A less general random access variation of bpqueue for message status records

-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, least_key/1,
         join/2, join_bpqueue/2, foldr/3, from_batch/1, map_fold_filter_r/4]).

-include("rabbit.hrl").
-include("rabbit_backing_queue.hrl").

new() ->
    gb_trees:empty().

is_empty(T) ->
    gb_trees:is_empty(T).

len(T) ->
    gb_trees:size(T).

in(IndexOnDisk, MsgStatus, Tree) -> in_r(IndexOnDisk, MsgStatus, Tree).

in_r(IndexOnDisk,
     #msg_status { seq_id = SeqId, index_on_disk = IndexOnDisk } = MsgStatus,
     Tree) ->
    gb_trees:insert(SeqId, MsgStatus, Tree);
in_r(IndexOnDisk, _Msgstatus, _Tree) ->
    throw({prefix_and_msg_disagree, IndexOnDisk}).

out(Tree)   -> out1(Tree, fun gb_trees:take_smallest/1).
out_r(Tree) -> out1(Tree, fun gb_trees:take_largest/1).

out1(Tree, TakeFun) ->
   case gb_trees:is_empty(Tree) of
       true  -> {empty, Tree};
       false -> {_Key, #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
                 Tree2} = TakeFun(Tree),
                {{value, IndexOnDisk, MsgStatus}, Tree2}
   end.

least_key(Tree) ->
    {Least, _} = gb_trees:smallest(Tree),
    Least.

join(T1, T2) ->
   join1(gb_trees:iterator(T1), T2).
join1(Iter, T) ->
    case gb_trees:next(Iter) of
        none    -> T;
        {_SeqId,
         #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
         Iter1} -> join1(Iter1, in_r(IndexOnDisk, MsgStatus, T))
    end.

join_bpqueue(T, Q) ->
   bpqueue:foldr(fun (IndexOnDisk, MsgStatus, Tree) ->
                     in_r(IndexOnDisk, MsgStatus, Tree)
                 end, T, Q).

foldr(Fun, Acc, Tree) ->
   lists:foldr(Fun, Acc, gb_trees:to_list(Tree)).

from_batch({IndexOnDisk, L}) ->
    lists:foldl(fun (MsgStatus, Tree) ->
                    in_r(IndexOnDisk, MsgStatus, Tree)
                end, new(), L).

map_fold_filter_r(PFilter, Fun, Acc, Tree) ->
    map_fold_filter_r1(PFilter, Fun, Acc, Tree, new()).

map_fold_filter_r1(PFilter, Fun, Acc, TreeOld, TreeNew) ->
    case out_r(TreeOld) of
        {empty, _T} -> {TreeNew, Acc};
        {{value,
          IndexOnDisk, #msg_status{index_on_disk = IndexOnDisk} = MsgStatus},
         TreeOld1} ->
            case PFilter(IndexOnDisk) of
                false ->
                   map_fold_filter_r1(PFilter, Fun, Acc, TreeOld1,
                                      in_r(IndexOnDisk, MsgStatus, TreeNew));
                true ->
                   case Fun(MsgStatus, Acc) of
                       stop ->
                           {join(TreeOld, TreeNew), Acc};
                       {IndexOnDisk1, MsgStatus1, Acc1} ->
                           map_fold_filter_r1(PFilter, Fun, Acc1, TreeOld1,
                                              in_r(IndexOnDisk1, MsgStatus1,
                                              TreeNew))
                   end
            end
   end.