summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl20
-rw-r--r--src/rabbit_access_control.erl14
-rw-r--r--src/rabbit_amqqueue.erl56
-rw-r--r--src/rabbit_channel.erl27
-rw-r--r--src/rabbit_control.erl7
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl14
-rw-r--r--src/rabbit_misc.erl15
-rw-r--r--src/rabbit_mnesia.erl4
9 files changed, 57 insertions, 102 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2cd04d0ae1..9ab6b1a68c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -98,7 +98,7 @@ manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
end
end, [], Apps),
ok.
-
+
start_applications(Apps) ->
manage_applications(fun lists:foldl/3,
fun application:start/1,
@@ -128,9 +128,9 @@ start(normal, []) ->
io:format("starting ~-20s ...", [Msg]),
Thunk(),
io:format("done~n");
- ({Msg, M, F, A}) ->
+ ({Msg, M, F, A}) ->
io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
+ apply(M, F, A),
io:format("done~n")
end,
[{"database",
@@ -154,8 +154,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:recover()
end},
{"persister",
- fun () ->
- ok = start_child(rabbit_persister)
+ fun () ->
+ ok = start_child(rabbit_persister)
end},
{"builtin applications",
fun () ->
@@ -213,12 +213,8 @@ insert_default_data() ->
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = insert_default_user(DefaultUser, DefaultPass,DefaultVHost),
- ok.
-
-insert_default_user(Username, Password, VHostPath) ->
- ok = rabbit_access_control:add_user(Username, Password),
- ok = rabbit_access_control:map_user_vhost(Username, VHostPath),
+ ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost),
ok.
start_builtin_amq_applications() ->
@@ -257,7 +253,7 @@ error_log_location() ->
end.
sasl_log_location() ->
- case application:get_env(sasl, sasl_error_logger) of
+ case application:get_env(sasl, sasl_error_logger) of
{ok, {file, File}} -> File;
{ok, false} -> undefined;
{ok, tty} -> tty;
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a53ea30795..4342e15b3b 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -82,7 +82,7 @@ check_login(<<"AMQPLAIN">>, Response) ->
[LoginTable])
end;
-check_login(Mechanism, _Response) ->
+check_login(Mechanism, _Response) ->
rabbit_misc:protocol_error(
access_refused, "unsupported authentication mechanism '~s'",
[Mechanism]).
@@ -173,10 +173,14 @@ add_vhost(VHostPath) ->
case mnesia:read({vhost, VHostPath}) of
[] ->
ok = mnesia:write(#vhost{virtual_host = VHostPath}),
- #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"">>), direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.direct">>), direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.topic">>), topic, true, false, []),
- #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.fanout">>), fanout, true, false, []),
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.fanout">>, fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d7284998cd..1038810345 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -55,7 +55,7 @@
{'error', 'queue_not_found' | 'exchange_not_found'}).
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (name(), bool(), bool(), amqp_table()) ->
+-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
-spec(add_binding/4 ::
(queue_name(), exchange_name(), routing_key(), amqp_table()) ->
@@ -129,15 +129,15 @@ recover_durable_queues() ->
ok
end).
-declare(Resource = #resource{}, Durable, AutoDelete, Args) ->
- Q = start_queue_process(#amqqueue{name = Resource,
+declare(QueueName, Durable, AutoDelete, Args) ->
+ Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, Resource}) of
+ case mnesia:wread({amqqueue, QueueName}) of
[] -> ok = recover_queue(Q),
Q;
[ExistingQ] -> ExistingQ
@@ -164,47 +164,7 @@ recover_queue(Q) ->
ok = store_queue(Q),
%ok = recover_bindings(Q),
ok.
-
-% default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) ->
-% #binding{exchange_name = <<"">>,
-% key = Name,
-% queue_name = Name}.
- % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
- % routing_key = Name,
- % arguments = []}.
-
-% recover_bindings(Q = #amqqueue{name = QueueName}) ->
-% io:format("Q was ~p~n",[Q]),
-% ok = rabbit_exchange:add_binding(default_binding_spec(QueueName)).
- % lists:foreach(fun (B) ->
- % ok = rabbit_exchange:add_binding(B, Q)
- % end, Specs),
- % ok.
-
-modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments,
- SpecPresentFun, SpecAbsentFun) ->
- exit(modify_bindings).
- % rabbit_misc:execute_mnesia_transaction(
- % fun () ->
- % case mnesia:wread({amqqueue, Queue}) of
- % [Q = #amqqueue{binding_specs = Specs0}] ->
- % Spec = #binding_spec{exchange_name = X,
- % routing_key = RoutingKey,
- % arguments = Arguments},
- % case (case lists:member(Spec, Specs0) of
- % true -> SpecPresentFun;
- % false -> SpecAbsentFun
- % end)(Q, Spec) of
- % {ok, #amqqueue{binding_specs = Specs}} ->
- % {ok, length(Specs)};
- % {error, not_found} ->
- % {error, exchange_not_found};
- % Other -> Other
- % end;
- % [] -> {error, queue_not_found}
- % end
- % end).
-
+
update_bindings(Q = #amqqueue{}, Spec,
UpdateSpecFun, UpdateExchangeFun) ->
exit(update_bindings).
@@ -337,7 +297,7 @@ internal_delete(QueueName) ->
case mnesia:wread({amqqueue, QueueName}) of
[] -> {error, not_found};
[Q] ->
- ok = delete_temp(Q),
+ ok = delete_queue(Q),
ok = mnesia:delete({durable_queues, QueueName}),
ok
end
@@ -362,8 +322,8 @@ on_node_down(Node) ->
node(Pid) == Node]))
end).
-pseudo_queue(NameBin, Pid) ->
- #amqqueue{name = NameBin,
+pseudo_queue(QueueName, Pid) ->
+ #amqqueue{name = QueueName,
durable = false,
auto_delete = false,
arguments = [],
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 73ea552fd6..070760a98a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -153,7 +153,8 @@ ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue = Q#amqqueue.name},
+ NewState = State#ch{most_recently_declared_queue =
+ (Q#amqqueue.name)#resource.name},
case NoWait of
true -> {noreply, NewState};
false ->
@@ -161,8 +162,7 @@ return_queue_declare_ok(State, NoWait, Q) ->
rabbit_misc:with_exit_handler(
fun () -> {ok, Q#amqqueue.name, 0, 0} end,
fun () -> rabbit_amqqueue:stat(Q) end),
- QueueName = ActualName#resource.name,
- Reply = #'queue.declare_ok'{queue = QueueName,
+ Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
message_count = MessageCount,
consumer_count = ConsumerCount},
{reply, Reply, NewState}
@@ -171,7 +171,9 @@ return_queue_declare_ok(State, NoWait, Q) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ;
+expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
+ most_recently_declared_queue = MRDQ }) ->
+ rabbit_misc:r(VHostPath, queue, MRDQ);
expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
rabbit_misc:r(VHostPath, queue, QueueNameBin).
@@ -227,7 +229,8 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:shutdown(WriterPid),
stop;
-handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State};
+handle_method(#'access.request'{},_, State) ->
+ {reply, #'access.request_ok'{ticket = 1}, State};
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
@@ -336,7 +339,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
ConsumerMapping)}};
{error, queue_owned_by_another_connection} ->
%% The spec is silent on which exception to use
- %% here. This seems reasonable?
+ %% here. This seems reasonable?
%% FIXME: check this
rabbit_misc:protocol_error(
@@ -450,7 +453,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
- %% FIXME: clarify spec as per declare wrt differing realms
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
@@ -520,7 +522,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
Q
end,
- %% FIXME: clarify spec as per declare wrt differing realms
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
Finish) of
@@ -530,10 +531,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
<<>> -> rabbit_misc:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- Finish(rabbit_amqqueue:declare(rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Durable,
- AutoDelete,
- Args));
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ Finish(rabbit_amqqueue:declare(QueueName,
+ Durable, AutoDelete, Args));
Other -> Other
end,
return_queue_declare_ok(State, NoWait, Q);
@@ -550,7 +550,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait
- },_, State) ->
+ },
+ _, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
case rabbit_amqqueue:with_or_die(
QueueName,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 12040725f9..eb24b78270 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -88,13 +88,6 @@ Available commands:
list_user_vhosts <UserName>
list_vhost_users <VHostPath>
- set_permissions <UserName> <VHostPath> <RealmName> [<Permission> ...]
- Permissions management. The available permissions are 'passive',
- 'active', 'write' and 'read', corresponding to the permissions
- referred to in AMQP's \"access.request\" message, or 'all' as an
- abbreviation for all defined permission flags.
- list_permissions <UserName> <VHostPath>
-
<node> should be the name of the master node of the RabbitMQ cluster. It
defaults to the node named \"rabbit\" on the local host. On a host named
\"server.example.com\", the master node will usually be rabbit@server (unless
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e4ce3aa3df..9220d7b46f 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -34,7 +34,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
- rabbit_misc:r(DefaultVHost,exchange,?LOG_EXCH_NAME),
+ rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 89580a0e35..9d8831887b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -50,7 +50,7 @@
not_found() | {'error', 'unroutable' | 'not_delivered'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (name(), exchange_type(), bool(), bool(),
+-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
@@ -91,15 +91,15 @@ recover_durable_exchanges() ->
end, ok, durable_exchanges)
end).
-declare(Resource = #resource{}, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = Resource,
+declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+ Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, Resource}) of
+ case mnesia:wread({exchange, ExchangeName}) of
[] -> ok = mnesia:write(Exchange),
if Durable ->
ok = mnesia:write(
@@ -132,12 +132,12 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
lookup(Name) ->
rabbit_misc:dirty_read({exchange, Name}).
-lookup_or_die(Resource = #resource{name = Name}) ->
- case lookup(Resource) of
+lookup_or_die(Name) ->
+ case lookup(Name) of
{ok, X} -> X;
{error, not_found} ->
rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(#resource{virtual_host = <<"/">>, kind = exchange, name = Name})])
+ not_found, "no ~s", [rabbit_misc:rs(Name)])
end.
list_vhost_exchanges(VHostPath) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 6be301e3d5..78de81570d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -66,6 +66,7 @@
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
+-spec(r/3 :: (vhost(), K, name()) -> r(K) when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
@@ -73,16 +74,16 @@
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
--spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
--spec(with_user/2 :: (username(), thunk(A)) -> A).
+-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
--spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
+-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> node()).
--spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
+-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
--spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(guid/0 :: () -> guid()).
-spec(string_guid/1 :: (any()) -> string()).
@@ -213,7 +214,7 @@ with_user(Username, Thunk) ->
with_vhost(VHostPath, Thunk) ->
fun () ->
case mnesia:read({vhost, VHostPath}) of
- [] ->
+ [] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
Thunk()
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f7c497f741..3e031c3b31 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -241,7 +241,6 @@ init_db(ClusterNodes) ->
ClusterNodes}})
end;
{ok, [_|_]} ->
- ok = ensure_schema_integrity(),
ok = wait_for_tables(),
ok = create_local_table_copies(
case IsDiskNode of
@@ -324,7 +323,8 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_tables() ->
+wait_for_tables() ->
+ ok = ensure_schema_integrity(),
case mnesia:wait_for_tables(table_names(), 30000) of
ok -> ok;
{timeout, BadTabs} ->