summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-10 11:46:30 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-10 11:46:30 +0000
commit21068eb16a23bc31cb8c20aba7a2744cfaf3f722 (patch)
tree67467456dd2d27bf741751f666d5c529e5221ca6 /src
parente14e75414167d0b2ee988da81d016d7fb2989951 (diff)
parent58a159e006bb20002eaebc217edfabf4efe8524c (diff)
downloadrabbitmq-server-git-21068eb16a23bc31cb8c20aba7a2744cfaf3f722.tar.gz
Merging default into bug23329
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl209
-rw-r--r--src/delegate_sup.erl13
-rw-r--r--src/rabbit.erl26
-rw-r--r--src/rabbit_access_control.erl466
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl110
-rw-r--r--src/rabbit_auth_backend.erl76
-rw-r--r--src/rabbit_auth_backend_internal.erl347
-rw-r--r--src/rabbit_auth_mechanism.erl57
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl70
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl74
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_auth_mechanism_plain.erl66
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl224
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_control.erl76
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl35
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_log.erl3
-rw-r--r--src/rabbit_misc.erl45
-rw-r--r--src/rabbit_mnesia.erl32
-rw-r--r--src/rabbit_msg_store.erl13
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_prelaunch.erl (renamed from src/rabbit_plugin_activator.erl)43
-rw-r--r--src/rabbit_reader.erl136
-rw-r--r--src/rabbit_registry.erl (renamed from src/rabbit_exchange_type_registry.erl)44
-rw-r--r--src/rabbit_ssl.erl25
-rw-r--r--src/rabbit_tests.erl63
-rw-r--r--src/rabbit_types.erl21
-rw-r--r--src/rabbit_upgrade_functions.erl31
-rw-r--r--src/rabbit_variable_queue.erl139
-rw-r--r--src/rabbit_vhost.erl122
-rw-r--r--src/rabbit_writer.erl2
41 files changed, 1764 insertions, 975 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 11abe73b0d..10054e57b6 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -31,11 +31,9 @@
-module(delegate).
--define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-
-behaviour(gen_server2).
--export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,13 +42,16 @@
-ifdef(use_specs).
--spec(start_link/2 ::
- (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
+-spec(invoke/2 ::
+ ( pid(), fun ((pid()) -> A)) -> A;
+ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
--spec(process_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/0 :: () -> non_neg_integer()).
-endif.
@@ -61,157 +62,113 @@
%%----------------------------------------------------------------------------
-start_link(Prefix, Hash) ->
- gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
+start_link(Num) ->
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ Fun(Pid);
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
- case Res of
- {ok, Result, _} ->
+ case invoke([Pid], Fun) of
+ {[{Pid, Result}], []} ->
Result;
- {error, {Class, Reason, StackTrace}, _} ->
+ {[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
invoke(Pids, Fun) when is_list(Pids) ->
- lists:foldl(
- fun ({Status, Result, Pid}, {Good, Bad}) ->
- case Status of
- ok -> {[{Pid, Result}|Good], Bad};
- error -> {Good, [{Pid, Result}|Bad]}
- end
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ %% The use of multi_call is only safe because the timeout is
+ %% infinity, and thus there is no process spawned in order to do
+ %% the sending. Thus calls can't overtake preceding calls/casts.
+ {Replies, BadNodes} =
+ case orddict:fetch_keys(Grouped) of
+ [] -> {[], []};
+ RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped},
+ infinity)
end,
- {[], []},
- invoke_per_node(split_delegate_per_node(Pids), Fun)).
+ BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
+ BadNode <- BadNodes,
+ Pid <- orddict:fetch(BadNode, Grouped)],
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ [Results || {_Node, Results} <- Replies]]),
+ lists:foldl(
+ fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
+ ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
+ end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, Fun), %% we don't care about any error
ok;
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result([Pid], Fun);
invoke_no_result(Pids, Fun) when is_list(Pids) ->
- invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ case orddict:fetch_keys(Grouped) of
+ [] -> ok;
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped})
+ end,
+ safe_invoke(LocalPids, Fun), %% must not die
ok.
%%----------------------------------------------------------------------------
-internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
-
-internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
-
-split_delegate_per_node(Pids) ->
+group_pids_by_node(Pids) ->
LocalNode = node(),
- {Local, Remote} =
- lists:foldl(
- fun (Pid, {L, D}) ->
- Node = node(Pid),
- case Node of
- LocalNode -> {[Pid|L], D};
- _ -> {L, orddict:append(Node, Pid, D)}
- end
- end,
- {[], orddict:new()}, Pids),
- {Local, orddict:to_list(Remote)}.
-
-invoke_per_node(NodePids, Fun) ->
- lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-
-invoke_no_result_per_node(NodePids, Fun) ->
- delegate_per_node(NodePids, Fun, fun internal_cast/2),
- ok.
-
-delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
- %% In the case where DelegateFun is internal_cast, the safe_invoke
- %% is not actually async! However, in practice Fun will always be
- %% something that does a gen_server:cast or similar, so I don't
- %% think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- [safe_invoke(LocalPids, Fun)|
- delegate_per_remote_node(NodePids, Fun, DelegateFun)].
-
-delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
- Self = self(),
- %% Note that this is unsafe if the Fun requires reentrancy to the
- %% local_server. I.e. if self() == local_server(Node) then we'll
- %% block forever.
- [gen_server2:cast(
- local_server(Node),
- {thunk, fun () ->
- Self ! {result,
- DelegateFun(
- Node, fun () -> safe_invoke(Pids, Fun) end)}
- end}) || {Node, Pids} <- NodePids],
- [receive {result, Result} -> Result end || _ <- NodePids].
-
-local_server(Node) ->
- case get({delegate_local_server_name, Node}) of
- undefined ->
- Name = server(outgoing,
- erlang:phash2({self(), Node}, process_count())),
- put({delegate_local_server_name, Node}, Name),
- Name;
- Name -> Name
- end.
-
-remote_server(Node) ->
- case get({delegate_remote_server_name, Node}) of
- undefined ->
- case rpc:call(Node, delegate, process_count, []) of
- {badrpc, _} ->
- %% Have to return something, if we're just casting
- %% then we don't want to blow up
- server(incoming, 1);
- Count ->
- Name = server(incoming,
- erlang:phash2({self(), Node}, Count)),
- put({delegate_remote_server_name, Node}, Name),
- Name
- end;
- Name -> Name
+ lists:foldl(
+ fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
+ {[Pid | Local], Remote};
+ (Pid, {Local, Remote}) ->
+ {Local,
+ orddict:update(
+ node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
+ end, {[], orddict:new()}, Pids).
+
+delegate_count() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ Count.
+
+delegate_name(Hash) ->
+ list_to_atom("delegate_" ++ integer_to_list(Hash)).
+
+delegate() ->
+ case get(delegate) of
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(), delegate_count())),
+ put(delegate, Name),
+ Name;
+ Name -> Name
end.
-server(Prefix, Hash) ->
- list_to_atom("delegate_" ++
- atom_to_list(Prefix) ++ "_" ++
- integer_to_list(Hash)).
-
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
safe_invoke(Pid, Fun) when is_pid(Pid) ->
try
- {ok, Fun(Pid), Pid}
- catch
- Class:Reason ->
- {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ {ok, Pid, Fun(Pid)}
+ catch Class:Reason ->
+ {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
-process_count() ->
- ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
-
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, no_state, hibernate,
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-%% We don't need a catch here; we always go via safe_invoke. A catch here would
-%% be the wrong thing anyway since the Thunk can throw multiple errors.
-handle_call({thunk, Thunk}, _From, State) ->
- {reply, Thunk(), State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({thunk, Thunk}, State) ->
- Thunk(),
- {noreply, State, hibernate}.
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 544546f1b1..d2af72af20 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -55,11 +55,8 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}.
-
-specs(Prefix) ->
- [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Hash <- lists:seq(0, delegate:process_count() - 1)].
-
-%%----------------------------------------------------------------------------
+ DCount = delegate:delegate_count(),
+ {ok, {{one_for_one, 10, 10},
+ [{Num, {delegate, start_link, [Num]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Num <- lists:seq(0, DCount - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a619ef362d..954e289bd4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -69,10 +69,10 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_exchange_type_registry,
- [{description, "exchange type registry"},
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
{mfa, {rabbit_sup, start_child,
- [rabbit_exchange_type_registry]}},
+ [rabbit_registry]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
@@ -170,12 +170,6 @@
%%---------------------------------------------------------------------------
--import(application).
--import(mnesia).
--import(lists).
--import(inet).
--import(gen_tcp).
-
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
@@ -464,16 +458,16 @@ insert_default_data() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
- ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_vhost:add(DefaultVHost),
+ ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
case DefaultAdmin of
- true -> rabbit_access_control:set_admin(DefaultUser);
+ true -> rabbit_auth_backend_internal:set_admin(DefaultUser);
_ -> ok
end,
- ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
ok.
rotate_logs(File, Suffix, Handler) ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index bc5880130d..02a654420d 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -30,419 +30,123 @@
%%
-module(rabbit_access_control).
--include_lib("stdlib/include/qlc.hrl").
+
-include("rabbit.hrl").
--export([check_login/2, user_pass_login/2, check_user_pass_login/2,
- check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, set_admin/1,
- clear_admin/1, list_users/0, lookup_user/1]).
--export([change_password_hash/2, hash_password/1]).
--export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
- list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
- list_user_vhost_permissions/2]).
+-export([user_pass_login/2, check_user_pass_login/2, check_user_login/2,
+ check_vhost_access/2, check_resource_access/3, list_vhosts/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([username/0, password/0, password_hash/0]).
+-export_type([permission_atom/0, vhost_permission_atom/0]).
-type(permission_atom() :: 'configure' | 'read' | 'write').
--type(username() :: binary()).
--type(password() :: binary()).
--type(password_hash() :: binary()).
--type(regexp() :: binary()).
--spec(check_login/2 ::
- (binary(), binary()) -> rabbit_types:user() |
- rabbit_types:channel_exit()).
+-type(vhost_permission_atom() :: 'read' | 'write').
+
-spec(user_pass_login/2 ::
- (username(), password())
+ (rabbit_types:username(), rabbit_types:password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
- (username(), password())
- -> {'ok', rabbit_types:user()} | 'refused').
+ (rabbit_types:username(), rabbit_types:password())
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
- (username(), rabbit_types:r(atom()), permission_atom())
+ (rabbit_types:user(), rabbit_types:r(atom()), permission_atom())
-> 'ok' | rabbit_types:channel_exit()).
--spec(add_user/2 :: (username(), password()) -> 'ok').
--spec(delete_user/1 :: (username()) -> 'ok').
--spec(change_password/2 :: (username(), password()) -> 'ok').
--spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
--spec(hash_password/1 :: (password()) -> password_hash()).
--spec(set_admin/1 :: (username()) -> 'ok').
--spec(clear_admin/1 :: (username()) -> 'ok').
--spec(list_users/0 :: () -> [{username(), boolean()}]).
--spec(lookup_user/1 ::
- (username()) -> rabbit_types:ok(rabbit_types:user())
- | rabbit_types:error('not_found')).
--spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
--spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
- regexp(), regexp()) -> 'ok').
--spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
--spec(list_permissions/0 ::
- () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
--spec(list_user_permissions/1 ::
- (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_user_vhost_permissions/2 ::
- (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]).
+-spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom())
+ -> [rabbit_types:vhost()]).
-endif.
%%----------------------------------------------------------------------------
-%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
-%% apparently, by OpenAMQ.
-check_login(<<"PLAIN">>, Response) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- user_pass_login(User, Pass);
-%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
-%% defines this as PLAIN, but in 0-9 that definition is gone, instead
-%% referring generically to "SASL security mechanism", i.e. the above.
-check_login(<<"AMQPLAIN">>, Response) ->
- LoginTable = rabbit_binary_parser:parse_table(Response),
- case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
- lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
- user_pass_login(User, Pass);
- _ ->
- %% Is this an information leak?
- rabbit_misc:protocol_error(
- access_refused,
- "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable])
- end;
-
-check_login(Mechanism, _Response) ->
- rabbit_misc:protocol_error(
- access_refused, "unsupported authentication mechanism '~s'",
- [Mechanism]).
-
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- refused ->
+ {refused, Msg, Args} ->
rabbit_misc:protocol_error(
- access_refused, "login refused for user '~s'", [User]);
+ access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]);
{ok, U} ->
U
end.
-check_user_pass_login(User, Pass) ->
- case lookup_user(User) of
- {ok, U} ->
- case check_password(Pass, U#user.password_hash) of
- true -> {ok, U};
- _ -> refused
- end;
- {error, not_found} ->
- refused
- end.
-
-internal_lookup_vhost_access(Username, VHostPath) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] -> not_found;
- [R] -> {ok, R}
- end
- end).
-
-check_vhost_access(#user{username = Username}, VHostPath) ->
+check_user_pass_login(Username, Password) ->
+ check_user_login(Username, [{password, Password}]).
+
+check_user_login(Username, AuthProps) ->
+ {ok, Modules} = application:get_env(rabbit, auth_backends),
+ lists:foldl(
+ fun(Module, {refused, _, _}) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} ->
+ {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else ->
+ Else
+ end;
+ (_, {ok, User}) ->
+ {ok, User}
+ end, {refused, "No modules checked '~s'", [Username]}, Modules).
+
+check_vhost_access(User = #user{ username = Username,
+ auth_backend = Module }, VHostPath) ->
?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
- case internal_lookup_vhost_access(Username, VHostPath) of
- {ok, _R} ->
- ok;
- not_found ->
- rabbit_misc:protocol_error(
- access_refused, "access to vhost '~s' refused for user '~s'",
- [VHostPath, Username])
- end.
-
-permission_index(configure) -> #permission.configure;
-permission_index(write) -> #permission.write;
-permission_index(read) -> #permission.read.
-
-check_resource_access(Username,
- R = #resource{kind = exchange, name = <<"">>},
+ check_access(
+ fun() ->
+ rabbit_vhost:exists(VHostPath) andalso
+ Module:check_vhost_access(User, VHostPath, write)
+ end,
+ "~s failed checking vhost access to ~s for ~s: ~p~n",
+ [Module, VHostPath, Username],
+ "access to vhost '~s' refused for user '~s'",
+ [VHostPath, Username]).
+
+check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
- check_resource_access(Username,
- R#resource{name = <<"amq.default">>},
+ check_resource_access(User, R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(Username,
- R = #resource{virtual_host = VHostPath, name = Name},
- Permission) ->
- Res = case mnesia:dirty_read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] ->
- false;
- [#user_permission{permission = P}] ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
- end
- end,
- if Res -> ok;
- true -> rabbit_misc:protocol_error(
- access_refused, "access to ~s refused for user '~s'",
- [rabbit_misc:rs(R), Username])
- end.
-
-add_user(Username, Password) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(rabbit_user,
- #user{username = Username,
- password_hash =
- hash_password(Password),
- is_admin = false},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end),
- rabbit_log:info("Created user ~p~n", [Username]),
- R.
-
-delete_user(Username) ->
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)),
- rabbit_log:info("Deleted user ~p~n", [Username]),
- R.
-
-change_password(Username, Password) ->
- change_password_hash(Username, hash_password(Password)).
-
-change_password_hash(Username, PasswordHash) ->
- R = update_user(Username, fun(User) ->
- User#user{ password_hash = PasswordHash }
- end),
- rabbit_log:info("Changed password for user ~p~n", [Username]),
- R.
-
-hash_password(Cleartext) ->
- Salt = make_salt(),
- Hash = salted_md5(Salt, Cleartext),
- <<Salt/binary, Hash/binary>>.
-
-check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext).
-
-make_salt() ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- Salt = random:uniform(16#ffffffff),
- <<Salt:32>>.
-
-salted_md5(Salt, Cleartext) ->
- Salted = <<Salt/binary, Cleartext/binary>>,
- erlang:md5(Salted).
-
-set_admin(Username) ->
- set_admin(Username, true).
-
-clear_admin(Username) ->
- set_admin(Username, false).
-
-set_admin(Username, IsAdmin) ->
- R = update_user(Username, fun(User) ->
- User#user{is_admin = IsAdmin}
- end),
- rabbit_log:info("Set user admin flag for user ~p to ~p~n",
- [Username, IsAdmin]),
- R.
-
-update_user(Username, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- {ok, User} = lookup_user(Username),
- ok = mnesia:write(rabbit_user, Fun(User), write)
- end)).
-
-list_users() ->
- [{Username, IsAdmin} ||
- #user{username = Username, is_admin = IsAdmin} <-
- mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})].
-
-lookup_user(Username) ->
- rabbit_misc:dirty_read({rabbit_user, Username}).
-
-add_vhost(VHostPath) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
- end
- end),
- rabbit_log:info("Added vhost ~p~n", [VHostPath]),
- R.
-
-delete_vhost(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () ->
- ok = internal_delete_vhost(VHostPath)
- end)),
- rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
- R.
-
-internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
- ok = clear_permissions(Username, VHostPath)
- end,
- list_vhost_permissions(VHostPath)),
- ok = mnesia:delete({rabbit_vhost, VHostPath}),
- ok.
-
-vhost_exists(VHostPath) ->
- mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
-
-list_vhosts() ->
- mnesia:dirty_all_keys(rabbit_vhost).
-
-validate_regexp(RegexpBin) ->
- Regexp = binary_to_list(RegexpBin),
- case re:compile(Regexp) of
- {ok, _} -> ok;
- {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+check_resource_access(User = #user{username = Username, auth_backend = Module},
+ Resource, Permission) ->
+ check_access(
+ fun() -> Module:check_resource_access(User, Resource, Permission) end,
+ "~s failed checking resource access to ~p for ~s: ~p~n",
+ [Module, Resource, Username],
+ "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(Resource), Username]).
+
+check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
+ Allow = case Fun() of
+ {error, _} = E ->
+ rabbit_log:error(ErrStr, ErrArgs ++ [E]),
+ false;
+ Else ->
+ Else
+ end,
+ case Allow of
+ true ->
+ ok;
+ false ->
+ rabbit_misc:protocol_error(access_refused, RefStr, RefArgs)
end.
-set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () -> ok = mnesia:write(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = #permission{
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
- write)
- end)).
-
-
-clear_permissions(Username, VHostPath) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () ->
- ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}})
- end)).
-
-list_permissions() ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(match_user_vhost('_', '_'))].
-
-list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_vhost(
- VHostPath, match_user_vhost('_', VHostPath)))].
-
-list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user(
- Username, match_user_vhost(Username, '_')))].
-
-list_user_vhost_permissions(Username, VHostPath) ->
- [{ConfigurePerm, WritePerm, ReadPerm} ||
- {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- match_user_vhost(Username, VHostPath)))].
-
-list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- #user_permission{user_vhost = #user_vhost{username = Username,
- virtual_host = VHostPath},
- permission = #permission{ configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
-
-match_user_vhost(Username, VHostPath) ->
- fun () -> mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = '_'},
- read)
- end.
+%% Permission = write -> log in
+%% Permission = read -> learn of the existence of (only relevant for
+%% management plugin)
+list_vhosts(User = #user{username = Username, auth_backend = Module},
+ Permission) ->
+ lists:filter(
+ fun(VHost) ->
+ case Module:check_vhost_access(User, VHost, Permission) of
+ {error, _} = E ->
+ rabbit_log:warning("~w failed checking vhost access "
+ "to ~s for ~s: ~p~n",
+ [Module, VHost, Username, E]),
+ false;
+ Else ->
+ Else
+ end
+ end, rabbit_vhost:list()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 70d8f2dda7..35ed1c94d6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -49,11 +49,6 @@
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--import(mnesia).
--import(gen_server2).
--import(lists).
--import(queue).
-
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -157,9 +152,9 @@
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -508,19 +503,17 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid}.
safe_delegate_call_ok(F, Pids) ->
- {_, Bad} = delegate:invoke(Pids,
- fun (Pid) ->
+ case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () -> F(Pid) end)
- end),
- case Bad of
- [] -> ok;
- _ -> {error, Bad}
+ end) of
+ {_, []} -> ok;
+ {_, Bad} -> {error, Bad}
end.
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 25859c22f9..e6e3989fc4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,10 +48,6 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
--import(queue).
--import(erlang).
--import(lists).
-
% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -203,6 +199,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ [emit_consumer_deleted(Ch, CTag)
+ || {Ch, CTag, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -230,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -376,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -398,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -434,15 +430,11 @@ confirm_messages(Guids, State) ->
confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
_ -> ok
end,
State#q{guid_to_channel = dict:erase(Guid, GTC)}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
-
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
record_confirm_message(#delivery{sender = ChPid,
@@ -457,11 +449,6 @@ record_confirm_message(#delivery{sender = ChPid,
record_confirm_message(_Delivery, State) ->
State.
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
-
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
@@ -524,7 +511,7 @@ deliver_or_enqueue(Delivery, State) ->
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
- BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
+ {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -540,12 +527,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(Ch, CTag) ||
+ {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -621,12 +615,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {BQS2, State1} =
- case Fun(BQS) of
- {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
- BQS1 -> {BQS1, State}
- end,
- run_message_queue(State1#q{backing_queue_state = BQS2}).
+ {Guids, BQS1} = Fun(BQS),
+ run_message_queue(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -728,12 +719,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, self()}]).
+
%---------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
@@ -796,16 +809,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -822,10 +829,7 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%%
{Delivered, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
@@ -863,7 +867,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -906,6 +910,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -924,6 +930,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -998,8 +1005,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1008,7 +1015,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1017,7 +1026,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
@@ -1111,7 +1121,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
new file mode 100644
index 0000000000..0dc8e61b73
--- /dev/null
+++ b/src/rabbit_auth_backend.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description proplist as with auth mechanisms,
+ %% exchanges. Currently unused.
+ {description, 0},
+
+ %% Check a user can log in, given a username and a proplist of
+ %% authentication information (e.g. [{password, Password}]).
+ %%
+ %% Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {check_user_login, 2},
+
+ %% Given #user, vhost path and permission, can a user access a vhost?
+ %% Permission is read - learn of the existence of (only relevant for
+ %% management plugin)
+ %% or write - log in
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_vhost_access, 3},
+
+ %% Given #user, resource and permission, can a user access a resource?
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_resource_access, 3}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
new file mode 100644
index 0000000000..79910b95aa
--- /dev/null
+++ b/src/rabbit_auth_backend_internal.erl
@@ -0,0 +1,347 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend_internal).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_backend).
+
+-export([description/0]).
+-export([check_user_login/2, check_vhost_access/3, check_resource_access/3]).
+
+-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
+ clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
+-export([make_salt/0, check_password/2, change_password_hash/2,
+ hash_password/1]).
+-export([set_permissions/5, clear_permissions/2,
+ list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
+ list_user_vhost_permissions/2]).
+
+-include("rabbit_auth_backend_spec.hrl").
+
+-ifdef(use_specs).
+
+-type(regexp() :: binary()).
+
+-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok').
+-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok').
+-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password())
+ -> 'ok').
+-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok').
+-spec(make_salt/0 :: () -> binary()).
+-spec(check_password/2 :: (rabbit_types:password(),
+ rabbit_types:password_hash()) -> boolean()).
+-spec(change_password_hash/2 :: (rabbit_types:username(),
+ rabbit_types:password_hash()) -> 'ok').
+-spec(hash_password/1 :: (rabbit_types:password())
+ -> rabbit_types:password_hash()).
+-spec(set_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
+-spec(lookup_user/1 :: (rabbit_types:username())
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
+-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
+ -> 'ok').
+-spec(list_permissions/0 ::
+ () -> [{rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_vhost_permissions/1 ::
+ (rabbit_types:vhost()) -> [{rabbit_types:username(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (rabbit_types:username()) -> [{rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_vhost_permissions/2 ::
+ (rabbit_types:username(), rabbit_types:vhost())
+ -> [{regexp(), regexp(), regexp()}]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Implementation of rabbit_auth_backend
+
+description() ->
+ [{name, <<"Internal">>},
+ {description, <<"Internal user / password database">>}].
+
+check_user_login(Username, []) ->
+ internal_check_user_login(Username, fun() -> true end);
+check_user_login(Username, [{password, Password}]) ->
+ internal_check_user_login(
+ Username,
+ fun(#internal_user{password_hash = Hash}) ->
+ check_password(Password, Hash)
+ end);
+check_user_login(Username, AuthProps) ->
+ exit({unknown_auth_props, Username, AuthProps}).
+
+internal_check_user_login(Username, Fun) ->
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
+ case lookup_user(Username) of
+ {ok, User = #internal_user{is_admin = IsAdmin}} ->
+ case Fun(User) of
+ true -> {ok, #user{username = Username,
+ is_admin = IsAdmin,
+ auth_backend = ?MODULE,
+ impl = User}};
+ _ -> Refused
+ end;
+ {error, not_found} ->
+ Refused
+ end.
+
+check_vhost_access(#user{is_admin = true}, _VHostPath, read) ->
+ true;
+
+check_vhost_access(#user{username = Username}, VHostPath, _) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end
+ end).
+
+check_resource_access(#user{username = Username},
+ #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
+ end.
+
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
+%%----------------------------------------------------------------------------
+%% Manipulation of the user database
+
+add_user(Username, Password) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ is_admin = false},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end),
+ rabbit_log:info("Created user ~p~n", [Username]),
+ R.
+
+delete_user(Username) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)),
+ rabbit_log:info("Deleted user ~p~n", [Username]),
+ R.
+
+change_password(Username, Password) ->
+ change_password_hash(Username, hash_password(Password)).
+
+clear_password(Username) ->
+ change_password_hash(Username, <<"">>).
+
+change_password_hash(Username, PasswordHash) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash }
+ end),
+ rabbit_log:info("Changed password for user ~p~n", [Username]),
+ R.
+
+hash_password(Cleartext) ->
+ Salt = make_salt(),
+ Hash = salted_md5(Salt, Cleartext),
+ <<Salt/binary, Hash/binary>>.
+
+check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
+ Hash =:= salted_md5(Salt, Cleartext).
+
+make_salt() ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ Salt = random:uniform(16#ffffffff),
+ <<Salt:32>>.
+
+salted_md5(Salt, Cleartext) ->
+ Salted = <<Salt/binary, Cleartext/binary>>,
+ erlang:md5(Salted).
+
+set_admin(Username) ->
+ set_admin(Username, true).
+
+clear_admin(Username) ->
+ set_admin(Username, false).
+
+set_admin(Username, IsAdmin) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{is_admin = IsAdmin}
+ end),
+ rabbit_log:info("Set user admin flag for user ~p to ~p~n",
+ [Username, IsAdmin]),
+ R.
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+list_users() ->
+ [{Username, IsAdmin} ||
+ #internal_user{username = Username, is_admin = IsAdmin} <-
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
+
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
+
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
+ end)).
+
+
+clear_permissions(Username, VHostPath) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () ->
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
+ end)).
+
+list_permissions() ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(match_user_vhost('_', '_'))].
+
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_vhost:with(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ [{ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ match_user_vhost(Username, VHostPath)))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
new file mode 100644
index 0000000000..ce1b16acd6
--- /dev/null
+++ b/src/rabbit_auth_mechanism.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description.
+ {description, 0},
+
+ %% Called before authentication starts. Should create a state
+ %% object to be passed through all the stages of authentication.
+ {init, 1},
+
+ %% Handle a stage of authentication. Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {challenge, Challenge, NextState}
+ %% Another round is needed. Here's the state I want next time.
+ %% {protocol_error, Msg, Args}
+ %% Client got the protocol wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {handle_response, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
new file mode 100644
index 0000000000..5d51d9047c
--- /dev/null
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_amqplain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism amqplain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
+%% defines this as PLAIN, but in 0-9 that definition is gone, instead
+%% referring generically to "SASL security mechanism", i.e. the above.
+
+description() ->
+ [{name, <<"AMQPLAIN">>},
+ {description, <<"QPid AMQPLAIN mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ LoginTable = rabbit_binary_parser:parse_table(Response),
+ case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
+ lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
+ {{value, {_, longstr, User}},
+ {value, {_, longstr, Pass}}} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
+ [LoginTable]}
+ end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
new file mode 100644
index 0000000000..6766592809
--- /dev/null
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_cr_demo).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism cr-demo"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"RABBIT-CR-DEMO">>,
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok)
+%% START-OK: Username
+%% SECURE: "Please tell me your password"
+%% SECURE-OK: "My password is ~s", [Password]
+
+description() ->
+ [{name, <<"RABBIT-CR-DEMO">>},
+ {description, <<"RabbitMQ Demo challenge-response authentication "
+ "mechanism">>}].
+
+init(_Sock) ->
+ #state{}.
+
+handle_response(Response, State = #state{username = undefined}) ->
+ {challenge, <<"Please tell me your password">>,
+ State#state{username = Response}};
+
+handle_response(Response, #state{username = Username}) ->
+ case Response of
+ <<"My password is ", Password/binary>> ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+ _ ->
+ {protocol_error, "Invalid response '~s'", [Response]}
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
new file mode 100644
index 0000000000..1c4e5c150f
--- /dev/null
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_external).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism external"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
+%% established by means external to the mechanism". We define that to
+%% mean the peer certificate's subject's CN.
+
+description() ->
+ [{name, <<"EXTERNAL">>},
+ {description, <<"SASL EXTERNAL authentication mechanism">>}].
+
+init(Sock) ->
+ Username = case rabbit_net:peercert(Sock) of
+ {ok, C} ->
+ CN = case rabbit_ssl:peer_cert_subject_item(
+ C, ?'id-at-commonName') of
+ not_found -> {refused, "no CN found", []};
+ CN0 -> list_to_binary(CN0)
+ end,
+ case config_sane() of
+ true -> CN;
+ false -> {refused, "configuration unsafe", []}
+ end;
+ {error, no_peercert} ->
+ {refused, "no peer certificate", []};
+ nossl ->
+ {refused, "not SSL connection", []}
+ end,
+ #state{username = Username}.
+
+handle_response(_Response, #state{username = Username}) ->
+ case Username of
+ {refused, _, _} = E ->
+ E;
+ _ ->
+ case rabbit_access_control:check_user_login(Username, []) of
+ {ok, User} ->
+ {ok, User};
+ {error, not_found} ->
+ %% This is not an information leak as we have to
+ %% have validated a client cert to get this far.
+ {refused, "user '~s' not found", [Username]}
+ end
+ end.
+
+%%--------------------------------------------------------------------------
+
+config_sane() ->
+ {ok, Opts} = application:get_env(ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("EXTERNAL mechanism disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
new file mode 100644
index 0000000000..e5f8f3e6a1
--- /dev/null
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_plain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism plain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"PLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
+%% apparently, by OpenAMQ.
+
+description() ->
+ [{name, <<"PLAIN">>},
+ {description, <<"SASL PLAIN authentication mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ %% The '%%"' at the end of the next line is for Emacs
+ case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
+ [{capture, all_but_first, binary}]) of
+ {match, [User, Pass]} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error, "response ~p invalid", [Response]}
+ end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 352e76fd0c..8603d8d702 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about.
+ %% about. Must return 1 guid per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index b2997ae259..a5297a707c 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -49,8 +49,6 @@
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index ebf063f031..4b4358b4c7 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -36,8 +36,6 @@
-export([parse_table/1, parse_properties/2]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb25..ccadf5af45 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c8ad00ae3..2067e306cf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,10 +35,10 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
+-export([emit_stats/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,7 +47,7 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host, most_recently_declared_queue,
+ user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
held_confirms, unconfirmed, queues_for_msg}).
@@ -72,7 +72,7 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
+-define(FLUSH_CONFIRMS_INTERVAL, 1000).
%%----------------------------------------------------------------------------
@@ -83,19 +83,22 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ (channel_number(), pid(), pid(), rabbit_types:user(),
rabbit_types:vhost(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
+-spec(flush_confirms/1 :: (pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -103,16 +106,14 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
--spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
@@ -121,6 +122,9 @@ do(Pid, Method) ->
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -133,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
+
+flush_confirms(Pid) ->
+ gen_server2:cast(Pid, flush_confirms).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -156,18 +166,9 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
-flush(Pid) ->
- gen_server2:call(Pid, flush).
-
-flush_multiple_acks(Pid) ->
- gen_server2:cast(Pid, flush_multiple_acks).
-
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -183,7 +184,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
next_tag = 1,
uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
- username = Username,
+ user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
@@ -215,6 +216,9 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -224,9 +228,6 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
handle_call(_Request, _From, State) ->
noreply(State).
@@ -260,14 +261,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
-handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+handle_cast({deliver, ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
- ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+
+ M = #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
+
maybe_incr_stats([{QPid, 1}],
case AckRequired of
true -> deliver;
@@ -281,11 +292,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_multiple_acks, State) ->
- {noreply, flush_multiple(State)};
+handle_cast(flush_confirms, State) ->
+ {noreply, internal_flush_confirms(State)};
handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+ {noreply, confirm(MsgSeqNo, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -293,7 +304,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> confirm(Msg, QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -303,7 +314,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = flush_multiple(State),
+ State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
@@ -360,7 +371,7 @@ return_queue_declare_ok(#resource{name = ActualName},
message_count = MessageCount,
consumer_count = ConsumerCount}).
-check_resource_access(Username, Resource, Perm) ->
+check_resource_access(User, Resource, Perm) ->
V = {Resource, Perm},
Cache = case get(permission_cache) of
undefined -> [];
@@ -370,7 +381,7 @@ check_resource_access(Username, Resource, Perm) ->
case lists:member(V, Cache) of
true -> lists:delete(V, Cache);
false -> ok = rabbit_access_control:check_resource_access(
- Username, Resource, Perm),
+ User, Resource, Perm),
lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
end,
put(permission_cache, [V | CacheTail]),
@@ -380,14 +391,32 @@ clear_permission_cache() ->
erase(permission_cache),
ok.
-check_configure_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, configure).
+check_configure_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, configure).
-check_write_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, write).
+check_write_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, write).
-check_read_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, read).
+check_read_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, read).
+
+check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Username},
+ #ch{user = #user{username = Username}}) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Claimed},
+ #ch{user = #user{username = Actual}}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "user_id property set to '~s' but "
+ "authenticated user was '~s'", [Claimed, Actual]).
+
+check_internal_exchange(#exchange{name = Name, internal = true}) ->
+ rabbit_misc:protocol_error(access_refused,
+ "cannot publish to internal ~s",
+ [rabbit_misc:rs(Name)]);
+check_internal_exchange(_) ->
+ ok.
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
@@ -455,11 +484,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+confirm(undefined, _QPid, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
@@ -467,7 +496,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_confirm_timer(
@@ -529,9 +558,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(DecodedContent#content.properties, State),
IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1}
= case ConfirmEnabled of
@@ -761,7 +792,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = false,
durable = Durable,
auto_delete = AutoDelete,
- internal = false,
+ internal = Internal,
nowait = NoWait,
arguments = Args},
_, State = #ch{virtual_host = VHostPath}) ->
@@ -784,10 +815,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
CheckedType,
Durable,
AutoDelete,
+ Internal,
Args)
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Args),
+ AutoDelete, Internal, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
@@ -1221,12 +1253,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1240,21 +1272,44 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
- {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}) ->
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- ok = case Notify of
- true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, self(), M, Content);
- false -> rabbit_writer:send_command(WriterPid, M, Content)
- end.
+start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
+ ?MODULE, flush_confirms, [self()]),
+ State#ch{confirm_tref = TRef};
+start_confirm_timer(State) ->
+ State.
+
+stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ State;
+stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm_tref = undefined}.
+
+internal_flush_confirms(State = #ch{writer_pid = WriterPid,
+ held_confirms = Cs}) ->
+ case gb_sets:is_empty(Cs) of
+ true -> State#ch{confirm_tref = undefined};
+ false -> [First | Rest] = gb_sets:to_list(Cs),
+ {Mult, Inds} = find_consecutive_sequence(First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Inds),
+ State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}
+ end.
+
+%% Find longest sequence of consecutive numbers at the beginning.
+find_consecutive_sequence(Last, []) ->
+ {Last, []};
+find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
+ find_consecutive_sequence(N, Ns);
+find_consecutive_sequence(Last, Ns) ->
+ {Last, Ns}.
terminate(State) ->
stop_confirm_timer(State),
@@ -1266,7 +1321,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{username = Username}) -> Username;
+i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
@@ -1343,42 +1398,3 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
- ?MODULE, flush_multiple_acks, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-flush_multiple(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 02199a6516..a36253a053 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -48,7 +48,7 @@
-type(start_link_args() ::
{rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
@@ -56,7 +56,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -69,7 +69,7 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, Username, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, FramingChannelPid} =
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 72b77b1f07..8a3275bcd5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,7 +32,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
@@ -50,6 +50,7 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
+-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -116,24 +117,28 @@ fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- fmt_stderr("diagnostics:", []),
+ [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
+
+diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- fmt_stderr("- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]);
- {ok, NamePorts} ->
- fmt_stderr("- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]])
- end,
- fmt_stderr("- current node: ~w", [node()]),
- case init:get_argument(home) of
- {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
- Other -> fmt_stderr("- no current node home dir: ~p", [Other])
- end,
- fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
- ok.
+ [
+ {"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
+ ].
stop() ->
ok.
@@ -196,44 +201,48 @@ action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
- call(Node, {rabbit_access_control, add_user, Args});
+ call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
- call(Node, {rabbit_access_control, delete_user, Args});
+ call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
- call(Node, {rabbit_access_control, change_password, Args});
+ call(Node, {rabbit_auth_backend_internal, change_password, Args});
+
+action(clear_password, Node, Args = [Username], _Opts, Inform) ->
+ Inform("Clearing password for user ~p", [Username]),
+ call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_admin, Node, [Username], _Opts, Inform) ->
Inform("Setting administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, set_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, set_admin, [Username]});
action(clear_admin, Node, [Username], _Opts, Inform) ->
Inform("Clearing administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, clear_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]});
action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
- display_list(call(Node, {rabbit_access_control, list_users, []}));
+ display_list(call(Node, {rabbit_auth_backend_internal, list_users, []}));
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
- call(Node, {rabbit_access_control, add_vhost, Args});
+ call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
- call(Node, {rabbit_access_control, delete_vhost, Args});
+ call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
- display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
+ display_list(call(Node, {rabbit_vhost, list, []}));
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_permissions,
- Args}));
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_user_permissions, Args}));
action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
@@ -287,19 +296,20 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, set_permissions,
+ call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
+ call(Node, {rabbit_auth_backend_internal, clear_permissions,
+ [Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
- display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
- [VHost]})).
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_vhost_permissions, [VHost]})).
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 42861f8603..dd009c8397 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -49,7 +49,7 @@ boot() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, false, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 00e479a261..a95cf0b199 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,11 +33,11 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
+-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
--export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
+-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -49,13 +49,14 @@
-type(type() :: atom()).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 ::
- (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
+-spec(declare/6 ::
+ (name(), type(), boolean(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
-> rabbit_types:exchange()).
-spec(check_type/1 ::
(binary()) -> atom() | rabbit_types:connection_exit()).
--spec(assert_equivalence/5 ::
- (rabbit_types:exchange(), atom(), boolean(), boolean(),
+-spec(assert_equivalence/6 ::
+ (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
-> 'ok' | rabbit_types:connection_exit()).
-spec(assert_args_equivalence/2 ::
@@ -90,7 +91,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
Xs = rabbit_misc:table_fold(
@@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(XName, Type, Durable, AutoDelete, Args) ->
+declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
@@ -150,17 +152,17 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
- case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- case rabbit_exchange_type_registry:lookup_module(T) of
+ case rabbit_registry:lookup_module(exchange, T) of
{error, not_found} -> rabbit_misc:protocol_error(
command_invalid,
"invalid exchange type '~s'", [T]);
@@ -170,14 +172,16 @@ check_type(TypeBin) ->
assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
type = Type},
- Type, Durable, AutoDelete, RequiredArgs) ->
+ Type, Durable, AutoDelete, Internal, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
-assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
- _Args) ->
+assert_equivalence(#exchange{ name = Name },
+ _Type, _Durable, _Internal, _AutoDelete, _Args) ->
rabbit_misc:protocol_error(
precondition_failed,
- "cannot redeclare ~s with different type, durable or autodelete value",
+ "cannot redeclare ~s with different type, durable, "
+ "internal or autodelete value",
[rabbit_misc:rs(Name)]).
assert_args_equivalence(#exchange{ name = Name, arguments = Args },
@@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
+i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a49709..d49d019989 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"direct">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686c2..e7f754644c 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"fanout">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba468..caf141fe57 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -42,9 +42,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"headers">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-ifdef(use_specs).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4dd2..4485185814 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"topic">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-export([topic_matches/2]).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 863f77e7eb..a1a8364c63 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -41,9 +41,6 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
--import(io).
--import(error_logger).
-
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1a05b7298a..15ba787ad2 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,7 +46,7 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
+-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
@@ -68,16 +68,11 @@
-export([now_ms/0]).
-export([lock_file/1]).
--import(mnesia).
--import(lists).
--import(cover).
--import(disk_log).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([resource_name/0]).
+-export_type([resource_name/0, thunk/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -142,10 +137,9 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A).
--spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A).
+-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
- (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
+ (rabbit_types:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
@@ -349,8 +343,8 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ catch exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
Handler()
end.
@@ -371,19 +365,8 @@ with_user(Username, Thunk) ->
end
end.
-with_vhost(VHostPath, Thunk) ->
- fun () ->
- case mnesia:read({rabbit_vhost, VHostPath}) of
- [] ->
- mnesia:abort({no_such_vhost, VHostPath});
- [_V] ->
- Thunk()
- end
- end.
-
with_user_and_vhost(Username, VHostPath, Thunk) ->
- with_user(Username, with_vhost(VHostPath, Thunk)).
-
+ with_user(Username, rabbit_vhost:with(VHostPath, Thunk)).
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
@@ -594,19 +577,19 @@ sort_field_table(Arguments) ->
pid_to_string(Pid) when is_pid(Pid) ->
%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
%% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+ lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
%% inverse of above
string_to_pid(Str) ->
Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
- {match, [NodeStr, IdStr, SerStr]} ->
+ {match, [NodeStr, CreStr, IdStr, SerStr]} ->
%% the NodeStr atom might be quoted, so we have to parse
%% it rather than doing a simple list_to_atom
NodeAtom = case erl_scan:string(NodeStr) of
@@ -614,9 +597,9 @@ string_to_pid(Str) ->
{error, _, _} -> throw(Err)
end,
<<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
- Id = list_to_integer(IdStr),
- Ser = list_to_integer(SerStr),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ [Cre, Id, Ser] = lists:map(fun list_to_integer/1,
+ [CreStr, IdStr, SerStr]),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
nomatch ->
throw(Err)
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a62e7a6f47..38cc82a656 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
+ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
+ empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -63,6 +64,8 @@
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(all_clustered_nodes/0 :: () -> [node()]).
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -81,12 +84,12 @@ status() ->
Nodes = nodes_of_type(CopyType),
Nodes =/= []
end];
- no -> case mnesia:system_info(db_nodes) of
+ no -> case all_clustered_nodes() of
[] -> [];
Nodes -> [{unknown, Nodes}]
end
end},
- {running_nodes, mnesia:system_info(running_db_nodes)}].
+ {running_nodes, running_clustered_nodes()}].
init() ->
ok = ensure_mnesia_running(),
@@ -127,9 +130,15 @@ reset() -> reset(false).
force_reset() -> reset(true).
is_clustered() ->
- RunningNodes = mnesia:system_info(running_db_nodes),
+ RunningNodes = running_clustered_nodes(),
[node()] /= RunningNodes andalso [] /= RunningNodes.
+all_clustered_nodes() ->
+ mnesia:system_info(db_nodes).
+
+running_clustered_nodes() ->
+ mnesia:system_info(running_db_nodes).
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -154,10 +163,10 @@ nodes_of_type(Type) ->
table_definitions() ->
[{rabbit_user,
- [{record_name, user},
- {attributes, record_info(fields, user)},
+ [{record_name, internal_user},
+ {attributes, record_info(fields, internal_user)},
{disc_copies, [node()]},
- {match, #user{_='_'}}]},
+ {match, #internal_user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
@@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir),
- mnesia:system_info(db_nodes)} of
+ case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
@@ -388,7 +396,7 @@ init_db(ClusterNodes, Force) ->
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
@@ -566,8 +574,8 @@ reset(Force) ->
{Nodes, RunningNodes} =
try
ok = init(),
- {mnesia:system_info(db_nodes) -- [Node],
- mnesia:system_info(running_db_nodes) -- [Node]}
+ {all_clustered_nodes() -- [Node],
+ running_clustered_nodes() -- [Node]}
after
mnesia:stop()
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f532a9118b..1d679215fb 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -715,7 +715,7 @@ handle_cast({write, CRef, Guid},
{ok, _} -> dict:update(CRef, fun(Guids) ->
gb_sets:add(Guid, Guids)
end,
- gb_sets:empty(), CTG);
+ gb_sets:singleton(Guid), CTG);
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
@@ -762,8 +762,8 @@ handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
- noreply(maybe_compact(State2));
+ State3 = client_confirm(CRef, gb_sets:from_list(Guids), removed, State1),
+ noreply(maybe_compact(State3));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -900,7 +900,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [client_confirm(CRef, Guids, written, State1)
+ || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
@@ -1094,11 +1095,11 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, ActionTaken,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, ActionTaken),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1c542ffe7b..d5a9d73c84 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) ->
connections() ->
[rabbit_connection_sup:reader(ConnSup) ||
+ Node <- rabbit_mnesia:running_clustered_nodes(),
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children({rabbit_tcp_client_sup, Node})].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_prelaunch.erl
index 072f297e69..8ae45abda3 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,11 +29,12 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_plugin_activator).
+-module(rabbit_prelaunch).
-export([start/0, stop/0]).
-define(BaseApps, [rabbit]).
+-define(ERROR_CODE, 1).
%%----------------------------------------------------------------------------
%% Specs
@@ -52,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -130,7 +131,10 @@ start() ->
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
|| App <- PluginApps],
io:nl(),
- halt(),
+
+ ok = duplicate_node_check(NodeStr),
+
+ terminate(0),
ok.
stop() ->
@@ -251,6 +255,37 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
process_entry(Entry) ->
[Entry].
+%% Check whether a node with the same name is already running
+duplicate_node_check([]) ->
+ %% Ignore running node while installing windows service
+ ok;
+duplicate_node_check(NodeStr) ->
+ Node = rabbit_misc:makenode(NodeStr),
+ {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
+ case net_adm:names(NodeHost) of
+ {ok, NamePorts} ->
+ case proplists:is_defined(NodeName, NamePorts) of
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
+ end;
+ {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
+ [EpmdReason])
+ end.
+
terminate(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
- halt(1).
+ terminate(?ERROR_CODE).
+
+terminate(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4dd150a26f..e87ff87976 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -56,14 +56,15 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
+ auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
- peer_cert_validity,
+ peer_cert_validity, auth_mechanism,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ auth_mechanism = none,
+ auth_state = none
},
handshake, 8))
catch
@@ -681,11 +684,12 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Start = #'connection.start'{ version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> },
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = auth_mechanisms_binary(),
+ locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
@@ -710,42 +714,45 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- try
- handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
- State)
- catch exit:Reason ->
- CompleteReason = case Reason of
- #amqp_error{method = none} ->
- Reason#amqp_error{method = MethodName};
- OtherReason -> OtherReason
- end,
+ HandleException =
+ fun(R) ->
case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, CompleteReason);
+ true -> send_exception(State, 0, R);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state,
- CompleteReason})
+ throw({channel0_error, State#v1.connection_state, R})
end
+ end,
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch exit:#amqp_error{method = none} = Reason ->
+ HandleException(Reason#amqp_error{method = MethodName});
+ Type:Reason ->
+ HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
- State = #v1{connection_state = starting,
- connection = Connection =
- #connection{protocol = Protocol},
- sock = Sock}) ->
- User = rabbit_access_control:check_login(Mechanism, Response),
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
- heartbeat = 0},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{
- user = User,
- client_properties = ClientProperties}};
+ State0 = #v1{connection_state = starting,
+ connection = Connection,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism),
+ State = State0#v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock),
+ connection_state = securing,
+ connection =
+ Connection#connection{
+ client_properties = ClientProperties}},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
@@ -827,6 +834,61 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+auth_mechanism_to_module(TypeBin) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms()),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms() ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ lists:member(Name, Configured)].
+
+auth_mechanisms_binary() ->
+ list_to_binary(
+ string:join(
+ [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+
+auth_phase(Response,
+ State = #v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthState,
+ connection = Connection =
+ #connection{protocol = Protocol},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Msg, Args} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s login refused: ~s",
+ [proplists:get_value(name, AuthMechanism:description()),
+ io_lib:format(Msg, Args)]);
+ {protocol_error, Msg, Args} ->
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{auth_state = AuthState1};
+ {ok, User} ->
+ Tune = #'connection.tune'{channel_max = 0,
+ frame_max = ?FRAME_MAX,
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User}}
+ end.
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -864,6 +926,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) ->
none;
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
Protocol:version();
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
+ none;
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ proplists:get_value(name, Mechanism:description());
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -903,12 +969,12 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
frame_max = FrameMax,
- user = #user{username = Username},
+ user = User,
vhost = VHost}} = State,
{ok, ChSupPid, ChFrPid} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), Username, VHost, Collector}),
+ self(), User, VHost, Collector}),
erlang:monitor(process, ChSupPid),
put({channel, Channel}, {ch_fr_pid, ChFrPid}),
put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl
index f15275b538..7a3fcb51e5 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_registry.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_exchange_type_registry).
+-module(rabbit_registry).
-behaviour(gen_server).
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -46,11 +46,12 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
--spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_module/2 ::
+ (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]).
-endif.
@@ -61,8 +62,8 @@ start_link() ->
%%---------------------------------------------------------------------------
-register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+register(Class, TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
TypeAtom -> TypeAtom
end.
-lookup_module(T) when is_atom(T) ->
- case ets:lookup(?ETS_NAME, T) of
+lookup_module(Class, T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, {Class, T}) of
[{_, Module}] ->
{ok, Module};
[] ->
{error, not_found}
end.
+lookup_all(Class) ->
+ [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
+
%%---------------------------------------------------------------------------
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
- when is_binary(TypeName), is_atom(ModuleName) ->
- ok = sanity_check_module(ModuleName),
+internal_register(Class, TypeName, ModuleName)
+ when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(class_module(Class), ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
-sanity_check_module(Module) ->
- case catch lists:member(rabbit_exchange_type,
+sanity_check_module(ClassModule, Module) ->
+ case catch lists:member(ClassModule,
lists:flatten(
[Bs || {Attr, Bs} <-
Module:module_info(attributes),
Attr =:= behavior orelse
Attr =:= behaviour])) of
{'EXIT', {undef, _}} -> {error, not_module};
- false -> {error, not_exchange_type};
+ false -> {error, {not_type, ClassModule}};
true -> ok
end.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism.
+
%%---------------------------------------------------------------------------
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1d8ce23b81..a4da23e21f 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -36,6 +36,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+-export([peer_cert_subject_item/2]).
%%--------------------------------------------------------------------------
@@ -45,9 +46,11 @@
-type(certificate() :: binary()).
--spec(peer_cert_issuer/1 :: (certificate()) -> string()).
--spec(peer_cert_subject/1 :: (certificate()) -> string()).
--spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject_item/2 ::
+ (certificate(), tuple()) -> string() | 'not_found').
-endif.
@@ -71,6 +74,14 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
+%% Return a part of the certificate's subject.
+peer_cert_subject_item(Cert, Type) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ find_by_type(Type, Subject)
+ end, Cert).
+
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
@@ -89,6 +100,14 @@ cert_info(F, Cert) ->
DecCert -> DecCert %%R14B onwards
end).
+find_by_type(Type, {rdnSequence, RDNs}) ->
+ case [V || #'AttributeTypeAndValue'{type = T, value = V}
+ <- lists:flatten(RDNs),
+ T == Type] of
+ [{printableString, S}] -> S;
+ [] -> not_found
+ end.
+
%%--------------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8de5adbe5d..d913092cce 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,8 +35,6 @@
-export([all_tests/0, test_parsing/0]).
--import(lists).
-
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
@@ -98,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ %% we now run the tests remotely, so that code coverage on the
+ %% local node picks up more of the delegate
+ Node = node(),
+ Self = self(),
+ Remote = spawn(SecondaryNode,
+ fun () -> A = test_delegates_async(Node),
+ B = test_delegates_sync(Node),
+ Self ! {self(), {A, B}}
+ end),
+ receive
+ {Remote, Result} ->
+ Result = {passed, passed}
+ after 2000 ->
+ throw(timeout)
+ end,
+
passed.
test_priority_queue() ->
@@ -1016,7 +1030,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self(),
+ user(<<"user">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1076,7 +1090,7 @@ test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
{ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- <<"guest">>, <<"/">>, self(),
+ user(<<"guest">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -1084,6 +1098,13 @@ test_spawn(Receiver) ->
end,
{Writer, Ch}.
+user(Username) ->
+ #user{username = Username,
+ is_admin = true,
+ auth_backend = rabbit_auth_backend_internal,
+ impl = #internal_user{username = Username,
+ is_admin = true}}.
+
test_statistics_receiver(Pid) ->
receive
shutdown ->
@@ -1249,15 +1270,26 @@ test_delegates_sync(SecondaryNode) ->
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
- Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
- Good = ordsets:from_list(GoodResPids),
+ Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
+ Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
- Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
- Bad = ordsets:from_list(BadResPids),
+ Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
+ Bad = lists:usort(BadResPids),
+
+ MagicalPids = [rabbit_misc:string_to_pid(Str) ||
+ Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
+ {[], BadNodes} = delegate:invoke(MagicalPids, Sender),
+ true = lists:all(
+ fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
+ BadNodes),
+ BadNodesPids = [Pid || {Pid, _} <- BadNodes],
+
+ Magical = lists:usort(MagicalPids),
+ Magical = lists:usort(BadNodesPids),
passed.
@@ -1858,7 +1890,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1960,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1970,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2004,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2034,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2051,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2082,7 +2114,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2144,3 +2176,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 548014be3e..70d18d7acf 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,8 +42,9 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
- connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
- ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection/0, protocol/0, user/0, internal_user/0,
+ username/0, password/0, password_hash/0, ok/1, error/1,
+ ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
connection_exit/0]).
-type(channel_exit() :: no_return()).
@@ -151,9 +152,19 @@
-type(protocol() :: rabbit_framing:protocol()).
-type(user() ::
- #user{username :: rabbit_access_control:username(),
- password_hash :: rabbit_access_control:password_hash(),
- is_admin :: boolean()}).
+ #user{username :: username(),
+ is_admin :: boolean(),
+ auth_backend :: atom(),
+ impl :: any()}).
+
+-type(internal_user() ::
+ #internal_user{username :: username(),
+ password_hash :: password_hash(),
+ is_admin :: boolean()}).
+
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(password_hash() :: binary()).
-type(ok(A) :: {'ok', A}).
-type(error(A) :: {'error', A}).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1c56d51dd8..b5ff2b12be 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -27,6 +27,8 @@
-rabbit_upgrade({remove_user_scope, []}).
-rabbit_upgrade({hash_passwords, []}).
-rabbit_upgrade({add_ip_to_listener, []}).
+-rabbit_upgrade({internal_exchanges, []}).
+-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -35,6 +37,8 @@
-spec(remove_user_scope/0 :: () -> 'ok').
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
+-spec(internal_exchanges/0 :: () -> 'ok').
+-spec(user_to_internal_user/0 :: () -> 'ok').
-endif.
@@ -58,7 +62,7 @@ hash_passwords() ->
mnesia(
rabbit_user,
fun ({user, Username, Password, IsAdmin}) ->
- Hash = rabbit_access_control:hash_password(Password),
+ Hash = rabbit_auth_backend_internal:hash_password(Password),
{user, Username, Hash, IsAdmin}
end,
[username, password_hash, is_admin]).
@@ -71,8 +75,33 @@ add_ip_to_listener() ->
end,
[node, protocol, host, ip_address, port]).
+internal_exchanges() ->
+ Tables = [rabbit_exchange, rabbit_durable_exchange],
+ AddInternalFun =
+ fun ({exchange, Name, Type, Durable, AutoDelete, Args}) ->
+ {exchange, Name, Type, Durable, AutoDelete, false, Args}
+ end,
+ [ ok = mnesia(T,
+ AddInternalFun,
+ [name, type, durable, auto_delete, internal, arguments])
+ || T <- Tables ],
+ ok.
+
+user_to_internal_user() ->
+ mnesia(
+ rabbit_user,
+ fun({user, Username, PasswordHash, IsAdmin}) ->
+ {internal_user, Username, PasswordHash, IsAdmin}
+ end,
+ [username, password_hash, is_admin], internal_user).
+
%%--------------------------------------------------------------------
mnesia(TableName, Fun, FieldList) ->
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
+
+mnesia(TableName, Fun, FieldList, NewRecordName) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
+ NewRecordName),
+ ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0db5116559..19d7c5761a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,7 @@
timestamp :: timestamp() }).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
+ count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, ActionTaken) ->
+ msgs_written_to_disk(Self, Guids, ActionTaken)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message{guid = Guid}}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,23 +710,22 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
- ack(fun msg_store_release/3,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ a(reduce_memory_use(
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
+ State2;
+ ({IsPersistent, Guid, MsgProps}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
+ State3
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -1096,9 +1093,9 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
+ Self, fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
@@ -1160,7 +1157,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1168,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1311,10 +1307,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true ->
- {{IsPersistent, Guid, MsgProps}, RAI};
- false ->
- {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, Guid, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1325,8 +1319,8 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], orddict:new()}, PA),
+ {PersistentSeqIds, GuidsByStore} =
+ dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1336,18 +1330,17 @@ remove_pending_ack(KeepPersistent,
Guids),
State1
end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(
- fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ false -> IndexState1 =
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1361,27 +1354,27 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA),
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
- end, {{[], orddict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- AckdGuids = lists:concat(
- orddict:fold(
- fun (IsPersistent, Guids, Gs) ->
- MsgStoreFun(MSCState, IsPersistent, Guids),
- [Guids | Gs]
- end, [], GuidsByStore)),
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State1 #vqstate { index_state = IndexState1,
+ State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ ack_out_counter = AckOutCount + length(AckTags) }.
+
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false }, Acc) ->
- Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
- {cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1401,13 +1394,19 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = gb_sets:difference(UC, GuidSet) }.
msgs_confirmed(GuidSet, State) ->
- {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}.
+ {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
+
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
-msgs_written_to_disk(QPid, GuidSet) ->
+msgs_written_to_disk(QPid, GuidSet, removed) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
@@ -1417,9 +1416,9 @@ msgs_written_to_disk(QPid, GuidSet) ->
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
new file mode 100644
index 0000000000..f939a3feb6
--- /dev/null
+++ b/src/rabbit_vhost.erl
@@ -0,0 +1,122 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_vhost).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-export([add/1, delete/1, exists/1, list/0, with/2]).
+
+-ifdef(use_specs).
+
+-spec(add/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(exists/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(list/0 :: () -> [rabbit_types:vhost()]).
+-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+add(VHostPath) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
+ [] ->
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok;
+ [_] ->
+ mnesia:abort({vhost_already_exists, VHostPath})
+ end
+ end),
+ rabbit_log:info("Added vhost ~p~n", [VHostPath]),
+ R.
+
+delete(VHostPath) ->
+ %%FIXME: We are forced to delete the queues outside the TX below
+ %%because queue deletion involves sending messages to the queue
+ %%process, which in turn results in further mnesia actions and
+ %%eventually the termination of that process.
+ lists:foreach(fun (Q) ->
+ {ok,_} = rabbit_amqqueue:delete(Q, false, false)
+ end,
+ rabbit_amqqueue:list(VHostPath)),
+ R = rabbit_misc:execute_mnesia_transaction(
+ with(VHostPath, fun () ->
+ ok = internal_delete(VHostPath)
+ end)),
+ rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
+ R.
+
+internal_delete(VHostPath) ->
+ lists:foreach(fun (#exchange{name = Name}) ->
+ ok = rabbit_exchange:delete(Name, false)
+ end,
+ rabbit_exchange:list(VHostPath)),
+ lists:foreach(
+ fun ({Username, _, _, _}) ->
+ ok = rabbit_auth_backend_internal:clear_permissions(Username,
+ VHostPath)
+ end,
+ rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
+ ok.
+
+exists(VHostPath) ->
+ mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+
+list() ->
+ mnesia:dirty_all_keys(rabbit_vhost).
+
+with(VHostPath, Thunk) ->
+ fun () ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [_V] ->
+ Thunk()
+ end
+ end.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 0159609d9a..068ac18612 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -39,8 +39,6 @@
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--import(gen_tcp).
-
-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).