diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 98 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 36 |
7 files changed, 140 insertions, 72 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 7d1839bb6e..30bae25e5a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -172,9 +172,14 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> + PermRegexp = case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, case regexp:match( binary_to_list(Name), - binary_to_list(element(permission_index(Permission), P))) of + binary_to_list(PermRegexp)) of {match, _, _} -> true; nomatch -> false end diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index da8225debc..c4db3ace73 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -911,7 +911,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, fun (_X, Q) -> - rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + catch exit:Reason -> {error, Reason} + end end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); @@ -926,6 +928,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); + {error, #amqp_error{} = Error} -> + rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 95a49f8679..6e6ad06cb3 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -160,6 +160,12 @@ action(cluster, Node, ClusterNodeSs, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); +action(force_cluster, Node, ClusterNodeSs, Inform) -> + ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", + [Node, ClusterNodes]), + rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); + action(status, Node, [], Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 822c164d57..d91ebe9ba9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -64,7 +64,9 @@ -type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found')). --type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> any())). +-type(inner_fun() :: + fun((rabbit_types:exchange(), queue()) -> + rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: @@ -427,23 +429,27 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - InnerFun(X, Q), - case mnesia:read({rabbit_route, B}) of - [] -> - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - {new, X, B}; - [_R] -> - {existing, X, B} + case InnerFun(X, Q) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> + ok = sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end; + {error, _} = E -> + E end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> (type_to_module(Type)):add_binding(Exchange, Binding); {existing, _, _} -> ok; - Err = {error, _} -> + {error, _} = Err -> Err end. @@ -453,14 +459,23 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - _ -> InnerFun(X, Q), - ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - {maybe_auto_delete(X), B} + [] -> + {error, binding_not_found}; + _ -> + case InnerFun(X, Q) of + ok -> + ok = + sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B}; + {error, _} = E -> + E + end end end) of - Err = {error, _} -> + {error, _} = Err -> Err; {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f04df8a042..fcc9fc7e54 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -37,7 +37,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, - protocol_error/3, protocol_error/4]). + protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). @@ -95,6 +95,7 @@ (rabbit_framing:amqp_exception(), string(), [any()], rabbit_framing:amqp_method_name()) -> no_return()). +-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). @@ -199,7 +200,10 @@ protocol_error(Name, ExplanationFormat, Params) -> protocol_error(Name, ExplanationFormat, Params, none). protocol_error(Name, ExplanationFormat, Params, Method) -> - exit(amqp_error(Name, ExplanationFormat, Params, Method)). + protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)). + +protocol_error(#amqp_error{} = Error) -> + exit(Error). not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 5c14ba7b16..e2b6927f8c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -33,8 +33,8 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, reset/0, force_reset/0, is_clustered/0, - empty_ram_only_tables/0]). + cluster/1, force_cluster/1, reset/0, force_reset/0, + is_clustered/0, empty_ram_only_tables/0]). -export([table_names/0]). @@ -58,6 +58,8 @@ -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). +-spec(force_cluster/1 :: ([node()]) -> 'ok'). +-spec(cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(is_clustered/0 :: () -> boolean()). @@ -88,7 +90,7 @@ status() -> init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config()), + ok = init_db(read_cluster_nodes_config(), true), ok = wait_for_tables(), ok. @@ -96,16 +98,22 @@ is_db_empty() -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, table_names()). +cluster(ClusterNodes) -> + cluster(ClusterNodes, false). +force_cluster(ClusterNodes) -> + cluster(ClusterNodes, true). + %% Alter which disk nodes this node is clustered with. This can be a %% subset of all the disk nodes in the cluster but can (and should) %% include the node itself if it is to be a disk rather than a ram -%% node. -cluster(ClusterNodes) -> +%% node. If Force is false, only connections to online nodes are +%% allowed. +cluster(ClusterNodes, Force) -> ok = ensure_mnesia_not_running(), ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes), + ok = init_db(ClusterNodes, Force), ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after @@ -277,38 +285,56 @@ delete_cluster_nodes_config() -> %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. -init_db(ClusterNodes) -> - case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of - {ok, []} -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), +%% specified cluster nodes. If Force is false, don't allow +%% connections to offline nodes. +init_db(ClusterNodes, Force) -> + UClusterNodes = lists:usort(ClusterNodes), + ProperClusterNodes = UClusterNodes -- [node()], + case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, Nodes} -> + case Force of + false -> + FailedClusterNodes = ProperClusterNodes -- Nodes, + case FailedClusterNodes of + [] -> ok; + _ -> + throw({error, {failed_to_cluster_with, + FailedClusterNodes, + "Mnesia could not connect to some nodes."}}) + end; + _ -> ok + end, + case Nodes of + [] -> + case mnesia:system_info(use_dir) of + true -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since + %% it may not have been started yet + error_logger:warning_msg( + "schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end; + false -> ok = create_schema() end; - false -> - ok = create_schema() - end; - {ok, [_|_]} -> - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end); + [_|_] -> + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end) + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f35a899018..ff7df11b3d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -759,19 +759,19 @@ test_cluster_management() -> ok = control_action(reset, []), lists:foreach(fun (Arg) -> - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok end, ClusteringSequence), lists:foreach(fun (Arg) -> ok = control_action(reset, []), - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok end, ClusteringSequence), ok = control_action(reset, []), lists:foreach(fun (Arg) -> - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok = control_action(start_app, []), ok = control_action(stop_app, []), ok @@ -779,7 +779,7 @@ test_cluster_management() -> ClusteringSequence), lists:foreach(fun (Arg) -> ok = control_action(reset, []), - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok = control_action(start_app, []), ok = control_action(stop_app, []), ok @@ -790,13 +790,13 @@ test_cluster_management() -> ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% join a non-existing cluster as a ram node ok = control_action(reset, []), - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of @@ -821,18 +821,26 @@ test_cluster_management2(SecondaryNode) -> %% join cluster as a ram node ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS, "invalid1@invalid"]), + ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), %% change cluster config while remaining in same cluster - ok = control_action(cluster, ["invalid2@invalid", SecondaryNodeS]), + ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), %% join non-existing cluster as a ram node - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% join empty cluster as a ram node + ok = control_action(cluster, []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + %% turn ram node into disk node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), @@ -840,8 +848,8 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% convert a disk node into a ram node - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node ok = control_action(reset, []), |
