diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-07-08 18:41:25 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2019-07-08 18:41:25 +0300 |
| commit | 50ac865517c84e01a122e5a8e92a1313b7114d67 (patch) | |
| tree | cebda5964958361142a6d18d01fb1958058c6304 /src | |
| parent | e151ebfe2a50a5e9418e12a6712545aa7054cc40 (diff) | |
| parent | d35182b709ed91b12b9b988ba28d1dcbf063370d (diff) | |
| download | rabbitmq-server-git-50ac865517c84e01a122e5a8e92a1313b7114d67.tar.gz | |
Merge branch 'master' into management-only-api
Diffstat (limited to 'src')
| -rw-r--r-- | src/amqqueue.erl | 83 | ||||
| -rw-r--r-- | src/amqqueue_v1.erl | 208 | ||||
| -rw-r--r-- | src/rabbit.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_log_tail.erl | 95 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 43 |
15 files changed, 599 insertions, 80 deletions
diff --git a/src/amqqueue.erl b/src/amqqueue.erl index dda2642c34..35e7f0c4c4 100644 --- a/src/amqqueue.erl +++ b/src/amqqueue.erl @@ -19,7 +19,9 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). --export([new/9, +-export([new/8, + new/9, + new_with_version/9, new_with_version/10, fields/0, fields/1, @@ -184,6 +186,40 @@ pid() | none, rabbit_framing:amqp_table(), rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new(Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new(rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, map(), atom()) -> amqqueue(). @@ -238,6 +274,44 @@ new(#resource{kind = queue} = Name, pid() | none, rabbit_framing:amqp_table(), rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new_with_version(RecordVersion, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version(RecordVersion, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new_with_version +(amqqueue_v1 | amqqueue_v2, + rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, map(), atom()) -> amqqueue(). @@ -359,6 +433,8 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> get_exclusive_owner(Queue) -> amqqueue_v1:get_exclusive_owner(Queue). +% gm_pids + -spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none. get_gm_pids(#amqqueue{gm_pids = GMPids}) -> @@ -421,7 +497,7 @@ get_pid(#amqqueue{pid = Pid}) -> Pid; get_pid(Queue) -> amqqueue_v1:get_pid(Queue). -spec set_pid -(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); +(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); (amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). set_pid(#amqqueue{} = Queue, Pid) -> @@ -509,7 +585,8 @@ set_slave_pids(Queue, SlavePids) -> -spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()]. -get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> +get_slave_pids_pending_shutdown( + #amqqueue{slave_pids_pending_shutdown = Slaves}) -> Slaves; get_slave_pids_pending_shutdown(Queue) -> amqqueue_v1:get_slave_pids_pending_shutdown(Queue). diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl index 0a0073075f..9b739026c6 100644 --- a/src/amqqueue_v1.erl +++ b/src/amqqueue_v1.erl @@ -17,9 +17,12 @@ -module(amqqueue_v1). -include_lib("rabbit_common/include/resource.hrl"). +-include("amqqueue.hrl"). -export([new/8, + new/9, new_with_version/9, + new_with_version/10, fields/0, fields/1, field_vhost/0, @@ -37,7 +40,8 @@ % gm_pids get_gm_pids/1, set_gm_pids/2, - % name + get_leader/1, + % name (#resource) get_name/1, set_name/2, % operator_policy @@ -53,6 +57,9 @@ % policy_version get_policy_version/1, set_policy_version/2, + % quorum_nodes + get_quorum_nodes/1, + set_quorum_nodes/2, % recoverable_slaves get_recoverable_slaves/1, set_recoverable_slaves/2, @@ -68,14 +75,19 @@ % sync_slave_pids get_sync_slave_pids/1, set_sync_slave_pids/2, + get_type/1, get_vhost/1, is_amqqueue/1, is_auto_delete/1, is_durable/1, + is_classic/1, + is_quorum/1, pattern_match_all/0, pattern_match_on_name/1, + pattern_match_on_type/1, reset_mirroring_and_decorators/1, set_immutable/1, + qnode/1, macros/0]). -define(record_version, ?MODULE). @@ -192,6 +204,42 @@ new(#resource{kind = queue} = Name, VHost, Options). +-spec new(rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + ?amqqueue_v1_type) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new( + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + -spec new_with_version(amqqueue_v1, rabbit_amqqueue:name(), pid() | none, @@ -227,6 +275,45 @@ new_with_version(?record_version, vhost = VHost, options = Options}. +-spec new_with_version(amqqueue_v1, + rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + ?amqqueue_v1_type) -> amqqueue(). + +new_with_version(?record_version, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + -spec is_amqqueue(any()) -> boolean(). is_amqqueue(#amqqueue{}) -> true; @@ -239,8 +326,7 @@ record_version_to_use() -> -spec upgrade(amqqueue()) -> amqqueue(). -upgrade(#amqqueue{} = Queue) -> - Queue. +upgrade(#amqqueue{} = Queue) -> Queue. -spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). @@ -260,66 +346,120 @@ set_arguments(#amqqueue{} = Queue, Args) -> % decorators +-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. + get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. +-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). + set_decorators(#amqqueue{} = Queue, Decorators) -> Queue#amqqueue{decorators = Decorators}. +-spec get_exclusive_owner(amqqueue()) -> pid() | none. + get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. % gm_pids +-spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none. + get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. +-spec set_gm_pids(amqqueue(), [{pid(), pid()} | pid()] | none) -> amqqueue(). + set_gm_pids(#amqqueue{} = Queue, GMPids) -> Queue#amqqueue{gm_pids = GMPids}. -% name - -get_name(#amqqueue{name = Name}) -> Name. +-spec get_leader(amqqueue_v1()) -> no_return(). -set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}. +get_leader(_) -> throw({unsupported, ?record_version, get_leader}). % operator_policy +-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. + get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. +-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> + amqqueue(). + set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> Queue#amqqueue{operator_policy = OpPolicy}. +% name + +-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). + +get_name(#amqqueue{name = Name}) -> Name. + +-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). + +set_name(#amqqueue{} = Queue, Name) -> + Queue#amqqueue{name = Name}. + +-spec get_options(amqqueue()) -> map(). + get_options(#amqqueue{options = Options}) -> Options. % pid +-spec get_pid +(amqqueue_v1:amqqueue_v1()) -> pid() | none. + get_pid(#amqqueue{pid = Pid}) -> Pid. +-spec set_pid +(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). + set_pid(#amqqueue{} = Queue, Pid) -> Queue#amqqueue{pid = Pid}. % policy +-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. + get_policy(#amqqueue{policy = Policy}) -> Policy. +-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). + set_policy(#amqqueue{} = Queue, Policy) -> Queue#amqqueue{policy = Policy}. % policy_version +-spec get_policy_version(amqqueue()) -> non_neg_integer(). + get_policy_version(#amqqueue{policy_version = PV}) -> PV. +-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). + set_policy_version(#amqqueue{} = Queue, PV) -> Queue#amqqueue{policy_version = PV}. % recoverable_slaves +-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. + get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> Slaves. +-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). + set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> Queue#amqqueue{recoverable_slaves = Slaves}. +% quorum_nodes (new in v2) + +-spec get_quorum_nodes(amqqueue()) -> no_return(). + +get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}). + +-spec set_quorum_nodes(amqqueue(), [node()]) -> no_return(). + +set_quorum_nodes(_, _) -> + throw({unsupported, ?record_version, set_quorum_nodes}). + % slave_pids get_slave_pids(#amqqueue{slave_pids = Slaves}) -> @@ -330,7 +470,8 @@ set_slave_pids(#amqqueue{} = Queue, SlavePids) -> % slave_pids_pending_shutdown -get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> +get_slave_pids_pending_shutdown( + #amqqueue{slave_pids_pending_shutdown = Slaves}) -> Slaves. set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> @@ -338,23 +479,55 @@ set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> % state +-spec get_state(amqqueue()) -> atom() | none. + get_state(#amqqueue{state = State}) -> State. -set_state(#amqqueue{} = Queue, State) -> Queue#amqqueue{state = State}. +-spec set_state(amqqueue(), atom() | none) -> amqqueue(). + +set_state(#amqqueue{} = Queue, State) -> + Queue#amqqueue{state = State}. % sync_slave_pids -get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> Pids. +-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. + +get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> + Pids. + +-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> Queue#amqqueue{sync_slave_pids = Pids}. +%% New in v2. + +-spec get_type(amqqueue()) -> atom(). + +get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. + +-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. + get_vhost(#amqqueue{vhost = VHost}) -> VHost. +-spec is_auto_delete(amqqueue()) -> boolean(). + is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. +-spec is_durable(amqqueue()) -> boolean(). + is_durable(#amqqueue{durable = Durable}) -> Durable. +-spec is_classic(amqqueue()) -> boolean(). + +is_classic(Queue) -> + get_type(Queue) =:= ?amqqueue_v1_type. + +-spec is_quorum(amqqueue()) -> boolean(). + +is_quorum(Queue) -> + get_type(Queue) =:= quorum. + fields() -> fields(?record_version). fields(?record_version) -> record_info(fields, amqqueue). @@ -370,6 +543,11 @@ pattern_match_all() -> #amqqueue{_ = '_'}. pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. +-spec pattern_match_on_type(atom()) -> no_return(). + +pattern_match_on_type(_) -> + throw({unsupported, ?record_version, pattern_match_on_type}). + reset_mirroring_and_decorators(#amqqueue{} = Queue) -> Queue#amqqueue{slave_pids = [], sync_slave_pids = [], @@ -386,6 +564,14 @@ set_immutable(#amqqueue{} = Queue) -> decorators = none, state = none}. +-spec qnode(amqqueue() | pid()) -> node(). + +qnode(Queue) when ?is_amqqueue(Queue) -> + QPid = get_pid(Queue), + qnode(QPid); +qnode(QPid) when is_pid(QPid) -> + node(QPid). + macros() -> io:format( "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", diff --git a/src/rabbit.erl b/src/rabbit.erl index 4a974fd682..fdf2138515 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,8 +33,6 @@ -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent -export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]). --deprecated([{force_event_refresh, 1, eventually}]). - -ifdef(TEST). -export([start_logger/0]). @@ -539,6 +537,12 @@ start_loaded_apps(Apps, RestartTypes) -> application:set_env(ra, logger_module, rabbit_log_ra_shim), %% use a larger segments size for queues application:set_env(ra, segment_max_entries, 32768), + case application:get_env(ra, wal_max_size_bytes) of + undefined -> + application:set_env(ra, wal_max_size_bytes, 536870912); %% 5 * 2 ^ 20 + _ -> + ok + end, ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of undefined -> []; @@ -1117,17 +1121,16 @@ start_logger() -> log_locations() -> rabbit_lager:log_locations(). -%% This feature was used by the management API up-to and including -%% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it -%% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters). - -spec force_event_refresh(reference()) -> 'ok'. +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started force_event_refresh(Ref) -> - rabbit_direct:force_event_refresh(Ref), - rabbit_networking:force_connection_event_refresh(Ref), - rabbit_channel:force_event_refresh(Ref), - rabbit_amqqueue:force_event_refresh(Ref). + ok = rabbit_direct:force_event_refresh(Ref), + ok = rabbit_networking:force_connection_event_refresh(Ref), + ok = rabbit_channel:force_event_refresh(Ref), + ok = rabbit_amqqueue:force_event_refresh(Ref). %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d04f0047de..4c68fe2eab 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -21,6 +21,8 @@ -export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2, check_vhost_access/4, check_resource_access/4, check_topic_access/4]). +-export([permission_cache_can_expire/1, update_state/2]). + %%---------------------------------------------------------------------------- -export_type([permission_atom/0]). @@ -217,3 +219,38 @@ check_access(Fun, Module, ErrStr, ErrArgs, ErrName) -> rabbit_log:error(FullErrStr, FullErrArgs), rabbit_misc:protocol_error(ErrName, FullErrStr, FullErrArgs) end. + +-spec update_state(User :: rabbit_types:user(), NewState :: term()) -> + {'ok', rabbit_types:auth_user()} | + {'refused', string()} | + {'error', any()}. + +update_state(User = #user{authz_backends = Backends0}, NewState) -> + %% N.B.: we use foldl/3 and prepending, so the final list of + %% backends is in reverse order from the original list. + Backends = lists:foldl( + fun({Module, Impl}, {ok, Acc}) -> + case Module:state_can_expire() of + true -> + case Module:update_state(auth_user(User, Impl), NewState) of + {ok, #auth_user{impl = Impl1}} -> + {ok, [{Module, Impl1} | Acc]}; + Else -> Else + end; + false -> + {ok, [{Module, Impl} | Acc]} + end; + (_, {error, _} = Err) -> Err; + (_, {refused, _, _} = Err) -> Err + end, {ok, []}, Backends0), + case Backends of + {ok, Pairs} -> {ok, User#user{authz_backends = lists:reverse(Pairs)}}; + Else -> Else + end. + +-spec permission_cache_can_expire(User :: rabbit_types:user()) -> boolean(). + +%% Returns true if any of the backends support credential expiration, +%% otherwise returns false. +permission_cache_can_expire(#user{authz_backends = Backends}) -> + lists:any(fun ({Module, _State}) -> Module:state_can_expire() end, Backends). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bfacef6d49..cbe8738c5a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -52,8 +52,6 @@ -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). --deprecated([{force_event_refresh, 1, eventually}]). - %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -1037,6 +1035,9 @@ list_local(VHostPath) -> -spec force_event_refresh(reference()) -> 'ok'. +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started force_event_refresh(Ref) -> [gen_server2:cast(amqqueue:get_pid(Q), {force_event_refresh, Ref}) || Q <- list()], diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2185d7c95f..46db98ba5a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1615,6 +1615,9 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, run_message_queue(true, State1) end); +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started handle_cast({force_event_refresh, Ref}, State = #q{consumers = Consumers, active_consumer = Holder}) -> diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index e16b14734b..e675ad188b 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -40,6 +40,8 @@ list_user_vhost_permissions/2, list_user_topic_permissions/1, list_vhost_topic_permissions/1, list_user_vhost_topic_permissions/2]). +-export([state_can_expire/0]). + %% for testing -export([hashing_module_for_user/1, expand_topic_permission/2]). @@ -93,6 +95,8 @@ user_login_authentication(Username, AuthProps) -> false -> exit({unknown_auth_props, Username, AuthProps}) end. +state_can_expire() -> false. + user_login_authorization(Username, _AuthProps) -> case user_login_authentication(Username, []) of {ok, #auth_user{impl = Impl, tags = Tags}} -> {ok, Impl, Tags}; @@ -123,7 +127,7 @@ check_vhost_access(#auth_user{username = Username}, VHostPath, _AuthzData) -> check_resource_access(#auth_user{username = Username}, #resource{virtual_host = VHostPath, name = Name}, - Permission, + Permission, _AuthContext) -> case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index cfc1a0ce18..706e5eedfa 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -31,9 +31,6 @@ %% SASL PLAIN, as used by the Qpid Java client and our clients. Also, %% apparently, by OpenAMQ. -%% TODO: reimplement this using the binary module? - that makes use of -%% BIFs to do binary matching and will thus be much faster. - description() -> [{description, <<"SASL PLAIN authentication mechanism">>}]. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f5c9e8dfce..d16929d962 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,15 +63,13 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). --export([source/2]). +-export([source/2, update_user_state/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). --deprecated([{force_event_refresh, 1, eventually}]). - %% Internal -export([list_local/0, emit_info_local/3, deliver_reply_local/3]). -export([get_vhost/1, get_user/1]). @@ -452,6 +450,9 @@ ready_for_close(Pid) -> -spec force_event_refresh(reference()) -> 'ok'. +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started force_event_refresh(Ref) -> [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. @@ -459,11 +460,21 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec source(pid(), any()) -> any(). +-spec source(pid(), any()) -> 'ok' | {error, channel_terminated}. source(Pid, Source) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of - true -> Pid ! {channel_source, Source}; + true -> Pid ! {channel_source, Source}, + ok; + false -> {error, channel_terminated} + end. + +-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. + +update_user_state(Pid, UserState) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {update_user_state, UserState}, + ok; false -> {error, channel_terminated} end. @@ -489,6 +500,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + %% Process dictionary is used here because permission cache already uses it. MK. + put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)), MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), State = #ch{cfg = #conf{state = starting, @@ -691,6 +704,9 @@ handle_cast({send_drained, CTagCredit}, || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started handle_cast({force_event_refresh, Ref}, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), Ref), @@ -838,6 +854,10 @@ handle_info({{Ref, Node}, LateAnswer}, noreply(State); handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> + case get(permission_cache_can_expire) of + true -> ok = clear_permission_cache(); + _ -> ok + end, QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), @@ -850,7 +870,10 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> Return end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> - noreply(State#ch{cfg = Cfg#conf{source = Source}}). + noreply(State#ch{cfg = Cfg#conf{source = Source}}); +handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) -> + noreply(State#ch{cfg = Cfg#conf{user = User}}). + handle_pre_hibernate(State0) -> ok = clear_permission_cache(), @@ -973,7 +996,7 @@ return_queue_declare_ok(#resource{name = ActualName}, check_resource_access(User, Resource, Perm, Context) -> V = {Resource, Context, Perm}, - + Cache = case get(permission_cache) of undefined -> []; Other -> Other @@ -1051,8 +1074,8 @@ check_topic_authorisation(_, _, _, _, _, _) -> ok. check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, - User = #user{username = Username}, - AmqpParams, RoutingKey, Permission) -> + User = #user{username = Username}, + AmqpParams, RoutingKey, Permission) -> Resource = Name#resource{kind = topic}, VariableMap = build_topic_variable_map(AmqpParams, VHost, Username), Context = #{routing_key => RoutingKey, diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index b918d2e671..b84b4d91bb 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -132,7 +132,9 @@ extract_protocol(Infos) -> maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) -> Module = rabbit_data_coercion:to_atom(string:to_lower( - "rabbit_" ++ rabbit_data_coercion:to_list(Protocol) ++ "_connection_info") + "rabbit_" ++ + lists:flatten(string:replace(rabbit_data_coercion:to_list(Protocol), " ", "_", all)) ++ + "_connection_info") ), Args = [Creds, VHost, Pid, Infos], code_server_cache:maybe_call_mfa(Module, additional_authn_params, Args, []). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d9566ea8f7..cfb1ac7fda 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -64,7 +64,9 @@ make_credit/4, make_purge/0, make_purge_nodes/1, - make_update_config/1 + make_update_config/1, + + from_log/2 ]). %% command records representing all the protocol actions that are supported @@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) -> [{_, Use}] -> Use end. +from_log(Log, State0) -> + lists:foldl( + fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) -> + Meta = Meta0#{index => Idx, + term => Term}, + case apply(Meta, Cmd, S0) of + {S, _, E} when is_list(E) -> + {S, Effs ++ E}; + {S, _, E} -> + {S, Effs ++ [E]}; + {S, _} -> + {S, Effs} + end; + (_, Acc) -> + Acc + end, {State0, []}, Log). + + %%% Internal messages_ready(#?MODULE{messages = M, diff --git a/src/rabbit_log_tail.erl b/src/rabbit_log_tail.erl new file mode 100644 index 0000000000..555164213b --- /dev/null +++ b/src/rabbit_log_tail.erl @@ -0,0 +1,95 @@ +-module(rabbit_log_tail). + +-export([tail_n_lines/2]). +-export([init_tail_stream/4]). + +-define(GUESS_OFFSET, 200). + +init_tail_stream(Filename, Pid, Ref, Duration) -> + RPCProc = self(), + Reader = spawn(fun() -> + link(Pid), + case file:open(Filename, [read, binary]) of + {ok, File} -> + TimeLimit = case Duration of + infinity -> infinity; + _ -> erlang:system_time(second) + Duration + end, + {ok, _} = file:position(File, eof), + RPCProc ! {Ref, opened}, + read_loop(File, Pid, Ref, TimeLimit); + {error, _} = Err -> + RPCProc ! {Ref, Err} + end + end), + receive + {Ref, opened} -> {ok, Ref}; + {Ref, {error, Err}} -> {error, Err} + after 5000 -> + exit(Reader, timeout), + {error, timeout} + end. + +read_loop(File, Pid, Ref, TimeLimit) -> + case is_integer(TimeLimit) andalso erlang:system_time(second) > TimeLimit of + true -> Pid ! {Ref, <<>>, finished}; + false -> + case file:read(File, ?GUESS_OFFSET) of + {ok, Data} -> + Pid ! {Ref, Data, confinue}, + read_loop(File, Pid, Ref, TimeLimit); + eof -> + timer:sleep(1000), + read_loop(File, Pid, Ref, TimeLimit); + {error, _} = Err -> + Pid ! {Ref, Err, finished} + end + end. + +tail_n_lines(Filename, N) -> + case file:open(Filename, [read, binary]) of + {ok, File} -> + {ok, Eof} = file:position(File, eof), + %% Eof may move. Only read up to the current one. + Result = reverse_read_n_lines(N, N, File, Eof, Eof), + file:close(File), + Result; + {error, _} = Error -> Error + end. + +reverse_read_n_lines(N, OffsetN, File, Position, Eof) -> + GuessPosition = offset(Position, OffsetN), + case read_lines_from_position(File, GuessPosition, Eof) of + {ok, Lines} -> + NLines = length(Lines), + case {NLines >= N, GuessPosition == 0} of + %% Take only N lines if there is more + {true, _} -> lists:nthtail(NLines - N, Lines); + %% Safe to assume that NLines is less then N + {_, true} -> Lines; + %% Adjust position + _ -> + reverse_read_n_lines(N, N - NLines + 1, File, GuessPosition, Eof) + end; + {error, _} = Error -> Error + end. + +read_from_position(File, GuessPosition, Eof) -> + file:pread(File, GuessPosition, max(0, Eof - GuessPosition)). + +read_lines_from_position(File, GuessPosition, Eof) -> + case read_from_position(File, GuessPosition, Eof) of + {ok, Data} -> + Lines = binary:split(Data, <<"\n">>, [global, trim]), + case {GuessPosition, Lines} of + %% If position is 0 - there are no partial lines + {0, _} -> {ok, Lines}; + %% Remove first line as it can be partial + {_, [_ | Rest]} -> {ok, Rest}; + {_, []} -> {ok, []} + end; + {error, _} = Error -> Error + end. + +offset(Base, N) -> + max(0, Base - N * ?GUESS_OFFSET).
\ No newline at end of file diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 162badc33d..bd1c26f20f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -45,7 +45,10 @@ %% Hooks used in `rabbit_node_monitor' on_node_up/1, - on_node_down/1 + on_node_down/1, + + %% Helpers for diagnostics commands + schema_info/1 ]). %% Used internally in rpc calls @@ -755,6 +758,18 @@ running_disc_nodes() -> ordsets:from_list(RunningNodes))). %%-------------------------------------------------------------------- +%% Helpers for diagnostics commands +%%-------------------------------------------------------------------- + +schema_info(Items) -> + Tables = mnesia:system_info(tables), + [info(Table, Items) || Table <- Tables]. + +info(Table, Items) -> + All = [{name, Table} | mnesia:table_info(Table, all)], + [{Item, proplists:get_value(Item, All)} || Item <- Items]. + +%%-------------------------------------------------------------------- %% Internal helpers %%-------------------------------------------------------------------- diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 75bd9515f1..eb34b9db9f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -32,7 +32,7 @@ -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). --export([add_member/3]). +-export([add_member/4]). -export([delete_member/3]). -export([requeue/3]). -export([policy_changed/2]). @@ -69,6 +69,7 @@ -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). +-define(ADD_MEMBER_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -119,23 +120,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> - RaMachine = ra_machine(NewQ), - ServerIds = [{RaName, Node} || Node <- Nodes], - ClusterName = RaName, TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), - RaConfs = [begin - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), - FName = rabbit_misc:rs(QName), - #{cluster_name => ClusterName, - id => ServerId, - uid => UId, - friendly_name => FName, - initial_members => ServerIds, - log_init_args => #{uid => UId}, - tick_timeout => TickTimeout, - machine => RaMachine} - end || ServerId <- ServerIds], - + RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) + || ServerId <- members(NewQ)], case ra:start_cluster(RaConfs) of {ok, _, _} -> rabbit_event:notify(queue_created, @@ -323,7 +310,6 @@ reductions(Name) -> recover(Queues) -> [begin {Name, _} = amqqueue:get_pid(Q0), - Nodes = amqqueue:get_quorum_nodes(Q0), case ra:restart_server({Name, node()}) of ok -> % queue was restarted, good @@ -333,10 +319,12 @@ recover(Queues) -> Err1 == name_not_registered -> % queue was never started on this node % so needs to be started from scratch. - Machine = ra_machine(Q0), - RaNodes = [{Name, Node} || Node <- Nodes], - case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of - ok -> ok; + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout), + case ra:start_server(Conf) of + ok -> + ok; Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" " be started ~w", [Name, Err2]), @@ -698,7 +686,7 @@ get_sys_status(Proc) -> end. -add_member(VHost, Name, Node) -> +add_member(VHost, Name, Node, Timeout) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -714,23 +702,25 @@ add_member(VHost, Name, Node) -> %% idempotent by design ok; false -> - add_member(Q, Node) + add_member(Q, Node, Timeout) end end; {error, not_found} = E -> E end. -add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> - {RaName, _} = ServerRef = amqqueue:get_pid(Q), +add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> + {RaName, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), - QNodes = amqqueue:get_quorum_nodes(Q), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + Members = members(Q), + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q, ServerId, TickTimeout), + case ra:start_server(Conf) of ok -> - case ra:add_member(ServerRef, ServerId) of + case ra:add_member(Members, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -742,9 +732,11 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; {timeout, _} -> + _ = ra:force_delete_server(ServerId), + _ = ra:remove_member(Members, ServerId), {error, timeout}; E -> - %% TODO should we stop the ra process here? + _ = ra:force_delete_server(ServerId), E end; E -> @@ -769,16 +761,18 @@ delete_member(VHost, Name, Node) -> E end. + delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case amqqueue:get_quorum_nodes(Q) of - [Node] -> + case members(Q) of + [{_, Node}] -> + %% deleting the last member is not allowed {error, last_node}; - _ -> - case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of + Members -> + case ra:leave_and_delete_server(Members, ServerId) of ok -> Fun = fun(Q1) -> amqqueue:set_quorum_nodes( @@ -827,7 +821,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> QName = amqqueue:get_name(Q), rabbit_log:info("~s: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node) of + case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of ok -> {QName, {ok, Size + 1}}; {error, Err} -> @@ -1142,3 +1136,26 @@ select_quorum_nodes(0, _, Selected) -> select_quorum_nodes(Size, Rest, Selected) -> S = lists:nth(rand:uniform(length(Rest)), Rest), select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). + +%% member with the current leader first +members(Q) when ?amqqueue_is_quorum(Q) -> + {RaName, LeaderNode} = amqqueue:get_pid(Q), + Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)), + [{RaName, N} || N <- [LeaderNode | Nodes]]. + +make_ra_conf(Q, ServerId, TickTimeout) -> + QName = amqqueue:get_name(Q), + RaMachine = ra_machine(Q), + [{ClusterName, _} | _] = Members = members(Q), + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + FName = rabbit_misc:rs(QName), + #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + friendly_name => FName, + metrics_key => QName, + initial_members => Members, + log_init_args => #{uid => UId}, + tick_timeout => TickTimeout, + machine => RaMachine}. + diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8f64a70b5f..39ac0ef8ac 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -66,8 +66,6 @@ -export([conserve_resources/3, server_properties/1]). --deprecated([{force_event_refresh, 2, eventually}]). - -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -220,6 +218,9 @@ info(Pid, Items) -> -spec force_event_refresh(pid(), reference()) -> 'ok'. +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started force_event_refresh(Pid, Ref) -> gen_server:cast(Pid, {force_event_refresh, Ref}). @@ -1273,6 +1274,44 @@ handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, State; +handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason}, + State = #v1{connection = + #connection{protocol = Protocol, + user = User = #user{username = Username}, + log_name = ConnName} = Conn, + sock = Sock}) when ?IS_RUNNING(State) -> + rabbit_log_connection:debug( + "connection ~p (~s) of user '~s': " + "asked to update secret, reason: ~s~n", + [self(), dynamic_connection_name(ConnName), Username, Reason]), + case rabbit_access_control:update_state(User, NewSecret) of + {ok, User1} -> + %% User/auth backend state has been updated. Now we can propagate it to channels + %% asynchronously and return. All the channels have to do is to update their + %% own state. + %% + %% Any secret update errors coming from the authz backend will be handled in the other branch. + %% Therefore we optimistically do no error handling here. MK. + lists:foreach(fun(Ch) -> + rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]), + _ = rabbit_channel:update_user_state(Ch, User1) + end, all_channels()), + ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol), + rabbit_log_connection:info( + "connection ~p (~s): " + "user '~s' updated secret, reason: ~s~n", + [self(), dynamic_connection_name(ConnName), Username, Reason]), + State#v1{connection = Conn#connection{user = User1}}; + {refused, Message} -> + rabbit_log_connection:error("Secret update was refused for user '~p': ~p", + [Username, Message]), + rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []); + {error, Message} -> + rabbit_log_connection:error("Secret update for user '~p' failed: ~p", + [Username, Message]), + rabbit_misc:protocol_error(not_allowed, + "Secret update failed", []) + end; handle_method0(_Method, State) when ?IS_STOPPING(State) -> State; handle_method0(_Method, #v1{connection_state = S}) -> |
