summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_access_control.erl7
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_exchange.erl51
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mnesia.erl98
-rw-r--r--src/rabbit_tests.erl36
7 files changed, 140 insertions, 72 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 7d1839bb6e..30bae25e5a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -172,9 +172,14 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
case regexp:match(
binary_to_list(Name),
- binary_to_list(element(permission_index(Permission), P))) of
+ binary_to_list(PermRegexp)) of
{match, _, _} -> true;
nomatch -> false
end
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da8225debc..c4db3ace73 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -911,7 +911,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
fun (_X, Q) ->
- rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ catch exit:Reason -> {error, Reason}
+ end
end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
@@ -926,6 +928,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
+ {error, #amqp_error{} = Error} ->
+ rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 95a49f8679..6e6ad06cb3 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -160,6 +160,12 @@ action(cluster, Node, ClusterNodeSs, Inform) ->
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
+action(force_cluster, Node, ClusterNodeSs, Inform) ->
+ ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
+ Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
+ [Node, ClusterNodes]),
+ rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+
action(status, Node, [], Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 822c164d57..d91ebe9ba9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -64,7 +64,9 @@
-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found')).
--type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> any())).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -427,23 +429,27 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- InnerFun(X, Q),
- case mnesia:read({rabbit_route, B}) of
- [] ->
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- {new, X, B};
- [_R] ->
- {existing, X, B}
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ ok = sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
(type_to_module(Type)):add_binding(Exchange, Binding);
{existing, _, _} ->
ok;
- Err = {error, _} ->
+ {error, _} = Err ->
Err
end.
@@ -453,14 +459,23 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> InnerFun(X, Q),
- ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ case InnerFun(X, Q) of
+ ok ->
+ ok =
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B};
+ {error, _} = E ->
+ E
+ end
end
end) of
- Err = {error, _} ->
+ {error, _} = Err ->
Err;
{{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f04df8a042..fcc9fc7e54 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -37,7 +37,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
- protocol_error/3, protocol_error/4]).
+ protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -95,6 +95,7 @@
(rabbit_framing:amqp_exception(), string(), [any()],
rabbit_framing:amqp_method_name())
-> no_return()).
+-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
-spec(get_config/1 ::
(atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
@@ -199,7 +200,10 @@ protocol_error(Name, ExplanationFormat, Params) ->
protocol_error(Name, ExplanationFormat, Params, none).
protocol_error(Name, ExplanationFormat, Params, Method) ->
- exit(amqp_error(Name, ExplanationFormat, Params, Method)).
+ protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)).
+
+protocol_error(#amqp_error{} = Error) ->
+ exit(Error).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5c14ba7b16..e2b6927f8c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -33,8 +33,8 @@
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, reset/0, force_reset/0, is_clustered/0,
- empty_ram_only_tables/0]).
+ cluster/1, force_cluster/1, reset/0, force_reset/0,
+ is_clustered/0, empty_ram_only_tables/0]).
-export([table_names/0]).
@@ -58,6 +58,8 @@
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([node()]) -> 'ok').
+-spec(force_cluster/1 :: ([node()]) -> 'ok').
+-spec(cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
@@ -88,7 +90,7 @@ status() ->
init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config()),
+ ok = init_db(read_cluster_nodes_config(), true),
ok = wait_for_tables(),
ok.
@@ -96,16 +98,22 @@ is_db_empty() ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
table_names()).
+cluster(ClusterNodes) ->
+ cluster(ClusterNodes, false).
+force_cluster(ClusterNodes) ->
+ cluster(ClusterNodes, true).
+
%% Alter which disk nodes this node is clustered with. This can be a
%% subset of all the disk nodes in the cluster but can (and should)
%% include the node itself if it is to be a disk rather than a ram
-%% node.
-cluster(ClusterNodes) ->
+%% node. If Force is false, only connections to online nodes are
+%% allowed.
+cluster(ClusterNodes, Force) ->
ok = ensure_mnesia_not_running(),
ok = ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
- ok = init_db(ClusterNodes),
+ ok = init_db(ClusterNodes, Force),
ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
@@ -277,38 +285,56 @@ delete_cluster_nodes_config() ->
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes.
-init_db(ClusterNodes) ->
- case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
- {ok, []} ->
- case mnesia:system_info(use_dir) of
- true ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- %% NB: we cannot use rabbit_log here since
- %% it may not have been started yet
- error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
- ok = move_db(),
+%% specified cluster nodes. If Force is false, don't allow
+%% connections to offline nodes.
+init_db(ClusterNodes, Force) ->
+ UClusterNodes = lists:usort(ClusterNodes),
+ ProperClusterNodes = UClusterNodes -- [node()],
+ case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, Nodes} ->
+ case Force of
+ false ->
+ FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ ->
+ throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect to some nodes."}})
+ end;
+ _ -> ok
+ end,
+ case Nodes of
+ [] ->
+ case mnesia:system_info(use_dir) of
+ true ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since
+ %% it may not have been started yet
+ error_logger:warning_msg(
+ "schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end;
+ false ->
ok = create_schema()
end;
- false ->
- ok = create_schema()
- end;
- {ok, [_|_]} ->
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, disc_copies),
- ok = create_local_table_copies(case IsDiskNode of
- true -> disc;
- false -> ram
- end);
+ [_|_] ->
+ IsDiskNode = ClusterNodes == [] orelse
+ lists:member(node(), ClusterNodes),
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(case IsDiskNode of
+ true -> disc;
+ false -> ram
+ end)
+ end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f35a899018..ff7df11b3d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -759,19 +759,19 @@ test_cluster_management() ->
ok = control_action(reset, []),
lists:foreach(fun (Arg) ->
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok
end,
ClusteringSequence),
lists:foreach(fun (Arg) ->
ok = control_action(reset, []),
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok
end,
ClusteringSequence),
ok = control_action(reset, []),
lists:foreach(fun (Arg) ->
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
ok
@@ -779,7 +779,7 @@ test_cluster_management() ->
ClusteringSequence),
lists:foreach(fun (Arg) ->
ok = control_action(reset, []),
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
ok
@@ -790,13 +790,13 @@ test_cluster_management() ->
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% join a non-existing cluster as a ram node
ok = control_action(reset, []),
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
@@ -821,18 +821,26 @@ test_cluster_management2(SecondaryNode) ->
%% join cluster as a ram node
ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS, "invalid1@invalid"]),
+ ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% change cluster config while remaining in same cluster
- ok = control_action(cluster, ["invalid2@invalid", SecondaryNodeS]),
+ ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% join non-existing cluster as a ram node
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
+ %% join empty cluster as a ram node
+ ok = control_action(cluster, []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
%% turn ram node into disk node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -840,8 +848,8 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
%% convert a disk node into a ram node
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn a disk node into a ram node
ok = control_action(reset, []),