summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl62
-rw-r--r--src/priority_queue.erl153
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl92
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_control.erl13
-rw-r--r--src/rabbit_exchange.erl88
-rw-r--r--src/rabbit_guid.erl3
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_misc.erl35
-rw-r--r--src/rabbit_multi.erl11
-rw-r--r--src/rabbit_persister.erl54
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/rabbit_router.erl3
-rw-r--r--src/rabbit_tests.erl63
16 files changed, 471 insertions, 125 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 11bb66d743..ba8becfca9 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,6 +16,11 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
+%%
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -107,8 +112,8 @@
%% API
-export([start/3, start/4,
start_link/3, start_link/4,
- call/2, call/3,
- cast/2, reply/2,
+ call/2, call/3, pcall/3, pcall/4,
+ cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
enter_loop/3, enter_loop/4, enter_loop/5]).
@@ -188,6 +193,22 @@ call(Name, Request, Timeout) ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
+pcall(Name, Priority, Request) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+ end.
+
+pcall(Name, Priority, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ end.
+
%% -----------------------------------------------------------------
%% Make a cast to a generic server.
%% -----------------------------------------------------------------
@@ -207,6 +228,22 @@ do_cast(Dest, Request) ->
cast_msg(Request) -> {'$gen_cast',Request}.
+pcast({global,Name}, Priority, Request) ->
+ catch global:send(Name, cast_msg(Priority, Request)),
+ ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+ do_cast(Dest, Priority, Request).
+
+do_cast(Dest, Priority, Request) ->
+ do_send(Dest, cast_msg(Priority, Request)),
+ ok.
+
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+
%% -----------------------------------------------------------------
%% Send a reply to the client.
%% -----------------------------------------------------------------
@@ -276,7 +313,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
%%%========================================================================
@@ -294,7 +331,7 @@ init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) ->
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -326,9 +363,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue), Debug)
+ Time, in(Input, Queue), Debug)
after 0 ->
- case queue:out(Queue) of
+ case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, Msg);
@@ -336,14 +373,21 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue1), Debug)
+ Time, in(Input, Queue1), Debug)
after Time ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, timeout)
end
end
end.
-
+
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+ priority_queue:in(Input, Queue).
+
process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
case Msg of
{system, From, Req} ->
@@ -850,5 +894,5 @@ format_status(Opt, StatusData) ->
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", queue:to_list(Queue)}]} |
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
Specfic].
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
new file mode 100644
index 0000000000..732757c41c
--- /dev/null
+++ b/src/priority_queue.erl
@@ -0,0 +1,153 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%%
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%%
+%% in/2 inserts items with priority 0.
+%%
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%%
+%% When the queue contains items with non-zero priorities, it is
+%% represented as a sorted kv list with the inverted Priority as the
+%% key and an ordinary queue as the value. Here again we use our own
+%% ordinary queue implemention for efficiency, often making recursive
+%% calls into the same function knowing that ordinary queues represent
+%% a base case.
+
+
+-module(priority_queue).
+
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(priority() :: integer()).
+-type(squeue() :: {queue, [any()], [any()]}).
+-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
+
+-spec(new/0 :: () -> pqueue()).
+-spec(is_queue/1 :: (any()) -> bool()).
+-spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(in/2 :: (any(), pqueue()) -> pqueue()).
+-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
+-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() ->
+ {queue, [], []}.
+
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Queues}) when is_list(Queues) ->
+ lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
+ Queues);
+is_queue(_) ->
+ false.
+
+is_empty({queue, [], []}) ->
+ true;
+is_empty(_) ->
+ false.
+
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, Queues}) ->
+ lists:sum([len(Q) || {_, Q} <- Queues]).
+
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ [{0, V} || V <- Out ++ lists:reverse(In, [])];
+to_list({pqueue, Queues}) ->
+ [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+
+in(Item, Q) ->
+ in(Item, 0, Q).
+
+in(X, 0, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(X, Priority, _Q = {queue, [], []}) ->
+ in(X, Priority, {pqueue, []});
+in(X, Priority, Q = {queue, _, _}) ->
+ in(X, Priority, {pqueue, [{0, Q}]});
+in(X, Priority, {pqueue, Queues}) ->
+ P = -Priority,
+ {pqueue, case lists:keysearch(P, 1, Queues) of
+ {value, {_, Q}} ->
+ lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end}.
+
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, [{P, Q} | Queues]}) ->
+ {R, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {R, NewQ}.
+
+r2f([]) -> {queue, [], []};
+r2f([_] = R) -> {queue, [], R};
+r2f([X,Y]) -> {queue, [X], [Y]};
+r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index da0ab9cf7a..54348d9a1c 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -192,7 +192,7 @@ delete_user(Username) ->
fun () ->
ok = mnesia:delete({rabbit_user, Username}),
[ok = mnesia:delete_object(
- rabbit_user_permissions, R, write) ||
+ rabbit_user_permission, R, write) ||
R <- mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 875624ba55..21999f16c3 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -78,7 +78,8 @@ stop() ->
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA}).
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3018582f94..eb076e94d6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -124,19 +124,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
<- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -200,10 +213,10 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, info).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:call(QPid, {info, Items}) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -212,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
gen_server2:cast(QPid, {deliver, Txn, Message}),
@@ -241,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
@@ -254,12 +266,11 @@ rollback_all(QPids, Txn) ->
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
@@ -269,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) ->
QPids).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
@@ -292,28 +305,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7574cd673a..b2716ec478 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -745,12 +745,16 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ {error, exchange_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
+ {error, exchange_and_queue_not_found} ->
rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index e6717d689f..6649899ade 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -38,6 +38,19 @@
-define(RPC_TIMEOUT, 30000).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(action/4 :: (atom(), erlang_node(), [string()],
+ fun ((string(), [any()]) -> 'ok')) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 19efd9fc22..fc89cfca51 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,7 @@
route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
--export([delete_bindings_for_queue/1]).
+-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
@@ -59,8 +59,10 @@
-type(publish_res() :: {'ok', [pid()]} |
not_found() | {'error', 'unroutable' | 'not_delivered'}).
--type(bind_res() :: 'ok' |
- {'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(bind_res() :: 'ok' | {'error',
+ 'queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
@@ -86,7 +88,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
+-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
@@ -103,24 +106,17 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
- Acc
- end, ok, rabbit_durable_exchange),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- Acc
- end, ok, rabbit_durable_route),
- ok
- end).
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write)
+ end, rabbit_durable_exchange),
+ ok = rabbit_misc:table_foreach(
+ fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write)
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -300,7 +296,7 @@ lookup_qpids(Queues) ->
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
-delete_bindings_for_exchange(ExchangeName) ->
+delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
@@ -312,10 +308,16 @@ delete_bindings_for_exchange(ExchangeName) ->
write)],
ok.
-delete_bindings_for_queue(QueueName) ->
+delete_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_forward_routes/1).
+
+delete_transient_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
+
+delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
[begin
- ok = delete_forward_routes(reverse_route(Route)),
+ ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
end || Route <- mnesia:match_object(
rabbit_reverse_route,
@@ -333,6 +335,9 @@ delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
#route{binding = #binding{exchange_name = '$1',
@@ -342,16 +347,13 @@ exchanges_for_queue(QueueName) ->
sets:from_list(
mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-has_bindings(ExchangeName) ->
- MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
+contains(Table, MatchHead) ->
try
- continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}],
- 1, read))
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- case mnesia:match_object(rabbit_route, MatchHead, read) of
+ case mnesia:match_object(Table, MatchHead, read) of
[] -> false;
[_|_] -> true
end
@@ -364,18 +366,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
- [] -> {error, exchange_not_found};
+ [] -> {error, not_found};
[X] -> Fun(X)
end
end).
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- call_with_exchange(
- Exchange,
- fun(X) -> case mnesia:read({rabbit_queue, Queue}) of
- [] -> {error, queue_not_found};
- [Q] -> Fun(X, Q)
- end
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
@@ -559,13 +563,17 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
ok.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- case has_bindings(ExchangeName) of
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_bindings_for_exchange(ExchangeName),
+ ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 51c1665bbb..2be005034e 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -82,7 +82,8 @@ guid() ->
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
+ 0};
{S, I} -> {S, I+1}
end,
put(guid, G),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 532be26d8e..3f9b6ebb9b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -90,7 +90,7 @@ can_send(undefined, _QPid) ->
can_send(LimiterPid, QPid) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5d176f8fac..eced0b3cbe 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
+-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -98,13 +99,14 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
--spec(format_stderr/2 :: (string(), [any()]) -> 'true').
+-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
@@ -298,6 +300,21 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
+%% For each entry in a table, execute a function in a transaction.
+%% This is often far more efficient than wrapping a tx around the lot.
+%%
+%% We ignore entries that have been modified or removed.
+table_foreach(F, TableName) ->
+ lists:foreach(
+ fun (E) -> execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> ok;
+ _ -> F(E)
+ end
+ end)
+ end, dirty_read_all(TableName)),
+ ok.
+
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -358,9 +375,19 @@ ensure_parent_dirs_exist(Filename) ->
end.
format_stderr(Fmt, Args) ->
- Port = open_port({fd, 0, 2}, [out]),
- port_command(Port, io_lib:format(Fmt, Args)),
- port_close(Port).
+ case os:type() of
+ {unix, _} ->
+ Port = open_port({fd, 0, 2}, [out]),
+ port_command(Port, io_lib:format(Fmt, Args)),
+ port_close(Port);
+ {win32, _} ->
+ %% stderr on Windows is buffered and I can't figure out a
+ %% way to trigger a fflush(stderr) in Erlang. So rather
+ %% than risk losing output we write to stdout instead,
+ %% which appears to be unbuffered.
+ io:format(Fmt, Args)
+ end,
+ ok.
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 5e8edd53a1..d91975359a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -36,6 +36,17 @@
-define(RPC_SLEEP, 500).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
RpcTimeout =
case init:get_argument(maxwait) of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 94033a4f3d..f4fa45993a 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -49,6 +49,8 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
+-define(HIBERNATE_AFTER, 10000).
+
-define(MAX_WRAP_ENTRIES, 500).
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
@@ -93,7 +95,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
extend_transaction(TxnKey, MessageList) ->
?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
@@ -105,17 +107,17 @@ dirty_work(MessageList) ->
commit_transaction(TxnKey) ->
?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}).
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
rollback_transaction(TxnKey) ->
?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot).
+ gen_server:call(?SERVER, force_snapshot, infinity).
serial() ->
- gen_server:call(?SERVER, serial).
+ gen_server:call(?SERVER, serial, infinity).
%%--------------------------------------------------------------------
@@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) ->
do_noreply(internal_commit(From, Key, NewState));
handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State = #pstate{log_handle = LH,
- snapshot = Snapshot}) ->
- ok = take_snapshot(LH, Snapshot),
- do_reply(ok, State);
+handle_call(force_snapshot, _From, State) ->
+ do_reply(ok, flush(true, State));
handle_call(serial, _From,
State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
do_reply(Serial, State);
@@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info(timeout, State = #pstate{deadline = infinity}) ->
+ State1 = flush(true, State),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State1, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(timeout, State) ->
- {noreply, flush(State)};
+ do_noreply(flush(State));
handle_info(_Info, State) ->
{noreply, State}.
@@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH,
- snapshot = Snapshot})
- when EntryCount >= ?MAX_WRAP_ENTRIES ->
+maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
+ log_handle = LH,
+ snapshot = Snapshot})
+ when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
ok = take_snapshot(LH, Snapshot),
State#pstate{entry_count = 0};
-maybe_take_snapshot(State) ->
+maybe_take_snapshot(_Force, State) ->
State.
later_ms(DeltaMilliSec) ->
@@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- infinity;
+ ?HIBERNATE_AFTER;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) ->
do_reply(Reply, State = #pstate{deadline = Deadline}) ->
{reply, Reply, State, compute_timeout(Deadline)}.
-flush(State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if
- PendingLogs /= [] ->
+flush(State) -> flush(false, State).
+
+flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
+ pending_replies = Waiting,
+ log_handle = LogHandle}) ->
+ State1 = if PendingLogs /= [] ->
disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- maybe_take_snapshot(
- State#pstate{
- entry_count = State#pstate.entry_count + 1});
- true ->
+ State#pstate{entry_count = State#pstate.entry_count + 1};
+ true ->
State
end,
+ State2 = maybe_take_snapshot(ForceSnapshot, State1),
if Waiting /= [] ->
ok = disk_log:sync(LogHandle),
lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
@@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs,
true ->
ok
end,
- State1#pstate{deadline = infinity,
+ State2#pstate{deadline = infinity,
pending_logs = [],
pending_replies = []}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index dbb9358314..ba6d6e6a42 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ff42ea0460..0b06a063a7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
fun ({Node, QPids}) ->
try gen_server2:call(
{?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message})
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6312e8e364..8f0a3a8973 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = test_priority_queue(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -55,6 +56,62 @@ all_tests() ->
passed = test_server_status(),
passed.
+test_priority_queue() ->
+
+ false = priority_queue:is_queue(not_a_queue),
+
+ %% empty Q
+ Q = priority_queue:new(),
+ {true, true, 0, [], []} = test_priority_queue(Q),
+
+ %% 1-4 element no-priority Q
+ true = lists:all(fun (X) -> X =:= passed end,
+ lists:map(fun test_simple_n_element_queue/1,
+ lists:seq(1, 4))),
+
+ %% 1-element priority Q
+ Q1 = priority_queue:in(foo, 1, priority_queue:new()),
+ {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+
+ %% 2-element same-priority Q
+ Q2 = priority_queue:in(bar, 1, Q1),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q2),
+
+ %% 2-element different-priority Q
+ Q3 = priority_queue:in(bar, 2, Q1),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q3),
+
+ %% 1-element negative priority Q
+ Q4 = priority_queue:in(foo, -1, priority_queue:new()),
+ {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+
+ passed.
+
+priority_queue_in_all(Q, L) ->
+ lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L).
+
+priority_queue_out_all(Q) ->
+ case priority_queue:out(Q) of
+ {empty, _} -> [];
+ {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
+ end.
+
+test_priority_queue(Q) ->
+ {priority_queue:is_queue(Q),
+ priority_queue:is_empty(Q),
+ priority_queue:len(Q),
+ priority_queue:to_list(Q),
+ priority_queue_out_all(Q)}.
+
+test_simple_n_element_queue(N) ->
+ Items = lists:seq(1, N),
+ Q = priority_queue_in_all(priority_queue:new(), Items),
+ ToListRes = [{0, X} || X <- Items],
+ {true, false, N, ToListRes, Items} = test_priority_queue(Q),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -430,7 +487,13 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
+
+ %% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
+ ok = control_action(start_app, []),
+ ok = control_action(force_reset, SecondaryNode, []),
+ ok = control_action(cluster, SecondaryNode, [NodeS]),
+ ok = control_action(start_app, SecondaryNode, []),
passed.