diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 56 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 |
9 files changed, 57 insertions, 102 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2cd04d0ae1..9ab6b1a68c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -98,7 +98,7 @@ manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> end end, [], Apps), ok. - + start_applications(Apps) -> manage_applications(fun lists:foldl/3, fun application:start/1, @@ -128,9 +128,9 @@ start(normal, []) -> io:format("starting ~-20s ...", [Msg]), Thunk(), io:format("done~n"); - ({Msg, M, F, A}) -> + ({Msg, M, F, A}) -> io:format("starting ~-20s ...", [Msg]), - apply(M, F, A), + apply(M, F, A), io:format("done~n") end, [{"database", @@ -154,8 +154,8 @@ start(normal, []) -> ok = rabbit_amqqueue:recover() end}, {"persister", - fun () -> - ok = start_child(rabbit_persister) + fun () -> + ok = start_child(rabbit_persister) end}, {"builtin applications", fun () -> @@ -213,12 +213,8 @@ insert_default_data() -> {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), ok = rabbit_access_control:add_vhost(DefaultVHost), - ok = insert_default_user(DefaultUser, DefaultPass,DefaultVHost), - ok. - -insert_default_user(Username, Password, VHostPath) -> - ok = rabbit_access_control:add_user(Username, Password), - ok = rabbit_access_control:map_user_vhost(Username, VHostPath), + ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), ok. start_builtin_amq_applications() -> @@ -257,7 +253,7 @@ error_log_location() -> end. sasl_log_location() -> - case application:get_env(sasl, sasl_error_logger) of + case application:get_env(sasl, sasl_error_logger) of {ok, {file, File}} -> File; {ok, false} -> undefined; {ok, tty} -> tty; diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index a53ea30795..4342e15b3b 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -82,7 +82,7 @@ check_login(<<"AMQPLAIN">>, Response) -> [LoginTable]) end; -check_login(Mechanism, _Response) -> +check_login(Mechanism, _Response) -> rabbit_misc:protocol_error( access_refused, "unsupported authentication mechanism '~s'", [Mechanism]). @@ -173,10 +173,14 @@ add_vhost(VHostPath) -> case mnesia:read({vhost, VHostPath}) of [] -> ok = mnesia:write(#vhost{virtual_host = VHostPath}), - #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"">>), direct, true, false, []), - #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.direct">>), direct, true, false, []), - #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.topic">>), topic, true, false, []), - #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.fanout">>), fanout, true, false, []), + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.fanout">>, fanout}]], ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d7284998cd..1038810345 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -55,7 +55,7 @@ {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (name(), bool(), bool(), amqp_table()) -> +-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). -spec(add_binding/4 :: (queue_name(), exchange_name(), routing_key(), amqp_table()) -> @@ -129,15 +129,15 @@ recover_durable_queues() -> ok end). -declare(Resource = #resource{}, Durable, AutoDelete, Args) -> - Q = start_queue_process(#amqqueue{name = Resource, +declare(QueueName, Durable, AutoDelete, Args) -> + Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, Resource}) of + case mnesia:wread({amqqueue, QueueName}) of [] -> ok = recover_queue(Q), Q; [ExistingQ] -> ExistingQ @@ -164,47 +164,7 @@ recover_queue(Q) -> ok = store_queue(Q), %ok = recover_bindings(Q), ok. - -% default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) -> -% #binding{exchange_name = <<"">>, -% key = Name, -% queue_name = Name}. - % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), - % routing_key = Name, - % arguments = []}. - -% recover_bindings(Q = #amqqueue{name = QueueName}) -> -% io:format("Q was ~p~n",[Q]), -% ok = rabbit_exchange:add_binding(default_binding_spec(QueueName)). - % lists:foreach(fun (B) -> - % ok = rabbit_exchange:add_binding(B, Q) - % end, Specs), - % ok. - -modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments, - SpecPresentFun, SpecAbsentFun) -> - exit(modify_bindings). - % rabbit_misc:execute_mnesia_transaction( - % fun () -> - % case mnesia:wread({amqqueue, Queue}) of - % [Q = #amqqueue{binding_specs = Specs0}] -> - % Spec = #binding_spec{exchange_name = X, - % routing_key = RoutingKey, - % arguments = Arguments}, - % case (case lists:member(Spec, Specs0) of - % true -> SpecPresentFun; - % false -> SpecAbsentFun - % end)(Q, Spec) of - % {ok, #amqqueue{binding_specs = Specs}} -> - % {ok, length(Specs)}; - % {error, not_found} -> - % {error, exchange_not_found}; - % Other -> Other - % end; - % [] -> {error, queue_not_found} - % end - % end). - + update_bindings(Q = #amqqueue{}, Spec, UpdateSpecFun, UpdateExchangeFun) -> exit(update_bindings). @@ -337,7 +297,7 @@ internal_delete(QueueName) -> case mnesia:wread({amqqueue, QueueName}) of [] -> {error, not_found}; [Q] -> - ok = delete_temp(Q), + ok = delete_queue(Q), ok = mnesia:delete({durable_queues, QueueName}), ok end @@ -362,8 +322,8 @@ on_node_down(Node) -> node(Pid) == Node])) end). -pseudo_queue(NameBin, Pid) -> - #amqqueue{name = NameBin, +pseudo_queue(QueueName, Pid) -> + #amqqueue{name = QueueName, durable = false, auto_delete = false, arguments = [], diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 73ea552fd6..070760a98a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -153,7 +153,8 @@ ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = Q#amqqueue.name}, + NewState = State#ch{most_recently_declared_queue = + (Q#amqqueue.name)#resource.name}, case NoWait of true -> {noreply, NewState}; false -> @@ -161,8 +162,7 @@ return_queue_declare_ok(State, NoWait, Q) -> rabbit_misc:with_exit_handler( fun () -> {ok, Q#amqqueue.name, 0, 0} end, fun () -> rabbit_amqqueue:stat(Q) end), - QueueName = ActualName#resource.name, - Reply = #'queue.declare_ok'{queue = QueueName, + Reply = #'queue.declare_ok'{queue = ActualName#resource.name, message_count = MessageCount, consumer_count = ConsumerCount}, {reply, Reply, NewState} @@ -171,7 +171,9 @@ return_queue_declare_ok(State, NoWait, Q) -> expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; +expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, + most_recently_declared_queue = MRDQ }) -> + rabbit_misc:r(VHostPath, queue, MRDQ); expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> rabbit_misc:r(VHostPath, queue, QueueNameBin). @@ -227,7 +229,8 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:shutdown(WriterPid), stop; -handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; +handle_method(#'access.request'{},_, State) -> + {reply, #'access.request_ok'{ticket = 1}, State}; handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, @@ -336,7 +339,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, ConsumerMapping)}}; {error, queue_owned_by_another_connection} -> %% The spec is silent on which exception to use - %% here. This seems reasonable? + %% here. This seems reasonable? %% FIXME: check this rabbit_misc:protocol_error( @@ -450,7 +453,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), - %% FIXME: clarify spec as per declare wrt differing realms ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; @@ -520,7 +522,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, Q end, - %% FIXME: clarify spec as per declare wrt differing realms Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), Finish) of @@ -530,10 +531,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin, <<>> -> rabbit_misc:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - Finish(rabbit_amqqueue:declare(rabbit_misc:r(VHostPath, queue, ActualNameBin), - Durable, - AutoDelete, - Args)); + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + Finish(rabbit_amqqueue:declare(QueueName, + Durable, AutoDelete, Args)); Other -> Other end, return_queue_declare_ok(State, NoWait, Q); @@ -550,7 +550,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait - },_, State) -> + }, + _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), case rabbit_amqqueue:with_or_die( QueueName, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 12040725f9..eb24b78270 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -88,13 +88,6 @@ Available commands: list_user_vhosts <UserName> list_vhost_users <VHostPath> - set_permissions <UserName> <VHostPath> <RealmName> [<Permission> ...] - Permissions management. The available permissions are 'passive', - 'active', 'write' and 'read', corresponding to the permissions - referred to in AMQP's \"access.request\" message, or 'all' as an - abbreviation for all defined permission flags. - list_permissions <UserName> <VHostPath> - <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local host. On a host named \"server.example.com\", the master node will usually be rabbit@server (unless diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index e4ce3aa3df..9220d7b46f 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -34,7 +34,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( - rabbit_misc:r(DefaultVHost,exchange,?LOG_EXCH_NAME), + rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), topic, true, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 89580a0e35..9d8831887b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -50,7 +50,7 @@ not_found() | {'error', 'unroutable' | 'not_delivered'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (name(), exchange_type(), bool(), bool(), +-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). @@ -91,15 +91,15 @@ recover_durable_exchanges() -> end, ok, durable_exchanges) end). -declare(Resource = #resource{}, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = Resource, +declare(ExchangeName, Type, Durable, AutoDelete, Args) -> + Exchange = #exchange{name = ExchangeName, type = Type, durable = Durable, auto_delete = AutoDelete, arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, Resource}) of + case mnesia:wread({exchange, ExchangeName}) of [] -> ok = mnesia:write(Exchange), if Durable -> ok = mnesia:write( @@ -132,12 +132,12 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> lookup(Name) -> rabbit_misc:dirty_read({exchange, Name}). -lookup_or_die(Resource = #resource{name = Name}) -> - case lookup(Resource) of +lookup_or_die(Name) -> + case lookup(Name) of {ok, X} -> X; {error, not_found} -> rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(#resource{virtual_host = <<"/">>, kind = exchange, name = Name})]) + not_found, "no ~s", [rabbit_misc:rs(Name)]) end. list_vhost_exchanges(VHostPath) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 6be301e3d5..78de81570d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -66,6 +66,7 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). +-spec(r/3 :: (vhost(), K, name()) -> r(K) when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -73,16 +74,16 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). --spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). --spec(with_user/2 :: (username(), thunk(A)) -> A). +-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). --spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). +-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> node()). --spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). +-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). --spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(guid/0 :: () -> guid()). -spec(string_guid/1 :: (any()) -> string()). @@ -213,7 +214,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> case mnesia:read({vhost, VHostPath}) of - [] -> + [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> Thunk() diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f7c497f741..3e031c3b31 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -241,7 +241,6 @@ init_db(ClusterNodes) -> ClusterNodes}}) end; {ok, [_|_]} -> - ok = ensure_schema_integrity(), ok = wait_for_tables(), ok = create_local_table_copies( case IsDiskNode of @@ -324,7 +323,8 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_tables() -> + ok = ensure_schema_integrity(), case mnesia:wait_for_tables(table_names(), 30000) of ok -> ok; {timeout, BadTabs} -> |
