diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-03-12 17:05:31 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-03-12 17:05:31 +0000 |
| commit | c61c5144f675d9c23c7ed738d65ec57a04c5da5e (patch) | |
| tree | faf5b6e014bcb2be85d71dc521639ab505bbdadc /src | |
| parent | 9125971827bee48265b84656d6691de24444e06a (diff) | |
| download | rabbitmq-server-git-c61c5144f675d9c23c7ed738d65ec57a04c5da5e.tar.gz | |
first cut at extending gen_server2 with priorities
This just changes the representation of the message queue; the API
changes are still to come.
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 12 | ||||
| -rw-r--r-- | src/priority_queue.erl | 125 |
2 files changed, 131 insertions, 6 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 11bb66d743..fbaff765d9 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -276,7 +276,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), loop(Parent, Name, State, Mod, Timeout, Queue, Debug). %%%======================================================================== @@ -294,7 +294,7 @@ init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, Parent, Name, Mod, Args, Options) -> Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -326,9 +326,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue), Debug) + Time, priority_queue:in(Input, Queue), Debug) after 0 -> - case queue:out(Queue) of + case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, Msg); @@ -336,7 +336,7 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue1), Debug) + Time, priority_queue:in(Input, Queue1), Debug) after Time -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, timeout) @@ -850,5 +850,5 @@ format_status(Opt, StatusData) -> {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", queue:to_list(Queue)}]} | + {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 0000000000..f660c0397d --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,125 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with the highest priority. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When in/3 is invoked for the first time we change the +%% representation from an ordinary queue to a gb_tree with {Priority, +%% Counter} as the key. Counter is incremented with for every 'in', + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). + +new() -> + {queue, [], []}. + +is_queue({queue, R, F}) when is_list(R), is_list(F) -> + true; +is_queue({pqueue, Counter, _Tree}) when is_integer(Counter), Counter >= 0 -> + true; +is_queue(_) -> + false. + +is_empty({queue, [], []}) -> + true; +is_empty({queue, In,Out}) when is_list(In), is_list(Out) -> + false; +is_empty({pqueue, _, Tree}) -> + gb_trees:is_empty(Tree). + +len({queue, R, F}) when is_list(R), is_list(F) -> + length(R) + length(F); +len({pqueue, _, Tree}) -> + gb_trees:size(Tree). + +to_list({queue, In, Out}) when is_list(In), is_list(Out) -> + Out ++ lists:reverse(In, []); +to_list({pqueue, _, Tree}) -> + gb_trees:to_list(Tree). + +in(X, {queue, [_] = In, []}) -> + {queue, [X], In}; +in(X, {queue, In, Out}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out}; +in(Item, Other) -> + in(Item, infinity, Other). + +in(Item, Priority, {queue, In, Out}) -> + in(Item, Priority, {pqueue, 0, to_tree(In, Out)}); +in(Item, Priority, {pqueue, Counter, Tree}) -> + {pqueue, Counter + 1, gb_trees:insert({Priority, Counter}, Item, Tree)}. + +out({queue, [], []} = Q) -> + {empty, Q}; +out({queue, [V], []}) -> + {{value, V}, {queue, [], []}}; +out({queue, [Y|In], []}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out}}; +out({queue, In, [V]}) when is_list(In) -> + {{value,V}, r2f(In)}; +out({queue, In,[V|Out]}) when is_list(In) -> + {{value, V}, {queue, In, Out}}; +out({pqueue, Counter, Tree}) -> + {_, Item, Tree1} = gb_trees:take_smallest(Tree), + {{value, Item}, case gb_trees:is_empty(Tree1) of + true -> {queue, queue:new()}; + false -> {pqueue, Counter, Tree1} + end}. + +to_tree(In, Out) -> + lists:foldl(fun (V, {C, T}) -> + {C + 1, gb_trees:insert({infinity, C}, V, T)} + end, {0, gb_trees:empty()}, Out ++ lists:reverse(In, [])). + +r2f([]) -> + {queue, [], []}; +r2f([_] = R) -> + {queue, [], R}; +r2f([X,Y]) -> + {queue, [X], [Y]}; +r2f([X,Y|R]) -> + {queue, [X,Y], lists:reverse(R, [])}. |
