summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_control.erl13
-rw-r--r--src/rabbit_exchange.erl63
-rw-r--r--src/rabbit_misc.erl22
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_multi.erl11
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_tests.erl6
11 files changed, 125 insertions, 61 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 88ad0c182d..732757c41c 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -111,7 +111,7 @@ in(X, 0, {queue, [_] = In, []}) ->
{queue, [X], In};
in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
{queue, [X|In], Out};
-in(X, Priority, Q = {queue, [], []}) ->
+in(X, Priority, _Q = {queue, [], []}) ->
in(X, Priority, {pqueue, []});
in(X, Priority, Q = {queue, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3a15e6b07c..c0d09547c5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -216,10 +216,14 @@ print_banner() ->
[Product, Version,
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- io:format("Logging to ~p~nSASL logging to ~p~n~n",
- [log_location(kernel), log_location(sasl)]).
-
-
+ Settings = [{"node", node()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
+ DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
+ Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
+ io:nl().
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 06bb18f5d3..0316788fe1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -201,9 +201,7 @@ with(Name, F, E) ->
with(Name, F) ->
with(Name, F, fun () -> {error, not_found} end).
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(Name)])
- end).
+ with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -305,28 +303,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index aeb15bd1b9..d14a01bee9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -576,8 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
die_precondition_failed(
"~s in use", [rabbit_misc:rs(ExchangeName)]);
@@ -742,12 +741,14 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
- {error, queue_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
{error, exchange_not_found} ->
+ rabbit_misc:not_found(ExchangeName);
+ {error, queue_not_found} ->
+ rabbit_misc:not_found(QueueName);
+ {error, exchange_and_queue_not_found} ->
rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index e6717d689f..6649899ade 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -38,6 +38,19 @@
-define(RPC_TIMEOUT, 30000).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(action/4 :: (atom(), erlang_node(), [string()],
+ fun ((string(), [any()]) -> 'ok')) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9b3bbb1851..e0f76d895c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,7 @@
route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
--export([delete_bindings_for_queue/1]).
+-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
@@ -59,8 +59,10 @@
-type(publish_res() :: {'ok', [pid()]} |
not_found() | {'error', 'unroutable' | 'not_delivered'}).
--type(bind_res() :: 'ok' |
- {'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(bind_res() :: 'ok' | {'error',
+ 'queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
@@ -86,7 +88,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
+-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
@@ -161,9 +164,7 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
{ok, X} -> X;
- {error, not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(Name)])
+ {error, not_found} -> rabbit_misc:not_found(Name)
end.
list(VHostPath) ->
@@ -295,7 +296,7 @@ lookup_qpids(Queues) ->
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
-delete_bindings_for_exchange(ExchangeName) ->
+delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
@@ -307,10 +308,16 @@ delete_bindings_for_exchange(ExchangeName) ->
write)],
ok.
-delete_bindings_for_queue(QueueName) ->
+delete_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_forward_routes/1).
+
+delete_transient_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
+
+delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
[begin
- ok = delete_forward_routes(reverse_route(Route)),
+ ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
end || Route <- mnesia:match_object(
rabbit_reverse_route,
@@ -328,6 +335,9 @@ delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
#route{binding = #binding{exchange_name = '$1',
@@ -337,16 +347,13 @@ exchanges_for_queue(QueueName) ->
sets:from_list(
mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-has_bindings(ExchangeName) ->
- MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
+contains(Table, MatchHead) ->
try
- continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}],
- 1, read))
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- case mnesia:match_object(rabbit_route, MatchHead, read) of
+ case mnesia:match_object(Table, MatchHead, read) of
[] -> false;
[_|_] -> true
end
@@ -359,18 +366,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
- [] -> {error, exchange_not_found};
+ [] -> {error, not_found};
[X] -> Fun(X)
end
end).
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- call_with_exchange(
- Exchange,
- fun(X) -> case mnesia:read({rabbit_queue, Queue}) of
- [] -> {error, queue_not_found};
- [Q] -> Fun(X, Q)
- end
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
@@ -554,13 +563,17 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
ok.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- case has_bindings(ExchangeName) of
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_bindings_for_exchange(ExchangeName),
+ ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2f329aa91c..49faba293b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,6 +36,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
@@ -73,6 +74,7 @@
(atom() | amqp_error(), string(), [any()]) -> no_return()).
-spec(protocol_error/4 ::
(atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
@@ -107,7 +109,7 @@
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
--spec(format_stderr/2 :: (string(), [any()]) -> 'true').
+-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> [B]).
@@ -141,6 +143,8 @@ protocol_error(Error, Explanation, Params, Method) ->
CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
exit({amqp, Error, CompleteExplanation, Method}).
+not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+
get_config(Key) ->
case dirty_read({rabbit_config, Key}) of
{ok, {rabbit_config, Key, V}} -> {ok, V};
@@ -377,9 +381,19 @@ ensure_parent_dirs_exist(Filename) ->
end.
format_stderr(Fmt, Args) ->
- Port = open_port({fd, 0, 2}, [out]),
- port_command(Port, io_lib:format(Fmt, Args)),
- port_close(Port).
+ case os:type() of
+ {unix, _} ->
+ Port = open_port({fd, 0, 2}, [out]),
+ port_command(Port, io_lib:format(Fmt, Args)),
+ port_close(Port);
+ {win32, _} ->
+ %% stderr on Windows is buffered and I can't figure out a
+ %% way to trigger a fflush(stderr) in Erlang. So rather
+ %% than risk losing output we write to stdout instead,
+ %% which appears to be unbuffered.
+ io:format(Fmt, Args)
+ end,
+ ok.
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d2b2b15c8a..cddcab643e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -31,7 +31,7 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0,
+-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, reset/0, force_reset/0]).
-export([table_names/0]).
@@ -47,6 +47,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
+-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> bool()).
@@ -155,8 +156,10 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+dir() -> mnesia:system_info(directory).
+
ensure_mnesia_dir() ->
- MnesiaDir = mnesia:system_info(directory) ++ "/",
+ MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
{error, Reason} ->
throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
@@ -192,7 +195,7 @@ check_schema_integrity() ->
%% it doesn't.
cluster_nodes_config_filename() ->
- mnesia:system_info(directory) ++ "/cluster_nodes.config".
+ dir() ++ "/cluster_nodes.config".
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
@@ -308,7 +311,7 @@ create_schema() ->
move_db() ->
mnesia:stop(),
- MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"),
+ MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
BackupDir = lists:flatten(
io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
@@ -425,7 +428,7 @@ reset(Force) ->
ok = delete_cluster_nodes_config(),
%% remove persistet messages and any other garbage we find
lists:foreach(fun file:delete/1,
- filelib:wildcard(mnesia:system_info(directory) ++ "/*")),
+ filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 5e8edd53a1..d91975359a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -36,6 +36,17 @@
-define(RPC_SLEEP, 500).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
RpcTimeout =
case init:get_argument(maxwait) of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index f4fa45993a..d0d60ddf3d 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -259,7 +259,7 @@ log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
pending_logs = [Message | Logs]}.
base_filename() ->
- mnesia:system_info(directory) ++ "/rabbit_persister.LOG".
+ rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
take_snapshot(LogHandle, OldFileName, Snapshot) ->
ok = disk_log:sync(LogHandle),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 552e4ed959..597b0a76e6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -490,7 +490,13 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
+
+ %% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
+ ok = control_action(start_app, []),
+ ok = control_action(force_reset, SecondaryNode, []),
+ ok = control_action(cluster, SecondaryNode, [NodeS]),
+ ok = control_action(start_app, SecondaryNode, []),
passed.