summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-03-12 17:05:31 +0000
committerMatthias Radestock <matthias@lshift.net>2009-03-12 17:05:31 +0000
commitc61c5144f675d9c23c7ed738d65ec57a04c5da5e (patch)
treefaf5b6e014bcb2be85d71dc521639ab505bbdadc /src
parent9125971827bee48265b84656d6691de24444e06a (diff)
downloadrabbitmq-server-git-c61c5144f675d9c23c7ed738d65ec57a04c5da5e.tar.gz
first cut at extending gen_server2 with priorities
This just changes the representation of the message queue; the API changes are still to come.
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl12
-rw-r--r--src/priority_queue.erl125
2 files changed, 131 insertions, 6 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 11bb66d743..fbaff765d9 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -276,7 +276,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
%%%========================================================================
@@ -294,7 +294,7 @@ init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) ->
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -326,9 +326,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue), Debug)
+ Time, priority_queue:in(Input, Queue), Debug)
after 0 ->
- case queue:out(Queue) of
+ case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, Msg);
@@ -336,7 +336,7 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue1), Debug)
+ Time, priority_queue:in(Input, Queue1), Debug)
after Time ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, timeout)
@@ -850,5 +850,5 @@ format_status(Opt, StatusData) ->
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", queue:to_list(Queue)}]} |
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
Specfic].
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
new file mode 100644
index 0000000000..f660c0397d
--- /dev/null
+++ b/src/priority_queue.erl
@@ -0,0 +1,125 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%%
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%%
+%% in/2 inserts items with the highest priority.
+%%
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%%
+%% When in/3 is invoked for the first time we change the
+%% representation from an ordinary queue to a gb_tree with {Priority,
+%% Counter} as the key. Counter is incremented with for every 'in',
+
+-module(priority_queue).
+
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+
+new() ->
+ {queue, [], []}.
+
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Counter, _Tree}) when is_integer(Counter), Counter >= 0 ->
+ true;
+is_queue(_) ->
+ false.
+
+is_empty({queue, [], []}) ->
+ true;
+is_empty({queue, In,Out}) when is_list(In), is_list(Out) ->
+ false;
+is_empty({pqueue, _, Tree}) ->
+ gb_trees:is_empty(Tree).
+
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, _, Tree}) ->
+ gb_trees:size(Tree).
+
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ Out ++ lists:reverse(In, []);
+to_list({pqueue, _, Tree}) ->
+ gb_trees:to_list(Tree).
+
+in(X, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(Item, Other) ->
+ in(Item, infinity, Other).
+
+in(Item, Priority, {queue, In, Out}) ->
+ in(Item, Priority, {pqueue, 0, to_tree(In, Out)});
+in(Item, Priority, {pqueue, Counter, Tree}) ->
+ {pqueue, Counter + 1, gb_trees:insert({Priority, Counter}, Item, Tree)}.
+
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, Counter, Tree}) ->
+ {_, Item, Tree1} = gb_trees:take_smallest(Tree),
+ {{value, Item}, case gb_trees:is_empty(Tree1) of
+ true -> {queue, queue:new()};
+ false -> {pqueue, Counter, Tree1}
+ end}.
+
+to_tree(In, Out) ->
+ lists:foldl(fun (V, {C, T}) ->
+ {C + 1, gb_trees:insert({infinity, C}, V, T)}
+ end, {0, gb_trees:empty()}, Out ++ lists:reverse(In, [])).
+
+r2f([]) ->
+ {queue, [], []};
+r2f([_] = R) ->
+ {queue, [], R};
+r2f([X,Y]) ->
+ {queue, [X], [Y]};
+r2f([X,Y|R]) ->
+ {queue, [X,Y], lists:reverse(R, [])}.