summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2017-01-23 09:41:59 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2017-01-23 09:41:59 +0100
commit41394d3cc9f2aa28f213dd75ebf2d64a9a7d37a6 (patch)
tree6962a11476a58c46a69ecdfb42b895d3bd4135c4
parent1ae0e83f74afec21246af3542978388442d7c248 (diff)
parent192f844d6fe0259aa0d6bb0f0eeb283a9038f031 (diff)
downloadrabbitmq-server-git-41394d3cc9f2aa28f213dd75ebf2d64a9a7d37a6.tar.gz
Merge branch 'master' into rabbitmq-server-1085
Conflicts: test/topic_permission_SUITE.erl
-rw-r--r--src/rabbit.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl68
-rw-r--r--src/rabbit_binding.erl49
-rw-r--r--src/rabbit_connection_tracking.erl1
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl29
-rw-r--r--src/rabbit_exchange_parameters.erl6
-rw-r--r--src/rabbit_policy.erl71
-rw-r--r--src/rabbit_queue_consumers.erl18
-rw-r--r--src/rabbit_runtime_parameters.erl84
-rw-r--r--src/rabbit_upgrade_functions.erl38
-rw-r--r--src/rabbit_vhost.erl33
-rw-r--r--src/rabbit_vhost_limit.erl37
-rw-r--r--test/dummy_runtime_parameters.erl6
-rw-r--r--test/dynamic_ha_SUITE.erl2
-rw-r--r--test/queue_master_location_SUITE.erl3
-rw-r--r--test/topic_permission_SUITE.erl43
-rw-r--r--test/unit_inbroker_SUITE.erl26
18 files changed, 322 insertions, 205 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e121fb3e2e..588774a7a9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -821,17 +821,20 @@ insert_default_data() ->
DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm),
DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm),
- ok = rabbit_vhost:add(DefaultVHostBin),
+ ok = rabbit_vhost:add(DefaultVHostBin, ?INTERNAL_USER),
ok = rabbit_auth_backend_internal:add_user(
DefaultUserBin,
- DefaultPassBin
+ DefaultPassBin,
+ ?INTERNAL_USER
),
- ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin,DefaultTags),
+ ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin, DefaultTags,
+ ?INTERNAL_USER),
ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin,
DefaultVHostBin,
DefaultConfigurePermBin,
DefaultWritePermBin,
- DefaultReadPermBin),
+ DefaultReadPermBin,
+ ?INTERNAL_USER),
ok.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8db2a167e4..d8b1cbc85b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -131,7 +131,8 @@
auto_delete,
arguments,
owner_pid,
- exclusive
+ exclusive,
+ user_who_performed_action
]).
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
@@ -286,7 +287,9 @@ terminate(_Reason, State = #q{q = Q}) ->
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
- backing_queue = BQ}) ->
+ backing_queue = BQ,
+ status = Status}) ->
+ ActingUser = terminated_by(Status),
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
@@ -294,11 +297,17 @@ terminate_delete(EmitStats, Reason,
true -> ok
end,
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName),
+ rabbit_amqqueue:internal_delete(QName, ActingUser),
BQS1
end.
-terminate_shutdown(Fun, State) ->
+terminated_by({terminated_by, ActingUser}) ->
+ ActingUser;
+terminated_by(_) ->
+ ?INTERNAL_USER.
+
+terminate_shutdown(Fun, #q{status = Status} = State) ->
+ ActingUser = terminated_by(Status),
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
@@ -310,7 +319,7 @@ terminate_shutdown(Fun, State) ->
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
notify_decorators(shutdown, State),
- [emit_consumer_deleted(Ch, CTag, QName) ||
+ [emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
{Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -731,7 +740,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ok, State1};
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
- [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags],
+ [emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
Holder1 = case Holder of
{DownPid, _} -> none;
Other -> Other
@@ -953,6 +962,8 @@ i(garbage_collection, _State) ->
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
+i(user_who_performed_action, #q{q = #amqqueue{options = Opts}}) ->
+ maps:get(user, Opts, ?UNKNOWN_USER);
i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:info(Item, BQS).
@@ -970,7 +981,7 @@ emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ All).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
- PrefetchCount, Args, Ref) ->
+ PrefetchCount, Args, Ref, ActingUser) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, CTag},
{exclusive, Exclusive},
@@ -978,15 +989,17 @@ emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
{channel, ChPid},
{queue, QName},
{prefetch_count, PrefetchCount},
- {arguments, Args}],
+ {arguments, Args},
+ {user_who_performed_action, ActingUser}],
Ref).
-emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, QName}]).
+ {queue, QName},
+ {user_who_performed_action, ActingUser}]).
%%----------------------------------------------------------------------------
@@ -996,7 +1009,7 @@ prioritise_call(Msg, _From, _Len, State) ->
{info, _Items} -> 9;
consumers -> 9;
stat -> 7;
- {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_consume, _, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
{basic_cancel, _, _, _} -> consumer_bias(State);
_ -> 0
end.
@@ -1096,7 +1109,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -1105,7 +1118,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State),
- Consumers),
+ ActingUser, Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -1121,12 +1134,12 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
- Args, none),
+ Args, none, ActingUser),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
@@ -1140,7 +1153,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
@@ -1159,14 +1172,15 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty, ActingUser}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop({ok, BQ:len(BQS)}, State)
+ true -> stop({ok, BQ:len(BQS)},
+ State#q{status = {terminated_by, ActingUser}})
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1321,14 +1335,16 @@ handle_cast({force_event_refresh, Ref},
QName = qname(State),
AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
- none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Prefetch,
- Args, Ref) ||
- {Ch, CTag, AckRequired, Prefetch, Args}
- <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
- emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
+ none ->
+ [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref, ActingUser) ||
+ {Ch, CTag, AckRequired, Prefetch, Args, ActingUser}
+ <- AllConsumers];
+ {Ch, CTag} ->
+ [{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers,
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser)
end,
noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7cebd194a6..e4e8552bc2 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -17,11 +17,11 @@
-module(rabbit_binding).
-include("rabbit.hrl").
--export([recover/2, exists/1, add/1, add/2, remove/1, remove/2, list/1]).
+-export([recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]).
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/1]).
+ process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -57,10 +57,10 @@
-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
'ok'.
-spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
--spec add(rabbit_types:binding()) -> bind_res().
--spec add(rabbit_types:binding(), inner_fun()) -> bind_res().
+-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
+-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
-spec remove(rabbit_types:binding()) -> bind_res().
--spec remove(rabbit_types:binding(), inner_fun()) -> bind_res().
+-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
-spec list(rabbit_types:vhost()) -> bindings().
-spec list_for_source
(rabbit_types:binding_source()) -> bindings().
@@ -84,7 +84,7 @@
(rabbit_types:binding_destination(), boolean()) -> deletions().
-spec remove_transient_for_destination
(rabbit_types:binding_destination()) -> deletions().
--spec process_deletions(deletions()) -> rabbit_misc:thunk('ok').
+-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
-spec combine_deletions(deletions(), deletions()) -> deletions().
-spec add_deletion
(rabbit_exchange:name(),
@@ -158,9 +158,9 @@ exists(Binding) ->
rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
end, fun not_found_or_absent_errs/1).
-add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
+add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
-add(Binding, InnerFun) ->
+add(Binding, InnerFun, ActingUser) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
@@ -172,7 +172,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> add(Src, Dst, B);
+ [] -> add(Src, Dst, B, ActingUser);
[_] -> fun () -> ok end
end;
{error, _} = Err ->
@@ -183,7 +183,7 @@ add(Binding, InnerFun) ->
end
end, fun not_found_or_absent_errs/1).
-add(Src, Dst, B) ->
+add(Src, Dst, B, ActingUser) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
case (SrcDurable andalso DstDurable andalso
mnesia:read({rabbit_durable_route, B}) =/= []) of
@@ -193,14 +193,16 @@ add(Src, Dst, B) ->
Serial = rabbit_exchange:serial(Src),
fun () ->
x_callback(Serial, Src, add_binding, B),
- ok = rabbit_event:notify(binding_created, info(B))
+ ok = rabbit_event:notify(
+ binding_created,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
end;
true -> rabbit_misc:const({error, binding_not_found})
end.
-remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER).
-remove(Binding, InnerFun) ->
+remove(Binding, InnerFun, ActingUser) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
@@ -210,18 +212,18 @@ remove(Binding, InnerFun) ->
_ -> rabbit_misc:const({error, binding_not_found})
end;
_ -> case InnerFun(Src, Dst) of
- ok -> remove(Src, Dst, B);
+ ok -> remove(Src, Dst, B, ActingUser);
{error, _} = Err -> rabbit_misc:const(Err)
end
end
end, fun absent_errs_only/1).
-remove(Src, Dst, B) ->
+remove(Src, Dst, B, ActingUser) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
Deletions = maybe_auto_delete(
B#binding.source, [B], new_deletions(), false),
- process_deletions(Deletions).
+ process_deletions(Deletions, ActingUser).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -539,7 +541,7 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions) ->
+process_deletions(Deletions, ActingUser) ->
AugmentedDeletions =
dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
@@ -553,16 +555,21 @@ process_deletions(Deletions) ->
fun() ->
dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
ok = rabbit_event:notify(
- exchange_deleted, [{name, XName}]),
- del_notify(Bs),
+ exchange_deleted,
+ [{name, XName},
+ {user_who_performed_action, ActingUser}]),
+ del_notify(Bs, ActingUser),
x_callback(Serial, X, delete, Bs);
(_XName, {X, not_deleted, Bs, Serial}, ok) ->
- del_notify(Bs),
+ del_notify(Bs, ActingUser),
x_callback(Serial, X, remove_bindings, Bs)
end, ok, AugmentedDeletions)
end.
-del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
+del_notify(Bs, ActingUser) -> [rabbit_event:notify(
+ binding_deleted,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
+ || B <- Bs].
x_callback(Serial, X, F, Bs) ->
ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 38684482ef..40f1d1f71d 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -323,6 +323,7 @@ tracked_connection_from_connection_state(#connection{
{node, node()},
{vhost, VHost},
{user, Username},
+ {user_who_performed_action, Username},
{connected_at, Ts},
{pid, self()},
{type, network},
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index a59afe6c43..9640897239 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -58,7 +58,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, true, []),
+ topic, true, false, true, [], ?INTERNAL_USER),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 252817fba1..85f061a17b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,12 +18,12 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, policy_changed/2, callback/4, declare/6,
+-export([recover/0, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
- route/2, delete/2, validate_binding/2]).
+ route/2, delete/3, validate_binding/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
@@ -43,7 +43,7 @@
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
-spec declare
(name(), type(), boolean(), boolean(), boolean(),
- rabbit_framing:amqp_table())
+ rabbit_framing:amqp_table(), rabbit_types:username())
-> rabbit_types:exchange().
-spec check_type
(binary()) -> atom() | rabbit_types:connection_exit().
@@ -86,8 +86,10 @@
-spec route(rabbit_types:exchange(), rabbit_types:delivery())
-> [rabbit_amqqueue:name()].
-spec delete
- (name(), 'true') -> 'ok' | rabbit_types:error('not_found' | 'in_use');
- (name(), 'false') -> 'ok' | rabbit_types:error('not_found').
+ (name(), 'true', rabbit_types:username()) ->
+ 'ok'| rabbit_types:error('not_found' | 'in_use');
+ (name(), 'false', rabbit_types:username()) ->
+ 'ok' | rabbit_types:error('not_found').
-spec validate_binding
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
@@ -101,7 +103,7 @@
%%----------------------------------------------------------------------------
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
- policy]).
+ policy, user_who_performed_action]).
recover() ->
Xs = rabbit_misc:table_filter(
@@ -151,14 +153,15 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
+declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
X = rabbit_exchange_decorator:set(
rabbit_policy:set(#exchange{name = XName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
internal = Internal,
- arguments = Args})),
+ arguments = Args,
+ options = #{user => Username}})),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -342,6 +345,8 @@ i(policy, X) -> case rabbit_policy:name(X) of
none -> '';
Policy -> Policy
end;
+i(user_who_performed_action, #exchange{options = Opts}) ->
+ maps:get(user, Opts, ?UNKNOWN_USER);
i(Item, #exchange{type = Type} = X) ->
case (type_to_module(Type)):info(X, [Item]) of
[{Item, I}] -> I;
@@ -437,7 +442,7 @@ call_with_exchange(XName, Fun) ->
end
end).
-delete(XName, IfUnused) ->
+delete(XName, IfUnused, Username) ->
Fun = case IfUnused of
true -> fun conditional_delete/2;
false -> fun unconditional_delete/2
@@ -449,7 +454,7 @@ delete(XName, IfUnused) ->
%% see rabbitmq/rabbitmq-federation#7
rabbit_runtime_parameters:set(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
- XName#resource.name, true, none),
+ XName#resource.name, true, Username),
call_with_exchange(
XName,
fun (X) ->
@@ -457,7 +462,7 @@ delete(XName, IfUnused) ->
{deleted, X, Bs, Deletions} ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
+ XName, {X, deleted, Bs}, Deletions), Username);
{error, _InUseOrNotFound} = E ->
rabbit_misc:const(E)
end
@@ -465,7 +470,7 @@ delete(XName, IfUnused) ->
after
rabbit_runtime_parameters:clear(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
- XName#resource.name)
+ XName#resource.name, Username)
end.
validate_binding(X = #exchange{type = XType}, Binding) ->
diff --git a/src/rabbit_exchange_parameters.erl b/src/rabbit_exchange_parameters.erl
index c0ca0a985b..8af84a5935 100644
--- a/src/rabbit_exchange_parameters.erl
+++ b/src/rabbit_exchange_parameters.erl
@@ -21,7 +21,7 @@
-include("rabbit.hrl").
-export([register/0]).
--export([validate/5, notify/4, notify_clear/3]).
+-export([validate/5, notify/5, notify_clear/4]).
-import(rabbit_misc, [pget/2]).
@@ -42,8 +42,8 @@ register() ->
validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->
ok.
-notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term) ->
+notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _Username) ->
ok.
-notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name) ->
+notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Username) ->
ok.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 437842b8dd..4ab972872e 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -43,10 +43,10 @@
-export([register/0]).
-export([invalidate/0, recover/0]).
-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]).
--export([validate/5, notify/4, notify_clear/3]).
--export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
+-export([validate/5, notify/5, notify_clear/4]).
+-export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]).
--export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1,
+-export([parse_set_op/7, set_op/7, delete_op/3, lookup_op/2, list_op/0, list_op/1,
list_formatted_op/1, list_formatted_op/3]).
-rabbit_boot_step({?MODULE,
@@ -198,38 +198,42 @@ invalid_file() ->
%%----------------------------------------------------------------------------
-parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
+ parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority,
+ ApplyTo, ActingUser).
-parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
+ parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo,
+ ActingUser).
-parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
try rabbit_data_coercion:to_integer(Priority) of
- Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo)
+ Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo,
+ ActingUser)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
+parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo, ActingUser) ->
case rabbit_json:try_decode(Defn) of
{ok, Term} ->
set0(Type, VHost, Name,
[{<<"pattern">>, Pattern},
{<<"definition">>, maps:to_list(Term)},
{<<"priority">>, Priority},
- {<<"apply-to">>, ApplyTo}]);
+ {<<"apply-to">>, ApplyTo}],
+ ActingUser);
error ->
{error_string, "JSON decoding error"}
end.
-set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
+ set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
-set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
+ set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
-set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
@@ -240,16 +244,16 @@ set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
undefined -> <<"all">>;
_ -> ApplyTo
end}],
- set0(Type, VHost, Name, PolicyProps).
+ set0(Type, VHost, Name, PolicyProps, ActingUser).
-set0(Type, VHost, Name, Term) ->
- rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none).
+set0(Type, VHost, Name, Term, ActingUser) ->
+ rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, ActingUser).
-delete_op(VHost, Name) ->
- rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name).
+delete_op(VHost, Name, ActingUser) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name, ActingUser).
-delete(VHost, Name) ->
- rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+delete(VHost, Name, ActingUser) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name, ActingUser).
lookup_op(VHost, Name) ->
case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of
@@ -322,18 +326,23 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
Name, operator_policy_validation(), Term).
-notify(VHost, <<"policy">>, Name, Term) ->
- rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
+notify(VHost, <<"policy">>, Name, Term, ActingUser) ->
+ rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
+ {user_who_performed_action, ActingUser} | Term]),
update_policies(VHost);
-notify(VHost, <<"operator_policy">>, Name, Term) ->
- rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
+notify(VHost, <<"operator_policy">>, Name, Term, ActingUser) ->
+ rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
+ {user_who_performed_action, ActingUser} | Term]),
update_policies(VHost).
-notify_clear(VHost, <<"policy">>, Name) ->
- rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
+notify_clear(VHost, <<"policy">>, Name, ActingUser) ->
+ rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost},
+ {user_who_performed_action, ActingUser}]),
update_policies(VHost);
-notify_clear(VHost, <<"operator_policy">>, Name) ->
- rabbit_event:notify(operator_policy_cleared, [{name, Name}, {vhost, VHost}]),
+notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) ->
+ rabbit_event:notify(operator_policy_cleared,
+ [{name, Name}, {vhost, VHost},
+ {user_who_performed_action, ActingUser}]),
update_policies(VHost).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index cd58de95dd..2bd3ebf75e 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -32,7 +32,7 @@
-record(state, {consumers, use}).
--record(consumer, {tag, ack_required, prefetch, args}).
+-record(consumer, {tag, ack_required, prefetch, args, user}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -68,7 +68,8 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state())
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_types:username(), state())
-> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
@@ -113,8 +114,8 @@ consumers(Consumers, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
- args = Args} = Consumer,
- [{ChPid, CTag, Ack, Prefetch, Args} | Acc1]
+ args = Args, user = Username} = Consumer,
+ [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -123,8 +124,8 @@ unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
- State = #state{consumers = Consumers,
- use = CUInfo}) ->
+ Username, State = #state{consumers = Consumers,
+ use = CUInfo}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -142,7 +143,8 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
prefetch = Prefetch,
- args = Args},
+ args = Args,
+ user = Username},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
use = update_use(CUInfo, active)}.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index cee5408f0a..7a320e108b 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -51,13 +51,13 @@
-include("rabbit.hrl").
--export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1,
+-export([parse_set/5, set/5, set_any/5, clear/4, clear_any/4, list/0, list/1,
list_component/1, list/2, list_formatted/1, list_formatted/3,
lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
--export([parse_set_global/2, set_global/2, value_global/1, value_global/2,
+-export([parse_set_global/3, set_global/3, value_global/1, value_global/2,
list_global/0, list_global_formatted/0, list_global_formatted/2,
- lookup_global/1, global_info_keys/0, clear_global/1]).
+ lookup_global/1, global_info_keys/0, clear_global/2]).
%%----------------------------------------------------------------------------
@@ -65,15 +65,18 @@
-type ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok').
-spec parse_set(rabbit_types:vhost(), binary(), binary(), string(),
- rabbit_types:user() | 'none') -> ok_or_error_string().
+ rabbit_types:user() | rabbit_types:username() | 'none')
+ -> ok_or_error_string().
-spec set(rabbit_types:vhost(), binary(), binary(), term(),
- rabbit_types:user() | 'none') -> ok_or_error_string().
+ rabbit_types:user() | rabbit_types:username() | 'none')
+ -> ok_or_error_string().
-spec set_any(rabbit_types:vhost(), binary(), binary(), term(),
- rabbit_types:user() | 'none') -> ok_or_error_string().
--spec set_global(atom(), term()) -> 'ok'.
--spec clear(rabbit_types:vhost(), binary(), binary())
- -> ok_thunk_or_error_string().
--spec clear_any(rabbit_types:vhost(), binary(), binary())
+ rabbit_types:user() | rabbit_types:username() | 'none')
+ -> ok_or_error_string().
+-spec set_global(atom(), term(), rabbit_types:username()) -> 'ok'.
+-spec clear(rabbit_types:vhost(), binary(), binary(), rabbit_types:username())
+ -> ok_thunk_or_error_string().
+-spec clear_any(rabbit_types:vhost(), binary(), binary(), rabbit_types:username())
-> ok_thunk_or_error_string().
-spec list() -> [rabbit_types:infos()].
-spec list(rabbit_types:vhost() | '_') -> [rabbit_types:infos()].
@@ -113,19 +116,20 @@ set(_, <<"policy">>, _, _, _) ->
set(VHost, Component, Name, Term, User) ->
set_any(VHost, Component, Name, Term, User).
-parse_set_global(Name, String) ->
+parse_set_global(Name, String, ActingUser) ->
Definition = rabbit_data_coercion:to_binary(String),
case rabbit_json:try_decode(Definition) of
- {ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term));
- {ok, Term} -> set_global(Name, Term);
+ {ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term), ActingUser);
+ {ok, Term} -> set_global(Name, Term, ActingUser);
error -> {error_string, "JSON decoding error"}
end.
-set_global(Name, Term) ->
+set_global(Name, Term, ActingUser) ->
NameAsAtom = rabbit_data_coercion:to_atom(Name),
mnesia_update(NameAsAtom, Term),
event_notify(parameter_set, none, global, [{name, NameAsAtom},
- {value, Term}]),
+ {value, Term},
+ {user_who_performed_action, ActingUser}]),
ok.
format_error(L) ->
@@ -141,15 +145,19 @@ set_any0(VHost, Component, Name, Term, User) ->
case lookup_component(Component) of
{ok, Mod} ->
case flatten_errors(
- Mod:validate(VHost, Component, Name, Term, User)) of
+ Mod:validate(VHost, Component, Name, Term, get_user(User))) of
ok ->
case mnesia_update(VHost, Component, Name, Term) of
- {old, Term} -> ok;
- _ -> event_notify(
- parameter_set, VHost, Component,
- [{name, Name},
- {value, Term}]),
- Mod:notify(VHost, Component, Name, Term)
+ {old, Term} ->
+ ok;
+ _ ->
+ ActingUser = get_username(User),
+ event_notify(
+ parameter_set, VHost, Component,
+ [{name, Name},
+ {value, Term},
+ {user_who_performed_action, ActingUser}]),
+ Mod:notify(VHost, Component, Name, Term, ActingUser)
end,
ok;
E ->
@@ -159,6 +167,19 @@ set_any0(VHost, Component, Name, Term, User) ->
E
end.
+%% Validate only an user record as expected by the API before #rabbitmq-event-exchange-10
+get_user(#user{} = User) ->
+ User;
+get_user(_) ->
+ none.
+
+get_username(#user{username = Username}) ->
+ Username;
+get_username(none) ->
+ ?INTERNAL_USER;
+get_username(Any) ->
+ Any.
+
mnesia_update(Key, Term) ->
rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)).
@@ -176,15 +197,17 @@ mnesia_update_fun(Key, Term) ->
Res
end.
-clear(_, <<"policy">> , _) ->
+clear(_, <<"policy">> , _, _) ->
{error_string, "policies may not be cleared using this method"};
-clear(VHost, Component, Name) ->
- clear_any(VHost, Component, Name).
+clear(VHost, Component, Name, ActingUser) ->
+ clear_any(VHost, Component, Name, ActingUser).
-clear_global(Key) ->
+clear_global(Key, ActingUser) ->
KeyAsAtom = rabbit_data_coercion:to_atom(Key),
Notify = fun() ->
- event_notify(parameter_set, none, global, [{name, KeyAsAtom}]),
+ event_notify(parameter_set, none, global,
+ [{name, KeyAsAtom},
+ {user_who_performed_action, ActingUser}]),
ok
end,
case value_global(KeyAsAtom) of
@@ -212,13 +235,14 @@ clear_component(Component) ->
ok
end.
-clear_any(VHost, Component, Name) ->
+clear_any(VHost, Component, Name, ActingUser) ->
Notify = fun () ->
case lookup_component(Component) of
{ok, Mod} -> event_notify(
parameter_cleared, VHost, Component,
- [{name, Name}]),
- Mod:notify_clear(VHost, Component, Name);
+ [{name, Name},
+ {user_who_performed_action, ActingUser}]),
+ Mod:notify_clear(VHost, Component, Name, ActingUser);
_ -> ok
end
end,
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 178db3e3e3..2116e2dfa1 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -59,6 +59,8 @@
-rabbit_upgrade({vhost_limits, mnesia, []}).
-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
-rabbit_upgrade({topic_permission, mnesia, []}).
+-rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}).
+-rabbit_upgrade({exchange_options, mnesia, [operator_policies]}).
%% -------------------------------------------------------------------
@@ -95,6 +97,8 @@
-spec vhost_limits() -> 'ok'.
-spec operator_policies() -> 'ok'.
-spec queue_vhost_field() -> 'ok'.
+-spec queue_options() -> 'ok'.
+-spec exchange_options() -> 'ok'.
%%--------------------------------------------------------------------
@@ -553,6 +557,25 @@ queue_vhost_field(Table) ->
sync_slave_pids, recoverable_slaves, policy, operator_policy,
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost]).
+queue_options() ->
+ ok = queue_options(rabbit_queue),
+ ok = queue_options(rabbit_durable_queue),
+ ok.
+
+queue_options(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown, VHost}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown, VHost, #{}}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, operator_policy,
+ gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
@@ -571,6 +594,21 @@ topic_permission() ->
{attributes, [topic_permission_key, permission]},
{disc_copies, [node()]}]).
+exchange_options() ->
+ ok = exchange_options(rabbit_exchange),
+ ok = exchange_options(rabbit_durable_exchange).
+
+exchange_options(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, OperatorPolicy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, OperatorPolicy, Decorators, #{}}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ operator_policy, decorators, options]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 6edb62425b..726b39cf28 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,14 +20,14 @@
%%----------------------------------------------------------------------------
--export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
+-export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([purge_messages/1]).
--spec add(rabbit_types:vhost()) -> 'ok'.
--spec delete(rabbit_types:vhost()) -> 'ok'.
+-spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
+-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
@@ -46,7 +46,7 @@
-define(INFO_KEYS, [name, tracing]).
-add(VHostPath) ->
+add(VHostPath, ActingUser) ->
rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -62,7 +62,7 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, Internal, []) ||
+ Type, true, false, Internal, [], ActingUser) ||
{Name, Type, Internal} <-
[{<<"">>, direct, false},
{<<"amq.direct">>, direct, false},
@@ -75,24 +75,26 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
- rabbit_event:notify(vhost_created, info(VHostPath)),
+ rabbit_event:notify(vhost_created, info(VHostPath)
+ ++ [{user_who_performed_action, ActingUser}]),
R.
-delete(VHostPath) ->
+delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside
%% the TX below. Queue deletion involves sending messages to the queue
%% process, which in turn results in further mnesia actions and
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
- QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end,
+ QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end,
[assert_benign(rabbit_amqqueue:with(Name, QDelFun)) ||
#amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
- [assert_benign(rabbit_exchange:delete(Name, false)) ||
+ [assert_benign(rabbit_exchange:delete(Name, false, ActingUser)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
Funs = rabbit_misc:execute_mnesia_transaction(
- with(VHostPath, fun () -> internal_delete(VHostPath) end)),
- ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
+ with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
+ {user_who_performed_action, ActingUser}]),
[ok = Fun() || Fun <- Funs],
ok.
@@ -117,18 +119,19 @@ assert_benign({error, {absent, Q, _}}) ->
{error, not_found} -> ok
end.
-internal_delete(VHostPath) ->
+internal_delete(VHostPath, ActingUser) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
- proplists:get_value(user, Info), VHostPath)
+ proplists:get_value(user, Info), VHostPath, ActingUser)
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
TopicPermissions = rabbit_auth_backend_internal:list_vhost_topic_permissions(VHostPath),
[ok = rabbit_auth_backend_internal:clear_topic_permissions(
proplists:get_value(user, TopicPermission), VHostPath) || TopicPermission <- TopicPermissions],
Fs1 = [rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info),
- proplists:get_value(name, Info))
+ proplists:get_value(name, Info),
+ ActingUser)
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
+ Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser)
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
purge_messages(VHostPath),
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index b79ca63642..b948a94668 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -21,10 +21,10 @@
-include("rabbit.hrl").
-export([register/0]).
--export([parse_set/2, set/2, clear/1]).
+-export([parse_set/3, set/3, clear/2]).
-export([list/0, list/1]).
--export([update_limit/3, clear_limit/2, get_limit/2]).
--export([validate/5, notify/4, notify_clear/3]).
+-export([update_limit/4, clear_limit/3, get_limit/2]).
+-export([validate/5, notify/5, notify_clear/4]).
-export([connection_limit/1, queue_limit/1,
is_over_queue_limit/1, is_over_connection_limit/1]).
@@ -45,12 +45,15 @@ validate(_VHost, <<"vhost-limits">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
Name, vhost_limit_validation(), Term).
-notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) ->
- rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]),
+notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
+ rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>},
+ {user_who_performed_action, ActingUser}
+ | Limits]),
update_vhost(VHost, Limits).
-notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
- rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]),
+notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
+ rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
+ {user_who_performed_action, ActingUser}]),
update_vhost(VHost, undefined).
connection_limit(VirtualHost) ->
@@ -129,38 +132,38 @@ is_over_queue_limit(VirtualHost) ->
%%----------------------------------------------------------------------------
-parse_set(VHost, Defn) ->
+parse_set(VHost, Defn, ActingUser) ->
Definition = rabbit_data_coercion:to_binary(Defn),
case rabbit_json:try_decode(Definition) of
{ok, Term} ->
- set(VHost, maps:to_list(Term));
+ set(VHost, maps:to_list(Term), ActingUser);
error ->
{error_string, "JSON decoding error"}
end.
-set(VHost, Defn) ->
+set(VHost, Defn, ActingUser) ->
rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>,
- <<"limits">>, Defn, none).
+ <<"limits">>, Defn, ActingUser).
-clear(VHost) ->
+clear(VHost, ActingUser) ->
rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>,
- <<"limits">>).
+ <<"limits">>, ActingUser).
-update_limit(VHost, Name, Value) ->
+update_limit(VHost, Name, Value, ActingUser) ->
OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of
[] -> [];
[Param] -> pget(value, Param, [])
end,
NewDef = [{Name, Value} | lists:keydelete(Name, 1, OldDef)],
- set(VHost, NewDef).
+ set(VHost, NewDef, ActingUser).
-clear_limit(VHost, Name) ->
+clear_limit(VHost, Name, ActingUser) ->
OldDef = case rabbit_runtime_parameters:list(VHost, <<"vhost-limits">>) of
[] -> [];
[Param] -> pget(value, Param, [])
end,
NewDef = lists:keydelete(Name, 1, OldDef),
- set(VHost, NewDef).
+ set(VHost, NewDef, ActingUser).
vhost_limit_validation() ->
[{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
diff --git a/test/dummy_runtime_parameters.erl b/test/dummy_runtime_parameters.erl
index d80ec785d0..0bf4b82d86 100644
--- a/test/dummy_runtime_parameters.erl
+++ b/test/dummy_runtime_parameters.erl
@@ -20,7 +20,7 @@
-include("rabbit.hrl").
--export([validate/5, notify/4, notify_clear/3]).
+-export([validate/5, notify/5, notify_clear/4]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -43,8 +43,8 @@ validate(_, <<"test">>, <<"admin">>, _Term, User) ->
end;
validate(_, <<"test">>, _, _, _) -> {error, "meh", []}.
-notify(_, _, _, _) -> ok.
-notify_clear(_, _, _) -> ok.
+notify(_, _, _, _, _) -> ok.
+notify_clear(_, _, _, _) -> ok.
%----------------------------------------------------------------------------
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 502e3a7e86..1e32fae254 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -223,7 +223,7 @@ vhost_deletion(Config) ->
rabbit_ct_broker_helpers:set_ha_policy_all(Config),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
amqp_channel:call(ACh, #'queue.declare'{queue = <<"vhost_deletion-q">>}),
- ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>]),
+ ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
ok.
promote_on_shutdown(Config) ->
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
index e77f27f14b..8f9433beb4 100644
--- a/test/queue_master_location_SUITE.erl
+++ b/test/queue_master_location_SUITE.erl
@@ -245,7 +245,8 @@ unset_location_config(Config) ->
declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
- [QueueName, Durable, AutoDelete, Args, Owner]),
+ [QueueName, Durable, AutoDelete, Args, Owner,
+ <<"acting-user">>]),
Queue.
verify_min_master(Config, Q) ->
diff --git a/test/topic_permission_SUITE.erl b/test/topic_permission_SUITE.erl
index 0ae44bc69b..7b9d9f7701 100644
--- a/test/topic_permission_SUITE.erl
+++ b/test/topic_permission_SUITE.erl
@@ -70,13 +70,13 @@ topic_permission_database_access(Config) ->
topic_permission_database_access1(_Config) ->
0 = length(ets:tab2list(rabbit_topic_permission)),
- rabbit_vhost:add(<<"/">>),
- rabbit_vhost:add(<<"other-vhost">>),
- rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>),
- rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>),
+ rabbit_vhost:add(<<"/">>, <<"acting-user">>),
+ rabbit_vhost:add(<<"other-vhost">>, <<"acting-user">>),
+ rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>, <<"acting-user">>),
+ rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>, <<"acting-user">>),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", "^a"
+ <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", "^a", <<"acting-user">>
),
1 = length(ets:tab2list(rabbit_topic_permission)),
1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@@ -88,7 +88,7 @@ topic_permission_database_access1(_Config) ->
1 = length(rabbit_auth_backend_internal:list_topic_permissions()),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*"
+ <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*", <<"acting-user">>
),
2 = length(ets:tab2list(rabbit_topic_permission)),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@@ -100,10 +100,10 @@ topic_permission_database_access1(_Config) ->
2 = length(rabbit_auth_backend_internal:list_topic_permissions()),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"/">>, <<"topic1">>, "^a", "^a"
+ <<"guest">>, <<"/">>, <<"topic1">>, "^a", "^a", <<"acting-user">>
),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"/">>, <<"topic2">>, "^a", "^a"
+ <<"guest">>, <<"/">>, <<"topic2">>, "^a", "^a", <<"acting-user">>
),
4 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@@ -111,25 +111,28 @@ topic_permission_database_access1(_Config) ->
1 = length(rabbit_auth_backend_internal:list_user_vhost_topic_permissions(<<"guest">>,<<"other-vhost">>)),
4 = length(rabbit_auth_backend_internal:list_topic_permissions()),
- rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"other-vhost">>),
+ rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"other-vhost">>,
+ <<"acting-user">>),
0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)),
3 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
- rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>, <<"topic1">>),
+ rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>, <<"topic1">>,
+ <<"acting-user">>),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
- rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>),
+ rabbit_auth_backend_internal:clear_topic_permissions(<<"guest">>, <<"/">>,
+ <<"acting-user">>),
0 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
- <<"non-existing-user">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*"
+ <<"non-existing-user">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*", <<"acting-user">>
)),
{error, {no_such_vhost, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*", ".*"
+ <<"guest">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*", ".*", <<"acting-user">>
)),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
- <<"non-existing-user">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*", ".*"
+ <<"non-existing-user">>, <<"non-existing-vhost">>, <<"amq.topic">>, ".*", ".*", <<"acting-user">>
)),
{error, {no_such_user, _}} = (catch rabbit_auth_backend_internal:list_user_topic_permissions(
@@ -141,7 +144,7 @@ topic_permission_database_access1(_Config) ->
)),
{error, {invalid_regexp, _, _}} = (catch rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"/">>, <<"amq.topic">>, "[", "^a"
+ <<"guest">>, <<"/">>, <<"amq.topic">>, "[", "^a", <<"acting-user">>
)),
ok.
@@ -159,11 +162,11 @@ topic_permission_checks1(_Config) ->
#vhost{virtual_host = <<"other-vhost">>},
write)
end),
- rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>),
- rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>),
+ rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>, <<"acting-user">>),
+ rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>, <<"acting-user">>),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", "^a"
+ <<"guest">>, <<"/">>, <<"amq.topic">>, "^a", "^a", <<"acting-user">>
),
1 = length(ets:tab2list(rabbit_topic_permission)),
1 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@@ -172,7 +175,7 @@ topic_permission_checks1(_Config) ->
0 = length(rabbit_auth_backend_internal:list_vhost_topic_permissions(<<"other-vhost">>)),
rabbit_auth_backend_internal:set_topic_permissions(
- <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*"
+ <<"guest">>, <<"other-vhost">>, <<"amq.topic">>, ".*", ".*", <<"acting-user">>
),
2 = length(ets:tab2list(rabbit_topic_permission)),
2 = length(rabbit_auth_backend_internal:list_user_topic_permissions(<<"guest">>)),
@@ -215,4 +218,4 @@ topic_permission_checks1(_Config) ->
Perm,
Context
) || Perm <- Permissions],
- ok. \ No newline at end of file
+ ok.
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index 91a3eb32a6..b1ebd054cf 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -794,7 +794,7 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
rabbit_amqqueue:declare(
queue_name(Config,
<<"bq_variable_queue_delete_msg_store_files_callback-q">>),
- true, false, [], none),
+ true, false, [], none, <<"acting-user">>),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
publish_and_confirm(Q, Payload, Count),
@@ -811,7 +811,7 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
%% give the queue a second to receive the close_fds callback msg
timer:sleep(1000),
- rabbit_amqqueue:delete(Q, false, false),
+ rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
passed.
bq_queue_recover(Config) ->
@@ -822,7 +822,7 @@ bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
- true, false, [], none),
+ true, false, [], none, <<"acting-user">>),
publish_and_confirm(Q, <<>>, Count),
SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid),
@@ -848,7 +848,7 @@ bq_queue_recover1(Config) ->
rabbit_variable_queue:fetch(true, VQ1),
CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- ok = rabbit_amqqueue:internal_delete(QName)
+ ok = rabbit_amqqueue:internal_delete(QName, <<"acting-user">>)
end),
passed.
@@ -2166,12 +2166,12 @@ change_password1(_Config) ->
UserName = <<"test_user">>,
Password = <<"test_password">>,
case rabbit_auth_backend_internal:lookup_user(UserName) of
- {ok, _} -> rabbit_auth_backend_internal:delete_user(UserName);
+ {ok, _} -> rabbit_auth_backend_internal:delete_user(UserName, <<"acting-user">>);
_ -> ok
end,
ok = application:set_env(rabbit, password_hashing_module,
rabbit_password_hashing_md5),
- ok = rabbit_auth_backend_internal:add_user(UserName, Password),
+ ok = rabbit_auth_backend_internal:add_user(UserName, Password, <<"acting-user">>),
{ok, #auth_user{username = UserName}} =
rabbit_auth_backend_internal:user_login_authentication(
UserName, [{password, Password}]),
@@ -2182,7 +2182,8 @@ change_password1(_Config) ->
UserName, [{password, Password}]),
NewPassword = <<"test_password1">>,
- ok = rabbit_auth_backend_internal:change_password(UserName, NewPassword),
+ ok = rabbit_auth_backend_internal:change_password(UserName, NewPassword,
+ <<"acting-user">>),
{ok, #auth_user{username = UserName}} =
rabbit_auth_backend_internal:user_login_authentication(
UserName, [{password, NewPassword}]),
@@ -3004,14 +3005,14 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
fun () ->
{new, #amqqueue{name = QueueName, pid = QPid}} =
rabbit_amqqueue:declare(QueueName, false, false, [],
- none),
+ none, <<"acting-user">>),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
receive
{Pid, killed, OldPid} ->
Q = dead_queue_loop(QueueName, OldPid),
- {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
passed
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
@@ -3038,9 +3039,9 @@ refresh_events1(Config, SecondaryNode) ->
{new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>),
- false, false, [], none),
+ false, false, [], none, <<"acting-user">>),
expect_events(name, QName, queue_created),
- rabbit_amqqueue:delete(Q, false, false),
+ rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
dummy_event_receiver:stop(),
passed.
@@ -3074,7 +3075,8 @@ must_exit(Fun) ->
end.
dead_queue_loop(QueueName, OldPid) ->
- {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
+ <<"acting-user">>),
case Q#amqqueue.pid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);