diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2018-11-30 11:30:36 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-02-01 11:23:16 +0100 |
| commit | 5bbde6d0a3eb1790d4965d76c8699d0187b74183 (patch) | |
| tree | df1cd4f48442a8e79d45e35c255cd28851dee63d | |
| parent | d142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8 (diff) | |
| download | rabbitmq-server-git-5bbde6d0a3eb1790d4965d76c8699d0187b74183.tar.gz | |
Move `-spec()` near their function
71 files changed, 1730 insertions, 1145 deletions
diff --git a/src/background_gc.erl b/src/background_gc.erl index 43c109ee2a..7aea28c1f4 100644 --- a/src/background_gc.erl +++ b/src/background_gc.erl @@ -32,14 +32,12 @@ %%---------------------------------------------------------------------------- -spec start_link() -> {'ok', pid()} | {'error', any()}. --spec run() -> 'ok'. --spec gc() -> 'ok'. - -%%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], [{timeout, infinity}]). +-spec run() -> 'ok'. + run() -> gen_server2:cast(?MODULE, run). %%---------------------------------------------------------------------------- @@ -73,6 +71,8 @@ interval_gc(State = #state{last_interval = LastInterval}) -> erlang:send_after(Interval, self(), run), State#state{last_interval = Interval}. +-spec gc() -> 'ok'. + gc() -> Enabled = rabbit_misc:get_env(rabbit, background_gc_enabled, false), case Enabled of diff --git a/src/dtree.erl b/src/dtree.erl index 08ddd22532..fd2188de29 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -46,24 +46,17 @@ -type val() :: any(). -type kv() :: {pk(), val()}. --spec empty() -> ?MODULE(). --spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE(). --spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}. --spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. --spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}. --spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. --spec drop(pk(), ?MODULE()) -> ?MODULE(). --spec is_defined(sk(), ?MODULE()) -> boolean(). --spec is_empty(?MODULE()) -> boolean(). --spec smallest(?MODULE()) -> kv(). --spec size(?MODULE()) -> non_neg_integer(). - %%---------------------------------------------------------------------------- +-spec empty() -> ?MODULE(). + empty() -> {gb_trees:empty(), gb_trees:empty()}. %% Insert an entry. Fails if there already is an entry with the given %% primary key. + +-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE(). + insert(PK, [], V, {P, S}) -> %% dummy insert to force error if PK exists _ = gb_trees:insert(PK, {gb_sets:empty(), V}, P), @@ -84,6 +77,9 @@ insert(PK, SKs, V, {P, S}) -> %% that were dropped as the result (i.e. due to their secondary key %% set becoming empty). It is ok for the given primary keys and/or %% secondary key to not exist. + +-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}. + take(PKs, SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; @@ -101,6 +97,9 @@ take(PKs, SK, {P, S}) -> %% primary-key/value pairs of any entries that were dropped as the %% result (i.e. due to their secondary key set becoming empty). It is %% ok for the given secondary key to not exist. + +-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. + take(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; @@ -111,6 +110,9 @@ take(SK, {P, S}) -> %% Drop an entry with the primary key and clears secondary keys for this key, %% returning a list with a key-value pair as a result. %% If the primary key does not exist, returns an empty list. + +-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}. + take_one(PK, {P, S}) -> case gb_trees:lookup(PK, P) of {value, {SKS, Value}} -> @@ -131,6 +133,9 @@ take_one(PK, {P, S}) -> %% Drop all entries which contain the given secondary key, returning %% the primary-key/value pairs of these entries. It is ok for the %% given secondary key to not exist. + +-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. + take_all(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; @@ -139,6 +144,9 @@ take_all(SK, {P, S}) -> end. %% Drop all entries for the given primary key (which does not have to exist). + +-spec drop(pk(), ?MODULE()) -> ?MODULE(). + drop(PK, {P, S}) -> case gb_trees:lookup(PK, P) of none -> {P, S}; @@ -146,13 +154,21 @@ drop(PK, {P, S}) -> prune(SKS, gb_sets:singleton(PK), S)} end. +-spec is_defined(sk(), ?MODULE()) -> boolean(). + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). +-spec is_empty(?MODULE()) -> boolean(). + is_empty({P, _S}) -> gb_trees:is_empty(P). +-spec smallest(?MODULE()) -> kv(). + smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), {K, V}. +-spec size(?MODULE()) -> non_neg_integer(). + size({P, _S}) -> gb_trees:size(P). %%---------------------------------------------------------------------------- diff --git a/src/gatherer.erl b/src/gatherer.erl index a8b55892c1..1625468a52 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -39,16 +39,6 @@ %%---------------------------------------------------------------------------- --spec start_link() -> rabbit_types:ok_pid_or_error(). --spec stop(pid()) -> 'ok'. --spec fork(pid()) -> 'ok'. --spec finish(pid()) -> 'ok'. --spec in(pid(), any()) -> 'ok'. --spec sync_in(pid(), any()) -> 'ok'. --spec out(pid()) -> {'value', any()} | 'empty'. - -%%---------------------------------------------------------------------------- - -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -58,24 +48,38 @@ %%---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). +-spec stop(pid()) -> 'ok'. + stop(Pid) -> gen_server2:call(Pid, stop, infinity). +-spec fork(pid()) -> 'ok'. + fork(Pid) -> gen_server2:call(Pid, fork, infinity). +-spec finish(pid()) -> 'ok'. + finish(Pid) -> gen_server2:cast(Pid, finish). +-spec in(pid(), any()) -> 'ok'. + in(Pid, Value) -> gen_server2:cast(Pid, {in, Value}). +-spec sync_in(pid(), any()) -> 'ok'. + sync_in(Pid, Value) -> gen_server2:call(Pid, {in, Value}, infinity). +-spec out(pid()) -> {'value', any()} | 'empty'. + out(Pid) -> gen_server2:call(Pid, out, infinity). diff --git a/src/gm.erl b/src/gm.erl index 427fa78f4e..02ee76cd60 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -436,16 +436,6 @@ -type group_name() :: any(). -type txn_fun() :: fun((fun(() -> any())) -> any()). --spec create_tables() -> 'ok' | {'aborted', any()}. --spec start_link(group_name(), atom(), any(), txn_fun()) -> - rabbit_types:ok_pid_or_error(). --spec leave(pid()) -> 'ok'. --spec broadcast(pid(), any()) -> 'ok'. --spec confirmed_broadcast(pid(), any()) -> 'ok'. --spec info(pid()) -> rabbit_types:infos(). --spec validate_members(pid(), [pid()]) -> 'ok'. --spec forget_group(group_name()) -> 'ok'. - %% The joined, members_changed and handle_msg callbacks can all return %% any of the following terms: %% @@ -490,6 +480,8 @@ -callback handle_terminate(Args :: term(), Reason :: term()) -> ok | term(). +-spec create_tables() -> 'ok' | {'aborted', any()}. + create_tables() -> create_tables([?TABLE]). @@ -506,27 +498,42 @@ table_definitions() -> {Name, Attributes} = ?TABLE, [{Name, [?TABLE_MATCH | Attributes]}]. +-spec start_link(group_name(), atom(), any(), txn_fun()) -> + rabbit_types:ok_pid_or_error(). + start_link(GroupName, Module, Args, TxnFun) -> gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], [{spawn_opt, [{fullsweep_after, 0}]}]). +-spec leave(pid()) -> 'ok'. + leave(Server) -> gen_server2:cast(Server, leave). +-spec broadcast(pid(), any()) -> 'ok'. + broadcast(Server, Msg) -> broadcast(Server, Msg, 0). broadcast(Server, Msg, SizeHint) -> gen_server2:cast(Server, {broadcast, Msg, SizeHint}). +-spec confirmed_broadcast(pid(), any()) -> 'ok'. + confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). +-spec info(pid()) -> rabbit_types:infos(). + info(Server) -> gen_server2:call(Server, info, infinity). +-spec validate_members(pid(), [pid()]) -> 'ok'. + validate_members(Server, Members) -> gen_server2:cast(Server, {validate_members, Members}). +-spec forget_group(group_name()) -> 'ok'. + forget_group(GroupName) -> {atomic, ok} = mnesia:sync_transaction( fun () -> diff --git a/src/lqueue.erl b/src/lqueue.erl index acfdbe79ef..820f9f4a2f 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -36,64 +36,76 @@ -type result(T) :: 'empty' | {'value', T}. -spec new() -> ?MODULE(_). --spec drop(?MODULE(T)) -> ?MODULE(T). --spec is_empty(?MODULE(_)) -> boolean(). --spec len(?MODULE(_)) -> non_neg_integer(). --spec in(T, ?MODULE(T)) -> ?MODULE(T). --spec in_r(value(), ?MODULE()) -> ?MODULE(). --spec out(?MODULE(T)) -> {result(T), ?MODULE()}. --spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}. --spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B). --spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B. --spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B. --spec from_list([T]) -> ?MODULE(T). --spec to_list(?MODULE(T)) -> [T]. -% -spec peek(?MODULE()) -> result(). --spec peek(?MODULE(T)) -> result(T). --spec peek_r(?MODULE(T)) -> result(T). new() -> {0, ?QUEUE:new()}. +-spec drop(?MODULE(T)) -> ?MODULE(T). + drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}. +-spec is_empty(?MODULE(_)) -> boolean(). + is_empty({0, _Q}) -> true; is_empty(_) -> false. +-spec in(T, ?MODULE(T)) -> ?MODULE(T). + in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}. +-spec in_r(value(), ?MODULE()) -> ?MODULE(). + in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}. +-spec out(?MODULE(T)) -> {result(T), ?MODULE()}. + out({0, _Q} = Q) -> {empty, Q}; out({L, Q}) -> {Result, Q1} = ?QUEUE:out(Q), {Result, {L-1, Q1}}. +-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}. + out_r({0, _Q} = Q) -> {empty, Q}; out_r({L, Q}) -> {Result, Q1} = ?QUEUE:out_r(Q), {Result, {L-1, Q1}}. +-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B). + join({L1, Q1}, {L2, Q2}) -> {L1 + L2, ?QUEUE:join(Q1, Q2)}. +-spec to_list(?MODULE(T)) -> [T]. + to_list({_L, Q}) -> ?QUEUE:to_list(Q). +-spec from_list([T]) -> ?MODULE(T). + from_list(L) -> {length(L), ?QUEUE:from_list(L)}. +-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B. + foldl(Fun, Init, Q) -> case out(Q) of {empty, _Q} -> Init; {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1) end. +-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B. + foldr(Fun, Init, Q) -> case out_r(Q) of {empty, _Q} -> Init; {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) end. +-spec len(?MODULE(_)) -> non_neg_integer(). + len({L, _}) -> L. +-spec peek(?MODULE(T)) -> result(T). peek({ 0, _Q}) -> empty; peek({_L, Q}) -> ?QUEUE:peek(Q). +-spec peek_r(?MODULE(T)) -> result(T). + peek_r({ 0, _Q}) -> empty; peek_r({_L, Q}) -> ?QUEUE:peek_r(Q). diff --git a/src/pg_local.erl b/src/pg_local.erl index 0ed7e9d85d..3f03c97182 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -44,15 +44,6 @@ -type name() :: term(). --spec start_link() -> {'ok', pid()} | {'error', any()}. --spec start() -> {'ok', pid()} | {'error', any()}. --spec join(name(), pid()) -> 'ok'. --spec leave(name(), pid()) -> 'ok'. --spec get_members(name()) -> [pid()]. --spec in_group(name(), pid()) -> boolean(). - --spec sync() -> 'ok'. - %%---------------------------------------------------------------------------- -define(TABLE, pg_local_table). @@ -61,24 +52,36 @@ %%% Exported functions %%% +-spec start_link() -> {'ok', pid()} | {'error', any()}. + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec start() -> {'ok', pid()} | {'error', any()}. + start() -> ensure_started(). +-spec join(name(), pid()) -> 'ok'. + join(Name, Pid) when is_pid(Pid) -> _ = ensure_started(), gen_server:cast(?MODULE, {join, Name, Pid}). +-spec leave(name(), pid()) -> 'ok'. + leave(Name, Pid) when is_pid(Pid) -> _ = ensure_started(), gen_server:cast(?MODULE, {leave, Name, Pid}). +-spec get_members(name()) -> [pid()]. + get_members(Name) -> _ = ensure_started(), group_members(Name). +-spec in_group(name(), pid()) -> boolean(). + in_group(Name, Pid) -> _ = ensure_started(), %% The join message is a cast and thus can race, but we want to @@ -89,6 +92,8 @@ in_group(Name, Pid) -> member_present(Name, Pid) end. +-spec sync() -> 'ok'. + sync() -> _ = ensure_started(), gen_server:call(?MODULE, sync, infinity). diff --git a/src/rabbit.erl b/src/rabbit.erl index e06e50561d..2d16661768 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,6 +33,8 @@ -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent -export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]). +-deprecated([{force_event_refresh, 1, eventually}]). + -ifdef(TEST). -export([start_logger/0]). @@ -257,42 +259,6 @@ -type param() :: atom(). -type app_name() :: atom(). --spec start() -> 'ok'. --spec boot() -> 'ok'. --spec stop() -> 'ok'. --spec stop_and_halt() -> no_return(). - --spec status - () -> [{pid, integer()} | - {running_applications, [{atom(), string(), string()}]} | - {os, {atom(), atom()}} | - {erlang_version, string()} | - {memory, any()}]. --spec is_running() -> boolean(). --spec is_running(node()) -> boolean(). --spec environment() -> [{param(), term()}]. --spec rotate_logs() -> rabbit_types:ok_or_error(any()). --deprecated([{force_event_refresh, 1, eventually}]). --spec force_event_refresh(reference()) -> 'ok'. - --spec log_locations() -> [log_location()]. - --spec start('normal',[]) -> - {'error', - {'erlang_version_too_old', - {'found',string(),string()}, - {'required',string(),string()}}} | - {'ok',pid()}. --spec stop(_) -> 'ok'. - --spec maybe_insert_default_data() -> 'ok'. --spec boot_delegate() -> 'ok'. --spec recover() -> 'ok'. --spec start_apps([app_name()]) -> 'ok'. --spec start_apps([app_name()], - #{app_name() => restart_type()}) -> 'ok'. --spec stop_apps([app_name()]) -> 'ok'. - %%---------------------------------------------------------------------------- ensure_application_loaded() -> @@ -304,6 +270,8 @@ ensure_application_loaded() -> {error, {already_loaded, rabbit}} -> ok end. +-spec start() -> 'ok'. + start() -> start_it(fun() -> %% We do not want to upgrade mnesia after just @@ -317,6 +285,8 @@ start() -> broker_start() end). +-spec boot() -> 'ok'. + boot() -> start_it(fun() -> ensure_config(), @@ -494,6 +464,8 @@ start_it(StartFun) -> Marker ! stop end. +-spec stop() -> 'ok'. + stop() -> case whereis(rabbit_boot) of undefined -> ok; @@ -508,6 +480,8 @@ stop() -> stop_apps(app_utils:app_dependency_order(Apps, true)), rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []). +-spec stop_and_halt() -> no_return(). + stop_and_halt() -> try stop() @@ -533,9 +507,14 @@ stop_and_halt() -> end, ok. +-spec start_apps([app_name()]) -> 'ok'. + start_apps(Apps) -> start_apps(Apps, #{}). +-spec start_apps([app_name()], + #{app_name() => restart_type()}) -> 'ok'. + start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), rabbit_feature_flags:initialize_registry(), @@ -678,6 +657,8 @@ decrypt_list([{Key, Value}|Tail], Algo, Acc) when Key =/= encrypted -> decrypt_list([Value|Tail], Algo, Acc) -> decrypt_list(Tail, Algo, [decrypt(Value, Algo)|Acc]). +-spec stop_apps([app_name()]) -> 'ok'. + stop_apps([]) -> ok; stop_apps(Apps) -> @@ -712,16 +693,19 @@ is_booting(Node) -> -spec await_startup() -> 'ok' | {'error', 'timeout'}. + await_startup() -> await_startup(node(), false). -spec await_startup(node() | non_neg_integer()) -> 'ok' | {'error', 'timeout'}. + await_startup(Node) when is_atom(Node) -> await_startup(Node, false); await_startup(Timeout) when is_integer(Timeout) -> await_startup(node(), false, Timeout). -spec await_startup(node(), boolean()) -> 'ok' | {'error', 'timeout'}. + await_startup(Node, PrintProgressReports) -> case is_booting(Node) of true -> wait_for_boot_to_finish(Node, PrintProgressReports); @@ -734,6 +718,7 @@ await_startup(Node, PrintProgressReports) -> end. -spec await_startup(node(), boolean(), non_neg_integer()) -> 'ok' | {'error', 'timeout'}. + await_startup(Node, PrintProgressReports, Timeout) -> case is_booting(Node) of true -> wait_for_boot_to_finish(Node, PrintProgressReports, Timeout); @@ -805,6 +790,13 @@ maybe_print_boot_progress(true, IterationsLeft) -> _ -> ok end. +-spec status + () -> [{pid, integer()} | + {running_applications, [{atom(), string(), string()}]} | + {os, {atom(), atom()}} | + {erlang_version, string()} | + {memory, any()}]. + status() -> S1 = [{pid, list_to_integer(os:getpid())}, %% The timeout value used is twice that of gen_server:call/2. @@ -860,8 +852,13 @@ listeners() -> %% TODO this only determines if the rabbit application has started, %% not if it is running, never mind plugins. It would be nice to have %% more nuance here. + +-spec is_running() -> boolean(). + is_running() -> is_running(node()). +-spec is_running(node()) -> boolean(). + is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). is_booted() -> is_booted(node()). @@ -873,6 +870,8 @@ is_booted(Node) -> _ -> false end. +-spec environment() -> [{param(), term()}]. + environment() -> %% The timeout value is twice that of gen_server:call/2. [{A, environment(A)} || @@ -883,6 +882,8 @@ environment(App) -> lists:keysort(1, [P || P = {K, _} <- application:get_all_env(App), not lists:member(K, Ignore)]). +-spec rotate_logs() -> rabbit_types:ok_or_error(any()). + rotate_logs() -> rabbit_lager:fold_sinks( fun @@ -906,6 +907,13 @@ rotate_logs() -> %%-------------------------------------------------------------------- +-spec start('normal',[]) -> + {'error', + {'erlang_version_too_old', + {'found',string(),string()}, + {'required',string(),string()}}} | + {'ok',pid()}. + start(normal, []) -> case erts_version_check() of ok -> @@ -938,6 +946,8 @@ prep_stop(State) -> rabbit_peer_discovery:maybe_unregister(), State. +-spec stop(_) -> 'ok'. + stop(_State) -> ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of @@ -992,14 +1002,20 @@ log_boot_error_and_exit(Reason, Format, Args) -> %%--------------------------------------------------------------------------- %% boot step functions +-spec boot_delegate() -> 'ok'. + boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), rabbit_sup:start_supervisor_child(delegate_sup, [Count]). +-spec recover() -> 'ok'. + recover() -> rabbit_policy:recover(), rabbit_vhost:recover(). +-spec maybe_insert_default_data() -> 'ok'. + maybe_insert_default_data() -> case rabbit_table:needs_default_data() of true -> insert_default_data(); @@ -1044,12 +1060,17 @@ start_logger() -> rabbit_lager:start_logger(), ok. +-spec log_locations() -> [log_location()]. + log_locations() -> rabbit_lager:log_locations(). %% This feature was used by the management API up-to and including %% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it %% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters). + +-spec force_event_refresh(reference()) -> 'ok'. + force_event_refresh(Ref) -> rabbit_direct:force_event_refresh(Ref), rabbit_networking:force_connection_event_refresh(Ref), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 4356cb427f..44e72f7ce4 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -27,29 +27,20 @@ -type permission_atom() :: 'configure' | 'read' | 'write'. +%%---------------------------------------------------------------------------- + -spec check_user_pass_login (rabbit_types:username(), rabbit_types:password()) -> {'ok', rabbit_types:user()} | {'refused', rabbit_types:username(), string(), [any()]}. + +check_user_pass_login(Username, Password) -> + check_user_login(Username, [{password, Password}]). + -spec check_user_login (rabbit_types:username(), [{atom(), any()}]) -> {'ok', rabbit_types:user()} | {'refused', rabbit_types:username(), string(), [any()]}. --spec check_user_loopback - (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) -> - 'ok' | 'not_allowed'. --spec check_vhost_access - (rabbit_types:user(), rabbit_types:vhost(), - rabbit_net:socket() | #authz_socket_info{}) -> - 'ok' | rabbit_types:channel_exit(). --spec check_resource_access - (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> - 'ok' | rabbit_types:channel_exit(). - -%%---------------------------------------------------------------------------- - -check_user_pass_login(Username, Password) -> - check_user_login(Username, [{password, Password}]). check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), @@ -122,6 +113,10 @@ auth_user(#user{username = Username, tags = Tags}, Impl) -> tags = Tags, impl = Impl}. +-spec check_user_loopback + (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) -> + 'ok' | 'not_allowed'. + check_user_loopback(Username, SockOrAddr) -> {ok, Users} = application:get_env(rabbit, loopback_users), case rabbit_net:is_loopback(SockOrAddr) @@ -130,6 +125,11 @@ check_user_loopback(Username, SockOrAddr) -> false -> not_allowed end. +-spec check_vhost_access + (rabbit_types:user(), rabbit_types:vhost(), + rabbit_net:socket() | #authz_socket_info{}) -> + 'ok' | rabbit_types:channel_exit(). + check_vhost_access(User = #user{username = Username, authz_backends = Modules}, VHostPath, Sock) -> lists:foldl( @@ -146,6 +146,10 @@ check_vhost_access(User = #user{username = Username, Else end, ok, Modules). +-spec check_resource_access + (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> + 'ok' | rabbit_types:channel_exit(). + check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, Permission) -> check_resource_access(User, R#resource{name = <<"amq.default">>}, diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 789992cb3d..6f33e02fe4 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -52,21 +52,15 @@ -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). --spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start() -> 'ok'. --spec stop() -> 'ok'. --spec register(pid(), rabbit_types:mfargs()) -> [atom()]. --spec set_alarm({alarm(), []}) -> 'ok'. --spec clear_alarm(alarm()) -> 'ok'. --spec on_node_up(node()) -> 'ok'. --spec on_node_down(node()) -> 'ok'. --spec get_alarms() -> [{alarm(), []}]. - %%---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_event:start_link({local, ?SERVER}). +-spec start() -> 'ok'. + start() -> ok = rabbit_sup:start_restartable_child(?MODULE), ok = gen_event:add_handler(?SERVER, ?MODULE, []), @@ -84,21 +78,38 @@ start() -> rabbit_disk_monitor, [DiskLimit]), ok. +-spec stop() -> 'ok'. + stop() -> ok. %% Registers a handler that should be called on every resource alarm change. %% Given a call rabbit_alarm:register(Pid, {M, F, A}), the handler would be %% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' %% has the type of resource_alarm_source() and `Alert' has the type of resource_alert(). + +-spec register(pid(), rabbit_types:mfargs()) -> [atom()]. + register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). +-spec set_alarm({alarm(), []}) -> 'ok'. + set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}). + +-spec clear_alarm(alarm()) -> 'ok'. + clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}). +-spec get_alarms() -> [{alarm(), []}]. + get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity). +-spec on_node_up(node()) -> 'ok'. + on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}). + +-spec on_node_down(node()) -> 'ok'. + on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}). remote_conserve_resources(Pid, Source, {true, _, _}) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 118a3d0fab..621ce95d41 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -51,6 +51,8 @@ -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). +-deprecated([{force_event_refresh, 1, eventually}]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -84,150 +86,6 @@ {'absent', amqqueue:amqqueue(),absent_reason()}. -type not_found_or_absent() :: 'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. --spec stop(rabbit_types:vhost()) -> 'ok'. --spec start([amqqueue:amqqueue()]) -> 'ok'. --spec declare - (name(), boolean(), boolean(), rabbit_framing:amqp_table(), - rabbit_types:maybe(pid()), rabbit_types:username()) -> - {'new' | 'existing' | 'absent' | 'owner_died', - amqqueue:amqqueue()} | - {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | - rabbit_types:channel_exit(). --spec declare - (name(), boolean(), boolean(), rabbit_framing:amqp_table(), - rabbit_types:maybe(pid()), rabbit_types:username(), node()) -> - {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | - {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | - {'absent', amqqueue:amqqueue(), absent_reason()} | - rabbit_types:channel_exit(). --spec internal_declare(amqqueue:amqqueue(), boolean()) -> - queue_or_absent() | rabbit_misc:thunk(queue_or_absent()). --spec update - (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> - 'not_found' | amqqueue:amqqueue(). --spec lookup - (name()) -> - rabbit_types:ok(amqqueue:amqqueue()) | - rabbit_types:error('not_found'); - ([name()]) -> - [amqqueue:amqqueue()]. --spec not_found_or_absent(name()) -> not_found_or_absent(). --spec with(name(), qfun(A)) -> - A | rabbit_types:error(not_found_or_absent()). --spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B. --spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). --spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit(). --spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) -> - rabbit_types:channel_exit(). --spec assert_equivalence - (amqqueue:amqqueue(), boolean(), boolean(), - rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> - 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). --spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> - 'ok' | rabbit_types:channel_exit(). --spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> - A | rabbit_types:channel_exit(). --spec list() -> [amqqueue:amqqueue()]. --spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. --spec list_names() -> [rabbit_amqqueue:name()]. --spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. --spec list_by_type(atom()) -> [amqqueue:amqqueue()]. --spec info_keys() -> rabbit_types:info_keys(). --spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). --spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> - rabbit_types:infos(). --spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. --spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> - [rabbit_types:infos()]. --deprecated([{force_event_refresh, 1, eventually}]). --spec force_event_refresh(reference()) -> 'ok'. --spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. --spec consumers(amqqueue:amqqueue()) -> - [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), - rabbit_framing:amqp_table()}]. --spec consumer_info_keys() -> rabbit_types:info_keys(). --spec consumers_all(rabbit_types:vhost()) -> - [{name(), pid(), rabbit_types:ctag(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table()}]. --spec stat(amqqueue:amqqueue()) -> - {'ok', non_neg_integer(), non_neg_integer()}. --spec delete_immediately(qpids()) -> 'ok'. --spec delete_exclusive(qpids(), pid()) -> 'ok'. --spec delete - (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> - qlen(); - (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> - qlen() | rabbit_types:error('in_use'); - (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> - qlen() | rabbit_types:error('not_empty'); - (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> - qlen() | - rabbit_types:error('in_use') | - rabbit_types:error('not_empty'). --spec delete_crashed(amqqueue:amqqueue()) -> 'ok'. --spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'. --spec purge(amqqueue:amqqueue()) -> {ok, qlen()}. --spec forget_all_durable(node()) -> 'ok'. --spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') -> - {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}. --spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'. --spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. --spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. --spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. --spec notify_down_all(qpids(), pid()) -> ok_or_errors(). --spec notify_down_all(qpids(), pid(), non_neg_integer()) -> - ok_or_errors(). --spec activate_limit_all(qpids(), pid()) -> ok_or_errors(). --spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> - {'ok', non_neg_integer(), qmsg()} | 'empty'. --spec credit - (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), - boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) -> - 'ok'. --spec basic_consume - (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(), - non_neg_integer(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table(), any(), rabbit_types:username(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> - rabbit_types:ok_or_error('exclusive_consume_unavailable'). --spec basic_cancel - (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(), - rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) -> - 'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}. --spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. --spec resume(pid(), pid()) -> 'ok'. --spec internal_delete(name(), rabbit_types:username()) -> - 'ok' | rabbit_types:connection_exit() | - fun (() -> - 'ok' | rabbit_types:connection_exit()). --spec run_backing_queue - (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> - 'ok'. --spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'. --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. --spec on_node_up(node()) -> 'ok'. --spec on_node_down(node()) -> 'ok'. --spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue(). --spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). --spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). --spec store_queue(amqqueue:amqqueue()) -> 'ok'. --spec update_decorators(name()) -> 'ok'. --spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> - 'ok'. --spec update_mirroring(pid()) -> 'ok'. --spec sync_mirrors(amqqueue:amqqueue() | pid()) -> - 'ok' | rabbit_types:error('not_mirrored'). --spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) -> - 'ok' | {'ok', 'not_syncing'}. --spec is_replicated(amqqueue:amqqueue()) -> boolean(). - --spec pid_of(amqqueue:amqqueue()) -> - {'ok', pid()} | rabbit_types:error('not_found'). --spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> - {'ok', pid()} | rabbit_types:error('not_found'). %%---------------------------------------------------------------------------- @@ -250,6 +108,8 @@ warn_file_limit() -> ok end. +-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. + recover(VHost) -> Classic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), @@ -278,11 +138,14 @@ filter_pid_per_type(QPids) -> filter_resource_per_type(Resources) -> Queues = [begin - {ok, #amqqueue{pid = QPid}} = lookup(Resource), + {ok, Q} = lookup(Resource), + QPid = amqqueue:get_pid(Q), {Resource, QPid} end || Resource <- Resources], lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). +-spec stop(rabbit_types:vhost()) -> 'ok'. + stop(VHost) -> %% Classic queues ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), @@ -290,6 +153,8 @@ stop(VHost) -> ok = BQ:stop(VHost), rabbit_quorum_queue:stop(VHost). +-spec start([amqqueue:amqqueue()]) -> 'ok'. + start(Qs) -> {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are @@ -366,6 +231,14 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. +-spec declare + (name(), boolean(), boolean(), rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), rabbit_types:username()) -> + {'new' | 'existing' | 'absent' | 'owner_died', + amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | + rabbit_types:channel_exit(). + declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()). @@ -373,6 +246,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> %% The Node argument suggests where the queue (master if mirrored) %% should be. Note that in some cases (e.g. with "nodes" policy in %% effect) this might not be possible to satisfy. + +-spec declare + (name(), boolean(), boolean(), rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), rabbit_types:username(), node()) -> + {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | + {'absent', amqqueue:amqqueue(), absent_reason()} | + rabbit_types:channel_exit(). + declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, Owner, ActingUser, Node) -> ok = check_declare_arguments(QueueName, Args), @@ -434,6 +316,9 @@ get_queue_type(Args) -> erlang:binary_to_existing_atom(V, utf8) end. +-spec internal_declare(amqqueue:amqqueue(), boolean()) -> + queue_or_absent() | rabbit_misc:thunk(queue_or_absent()). + internal_declare(Q, Recover) -> ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( do_internal_declare(Q, Recover), @@ -466,6 +351,10 @@ do_internal_declare(Q, false) -> end end). +-spec update + (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> + 'not_found' | amqqueue:amqqueue(). + update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of [Q] -> @@ -498,6 +387,8 @@ do_ensure_rabbit_queue_record_is_initialized(Q) -> rabbit_misc:const({ok, Q}) end). +-spec store_queue(amqqueue:amqqueue()) -> 'ok'. + store_queue(Q) when ?amqqueue_is_durable(Q) -> Q1 = amqqueue:reset_mirroring_and_decorators(Q), ok = mnesia:write(rabbit_durable_queue, Q1, write), @@ -508,6 +399,8 @@ store_queue(Q) when not ?amqqueue_is_durable(Q) -> store_queue_ram(Q) -> ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). +-spec update_decorators(name()) -> 'ok'. + update_decorators(Name) -> rabbit_misc:execute_mnesia_transaction( fun() -> @@ -518,6 +411,9 @@ update_decorators(Name) -> end end). +-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> + 'ok'. + policy_changed(Q1, Q2) -> Decorators1 = amqqueue:get_decorators(Q1), Decorators2 = amqqueue:get_decorators(Q2), @@ -529,6 +425,13 @@ policy_changed(Q1, Q2) -> %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). +-spec lookup + (name()) -> + rabbit_types:ok(amqqueue:amqqueue()) | + rabbit_types:error('not_found'); + ([name()]) -> + [amqqueue:amqqueue()]. + lookup([]) -> []; %% optimisation lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> @@ -538,6 +441,8 @@ lookup(Names) when is_list(Names) -> lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). +-spec not_found_or_absent(name()) -> not_found_or_absent(). + not_found_or_absent(Name) -> %% NB: we assume that the caller has already performed a lookup on %% rabbit_queue and not found anything @@ -555,6 +460,8 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B. + with(Name, F, E) -> with(Name, F, E, 2000). @@ -620,15 +527,25 @@ retry_wait(Q, F, E, RetriesLeft) -> with(Name, F, E, RetriesLeft - 1) end. +-spec with(name(), qfun(A)) -> + A | rabbit_types:error(not_found_or_absent()). + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). +-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). + with_or_die(Name, F) -> with(Name, F, fun (not_found) -> not_found(Name); ({absent, Q, Reason}) -> absent(Q, Reason) end). +-spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit(). + not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]). +-spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) -> + rabbit_types:channel_exit(). + absent(Q, AbsentReason) -> QueueName = amqqueue:get_name(Q), QPid = amqqueue:get_pid(Q), @@ -660,6 +577,11 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) -> not_found, "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]). +-spec assert_equivalence + (amqqueue:amqqueue(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> + 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). + assert_equivalence(Q, Durable1, AD1, Args1, Owner) -> QName = amqqueue:get_name(Q), Durable = amqqueue:is_durable(Q), @@ -669,6 +591,9 @@ assert_equivalence(Q, Durable1, AD1, Args1, Owner) -> assert_args_equivalence(Q, Args1), check_exclusive_access(Q, Owner, strict). +-spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> + 'ok' | rabbit_types:channel_exit(). + check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). check_exclusive_access(Q, Owner, _MatchType) @@ -684,6 +609,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) -> "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QueueName)]). +-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> + A | rabbit_types:channel_exit(). + with_exclusive_access_or_die(Name, ReaderPid, F) -> with_or_die(Name, fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). @@ -817,6 +745,7 @@ check_queue_type({longstr, Val}, _Args) -> check_queue_type({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +-spec list() -> [amqqueue:amqqueue()]. list() -> list_with_possible_retry(fun do_list/0). @@ -824,6 +753,8 @@ list() -> do_list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). +-spec list_names() -> [rabbit_amqqueue:name()]. + list_names() -> mnesia:dirty_all_keys(rabbit_queue). list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)]. @@ -832,6 +763,8 @@ list_local_names() -> [ amqqueue:get_name(Q) || Q <- list(), amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. +-spec list_by_type(atom()) -> [amqqueue:amqqueue()]. + list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( @@ -853,6 +786,8 @@ is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> Node =:= Leader. +-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. + list(VHostPath) -> list(VHostPath, rabbit_queue). @@ -902,6 +837,8 @@ list_with_possible_retry(Fun) -> Ret end. +-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. + list_down(VHostPath) -> case rabbit_vhost:exists(VHostPath) of false -> []; @@ -937,6 +874,8 @@ list_for_count(VHost) -> amqqueue:field_vhost()) end). +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). @@ -957,6 +896,8 @@ is_unresponsive(Q, Timeout) -> format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); format(_) -> []. +-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). + info(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q); info(Q) when ?amqqueue_state_is(Q, crashed) -> info_down(Q, crashed); info(Q) when ?amqqueue_state_is(Q, stopped) -> info_down(Q, stopped); @@ -964,6 +905,9 @@ info(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). +-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> + rabbit_types:infos(). + info(Q, Items) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q, Items); info(Q, Items) when ?amqqueue_state_is(Q, crashed) -> @@ -996,10 +940,15 @@ i_down(K, _Q, _DownReason) -> false -> throw({bad_argument, K}) end. +-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. + info_all(VHostPath) -> map(list(VHostPath), fun (Q) -> info(Q) end) ++ map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end). +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> + [rabbit_types:infos()]. + info_all(VHostPath, Items) -> map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). @@ -1038,11 +987,15 @@ list_local(VHostPath) -> [Q || Q <- list(VHostPath), amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. +-spec force_event_refresh(reference()) -> 'ok'. + force_event_refresh(Ref) -> [gen_server2:cast(amqqueue:get_pid(Q), {force_event_refresh, Ref}) || Q <- list()], ok. +-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. + notify_policy_changed(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), gen_server2:cast(QPid, policy_changed); @@ -1051,6 +1004,10 @@ notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), rabbit_quorum_queue:policy_changed(QName, QPid). +-spec consumers(amqqueue:amqqueue()) -> + [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), + rabbit_framing:amqp_table()}]. + consumers(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); @@ -1060,8 +1017,14 @@ consumers(Q) when ?amqqueue_is_quorum(Q) -> fun rabbit_fifo:query_consumers/1), maps:values(Result). +-spec consumer_info_keys() -> rabbit_types:info_keys(). + consumer_info_keys() -> ?CONSUMER_INFO_KEYS. +-spec consumers_all(rabbit_types:vhost()) -> + [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]. + consumers_all(VHostPath) -> ConsumerInfoKeys = consumer_info_keys(), lists:append( @@ -1086,20 +1049,34 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) -> AckRequired, Prefetch, Active, ActivityStatus, Args]) || {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. +-spec stat(amqqueue:amqqueue()) -> + {'ok', non_neg_integer(), non_neg_integer()}. + stat(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:stat(Q); stat(Q) -> delegate:invoke(amqqueue:get_pid(Q), {gen_server2, call, [stat, infinity]}). +-spec pid_of(amqqueue:amqqueue()) -> + {'ok', pid()} | rabbit_types:error('not_found'). + pid_of(Q) -> amqqueue:get_pid(Q). + +-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> + {'ok', pid()} | rabbit_types:error('not_found'). + pid_of(VHost, QueueName) -> case lookup(rabbit_misc:r(VHost, queue, QueueName)) of {ok, Q} -> pid_of(Q); {error, not_found} = E -> E end. +-spec delete_exclusive(qpids(), pid()) -> 'ok'. + delete_exclusive(QPids, ConnId) -> [gen_server2:cast(QPid, {delete_exclusive, ConnId}) || QPid <- QPids], ok. +-spec delete_immediately(qpids()) -> 'ok'. + delete_immediately(QPids) -> {Classic, Quorum} = filter_pid_per_type(QPids), [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic], @@ -1115,6 +1092,18 @@ delete_immediately_by_resource(Resources) -> || {Resource, QPid} <- Quorum], ok. +-spec delete + (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> + qlen(); + (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> + qlen() | rabbit_types:error('in_use'); + (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> + qlen() | rabbit_types:error('not_empty'); + (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> + qlen() | + rabbit_types:error('in_use') | + rabbit_types:error('not_empty'). + delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:delete(Q, IfUnused, IfEmpty, ActingUser); @@ -1175,18 +1164,24 @@ wait_for_promoted_or_stopped(Q0) -> {error, not_found} end. +-spec delete_crashed(amqqueue:amqqueue()) -> 'ok'. + delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). delete_crashed(Q, ActingUser) -> ok = rpc:call(amqqueue:qnode(Q), ?MODULE, delete_crashed_internal, [Q, ActingUser]). +-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'. + delete_crashed_internal(Q, ActingUser) -> QName = amqqueue:get_name(Q), {ok, BQ} = application:get_env(rabbit, backing_queue_module), BQ:delete_crashed(Q), ok = internal_delete(QName, ActingUser). +-spec purge(amqqueue:amqqueue()) -> {ok, qlen()}. + purge(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}); @@ -1194,6 +1189,7 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). +-spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), @@ -1209,6 +1205,8 @@ requeue({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) QuorumStates end. +-spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. + ack(QPid, {_, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}), QueueStates; @@ -1223,6 +1221,9 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) QuorumStates end. +-spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(), + #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. + reject(QPid, Requeue, {_, MsgIds}, ChPid, QStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}), @@ -1239,9 +1240,14 @@ reject({Name, _} = QPid, Requeue, {CTag, MsgIds}, _ChPid, QuorumStates) QuorumStates end. +-spec notify_down_all(qpids(), pid()) -> ok_or_errors(). + notify_down_all(QPids, ChPid) -> notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). +-spec notify_down_all(qpids(), pid(), non_neg_integer()) -> + ok_or_errors(). + notify_down_all(QPids, ChPid, Timeout) -> case rpc:call(node(), delegate, invoke, [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of @@ -1259,11 +1265,18 @@ notify_down_all(QPids, ChPid, Timeout) -> Error -> {error, Error} end. +-spec activate_limit_all(qpids(), pid()) -> ok_or_errors(). + activate_limit_all(QRefs, ChPid) -> QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). +-spec credit + (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), + boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) -> + 'ok'. + credit(Q, ChPid, CTag, Credit, Drain, QStates) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), @@ -1279,6 +1292,9 @@ credit(Q, {ok, QState} = rabbit_quorum_queue:credit(CTag, Credit, Drain, QState0), {ok, maps:put(Name, QState, QStates)}. +-spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(), + #{Name :: atom() => rabbit_fifo_client:state()}) -> + {'ok', non_neg_integer(), qmsg()} | 'empty'. basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _) when ?amqqueue_is_classic(Q) -> @@ -1301,8 +1317,15 @@ basic_get(Q, _ChPid, NoAck, _LimiterPid, CTag, QStates) [rabbit_misc:rs(QName), Reason]) end. -basic_consume(Q, NoAck, ChPid, - LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, +-spec basic_consume + (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(), + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any(), rabbit_types:username(), + #{Name :: atom() => rabbit_fifo_client:state()}) -> + rabbit_types:ok_or_error('exclusive_consume_unavailable'). + +basic_consume(Q, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QState) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), @@ -1340,6 +1363,11 @@ basic_consume(Q, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. +-spec basic_cancel + (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(), + rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) -> + 'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}. + basic_cancel(Q, ChPid, ConsumerTag, OkMsg, ActingUser, QState) when ?amqqueue_is_classic(Q) -> @@ -1359,6 +1387,8 @@ basic_cancel(Q, ChPid, {ok, QState} = rabbit_quorum_queue:basic_cancel(ConsumerTag, ChPid, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. +-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. + notify_decorators(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). @@ -1369,6 +1399,8 @@ notify_sent(QPid, ChPid) -> notify_sent_queue_down(QPid) -> rabbit_amqqueue_common:notify_sent_queue_down(QPid). +-spec resume(pid(), pid()) -> 'ok'. + resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}). internal_delete1(QueueName, OnlyDurable) -> @@ -1389,6 +1421,11 @@ internal_delete1(QueueName, OnlyDurable, Reason) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName, OnlyDurable). +-spec internal_delete(name(), rabbit_types:username()) -> + 'ok' | rabbit_types:connection_exit() | + fun (() -> + 'ok' | rabbit_types:connection_exit()). + internal_delete(QueueName, ActingUser) -> internal_delete(QueueName, ActingUser, normal). @@ -1413,6 +1450,8 @@ internal_delete(QueueName, ActingUser, Reason) -> end end). +-spec forget_all_durable(node()) -> 'ok'. + forget_all_durable(Node) -> %% Note rabbit is not running so we avoid e.g. the worker pool. Also why %% we don't invoke the return from rabbit_binding:process_deletions/1. @@ -1474,29 +1513,48 @@ node_permits_offline_promotion(Node) -> %% %% [2] This is simpler; as long as it's down that's OK +-spec run_backing_queue + (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> + 'ok'. + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). +-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'. + set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). +-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. + set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +-spec update_mirroring(pid()) -> 'ok'. + update_mirroring(QPid) -> ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). +-spec sync_mirrors(amqqueue:amqqueue() | pid()) -> + 'ok' | rabbit_types:error('not_mirrored'). + sync_mirrors(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); sync_mirrors(QPid) -> delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). + +-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) -> + 'ok' | {'ok', 'not_syncing'}. + cancel_sync_mirrors(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); cancel_sync_mirrors(QPid) -> delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). +-spec is_replicated(amqqueue:amqqueue()) -> boolean(). + is_replicated(Q) when ?amqqueue_is_quorum(Q) -> true; is_replicated(Q) -> @@ -1508,6 +1566,8 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> Pid = amqqueue:get_pid(Q), not rabbit_mnesia:is_process_alive(Pid). +-spec on_node_up(node()) -> 'ok'. + on_node_up(Node) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -1548,6 +1608,8 @@ maybe_clear_recoverable_node(Node, Q) -> ok end. +-spec on_node_down(node()) -> 'ok'. + on_node_down(Node) -> {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node), notify_queue_binding_deletions(QueueDeletions), @@ -1613,9 +1675,13 @@ notify_queues_deleted(QueueDeletions) -> end, QueueDeletions). +-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue(). + pseudo_queue(QueueName, Pid) -> pseudo_queue(QueueName, Pid, false). +-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). + pseudo_queue(QueueName, Pid, Durable) -> amqqueue:new(QueueName, Pid, @@ -1628,12 +1694,19 @@ pseudo_queue(QueueName, Pid, Durable) -> classic % Type ). +-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). + immutable(Q) -> amqqueue:set_immutable(Q). +-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'. + deliver(Qs, Delivery) -> deliver(Qs, Delivery, untracked), ok. +-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') -> + {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}. + deliver([], _Delivery, QueueState) -> %% /dev/null optimisation {[], [], QueueState}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dd05b98e81..c3ba4a5c59 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,14 +102,6 @@ %%---------------------------------------------------------------------------- --spec info_keys() -> rabbit_types:info_keys(). --spec init_with_backing_queue_state - (amqqueue:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> - #q{}. - -%%---------------------------------------------------------------------------- - -define(STATISTICS_KEYS, [messages_ready, messages_unacknowledged, @@ -147,6 +139,8 @@ %%---------------------------------------------------------------------------- +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). @@ -265,6 +259,11 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. +-spec init_with_backing_queue_state + (amqqueue:amqqueue(), atom(), tuple(), any(), + [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> + #q{}. + init_with_backing_queue_state(Q, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> Owner = amqqueue:get_exclusive_owner(Q), diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 7c5a70b529..64efc01786 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -29,8 +29,6 @@ -spec start_link(amqqueue:amqqueue(), rabbit_prequeue:start_mode()) -> {'ok', pid(), pid()}. -%%---------------------------------------------------------------------------- - start_link(Q, StartMode) -> Marker = spawn_link(fun() -> receive stop -> ok end end), ChildSpec = {rabbit_amqqueue, diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 7e40e54aca..9e30a7f0ae 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -31,15 +31,14 @@ %%---------------------------------------------------------------------------- -spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start_queue_process - (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') -> - pid(). - -%%---------------------------------------------------------------------------- start_link() -> supervisor2:start_link(?MODULE, []). +-spec start_queue_process + (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') -> + pid(). + start_queue_process(Node, Q, StartMode) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), {ok, Sup} = find_for_vhost(VHost, Node), diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 83580e8b9c..5732ac4b30 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -47,46 +47,6 @@ -type regexp() :: binary(). --spec add_user(rabbit_types:username(), rabbit_types:password(), - rabbit_types:username()) -> 'ok' | {'error', string()}. --spec delete_user(rabbit_types:username(), rabbit_types:username()) -> 'ok'. --spec lookup_user - (rabbit_types:username()) -> - rabbit_types:ok(rabbit_types:internal_user()) | - rabbit_types:error('not_found'). --spec change_password - (rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'. --spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'. --spec hash_password - (module(), rabbit_types:password()) -> rabbit_types:password_hash(). --spec change_password_hash - (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'. --spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'. --spec set_permissions - (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), - regexp(), rabbit_types:username()) -> - 'ok'. --spec clear_permissions - (rabbit_types:username(), rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. --spec user_info_keys() -> rabbit_types:info_keys(). --spec perms_info_keys() -> rabbit_types:info_keys(). --spec user_perms_info_keys() -> rabbit_types:info_keys(). --spec vhost_perms_info_keys() -> rabbit_types:info_keys(). --spec user_vhost_perms_info_keys() -> rabbit_types:info_keys(). --spec list_users() -> [rabbit_types:infos()]. --spec list_users(reference(), pid()) -> 'ok'. --spec list_permissions() -> [rabbit_types:infos()]. --spec list_user_permissions - (rabbit_types:username()) -> [rabbit_types:infos()]. --spec list_user_permissions - (rabbit_types:username(), reference(), pid()) -> 'ok'. --spec list_vhost_permissions - (rabbit_types:vhost()) -> [rabbit_types:infos()]. --spec list_vhost_permissions - (rabbit_types:vhost(), reference(), pid()) -> 'ok'. --spec list_user_vhost_permissions - (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]. - %%---------------------------------------------------------------------------- %% Implementation of rabbit_auth_backend @@ -238,6 +198,9 @@ validate_and_alternate_credentials(Username, Password, ActingUser, Fun) -> {error, Err} end. +-spec add_user(rabbit_types:username(), rabbit_types:password(), + rabbit_types:username()) -> 'ok' | {'error', string()}. + add_user(Username, Password, ActingUser) -> validate_and_alternate_credentials(Username, Password, ActingUser, fun add_user_sans_validation/3). @@ -265,6 +228,8 @@ add_user_sans_validation(Username, Password, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +-spec delete_user(rabbit_types:username(), rabbit_types:username()) -> 'ok'. + delete_user(Username, ActingUser) -> rabbit_log:info("Deleting user '~s'~n", [Username]), R = rabbit_misc:execute_mnesia_transaction( @@ -291,9 +256,17 @@ delete_user(Username, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +-spec lookup_user + (rabbit_types:username()) -> + rabbit_types:ok(rabbit_types:internal_user()) | + rabbit_types:error('not_found'). + lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). +-spec change_password + (rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'. + change_password(Username, Password, ActingUser) -> validate_and_alternate_credentials(Username, Password, ActingUser, fun change_password_sans_validation/3). @@ -310,6 +283,8 @@ change_password_sans_validation(Username, Password, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +-spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'. + clear_password(Username, ActingUser) -> rabbit_log:info("Clearing password for '~s'~n", [Username]), R = change_password_hash(Username, <<"">>), @@ -318,9 +293,15 @@ clear_password(Username, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +-spec hash_password + (module(), rabbit_types:password()) -> rabbit_types:password_hash(). + hash_password(HashingMod, Cleartext) -> rabbit_password:hash(HashingMod, Cleartext). +-spec change_password_hash + (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'. + change_password_hash(Username, PasswordHash) -> change_password_hash(Username, PasswordHash, rabbit_password:hashing_mod()). @@ -332,6 +313,8 @@ change_password_hash(Username, PasswordHash, HashingAlgorithm) -> hashing_algorithm = HashingAlgorithm } end). +-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'. + set_tags(Username, Tags, ActingUser) -> ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], rabbit_log:info("Setting user tags for user '~s' to ~p~n", @@ -343,6 +326,11 @@ set_tags(Username, Tags, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +-spec set_permissions + (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), + regexp(), rabbit_types:username()) -> + 'ok'. + set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, ActingUser) -> rabbit_log:info("Setting permissions for " "'~s' in '~s' to '~s', '~s', '~s'~n", @@ -378,6 +366,9 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, ActingU {user_who_performed_action, ActingUser}]), R. +-spec clear_permissions + (rabbit_types:username(), rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. + clear_permissions(Username, VHostPath, ActingUser) -> R = rabbit_misc:execute_mnesia_transaction( rabbit_vhost:with_user_and_vhost( @@ -481,11 +472,24 @@ clear_topic_permissions(Username, VHostPath, Exchange, ActingUser) -> -define(PERMS_INFO_KEYS, [configure, write, read]). -define(USER_INFO_KEYS, [user, tags]). +-spec user_info_keys() -> rabbit_types:info_keys(). + user_info_keys() -> ?USER_INFO_KEYS. +-spec perms_info_keys() -> rabbit_types:info_keys(). + perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS]. + +-spec vhost_perms_info_keys() -> rabbit_types:info_keys(). + vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS]. + +-spec user_perms_info_keys() -> rabbit_types:info_keys(). + user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS]. + +-spec user_vhost_perms_info_keys() -> rabbit_types:info_keys(). + user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS. topic_perms_info_keys() -> [user, vhost, exchange, write, read]. @@ -493,16 +497,22 @@ user_topic_perms_info_keys() -> [vhost, exchange, write, read]. vhost_topic_perms_info_keys() -> [user, exchange, write, read]. user_vhost_topic_perms_info_keys() -> [exchange, write, read]. +-spec list_users() -> [rabbit_types:infos()]. + list_users() -> [extract_internal_user_params(U) || U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. +-spec list_users(reference(), pid()) -> 'ok'. + list_users(Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(U) -> extract_internal_user_params(U) end, mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})). +-spec list_permissions() -> [rabbit_types:infos()]. + list_permissions() -> list_permissions(perms_info_keys(), match_user_vhost('_', '_')). @@ -517,28 +527,43 @@ list_permissions(Keys, QueryThunk, Ref, AggregatorPid) -> filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. +-spec list_user_permissions + (rabbit_types:username()) -> [rabbit_types:infos()]. + list_user_permissions(Username) -> list_permissions( user_perms_info_keys(), rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). +-spec list_user_permissions + (rabbit_types:username(), reference(), pid()) -> 'ok'. + list_user_permissions(Username, Ref, AggregatorPid) -> list_permissions( user_perms_info_keys(), rabbit_misc:with_user(Username, match_user_vhost(Username, '_')), Ref, AggregatorPid). +-spec list_vhost_permissions + (rabbit_types:vhost()) -> [rabbit_types:infos()]. + list_vhost_permissions(VHostPath) -> list_permissions( vhost_perms_info_keys(), rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). +-spec list_vhost_permissions + (rabbit_types:vhost(), reference(), pid()) -> 'ok'. + list_vhost_permissions(VHostPath, Ref, AggregatorPid) -> list_permissions( vhost_perms_info_keys(), rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath)), Ref, AggregatorPid). +-spec list_user_vhost_permissions + (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]. + list_user_vhost_permissions(Username, VHostPath) -> list_permissions( user_vhost_perms_info_keys(), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index abd8635720..c3e570b26f 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -52,8 +52,6 @@ -type queue_mode() :: atom(). --spec info_keys() -> rabbit_types:info_keys(). - %% Called on startup with a vhost and a list of durable queue names on this vhost. %% The queues aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency @@ -270,4 +268,6 @@ %% queue -callback handle_info(term(), state()) -> state(). +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 6cb91a9243..40c60ece45 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -37,61 +37,27 @@ -type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name(). -type body_input() :: binary() | [binary()]. +%%---------------------------------------------------------------------------- + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. + -spec publish (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result(). --spec publish - (exchange_input(), rabbit_router:routing_key(), boolean(), - properties_input(), body_input()) -> - publish_result(). --spec publish(rabbit_types:delivery()) -> publish_result(). --spec delivery - (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> - rabbit_types:delivery(). --spec message - (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), - binary()) -> - rabbit_types:message(). --spec message - (rabbit_exchange:name(), rabbit_router:routing_key(), - rabbit_types:decoded_content()) -> - rabbit_types:ok_or_error2(rabbit_types:message(), any()). --spec properties - (properties_input()) -> rabbit_framing:amqp_property_record(). - --spec prepend_table_header - (binary(), rabbit_framing:amqp_table(), headers()) -> headers(). --spec header(header(), headers()) -> 'undefined' | any(). --spec header(header(), headers(), any()) -> 'undefined' | any(). - --spec extract_headers(rabbit_types:content()) -> headers(). - --spec map_headers - (fun((headers()) -> headers()), rabbit_types:content()) -> - rabbit_types:content(). - --spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()]. --spec build_content - (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> - rabbit_types:content(). --spec from_content - (rabbit_types:content()) -> - {rabbit_framing:amqp_property_record(), binary()}. --spec parse_expiration - (rabbit_framing:amqp_property_record()) -> - rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()). - -%%---------------------------------------------------------------------------- - -%% Convenience function, for avoiding round-trips in calls across the -%% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> publish(Exchange, RoutingKeyBin, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. + +-spec publish + (exchange_input(), rabbit_router:routing_key(), boolean(), + properties_input(), body_input()) -> + publish_result(). + publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), publish(X, delivery(Mandatory, false, Message, undefined)); @@ -99,6 +65,8 @@ publish(XName, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), publish(delivery(Mandatory, false, Message, undefined)). +-spec publish(rabbit_types:delivery()) -> publish_result(). + publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> case rabbit_exchange:lookup(XName) of @@ -111,10 +79,18 @@ publish(X, Delivery) -> DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), {ok, DeliveredQPids}. +-spec delivery + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery(). + delivery(Mandatory, Confirm, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. +-spec build_content + (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> + rabbit_types:content(). + build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); @@ -128,6 +104,10 @@ build_content(Properties, PFR) -> protocol = none, payload_fragments_rev = PFR}. +-spec from_content + (rabbit_types:content()) -> + {rabbit_framing:amqp_property_record(), binary()}. + from_content(Content) -> #content{class_id = ClassId, properties = Props, @@ -153,6 +133,11 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} headers = Headers0}}) end. +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any()). + message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> try {ok, #basic_message{ @@ -166,12 +151,20 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> {error, _Reason} = Error -> Error end. +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), + binary()) -> + rabbit_types:message(). + message(XName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), Content = build_content(Properties, Body), {ok, Msg} = message(XName, RoutingKey, Content), Msg. +-spec properties + (properties_input()) -> rabbit_framing:amqp_property_record(). + properties(P = #'P_basic'{}) -> P; properties(P) when is_list(P) -> @@ -185,6 +178,9 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). +-spec prepend_table_header + (binary(), rabbit_framing:amqp_table(), headers()) -> headers(). + prepend_table_header(Name, Info, undefined) -> prepend_table_header(Name, Info, []); prepend_table_header(Name, Info, Headers) -> @@ -224,6 +220,8 @@ update_invalid(Name, Value, ExistingHdr, Header) -> NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), set_invalid(NewHdr, Header). +-spec header(header(), headers()) -> 'undefined' | any(). + header(_Header, undefined) -> undefined; header(_Header, []) -> @@ -231,12 +229,16 @@ header(_Header, []) -> header(Header, Headers) -> header(Header, Headers, undefined). +-spec header(header(), headers(), any()) -> 'undefined' | any(). + header(Header, Headers, Default) -> case lists:keysearch(Header, 1, Headers) of false -> Default; {value, Val} -> Val end. +-spec extract_headers(rabbit_types:content()) -> headers(). + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -247,6 +249,10 @@ extract_timestamp(Content) -> rabbit_binary_parser:ensure_content_decoded(Content), Timestamp. +-spec map_headers + (fun((headers()) -> headers()), rabbit_types:content()) -> + rabbit_types:content(). + map_headers(F, Content) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content), #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, @@ -270,6 +276,9 @@ is_message_persistent(#content{properties = #'P_basic'{ end. %% Extract CC routes from headers + +-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()]. + header_routes(undefined) -> []; header_routes(HeadersTable) -> @@ -281,6 +290,10 @@ header_routes(HeadersTable) -> binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). +-spec parse_expiration + (rabbit_framing:amqp_property_record()) -> + rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()). + parse_expiration(#'P_basic'{expiration = undefined}) -> {ok, undefined}; parse_expiration(#'P_basic'{expiration = Expiration}) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index c4542abc61..460e6eab99 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -62,47 +62,6 @@ %% dialyzer into objecting to everything that uses it. -type deletions() :: dict:dict(). --spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) -> - 'ok'. --spec exists(rabbit_types:binding()) -> boolean() | bind_errors(). --spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res(). --spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). --spec remove(rabbit_types:binding()) -> bind_res(). --spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). --spec list(rabbit_types:vhost()) -> bindings(). --spec list_for_source - (rabbit_types:binding_source()) -> bindings(). --spec list_for_destination - (rabbit_types:binding_destination()) -> bindings(). --spec list_for_source_and_destination - (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> - bindings(). --spec info_keys() -> rabbit_types:info_keys(). --spec info(rabbit_types:binding()) -> rabbit_types:infos(). --spec info(rabbit_types:binding(), rabbit_types:info_keys()) -> - rabbit_types:infos(). --spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. --spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> - [rabbit_types:infos()]. --spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), - reference(), pid()) -> 'ok'. --spec has_for_source(rabbit_types:binding_source()) -> boolean(). --spec remove_for_source(rabbit_types:binding_source()) -> bindings(). --spec remove_for_destination - (rabbit_types:binding_destination(), boolean()) -> deletions(). --spec remove_transient_for_destination - (rabbit_types:binding_destination()) -> deletions(). --spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok'). --spec combine_deletions(deletions(), deletions()) -> deletions(). --spec add_deletion - (rabbit_exchange:name(), - {'undefined' | rabbit_types:exchange(), - 'deleted' | 'not_deleted', - bindings()}, - deletions()) -> - deletions(). --spec new_deletions() -> deletions(). - %%---------------------------------------------------------------------------- -define(INFO_KEYS, [source_name, source_kind, @@ -111,6 +70,10 @@ vhost]). %% Global table recovery + +-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) -> + 'ok'. + recover() -> rabbit_misc:table_filter( fun (Route) -> @@ -164,6 +127,8 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) -> (Serial, false) -> x_callback(Serial, X, add_binding, B) end). +-spec exists(rabbit_types:binding()) -> boolean() | bind_errors(). + exists(#binding{source = ?DEFAULT_EXCHANGE(_), destination = #resource{kind = queue, name = QName} = Queue, key = QName, @@ -178,8 +143,12 @@ exists(Binding) -> rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) end, fun not_found_or_absent_errs/1). +-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res(). + add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser). +-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). + add(Binding, InnerFun, ActingUser) -> binding_action( Binding, @@ -224,8 +193,12 @@ add(Src, Dst, B, ActingUser) -> true -> rabbit_misc:const({error, binding_not_found}) end. +-spec remove(rabbit_types:binding()) -> bind_res(). + remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER). +-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). + remove(Binding, InnerFun, ActingUser) -> binding_action( Binding, @@ -269,6 +242,8 @@ remove_default_exchange_binding_rows_of(Dst = #resource{}) -> end, ok. +-spec list(rabbit_types:vhost()) -> bindings(). + list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), Route = #route{binding = #binding{source = VHostResource, @@ -284,6 +259,9 @@ list(VHostPath) -> end, AllBindings), implicit_bindings(VHostPath) ++ Filtered. +-spec list_for_source + (rabbit_types:binding_source()) -> bindings(). + list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> implicit_bindings(VHostPath); list_for_source(SrcName) -> @@ -294,6 +272,9 @@ list_for_source(SrcName) -> <- mnesia:match_object(rabbit_route, Route, read)] end). +-spec list_for_destination + (rabbit_types:binding_destination()) -> bindings(). + list_for_destination(DstName) -> implicit_for_destination(DstName) ++ mnesia:async_dirty( @@ -324,6 +305,10 @@ implicit_for_destination(DstQueue = #resource{kind = queue, implicit_for_destination(_) -> []. +-spec list_for_source_and_destination + (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> + bindings(). + list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), #resource{kind = queue, virtual_host = VHostPath, @@ -342,6 +327,8 @@ list_for_source_and_destination(SrcName, DstName) -> Route, read)] end). +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -360,18 +347,33 @@ i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). +-spec info(rabbit_types:binding()) -> rabbit_types:infos(). + info(B = #binding{}) -> infos(?INFO_KEYS, B). +-spec info(rabbit_types:binding(), rabbit_types:info_keys()) -> + rabbit_types:infos(). + info(B = #binding{}, Items) -> infos(Items, B). +-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. + info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> + [rabbit_types:infos()]. + info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), + reference(), pid()) -> 'ok'. + info_all(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(B) -> info(B, Items) end, list(VHostPath)). +-spec has_for_source(rabbit_types:binding_source()) -> boolean(). + has_for_source(SrcName) -> Match = #route{binding = #binding{source = SrcName, _ = '_'}}, %% we need to check for semi-durable routes (which subsumes @@ -381,6 +383,8 @@ has_for_source(SrcName) -> contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route, Match). +-spec remove_for_source(rabbit_types:binding_source()) -> bindings(). + remove_for_source(SrcName) -> lock_resource(SrcName), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, @@ -389,9 +393,15 @@ remove_for_source(SrcName) -> mnesia:dirty_match_object(rabbit_route, Match) ++ mnesia:dirty_match_object(rabbit_semi_durable_route, Match))). +-spec remove_for_destination + (rabbit_types:binding_destination(), boolean()) -> deletions(). + remove_for_destination(DstName, OnlyDurable) -> remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). +-spec remove_transient_for_destination + (rabbit_types:binding_destination()) -> deletions(). + remove_transient_for_destination(DstName) -> remove_for_destination(DstName, false, fun remove_transient_routes/1). @@ -597,12 +607,24 @@ anything_but( NotThis, NotThis, This) -> This; anything_but( NotThis, This, NotThis) -> This; anything_but(_NotThis, This, This) -> This. +-spec new_deletions() -> deletions(). + new_deletions() -> dict:new(). +-spec add_deletion + (rabbit_exchange:name(), + {'undefined' | rabbit_types:exchange(), + 'deleted' | 'not_deleted', + bindings()}, + deletions()) -> + deletions(). + add_deletion(XName, Entry, Deletions) -> dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, Entry, Deletions). +-spec combine_deletions(deletions(), deletions()) -> deletions(). + combine_deletions(Deletions1, Deletions2) -> dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, Deletions1, Deletions2). @@ -612,6 +634,8 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. +-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok'). + process_deletions(Deletions, ActingUser) -> AugmentedDeletions = dict:map(fun (_XName, {X, deleted, Bindings}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3f199a4a00..ffd7c8fabc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -67,6 +67,9 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + +-deprecated([{force_event_refresh, 1, eventually}]). + %% Internal -export([list_local/0, emit_info_local/3, deliver_reply_local/3]). -export([get_vhost/1, get_user/1]). @@ -222,42 +225,13 @@ -type channel() :: #ch{}. +%%---------------------------------------------------------------------------- + -spec start_link (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), pid()) -> rabbit_types:ok_pid_or_error(). --spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. --spec do - (pid(), rabbit_framing:amqp_method_record(), - rabbit_types:maybe(rabbit_types:content())) -> - 'ok'. --spec do_flow - (pid(), rabbit_framing:amqp_method_record(), - rabbit_types:maybe(rabbit_types:content())) -> - 'ok'. --spec flush(pid()) -> 'ok'. --spec shutdown(pid()) -> 'ok'. --spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. --spec deliver - (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'. --spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. --spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'. --spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'. --spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'. --spec list() -> [pid()]. --spec list_local() -> [pid()]. --spec info_keys() -> rabbit_types:info_keys(). --spec info(pid()) -> rabbit_types:infos(). --spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). --spec info_all() -> [rabbit_types:infos()]. --spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec refresh_config_local() -> 'ok'. --spec ready_for_close(pid()) -> 'ok'. --deprecated([{force_event_refresh, 1, eventually}]). --spec force_event_refresh(reference()) -> 'ok'. - -%%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter) -> @@ -265,27 +239,50 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter], []). +-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. + do(Pid, Method) -> rabbit_channel_common:do(Pid, Method). +-spec do + (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> + 'ok'. + do(Pid, Method, Content) -> rabbit_channel_common:do(Pid, Method, Content). +-spec do_flow + (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> + 'ok'. + do_flow(Pid, Method, Content) -> rabbit_channel_common:do_flow(Pid, Method, Content). +-spec flush(pid()) -> 'ok'. + flush(Pid) -> gen_server2:call(Pid, flush, infinity). +-spec shutdown(pid()) -> 'ok'. + shutdown(Pid) -> gen_server2:cast(Pid, terminate). +-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. + send_command(Pid, Msg) -> gen_server2:cast(Pid, {command, Msg}). +-spec deliver + (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'. + deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. + deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> case decode_fast_reply_to(Rest) of {ok, Pid, Key} -> @@ -297,6 +294,9 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> %% We want to ensure people can't use this mechanism to send a message %% to an arbitrary process and kill it! + +-spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'. + deliver_reply_local(Pid, Key, Delivery) -> case pg_local:in_group(rabbit_channels, Pid) of true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery}); @@ -325,21 +325,33 @@ decode_fast_reply_to(Rest) -> _ -> error end. +-spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'. + send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). +-spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'. + send_drained(Pid, CTagCredit) -> gen_server2:cast(Pid, {send_drained, CTagCredit}). +-spec list() -> [pid()]. + list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). +-spec list_local() -> [pid()]. + list_local() -> pg_local:get_members(rabbit_channels). +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS. +-spec info(pid()) -> rabbit_types:infos(). + info(Pid) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try @@ -353,6 +365,8 @@ info(Pid) -> throw(timeout) end. +-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). + info(Pid, Items) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try @@ -366,9 +380,13 @@ info(Pid, Items) -> throw(timeout) end. +-spec info_all() -> [rabbit_types:infos()]. + info_all() -> rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). +-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. + info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). @@ -386,6 +404,8 @@ emit_info(PidList, InfoItems, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList). +-spec refresh_config_local() -> 'ok'. + refresh_config_local() -> rabbit_misc:upmap( fun (C) -> @@ -414,9 +434,13 @@ refresh_interceptors() -> list_local()), ok. +-spec ready_for_close(pid()) -> 'ok'. + ready_for_close(Pid) -> rabbit_channel_common:ready_for_close(Pid). +-spec force_event_refresh(reference()) -> 'ok'. + force_event_refresh(Ref) -> [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index e2e24e2f38..86a0f15650 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,12 +47,12 @@ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}. --spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}. - -define(FAIR_WAIT, 70000). %%---------------------------------------------------------------------------- +-spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}. + start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link( diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index d39e3f3e9b..b813b42e89 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -32,14 +32,13 @@ %%---------------------------------------------------------------------------- -spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start_channel(pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), {pid(), any()}}. - -%%---------------------------------------------------------------------------- start_link() -> supervisor2:start_link(?MODULE, []). +-spec start_channel(pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), {pid(), any()}}. + start_channel(Pid, Args) -> supervisor2:start_child(Pid, [Args]). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index 982c1d1e62..8b49244982 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,19 +28,19 @@ -spec start_link(rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error(). --spec start_link({'local', atom()}, rabbit_types:mfargs()) -> - rabbit_types:ok_pid_or_error(). --spec start_link_worker({'local', atom()}, rabbit_types:mfargs()) -> - rabbit_types:ok_pid_or_error(). - -%%---------------------------------------------------------------------------- start_link(Callback) -> supervisor2:start_link(?MODULE, Callback). +-spec start_link({'local', atom()}, rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error(). + start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). +-spec start_link_worker({'local', atom()}, rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error(). + start_link_worker(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, {Callback, worker}). diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index 8c4ba88da2..c5a0825d83 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -38,21 +38,21 @@ %%---------------------------------------------------------------------------- -spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start_channel_sup_sup(pid()) -> rabbit_types:ok_pid_or_error(). --spec start_queue_collector(pid(), rabbit_types:proc_name()) -> - rabbit_types:ok_pid_or_error(). - -%%---------------------------------------------------------------------------- start_link() -> supervisor2:start_link(?MODULE, []). +-spec start_channel_sup_sup(pid()) -> rabbit_types:ok_pid_or_error(). + start_channel_sup_sup(SupPid) -> supervisor2:start_child( SupPid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). +-spec start_queue_collector(pid(), rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error(). + start_queue_collector(SupPid, Identity) -> supervisor2:start_child( SupPid, diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 6e85514d44..51661dd8b6 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -38,9 +38,6 @@ -spec start_link(any(), rabbit_net:socket(), module(), any()) -> {'ok', pid(), pid()}. --spec reader(pid()) -> pid(). - -%%-------------------------------------------------------------------------- start_link(Ref, _Sock, _Transport, _Opts) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), @@ -66,6 +63,8 @@ start_link(Ref, _Sock, _Transport, _Opts) -> intrinsic, ?WORKER_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. +-spec reader(pid()) -> pid(). + reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index d4c065e64f..586913ea2c 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -19,12 +19,12 @@ interval }). --spec start_link() -> rabbit_types:ok_pid_or_error(). - -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). diff --git a/src/rabbit_credential_validation.erl b/src/rabbit_credential_validation.erl index 4b24d35f9d..ec0dea7893 100644 --- a/src/rabbit_credential_validation.erl +++ b/src/rabbit_credential_validation.erl @@ -27,8 +27,6 @@ -export([validate/2, backend/0]). --spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}. - %% Validates a username/password pair by delegating to the effective %% `rabbit_credential_validator`. Used by `rabbit_auth_backend_internal`. %% Note that some validators may choose to only validate passwords. @@ -38,6 +36,8 @@ %% * ok: provided credentials passed validation. %% * {error, Error, Args}: provided password password failed validation. +-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}. + validate(Username, Password) -> Backend = backend(), Backend:validate(Username, Password). diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 71ad8814b1..e26ea8297b 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -25,11 +25,11 @@ -type reason() :: 'expired' | 'rejected' | 'maxlen'. +%%---------------------------------------------------------------------------- + -spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(), 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. -%%---------------------------------------------------------------------------- - publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index cc2a9f1026..50e8f3d2b0 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -18,6 +18,9 @@ -export([boot/0, force_event_refresh/1, list/0, connect/5, start_channel/9, disconnect/2]). + +-deprecated([{force_event_refresh, 1, eventually}]). + %% Internal -export([list_local/0]). @@ -29,40 +32,25 @@ %%---------------------------------------------------------------------------- -spec boot() -> 'ok'. --deprecated([{force_event_refresh, 1, eventually}]). --spec force_event_refresh(reference()) -> 'ok'. --spec list() -> [pid()]. --spec list_local() -> [pid()]. --spec connect - (({'none', 'none'} | {rabbit_types:username(), 'none'} | - {rabbit_types:username(), rabbit_types:password()}), - rabbit_types:vhost(), rabbit_types:protocol(), pid(), - rabbit_event:event_props()) -> - rabbit_types:ok_or_error2( - {rabbit_types:user(), rabbit_framing:amqp_table()}, - 'broker_not_found_on_node' | - {'auth_failure', string()} | 'access_refused'). --spec start_channel - (rabbit_channel:channel_number(), pid(), pid(), string(), - rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), pid()) -> - {'ok', pid()}. --spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. - -%%---------------------------------------------------------------------------- boot() -> rabbit_sup:start_supervisor_child( rabbit_direct_client_sup, rabbit_client_sup, [{local, rabbit_direct_client_sup}, {rabbit_channel_sup, start_link, []}]). +-spec force_event_refresh(reference()) -> 'ok'. + force_event_refresh(Ref) -> [Pid ! {force_event_refresh, Ref} || Pid <- list()], ok. +-spec list_local() -> [pid()]. + list_local() -> pg_local:get_members(rabbit_direct). +-spec list() -> [pid()]. + list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_direct, list_local, []). @@ -82,6 +70,16 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) -> [{password, Password}, {vhost, VHost}] ++ ExtraAuthProps) end. +-spec connect + (({'none', 'none'} | {rabbit_types:username(), 'none'} | + {rabbit_types:username(), rabbit_types:password()}), + rabbit_types:vhost(), rabbit_types:protocol(), pid(), + rabbit_event:event_props()) -> + rabbit_types:ok_or_error2( + {rabbit_types:user(), rabbit_framing:amqp_table()}, + 'broker_not_found_on_node' | + {'auth_failure', string()} | 'access_refused'). + connect(Creds, VHost, Protocol, Pid, Infos) -> ExtraAuthProps = extract_extra_auth_props(Creds, VHost, Pid, Infos), AuthFun = auth_fun(Creds, VHost, ExtraAuthProps), @@ -200,6 +198,12 @@ connect1(User, VHost, Protocol, Pid, Infos) -> {error, Reason} end. +-spec start_channel + (rabbit_channel:channel_number(), pid(), pid(), string(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()) -> + {'ok', pid()}. + start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = @@ -209,6 +213,8 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. +-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. + disconnect(Pid, Infos) -> pg_local:leave(rabbit_direct, Pid), rabbit_core_metrics:connection_closed(Pid), diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 3ba1f35a6b..f48b0001fb 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -76,37 +76,43 @@ %%---------------------------------------------------------------------------- -type disk_free_limit() :: (integer() | string() | {'mem_relative', float() | integer()}). --spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error(). --spec get_disk_free_limit() -> integer(). --spec set_disk_free_limit(disk_free_limit()) -> 'ok'. --spec get_min_check_interval() -> integer(). --spec set_min_check_interval(integer()) -> 'ok'. --spec get_max_check_interval() -> integer(). --spec set_max_check_interval(integer()) -> 'ok'. --spec get_disk_free() -> (integer() | 'unknown'). %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- +-spec get_disk_free_limit() -> integer(). + get_disk_free_limit() -> gen_server:call(?MODULE, get_disk_free_limit, infinity). +-spec set_disk_free_limit(disk_free_limit()) -> 'ok'. + set_disk_free_limit(Limit) -> gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). +-spec get_min_check_interval() -> integer(). + get_min_check_interval() -> gen_server:call(?MODULE, get_min_check_interval, infinity). +-spec set_min_check_interval(integer()) -> 'ok'. + set_min_check_interval(Interval) -> gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity). +-spec get_max_check_interval() -> integer(). + get_max_check_interval() -> gen_server:call(?MODULE, get_max_check_interval, infinity). +-spec set_max_check_interval(integer()) -> 'ok'. + set_max_check_interval(Interval) -> gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). +-spec get_disk_free() -> (integer() | 'unknown'). + get_disk_free() -> gen_server:call(?MODULE, get_disk_free, infinity). @@ -114,6 +120,8 @@ get_disk_free() -> %% gen_server callbacks %%---------------------------------------------------------------------------- +-spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error(). + start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl index 15c7489e99..fb13f61ea3 100644 --- a/src/rabbit_epmd_monitor.erl +++ b/src/rabbit_epmd_monitor.erl @@ -29,10 +29,6 @@ -define(CHECK_FREQUENCY, 60000). %%---------------------------------------------------------------------------- - --spec start_link() -> rabbit_types:ok_pid_or_error(). - -%%---------------------------------------------------------------------------- %% It's possible for epmd to be killed out from underneath us. If that %% happens, then obviously clustering and rabbitmqctl stop %% working. This process checks up on epmd and restarts it / @@ -48,6 +44,8 @@ %% epmd" as a shutdown or uninstall step. %% ---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2480e7c3d0..1b262e9a48 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,77 +36,13 @@ -type type() :: atom(). -type fun_name() :: atom(). --spec recover(rabbit_types:vhost()) -> [name()]. --spec callback - (rabbit_types:exchange(), fun_name(), - fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'. --spec policy_changed - (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. --spec declare - (name(), type(), boolean(), boolean(), boolean(), - rabbit_framing:amqp_table(), rabbit_types:username()) - -> rabbit_types:exchange(). --spec check_type - (binary()) -> atom() | rabbit_types:connection_exit(). --spec assert_equivalence - (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), - rabbit_framing:amqp_table()) - -> 'ok' | rabbit_types:connection_exit(). --spec assert_args_equivalence - (rabbit_types:exchange(), rabbit_framing:amqp_table()) - -> 'ok' | rabbit_types:connection_exit(). --spec lookup - (name()) -> rabbit_types:ok(rabbit_types:exchange()) | - rabbit_types:error('not_found'). --spec lookup_or_die - (name()) -> rabbit_types:exchange() | - rabbit_types:channel_exit(). --spec list() -> [rabbit_types:exchange()]. --spec list_names() -> [rabbit_exchange:name()]. --spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()]. --spec lookup_scratch(name(), atom()) -> - rabbit_types:ok(term()) | - rabbit_types:error('not_found'). --spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'. --spec update - (name(), - fun((rabbit_types:exchange()) -> rabbit_types:exchange())) - -> not_found | rabbit_types:exchange(). --spec update_decorators(name()) -> 'ok'. --spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange(). --spec info_keys() -> rabbit_types:info_keys(). --spec info(rabbit_types:exchange()) -> rabbit_types:infos(). --spec info - (rabbit_types:exchange(), rabbit_types:info_keys()) - -> rabbit_types:infos(). --spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. --spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) - -> [rabbit_types:infos()]. --spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), - reference(), pid()) - -> 'ok'. --spec route(rabbit_types:exchange(), rabbit_types:delivery()) - -> [rabbit_amqqueue:name()]. --spec delete - (name(), 'true', rabbit_types:username()) -> - 'ok'| rabbit_types:error('not_found' | 'in_use'); - (name(), 'false', rabbit_types:username()) -> - 'ok' | rabbit_types:error('not_found'). --spec validate_binding - (rabbit_types:exchange(), rabbit_types:binding()) - -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). --spec maybe_auto_delete - (rabbit_types:exchange(), boolean()) - -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}. --spec serial(rabbit_types:exchange()) -> - fun((boolean()) -> 'none' | pos_integer()). --spec peek_serial(name()) -> pos_integer() | 'undefined'. - %%---------------------------------------------------------------------------- -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, policy, user_who_performed_action]). +-spec recover(rabbit_types:vhost()) -> [name()]. + recover(VHost) -> Xs = rabbit_misc:table_filter( fun (#exchange{name = XName}) -> @@ -123,6 +59,10 @@ recover(VHost) -> rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. +-spec callback + (rabbit_types:exchange(), fun_name(), + fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'. + callback(X = #exchange{type = XType, decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; @@ -133,6 +73,9 @@ callback(X = #exchange{type = XType, Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). +-spec policy_changed + (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + policy_changed(X = #exchange{type = XType, decorators = Decorators}, X1 = #exchange{decorators = Decorators1}) -> @@ -147,6 +90,9 @@ serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> rabbit_exchange_decorator:select(all, Decorators)) orelse (type_to_module(Type)):serialise_events(). +-spec serial(rabbit_types:exchange()) -> + fun((boolean()) -> 'none' | pos_integer()). + serial(#exchange{name = XName} = X) -> Serial = case serialise_events(X) of true -> next_serial(XName); @@ -156,6 +102,11 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. +-spec declare + (name(), type(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:username()) + -> rabbit_types:exchange(). + declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> X = rabbit_exchange_decorator:set( rabbit_policy:set(#exchange{name = XName, @@ -218,6 +169,10 @@ store_ram(X) -> X1. %% Used with binaries sent over the wire; the type may not exist. + +-spec check_type + (binary()) -> atom() | rabbit_types:connection_exit(). + check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> @@ -232,6 +187,11 @@ check_type(TypeBin) -> end end. +-spec assert_equivalence + (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table()) + -> 'ok' | rabbit_types:connection_exit(). + assert_equivalence(X = #exchange{ name = XName, durable = Durable, auto_delete = AutoDelete, @@ -245,6 +205,10 @@ assert_equivalence(X = #exchange{ name = XName, AFE(Internal, ReqInternal, XName, internal), (type_to_module(Type)):assert_args_equivalence(X, ReqArgs). +-spec assert_args_equivalence + (rabbit_types:exchange(), rabbit_framing:amqp_table()) + -> 'ok' | rabbit_types:connection_exit(). + assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic @@ -253,21 +217,36 @@ assert_args_equivalence(#exchange{ name = Name, arguments = Args }, rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name, [<<"alternate-exchange">>]). +-spec lookup + (name()) -> rabbit_types:ok(rabbit_types:exchange()) | + rabbit_types:error('not_found'). + lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). +-spec lookup_or_die + (name()) -> rabbit_types:exchange() | + rabbit_types:channel_exit(). + lookup_or_die(Name) -> case lookup(Name) of {ok, X} -> X; {error, not_found} -> rabbit_amqqueue:not_found(Name) end. +-spec list() -> [rabbit_types:exchange()]. + list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). +-spec list_names() -> [rabbit_exchange:name()]. + list_names() -> mnesia:dirty_all_keys(rabbit_exchange). %% Not dirty_match_object since that would not be transactional when used in a %% tx context + +-spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()]. + list(VHostPath) -> mnesia:async_dirty( fun () -> @@ -277,6 +256,10 @@ list(VHostPath) -> read) end). +-spec lookup_scratch(name(), atom()) -> + rabbit_types:ok(term()) | + rabbit_types:error('not_found'). + lookup_scratch(Name, App) -> case lookup(Name) of {ok, #exchange{scratches = undefined}} -> @@ -290,6 +273,8 @@ lookup_scratch(Name, App) -> {error, not_found} end. +-spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'. + update_scratch(Name, App, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> @@ -310,6 +295,8 @@ update_scratch(Name, App, Fun) -> ok end). +-spec update_decorators(name()) -> 'ok'. + update_decorators(Name) -> rabbit_misc:execute_mnesia_transaction( fun() -> @@ -320,6 +307,11 @@ update_decorators(Name) -> end end). +-spec update + (name(), + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) + -> not_found | rabbit_types:exchange(). + update(Name, Fun) -> case mnesia:wread({rabbit_exchange, Name}) of [X] -> X1 = Fun(X), @@ -327,10 +319,14 @@ update(Name, Fun) -> [] -> not_found end. +-spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange(). + immutable(X) -> X#exchange{scratches = none, policy = none, decorators = none}. +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -358,20 +354,38 @@ i(Item, #exchange{type = Type} = X) -> [] -> throw({bad_argument, Item}) end. +-spec info(rabbit_types:exchange()) -> rabbit_types:infos(). + info(X = #exchange{type = Type}) -> infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X). +-spec info + (rabbit_types:exchange(), rabbit_types:info_keys()) + -> rabbit_types:infos(). + info(X = #exchange{type = _Type}, Items) -> infos(Items, X). +-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. + info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]. + info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), + reference(), pid()) + -> 'ok'. + info_all(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). +-spec route(rabbit_types:exchange(), rabbit_types:delivery()) + -> [rabbit_amqqueue:name()]. + route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> @@ -447,6 +461,12 @@ call_with_exchange(XName, Fun) -> end end). +-spec delete + (name(), 'true', rabbit_types:username()) -> + 'ok'| rabbit_types:error('not_found' | 'in_use'); + (name(), 'false', rabbit_types:username()) -> + 'ok' | rabbit_types:error('not_found'). + delete(XName, IfUnused, Username) -> Fun = case IfUnused of true -> fun conditional_delete/2; @@ -478,10 +498,18 @@ delete(XName, IfUnused, Username) -> XName#resource.name, Username) end. +-spec validate_binding + (rabbit_types:exchange(), rabbit_types:binding()) + -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). + validate_binding(X = #exchange{type = XType}, Binding) -> Module = type_to_module(XType), Module:validate_binding(X, Binding). +-spec maybe_auto_delete + (rabbit_types:exchange(), boolean()) + -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}. + maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) -> @@ -516,6 +544,8 @@ next_serial(XName) -> #exchange_serial{name = XName, next = Serial + 1}, write), Serial. +-spec peek_serial(name()) -> pos_integer() | 'undefined'. + peek_serial(XName) -> peek_serial(XName, read). peek_serial(XName, LockType) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 4d6d49ef58..8b94bd343b 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -33,10 +33,6 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --spec headers_match - (rabbit_framing:amqp_table(), rabbit_framing:amqp_table()) -> - boolean(). - info(_X) -> []. info(_X, _) -> []. @@ -84,6 +80,11 @@ parse_x_match(_) -> all. %% legacy; we didn't validate %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% + +-spec headers_match + (rabbit_framing:amqp_table(), rabbit_framing:amqp_table()) -> + boolean(). + headers_match(Args, Data) -> MK = parse_x_match(rabbit_misc:table_lookup(Args, <<"x-match">>)), headers_match(Args, Data, true, false, MK). diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 557bc6f02a..462ddb4e11 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -34,31 +34,10 @@ -type ok_or_error() :: rabbit_types:ok_or_error(any()). --spec is_file((file:filename())) -> boolean(). --spec is_dir((file:filename())) -> boolean(). --spec file_size((file:filename())) -> non_neg_integer(). --spec ensure_dir((file:filename())) -> ok_or_error(). --spec wildcard(string(), file:filename()) -> [file:filename()]. --spec list_dir(file:filename()) -> - rabbit_types:ok_or_error2([file:filename()], any()). --spec read_term_file - (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()). --spec write_term_file(file:filename(), [any()]) -> ok_or_error(). --spec write_file(file:filename(), iodata()) -> ok_or_error(). --spec write_file(file:filename(), iodata(), [any()]) -> ok_or_error(). --spec append_file(file:filename(), string()) -> ok_or_error(). --spec ensure_parent_dirs_exist(string()) -> 'ok'. --spec rename(file:filename(), file:filename()) -> ok_or_error(). --spec delete([file:filename()]) -> ok_or_error(). --spec recursive_delete([file:filename()]) -> - rabbit_types:ok_or_error({file:filename(), any()}). --spec recursive_copy(file:filename(), file:filename()) -> - rabbit_types:ok_or_error({file:filename(), file:filename(), any()}). --spec lock_file(file:filename()) -> rabbit_types:ok_or_error('eexist'). --spec filename_as_a_directory(file:filename()) -> file:filename(). - %%---------------------------------------------------------------------------- +-spec is_file((file:filename())) -> boolean(). + is_file(File) -> case read_file_info(File) of {ok, #file_info{type=regular}} -> true; @@ -66,6 +45,8 @@ is_file(File) -> _ -> false end. +-spec is_dir((file:filename())) -> boolean(). + is_dir(Dir) -> is_dir_internal(read_file_info(Dir)). is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)). @@ -73,12 +54,16 @@ is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)). is_dir_internal({ok, #file_info{type=directory}}) -> true; is_dir_internal(_) -> false. +-spec file_size((file:filename())) -> non_neg_integer(). + file_size(File) -> case read_file_info(File) of {ok, #file_info{size=Size}} -> Size; _ -> 0 end. +-spec ensure_dir((file:filename())) -> ok_or_error(). + ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end). ensure_dir_internal("/") -> @@ -91,6 +76,8 @@ ensure_dir_internal(File) -> prim_file:make_dir(Dir) end. +-spec wildcard(string(), file:filename()) -> [file:filename()]. + wildcard(Pattern, Dir) -> case list_dir(Dir) of {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]), @@ -99,11 +86,17 @@ wildcard(Pattern, Dir) -> {error, _} -> [] end. +-spec list_dir(file:filename()) -> + rabbit_types:ok_or_error2([file:filename()], any()). + list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end). read_file_info(File) -> with_handle(fun () -> prim_file:read_file_info(File) end). +-spec read_term_file + (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()). + read_term_file(File) -> try {ok, Data} = with_handle(fun () -> prim_file:read_file(File) end), @@ -124,12 +117,18 @@ group_tokens(Cur, []) -> [Cur]; group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)]; group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts). +-spec write_term_file(file:filename(), [any()]) -> ok_or_error(). + write_term_file(File, Terms) -> write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || Term <- Terms])). +-spec write_file(file:filename(), iodata()) -> ok_or_error(). + write_file(Path, Data) -> write_file(Path, Data, []). +-spec write_file(file:filename(), iodata(), [any()]) -> ok_or_error(). + write_file(Path, Data, Modes) -> Modes1 = [binary, write | (Modes -- [binary, write])], case make_binary(Data) of @@ -185,6 +184,9 @@ with_synced_copy(Path, Modes, Fun) -> end. %% TODO the semantics of this function are rather odd. But see bug 25021. + +-spec append_file(file:filename(), string()) -> ok_or_error(). + append_file(File, Suffix) -> case read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -209,6 +211,8 @@ append_file(File, _, Suffix) -> Error -> Error end. +-spec ensure_parent_dirs_exist(string()) -> 'ok'. + ensure_parent_dirs_exist(Filename) -> case ensure_dir(Filename) of ok -> ok; @@ -216,10 +220,17 @@ ensure_parent_dirs_exist(Filename) -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) end. +-spec rename(file:filename(), file:filename()) -> ok_or_error(). + rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end). +-spec delete([file:filename()]) -> ok_or_error(). + delete(File) -> with_handle(fun () -> prim_file:delete(File) end). +-spec recursive_delete([file:filename()]) -> + rabbit_types:ok_or_error({file:filename(), any()}). + recursive_delete(Files) -> with_handle( fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path); @@ -262,6 +273,9 @@ is_symlink_no_handle(File) -> _ -> false end. +-spec recursive_copy(file:filename(), file:filename()) -> + rabbit_types:ok_or_error({file:filename(), file:filename(), any()}). + recursive_copy(Src, Dest) -> %% Note that this uses the 'file' module and, hence, shouldn't be %% run on many processes at once. @@ -293,6 +307,9 @@ recursive_copy(Src, Dest) -> %% TODO: When we stop supporting Erlang prior to R14, this should be %% replaced with file:open [write, exclusive] + +-spec lock_file(file:filename()) -> rabbit_types:ok_or_error('eexist'). + lock_file(Path) -> case is_file(Path) of true -> {error, eexist}; @@ -302,6 +319,8 @@ lock_file(Path) -> end) end. +-spec filename_as_a_directory(file:filename()) -> file:filename(). + filename_as_a_directory(FileName) -> case lists:last(FileName) of "/" -> diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 386702e86b..6f03a1a04f 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -36,15 +36,10 @@ -type guid() :: binary(). --spec start_link() -> rabbit_types:ok_pid_or_error(). --spec filename() -> string(). --spec gen() -> guid(). --spec gen_secure() -> guid(). --spec string(guid(), any()) -> string(). --spec binary(guid(), any()) -> binary(). - %%---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [update_disk_serial()], []). @@ -52,6 +47,9 @@ start_link() -> %% We use this to detect a (possibly rather old) Mnesia directory, %% since it has existed since at least 1.7.0 (as far back as I cared %% to go). + +-spec filename() -> string(). + filename() -> filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME). @@ -108,6 +106,9 @@ advance_blocks({B1, B2, B3, B4}, I) -> %% generate a GUID. This function should be used when performance is a %% priority and predictability is not an issue. Otherwise use %% gen_secure/0. + +-spec gen() -> guid(). + gen() -> %% We hash a fresh GUID with md5, split it in 4 blocks, and each %% time we need a new guid we rotate them producing a new hash @@ -129,6 +130,9 @@ gen() -> %% serial store hasn't been deleted. %% %% If you are not concerned with predictability, gen/0 is faster. + +-spec gen_secure() -> guid(). + gen_secure() -> %% Here instead of hashing once we hash the GUID and the counter %% each time, so that the GUID is not predictable. @@ -143,9 +147,14 @@ gen_secure() -> %% %% employs base64url encoding, which is safer in more contexts than %% plain base64. + +-spec string(guid(), any()) -> string(). + string(G, Prefix) -> Prefix ++ "-" ++ rabbit_misc:base64url(G). +-spec binary(guid(), any()) -> binary(). + binary(G, Prefix) -> list_to_binary(string(G, Prefix)). diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl index 54508bfdb0..97d8d44dc3 100644 --- a/src/rabbit_health_check.erl +++ b/src/rabbit_health_check.erl @@ -21,19 +21,20 @@ %% Internal API -export([local/0]). --spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}. --spec local() -> ok | {error_string, string()}. - %%---------------------------------------------------------------------------- %% External functions %%---------------------------------------------------------------------------- +-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}. + node(Node) -> %% same default as in CLI node(Node, 70000). node(Node, Timeout) -> rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout). +-spec local() -> ok | {error_string, string()}. + local() -> run_checks([list_channels, list_queues, alarms, rabbit_node_monitor]). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b5f09a525b..9770172089 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -147,36 +147,6 @@ -type credit_mode() :: 'manual' | 'drain' | 'auto'. --spec start_link(rabbit_types:proc_name()) -> - rabbit_types:ok_pid_or_error(). --spec new(pid()) -> lstate(). - --spec limit_prefetch(lstate(), non_neg_integer(), non_neg_integer()) -> - lstate(). --spec unlimit_prefetch(lstate()) -> lstate(). --spec is_active(lstate()) -> boolean(). --spec get_prefetch_limit(lstate()) -> non_neg_integer(). --spec ack(lstate(), non_neg_integer()) -> 'ok'. --spec pid(lstate()) -> pid(). - --spec client(pid()) -> qstate(). --spec activate(qstate()) -> qstate(). --spec can_send(qstate(), boolean(), rabbit_types:ctag()) -> - {'continue' | 'suspend', qstate()}. --spec resume(qstate()) -> qstate(). --spec deactivate(qstate()) -> qstate(). --spec is_suspended(qstate()) -> boolean(). --spec is_consumer_blocked(qstate(), rabbit_types:ctag()) -> boolean(). --spec credit - (qstate(), rabbit_types:ctag(), non_neg_integer(), credit_mode(), - boolean()) -> - {boolean(), qstate()}. --spec ack_from_queue(qstate(), rabbit_types:ctag(), non_neg_integer()) -> - {boolean(), qstate()}. --spec drained(qstate()) -> - {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}. --spec forget_consumer(qstate(), rabbit_types:ctag()) -> qstate(). - %%---------------------------------------------------------------------------- -record(lim, {prefetch_count = 0, @@ -194,41 +164,66 @@ %% API %%---------------------------------------------------------------------------- +-spec start_link(rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error(). + start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []). +-spec new(pid()) -> lstate(). + new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. ok = gen_server:call(Pid, {new, self()}, infinity), #lstate{pid = Pid, prefetch_limited = false}. +-spec limit_prefetch(lstate(), non_neg_integer(), non_neg_integer()) -> + lstate(). + limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> ok = gen_server:call( L#lstate.pid, {limit_prefetch, PrefetchCount, UnackedCount}, infinity), L#lstate{prefetch_limited = true}. +-spec unlimit_prefetch(lstate()) -> lstate(). + unlimit_prefetch(L) -> ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity), L#lstate{prefetch_limited = false}. +-spec is_active(lstate()) -> boolean(). + is_active(#lstate{prefetch_limited = Limited}) -> Limited. +-spec get_prefetch_limit(lstate()) -> non_neg_integer(). + get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit, infinity). +-spec ack(lstate(), non_neg_integer()) -> 'ok'. + ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). +-spec pid(lstate()) -> pid(). + pid(#lstate{pid = Pid}) -> Pid. +-spec client(pid()) -> qstate(). + client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. +-spec activate(qstate()) -> qstate(). + activate(L = #qstate{state = dormant}) -> ok = gen_server:cast(L#qstate.pid, {register, self()}), L#qstate{state = active}; activate(L) -> L. +-spec can_send(qstate(), boolean(), rabbit_types:ctag()) -> + {'continue' | 'suspend', qstate()}. + can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, AckRequired, CTag) -> case is_consumer_blocked(L, CTag) of @@ -246,18 +241,26 @@ safe_call(Pid, Msg, ExitValue) -> fun () -> ExitValue end, fun () -> gen_server2:call(Pid, Msg, infinity) end). +-spec resume(qstate()) -> qstate(). + resume(L = #qstate{state = suspended}) -> L#qstate{state = active}; resume(L) -> L. +-spec deactivate(qstate()) -> qstate(). + deactivate(L = #qstate{state = dormant}) -> L; deactivate(L) -> ok = gen_server:cast(L#qstate.pid, {unregister, self()}), L#qstate{state = dormant}. +-spec is_suspended(qstate()) -> boolean(). + is_suspended(#qstate{state = suspended}) -> true; is_suspended(#qstate{}) -> false. +-spec is_consumer_blocked(qstate(), rabbit_types:ctag()) -> boolean(). + is_consumer_blocked(#qstate{credits = Credits}, CTag) -> case gb_trees:lookup(CTag, Credits) of none -> false; @@ -265,6 +268,11 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. +-spec credit + (qstate(), rabbit_types:ctag(), non_neg_integer(), credit_mode(), + boolean()) -> + {boolean(), qstate()}. + credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> {Res, Cr} = case IsEmpty andalso Mode =:= drain of @@ -273,6 +281,9 @@ credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> end, {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. +-spec ack_from_queue(qstate(), rabbit_types:ctag(), non_neg_integer()) -> + {boolean(), qstate()}. + ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> {Credits1, Unblocked} = case gb_trees:lookup(CTag, Credits) of @@ -284,6 +295,9 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> end, {Unblocked, Limiter#qstate{credits = Credits1}}. +-spec drained(qstate()) -> + {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}. + drained(Limiter = #qstate{credits = Credits}) -> Drain = fun(C) -> C#credit{credit = 0, mode = manual} end, {CTagCredits, Credits2} = @@ -295,6 +309,8 @@ drained(Limiter = #qstate{credits = Credits}) -> end, {[], Credits}, Credits), {CTagCredits, Limiter#qstate{credits = Credits2}}. +-spec forget_consumer(qstate(), rabbit_types:ctag()) -> qstate(). + forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 62cf87fb11..0f5c9fbe3c 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -54,31 +54,33 @@ -define(EPSILON, 0.000001). %% less than this and we clamp to 0 %%---------------------------------------------------------------------------- - --spec start_link() -> rabbit_types:ok_pid_or_error(). --spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'. --spec deregister(pid()) -> 'ok'. --spec report_ram_duration - (pid(), float() | 'infinity') -> number() | 'infinity'. --spec stop() -> 'ok'. - -%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'. + register(Pid, MFA = {_M, _F, _A}) -> gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). +-spec deregister(pid()) -> 'ok'. + deregister(Pid) -> gen_server2:cast(?SERVER, {deregister, Pid}). +-spec report_ram_duration + (pid(), float() | 'infinity') -> number() | 'infinity'. + report_ram_duration(Pid, QueueDuration) -> gen_server2:call(?SERVER, {report_ram_duration, Pid, QueueDuration}, infinity). +-spec stop() -> 'ok'. + stop() -> gen_server2:cast(?SERVER, stop). diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl index 2a0a967e86..2d5f9c34e2 100644 --- a/src/rabbit_metrics.erl +++ b/src/rabbit_metrics.erl @@ -25,11 +25,12 @@ -define(SERVER, ?MODULE). --spec start_link() -> rabbit_types:ok_pid_or_error(). - %%---------------------------------------------------------------------------- %% Starts the raw metrics storage and owns the ETS tables. %%---------------------------------------------------------------------------- + +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 9e5bddb28a..96474b0d4e 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -37,14 +37,6 @@ depth_fun }). --spec start_link - (amqqueue:amqqueue(), pid() | 'undefined', - rabbit_mirror_queue_master:death_fun(), - rabbit_mirror_queue_master:depth_fun()) -> - rabbit_types:ok_pid_or_error(). --spec get_gm(pid()) -> pid(). --spec ensure_monitoring(pid(), [pid()]) -> 'ok'. - %%---------------------------------------------------------------------------- %% %% Mirror Queues @@ -307,12 +299,22 @@ %% %%---------------------------------------------------------------------------- +-spec start_link + (amqqueue:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:depth_fun()) -> + rabbit_types:ok_pid_or_error(). + start_link(Queue, GM, DeathFun, DepthFun) -> gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []). +-spec get_gm(pid()) -> pid(). + get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). +-spec ensure_monitoring(pid(), [pid()]) -> 'ok'. + ensure_monitoring(CPid, Pids) -> gen_server2:cast(CPid, {ensure_monitoring, Pids}). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c76bc521c2..6ab9a875c2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -62,18 +62,6 @@ confirmed :: [rabbit_guid:guid()], known_senders :: sets:set() }. --spec promote_backing_queue_state - (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], - map(), [pid()]) -> - master_state(). - --spec sender_death_fun() -> death_fun(). --spec depth_fun() -> depth_fun(). --spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) -> - master_state(). --spec stop_mirroring(master_state()) -> {atom(), any()}. --spec sync_mirrors(stats_fun(), stats_fun(), master_state()) -> - {'ok', master_state()} | {stop, any(), master_state()}. %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator @@ -101,6 +89,9 @@ init(Q, Recover, AsyncCallback) -> ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State. +-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) -> + master_state(). + init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) -> QName = amqqueue:get_name(Q0), case rabbit_mirror_queue_coordinator:start_link( @@ -146,6 +137,8 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) -> throw({coordinator_not_started, Reason}) end. +-spec stop_mirroring(master_state()) -> {atom(), any()}. + stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -153,6 +146,9 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. +-spec sync_mirrors(stats_fun(), stats_fun(), master_state()) -> + {'ok', master_state()} | {stop, any(), master_state()}. + sync_mirrors(HandleInfo, EmitStats, State = #state { name = QName, gm = GM, @@ -506,6 +502,11 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, %% Other exported functions %% --------------------------------------------------------------------------- +-spec promote_backing_queue_state + (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], + map(), [pid()]) -> + master_state(). + promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), Len = BQ:len(BQS1), @@ -523,6 +524,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> known_senders = sets:from_list(KS), wait_timeout = WaitTimeout }. +-spec sender_death_fun() -> death_fun(). + sender_death_fun() -> Self = self(), fun (DeadPid) -> @@ -535,6 +538,8 @@ sender_death_fun() -> end) end. +-spec depth_fun() -> depth_fun(). + depth_fun() -> Self = self(), fun () -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 6260c4319c..88aef8fcdc 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -57,29 +57,12 @@ %%---------------------------------------------------------------------------- +%% Returns {ok, NewMPid, DeadPids, ExtraNodes} + -spec remove_from_queue (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}. --spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> - 'ok'. --spec store_updated_slaves(amqqueue:amqqueue()) -> - amqqueue:amqqueue(). --spec initial_queue_node(amqqueue:amqqueue(), node()) -> node(). --spec suggested_queue_nodes(amqqueue:amqqueue()) -> - {node(), [node()]}. --spec is_mirrored(amqqueue:amqqueue()) -> boolean(). --spec update_mirrors - (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. --spec update_mirrors - (amqqueue:amqqueue()) -> 'ok'. --spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'. --spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'. --spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. --spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. - -%%---------------------------------------------------------------------------- -%% Returns {ok, NewMPid, DeadPids, ExtraNodes} remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -241,6 +224,9 @@ drop_mirror(QName, MirrorNode) -> E end. +-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> + 'ok'. + add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. @@ -282,13 +268,21 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> rabbit_misc:pid_to_string(MirrorPid), [[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]). +-spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. + log_info (QName, Fmt, Args) -> rabbit_log_mirroring:info("Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). + +-spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. + log_warning(QName, Fmt, Args) -> rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). +-spec store_updated_slaves(amqqueue:amqqueue()) -> + amqqueue:amqqueue(). + store_updated_slaves(Q0) when ?is_amqqueue(Q0) -> SPids = amqqueue:get_slave_pids(Q0), SSPids = amqqueue:get_sync_slave_pids(Q0), @@ -373,10 +367,15 @@ promote_slave([SPid | SPids]) -> %% the one to promote is the oldest. {SPid, SPids}. +-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node(). + initial_queue_node(Q, DefNode) -> {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()), MNode. +-spec suggested_queue_nodes(amqqueue:amqqueue()) -> + {node(), [node()]}. + suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()). suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). @@ -429,6 +428,8 @@ validate_mode(Mode) -> {error, "~p is not a valid ha-mode value", [Mode]} end. +-spec is_mirrored(amqqueue:amqqueue()) -> boolean(). + is_mirrored(Q) -> case module(Q) of {ok, _} -> true; @@ -451,6 +452,8 @@ actual_queue_nodes(Q) when ?is_amqqueue(Q) -> _ -> node(MPid) end, Nodes(SPids), Nodes(SSPids)}. +-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'. + maybe_auto_sync(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), case policy(<<"ha-sync-mode">>, Q) of @@ -496,6 +499,9 @@ default_batch_size() -> rabbit_misc:get_env(rabbit, mirroring_sync_batch_size, ?DEFAULT_BATCH_SIZE). +-spec update_mirrors + (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. + update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) -> % Note: we do want to ensure both queues have same pid QPid = amqqueue:get_pid(OldQ), @@ -505,6 +511,9 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) -> _ -> rabbit_amqqueue:update_mirroring(QPid) end. +-spec update_mirrors + (amqqueue:amqqueue()) -> 'ok'. + update_mirrors(Q) when ?is_amqqueue(Q) -> QName = amqqueue:get_name(Q), {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), @@ -532,6 +541,9 @@ update_mirrors(Q) when ?is_amqqueue(Q) -> %% We don't just call update_mirrors/2 here since that could decide to %% start a slave for some other reason, and since we are the slave ATM %% that allows complicated deadlocks. + +-spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'. + maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) -> QName = amqqueue:get_name(Q), MPid = amqqueue:get_pid(Q), diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index d206650d0f..2d7ce7edb2 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -59,8 +59,19 @@ -type slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), bqs()}. +%% --------------------------------------------------------------------------- +%% Master + -spec master_prepare(reference(), rabbit_amqqueue:name(), log_fun(), [pid()]) -> pid(). + +master_prepare(Ref, QName, Log, SPids) -> + MPid = self(), + spawn_link(fun () -> + ?store_proc_name(QName), + syncer(Ref, Log, MPid, SPids) + end). + -spec master_go(pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), @@ -69,21 +80,6 @@ {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | {'sync_died', any(), bqs()}. --spec slave(non_neg_integer(), reference(), timer:tref(), pid(), - bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> - 'denied' | - {'ok' | 'failed', slave_sync_state()} | - {'stop', any(), slave_sync_state()}. - -%% --------------------------------------------------------------------------- -%% Master - -master_prepare(Ref, QName, Log, SPids) -> - MPid = self(), - spawn_link(fun () -> - ?store_proc_name(QName), - syncer(Ref, Log, MPid, SPids) - end). master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, @@ -321,6 +317,12 @@ wait_for_resources(Ref, SPids) -> %% --------------------------------------------------------------------------- %% Slave +-spec slave(non_neg_integer(), reference(), timer:tref(), pid(), + bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> + 'denied' | + {'ok' | 'failed', slave_sync_state()} | + {'stop', any(), slave_sync_state()}. + slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> Syncer ! {sync_deny, Ref, self()}, denied; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3d561aea5e..ab29faf368 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -16,7 +16,8 @@ -module(rabbit_mnesia). --export([init/0, +-export([%% Main interface + init/0, join_cluster/2, reset/0, force_reset/0, @@ -25,6 +26,7 @@ forget_cluster_node/2, force_load_next_boot/0, + %% Various queries to get the status of the db status/0, is_clustered/0, on_running_node/1, @@ -35,11 +37,13 @@ dir/0, cluster_status_from_mnesia/0, + %% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' init_db_unchecked/2, copy_db/1, check_cluster_consistency/0, ensure_mnesia_dir/0, + %% Hooks used in `rabbit_node_monitor' on_node_up/1, on_node_down/1 ]). @@ -61,45 +65,12 @@ -type node_type() :: disc | ram. -type cluster_status() :: {[node()], [node()], [node()]}. -%% Main interface --spec init() -> 'ok'. --spec join_cluster(node(), node_type()) - -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}. --spec reset() -> 'ok'. --spec force_reset() -> 'ok'. --spec update_cluster_nodes(node()) -> 'ok'. --spec change_cluster_node_type(node_type()) -> 'ok'. --spec forget_cluster_node(node(), boolean()) -> 'ok'. --spec force_load_next_boot() -> 'ok'. - -%% Various queries to get the status of the db --spec status() -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]} | - {'partitions', [{node(), [node()]}]}]. --spec is_clustered() -> boolean(). --spec on_running_node(pid()) -> boolean(). --spec is_process_alive(pid() | {atom(), node()}) -> boolean(). --spec is_registered_process_alive(atom()) -> boolean(). --spec cluster_nodes('all' | 'disc' | 'ram' | 'running') -> [node()]. --spec node_type() -> node_type(). --spec dir() -> file:filename(). --spec cluster_status_from_mnesia() -> rabbit_types:ok_or_error2( - cluster_status(), any()). - -%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' --spec init_db_unchecked([node()], node_type()) -> 'ok'. --spec copy_db(file:filename()) -> rabbit_types:ok_or_error(any()). --spec check_cluster_consistency() -> 'ok'. --spec ensure_mnesia_dir() -> 'ok'. - -%% Hooks used in `rabbit_node_monitor' --spec on_node_up(node()) -> 'ok'. --spec on_node_down(node()) -> 'ok'. - %%---------------------------------------------------------------------------- %% Main interface %%---------------------------------------------------------------------------- +-spec init() -> 'ok'. + init() -> ensure_mnesia_running(), ensure_mnesia_dir(), @@ -228,6 +199,10 @@ join_discovered_peers(TryNodes, NodeType) -> %% Note that we make no attempt to verify that the nodes provided are %% all in the same cluster, we simply pick the first online node and %% we cluster to its cluster. + +-spec join_cluster(node(), node_type()) + -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}. + join_cluster(DiscoveryNode, NodeType) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), @@ -275,11 +250,16 @@ join_cluster(DiscoveryNode, NodeType) -> %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no %% persisted messages + +-spec reset() -> 'ok'. + reset() -> ensure_mnesia_not_running(), rabbit_log:info("Resetting Rabbit~n", []), reset_gracefully(). +-spec force_reset() -> 'ok'. + force_reset() -> ensure_mnesia_not_running(), rabbit_log:info("Resetting Rabbit forcefully~n", []), @@ -310,6 +290,8 @@ wipe() -> ok = rabbit_node_monitor:reset_cluster_status(), ok. +-spec change_cluster_node_type(node_type()) -> 'ok'. + change_cluster_node_type(Type) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), @@ -327,6 +309,8 @@ change_cluster_node_type(Type) -> ok = reset(), ok = join_cluster(Node, Type). +-spec update_cluster_nodes(node()) -> 'ok'. + update_cluster_nodes(DiscoveryNode) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), @@ -353,6 +337,9 @@ update_cluster_nodes(DiscoveryNode) -> %% * This node was, at the best of our knowledge (see comment below) %% the last or second to last after the node we're removing to go %% down + +-spec forget_cluster_node(node(), boolean()) -> 'ok'. + forget_cluster_node(Node, RemoveWhenOffline) -> forget_cluster_node(Node, RemoveWhenOffline, true). @@ -407,6 +394,10 @@ remove_node_offline_node(Node) -> %% Queries %%---------------------------------------------------------------------------- +-spec status() -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]} | + {'partitions', [{node(), [node()]}]}]. + status() -> IfNonEmpty = fun (_, []) -> []; (Type, Nodes) -> [{Type, Nodes}] @@ -427,15 +418,22 @@ mnesia_partitions(Nodes) -> is_running() -> mnesia:system_info(is_running) =:= yes. +-spec is_clustered() -> boolean(). + is_clustered() -> AllNodes = cluster_nodes(all), AllNodes =/= [] andalso AllNodes =/= [node()]. +-spec on_running_node(pid()) -> boolean(). + on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)). %% This requires the process be in the same running cluster as us %% (i.e. not partitioned or some random node). %% %% See also rabbit_misc:is_process_alive/1 which does not. + +-spec is_process_alive(pid() | {atom(), node()}) -> boolean(). + is_process_alive(Pid) when is_pid(Pid) -> on_running_node(Pid) andalso rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true; @@ -443,14 +441,22 @@ is_process_alive({Name, Node}) -> lists:member(Node, cluster_nodes(running)) andalso rpc:call(Node, rabbit_mnesia, is_registered_process_alive, [Name]) =:= true. +-spec is_registered_process_alive(atom()) -> boolean(). + is_registered_process_alive(Name) -> is_pid(whereis(Name)). +-spec cluster_nodes('all' | 'disc' | 'ram' | 'running') -> [node()]. + cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% This function is the actual source of information, since it gets %% the data from mnesia. Obviously it'll work only when mnesia is %% running. + +-spec cluster_status_from_mnesia() -> rabbit_types:ok_or_error2( + cluster_status(), any()). + cluster_status_from_mnesia() -> case is_running() of false -> @@ -505,6 +511,8 @@ node_info() -> mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. +-spec node_type() -> node_type(). + node_type() -> {_AllNodes, DiscNodes, _RunningNodes} = rabbit_node_monitor:read_cluster_status(), @@ -513,6 +521,8 @@ node_type() -> false -> ram end. +-spec dir() -> file:filename(). + dir() -> mnesia:system_info(directory). %%---------------------------------------------------------------------------- @@ -552,6 +562,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> rabbit_node_monitor:update_cluster_status(), ok. +-spec init_db_unchecked([node()], node_type()) -> 'ok'. + init_db_unchecked(ClusterNodes, NodeType) -> init_db(ClusterNodes, NodeType, false). @@ -582,6 +594,8 @@ init_db_with_mnesia(ClusterNodes, NodeType, stop_mnesia() end. +-spec ensure_mnesia_dir() -> 'ok'. + ensure_mnesia_dir() -> MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of @@ -629,6 +643,8 @@ ensure_schema_integrity() -> throw({error, {schema_integrity_check_failed, Reason}}) end. +-spec copy_db(file:filename()) -> rabbit_types:ok_or_error(any()). + copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_file:recursive_copy(dir(), Destination). @@ -636,6 +652,8 @@ copy_db(Destination) -> force_load_filename() -> filename:join(dir(), "force_load"). +-spec force_load_next_boot() -> 'ok'. + force_load_next_boot() -> rabbit_file:write_file(force_load_filename(), <<"">>). @@ -648,6 +666,9 @@ maybe_force_load() -> %% This does not guarantee us much, but it avoids some situations that %% will definitely end up badly + +-spec check_cluster_consistency() -> 'ok'. + check_cluster_consistency() -> %% We want to find 0 or 1 consistent nodes. case lists:foldl( @@ -717,12 +738,16 @@ remote_node_info(Node) -> %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- +-spec on_node_up(node()) -> 'ok'. + on_node_up(Node) -> case running_disc_nodes() of [Node] -> rabbit_log:info("cluster contains disc nodes again~n"); _ -> ok end. +-spec on_node_down(node()) -> 'ok'. + on_node_down(_Node) -> case running_disc_nodes() of [] -> rabbit_log:info("only running disc node went down~n"); diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index c6648ac802..76e120f292 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -44,9 +44,6 @@ %%---------------------------------------------------------------------------- -spec rename(node(), [{node(), node()}]) -> 'ok'. --spec maybe_finish([node()]) -> 'ok'. - -%%---------------------------------------------------------------------------- rename(Node, NodeMapList) -> try @@ -139,6 +136,8 @@ restore_backup(Backup) -> stop_mnesia(), rabbit_mnesia:force_load_next_boot(). +-spec maybe_finish([node()]) -> 'ok'. + maybe_finish(AllNodes) -> case rabbit_file:read_term_file(rename_config_name()) of {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes); diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index f2496cd71f..f3ec7b78cc 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -41,15 +41,10 @@ fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) -> A). +%%---------------------------------------------------------------------------- + -spec append(io_device(), rabbit_types:msg_id(), msg()) -> rabbit_types:ok_or_error2(msg_size(), any()). --spec read(io_device(), msg_size()) -> - rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()}, - any()). --spec scan(io_device(), file_size(), message_accumulator(A), A) -> - {'ok', A, position()}. - -%%---------------------------------------------------------------------------- append(FileHdl, MsgId, MsgBody) when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES -> @@ -65,6 +60,10 @@ append(FileHdl, MsgId, MsgBody) KO -> KO end. +-spec read(io_device(), msg_size()) -> + rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()}, + any()). + read(FileHdl, TotalSize) -> Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, @@ -77,6 +76,9 @@ read(FileHdl, TotalSize) -> KO -> KO end. +-spec scan(io_device(), file_size(), message_accumulator(A), A) -> + {'ok', A, position()}. + scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 -> scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bae8364614..337064ad39 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -173,33 +173,6 @@ -type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok'). -type deletion_thunk() :: fun (() -> boolean()). --spec start_link - (atom(), file:filename(), [binary()] | 'undefined', - {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error(). --spec successfully_recovered_state(server()) -> boolean(). --spec client_init(server(), client_ref(), maybe_msg_id_fun(), - maybe_close_fds_fun()) -> client_msstate(). --spec client_terminate(client_msstate()) -> 'ok'. --spec client_delete_and_terminate(client_msstate()) -> 'ok'. --spec client_ref(client_msstate()) -> client_ref(). --spec close_all_indicated - (client_msstate()) -> rabbit_types:ok(client_msstate()). --spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. --spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. --spec read(rabbit_types:msg_id(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}. --spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean(). --spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'. - --spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'. --spec has_readers(non_neg_integer(), gc_state()) -> boolean(). --spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - deletion_thunk(). --spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk(). --spec force_recovery(file:filename(), server()) -> 'ok'. --spec transform_dir(file:filename(), server(), - fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'. - %%---------------------------------------------------------------------------- %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION @@ -472,6 +445,10 @@ %% public API %%---------------------------------------------------------------------------- +-spec start_link + (atom(), file:filename(), [binary()] | 'undefined', + {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error(). + start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> gen_server2:start_link(?MODULE, [Type, Dir, ClientRefs, StartupFunState], @@ -482,9 +459,14 @@ start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Typ [Type, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). +-spec successfully_recovered_state(server()) -> boolean(). + successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). +-spec client_init(server(), client_ref(), maybe_msg_id_fun(), + maybe_close_fds_fun()) -> client_msstate(). + client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = @@ -506,17 +488,25 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom flying_ets = FlyingEts, credit_disc_bound = CreditDiscBound }. +-spec client_terminate(client_msstate()) -> 'ok'. + client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), ok = server_call(CState, {client_terminate, Ref}). +-spec client_delete_and_terminate(client_msstate()) -> 'ok'. + client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). +-spec client_ref(client_msstate()) -> client_ref(). + client_ref(#client_msstate { client_ref = Ref }) -> Ref. +-spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. + write_flow(MsgId, Msg, CState = #client_msstate { server = Server, @@ -528,8 +518,13 @@ write_flow(MsgId, Msg, credit_flow:send(Server, CreditDiscBound), client_write(MsgId, Msg, flow, CState). +-spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. + write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). +-spec read(rabbit_types:msg_id(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}. + read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> file_handle_cache_stats:update(msg_store_read), @@ -545,12 +540,19 @@ read(MsgId, {{ok, Msg}, CState} end. +-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean(). + contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). + +-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'. + remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). +-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'. + set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -1447,6 +1449,9 @@ safe_file_delete(File, Dir, FileHandlesEts) -> true end. +-spec close_all_indicated + (client_msstate()) -> rabbit_types:ok(client_msstate()). + close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, client_ref = Ref } = CState) -> @@ -1965,11 +1970,16 @@ cleanup_after_file_deletion(File, %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- +-spec has_readers(non_neg_integer(), gc_state()) -> boolean(). + has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> [#file_summary { locked = true, readers = Count }] = ets:lookup(FileSummaryEts, File), Count /= 0. +-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> + deletion_thunk(). + combine_files(Source, Destination, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, @@ -2046,6 +2056,8 @@ combine_files(Source, Destination, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). +-spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk(). + delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, dir = Dir, @@ -2127,6 +2139,8 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {destination, Destination}]} end. +-spec force_recovery(file:filename(), server()) -> 'ok'. + force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of @@ -2142,6 +2156,9 @@ foreach_file(D, Fun, Files) -> foreach_file(D1, D2, Fun, Files) -> [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. +-spec transform_dir(file:filename(), server(), + fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'. + transform_dir(BaseDir, Store, TransformFun) -> Dir = filename:join(BaseDir, atom_to_list(Store)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index d11d110abf..4b8d95b535 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -37,31 +37,34 @@ -spec start_link(rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error(). --spec combine(pid(), rabbit_msg_store:file_num(), - rabbit_msg_store:file_num()) -> 'ok'. --spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'. --spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'. --spec stop(pid()) -> 'ok'. --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. - -%%---------------------------------------------------------------------------- start_link(MsgStoreState) -> gen_server2:start_link(?MODULE, [MsgStoreState], [{timeout, infinity}]). +-spec combine(pid(), rabbit_msg_store:file_num(), + rabbit_msg_store:file_num()) -> 'ok'. + combine(Server, Source, Destination) -> gen_server2:cast(Server, {combine, Source, Destination}). +-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'. + delete(Server, File) -> gen_server2:cast(Server, {delete, File}). +-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'. + no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). +-spec stop(pid()) -> 'ok'. + stop(Server) -> gen_server2:call(Server, stop, infinity). +-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. + set_maximum_since_use(Pid, Age) -> gen_server2:cast(Pid, {set_maximum_since_use, Age}). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 79de7ce180..809b999b98 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -44,6 +44,8 @@ -export([tcp_listener_started/4, tcp_listener_stopped/4]). +-deprecated([{force_connection_event_refresh, 1, eventually}]). + %% Internal -export([connections_local/0]). @@ -68,53 +70,7 @@ -type protocol() :: atom(). -type label() :: string(). --spec start_tcp_listener( - listener_config(), integer()) -> 'ok' | {'error', term()}. --spec start_ssl_listener( - listener_config(), rabbit_types:infos(), integer()) -> 'ok' | {'error', term()}. --spec stop_tcp_listener(listener_config()) -> 'ok'. --spec active_listeners() -> [rabbit_types:listener()]. --spec node_listeners(node()) -> [rabbit_types:listener()]. --spec register_connection(pid()) -> ok. --spec unregister_connection(pid()) -> ok. --spec connections() -> [rabbit_types:connection()]. --spec connections_local() -> [rabbit_types:connection()]. --spec connection_info_keys() -> rabbit_types:info_keys(). --spec connection_info(rabbit_types:connection()) -> rabbit_types:infos(). --spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) -> - rabbit_types:infos(). --spec connection_info_all() -> [rabbit_types:infos()]. --spec connection_info_all(rabbit_types:info_keys()) -> - [rabbit_types:infos()]. --spec close_connection(pid(), string()) -> 'ok'. --deprecated([{force_connection_event_refresh, 1, eventually}]). --spec force_connection_event_refresh(reference()) -> 'ok'. - --spec on_node_down(node()) -> 'ok'. --spec tcp_listener_addresses(listener_config()) -> [address()]. --spec tcp_listener_spec - (name_prefix(), address(), [gen_tcp:listen_option()], module(), module(), - protocol(), any(), non_neg_integer(), label()) -> - supervisor:child_spec(). --spec ensure_ssl() -> rabbit_types:infos(). --spec poodle_check(atom()) -> 'ok' | 'danger'. - -spec boot() -> 'ok'. --spec tcp_listener_started - (_, _, - string() | - {byte(),byte(),byte(),byte()} | - {char(),char(),char(),char(),char(),char(),char(),char()}, _) -> - 'ok'. --spec tcp_listener_stopped - (_, _, - string() | - {byte(),byte(),byte(),byte()} | - {char(),char(),char(),char(),char(),char(),char(),char()}, - _) -> - 'ok'. - -%%---------------------------------------------------------------------------- boot() -> ok = record_distribution_listener(), @@ -159,12 +115,16 @@ boot_tls(NumAcceptors) -> ok end. +-spec ensure_ssl() -> rabbit_types:infos(). + ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), {ok, SslOptsConfig0} = application:get_env(rabbit, ssl_options), rabbit_ssl_options:fix(SslOptsConfig0). +-spec poodle_check(atom()) -> 'ok' | 'danger'. + poodle_check(Context) -> {ok, Vsn} = application:get_key(ssl, vsn), case rabbit_misc:version_compare(Vsn, "5.3", gte) of %% R16B01 @@ -193,6 +153,8 @@ log_poodle_fail(Context) -> fix_ssl_options(Config) -> rabbit_ssl_options:fix(Config). +-spec tcp_listener_addresses(listener_config()) -> [address()]. + tcp_listener_addresses(Port) when is_integer(Port) -> tcp_listener_addresses_auto(Port); tcp_listener_addresses({"auto", Port}) -> @@ -213,6 +175,11 @@ tcp_listener_addresses_auto(Port) -> lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +-spec tcp_listener_spec + (name_prefix(), address(), [gen_tcp:listen_option()], module(), module(), + protocol(), any(), non_neg_integer(), label()) -> + supervisor:child_spec(). + tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) -> {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), @@ -223,9 +190,15 @@ tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, NumAcceptors, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}. +-spec start_tcp_listener( + listener_config(), integer()) -> 'ok' | {'error', term()}. + start_tcp_listener(Listener, NumAcceptors) -> start_listener(Listener, NumAcceptors, amqp, "TCP listener", tcp_opts()). +-spec start_ssl_listener( + listener_config(), rabbit_types:infos(), integer()) -> 'ok' | {'error', term()}. + start_ssl_listener(Listener, SslOpts, NumAcceptors) -> start_listener(Listener, NumAcceptors, 'amqp/ssl', "TLS (SSL) listener", tcp_opts() ++ SslOpts). @@ -262,6 +235,7 @@ transport(Protocol) -> 'amqp/ssl' -> ranch_ssl end. +-spec stop_tcp_listener(listener_config()) -> 'ok'. stop_tcp_listener(Listener) -> [stop_tcp_listener0(Address) || @@ -273,6 +247,13 @@ stop_tcp_listener0({IPAddress, Port, _Family}) -> ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). +-spec tcp_listener_started + (_, _, + string() | + {byte(),byte(),byte(),byte()} | + {char(),char(),char(),char(),char(),char(),char(),char()}, _) -> + 'ok'. + tcp_listener_started(Protocol, Opts, IPAddress, Port) -> %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 %% We need the host so we can distinguish multiple instances of the above @@ -286,6 +267,14 @@ tcp_listener_started(Protocol, Opts, IPAddress, Port) -> port = Port, opts = Opts}). +-spec tcp_listener_stopped + (_, _, + string() | + {byte(),byte(),byte(),byte()} | + {char(),char(),char(),char(),char(),char(),char(),char()}, + _) -> + 'ok'. + tcp_listener_stopped(Protocol, Opts, IPAddress, Port) -> ok = mnesia:dirty_delete_object( rabbit_listener, @@ -301,12 +290,18 @@ record_distribution_listener() -> {port, Port, _Version} = erl_epmd:port_please(Name, Host), tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port). +-spec active_listeners() -> [rabbit_types:listener()]. + active_listeners() -> rabbit_misc:dirty_read_all(rabbit_listener). +-spec node_listeners(node()) -> [rabbit_types:listener()]. + node_listeners(Node) -> mnesia:dirty_read(rabbit_listener, Node). +-spec on_node_down(node()) -> 'ok'. + on_node_down(Node) -> case lists:member(Node, nodes()) of false -> @@ -318,22 +313,44 @@ on_node_down(Node) -> "Keeping ~s listeners: the node is already back~n", [Node]) end. +-spec register_connection(pid()) -> ok. + register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). +-spec unregister_connection(pid()) -> ok. + unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). +-spec connections() -> [rabbit_types:connection()]. + connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). +-spec connections_local() -> [rabbit_types:connection()]. + connections_local() -> pg_local:get_members(rabbit_connections). +-spec connection_info_keys() -> rabbit_types:info_keys(). + connection_info_keys() -> rabbit_reader:info_keys(). +-spec connection_info(rabbit_types:connection()) -> rabbit_types:infos(). + connection_info(Pid) -> rabbit_reader:info(Pid). + +-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) -> + rabbit_types:infos(). + connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). +-spec connection_info_all() -> [rabbit_types:infos()]. + connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). + +-spec connection_info_all(rabbit_types:info_keys()) -> + [rabbit_types:infos()]. + connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> @@ -346,6 +363,8 @@ emit_connection_info_local(Items, Ref, AggregatorPid) -> AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end, connections_local()). +-spec close_connection(pid(), string()) -> 'ok'. + close_connection(Pid, Explanation) -> case lists:member(Pid, connections()) of true -> @@ -359,6 +378,8 @@ close_connection(Pid, Explanation) -> ok end. +-spec force_connection_event_refresh(reference()) -> 'ok'. + force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 4a5dea6073..2d3b042fa4 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -50,37 +50,11 @@ keepalive_timer, autoheal, guid, node_guids}). %%---------------------------------------------------------------------------- - --spec start_link() -> rabbit_types:ok_pid_or_error(). - --spec running_nodes_filename() -> string(). --spec cluster_status_filename() -> string(). --spec prepare_cluster_status_files() -> 'ok'. --spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'. --spec read_cluster_status() -> rabbit_mnesia:cluster_status(). --spec update_cluster_status() -> 'ok'. --spec reset_cluster_status() -> 'ok'. - --spec notify_node_up() -> 'ok'. --spec notify_joined_cluster() -> 'ok'. --spec notify_left_cluster(node()) -> 'ok'. - --spec partitions() -> [node()]. --spec partitions([node()]) -> [{node(), [node()]}]. --spec status([node()]) -> {[{node(), [node()]}], [node()]}. --spec subscribe(pid()) -> 'ok'. --spec pause_partition_guard() -> 'ok' | 'pausing'. - --spec all_rabbit_nodes_up() -> boolean(). --spec run_outside_applications(fun (() -> any()), boolean()) -> pid(). --spec ping_all() -> 'ok'. --spec alive_nodes([node()]) -> [node()]. --spec alive_rabbit_nodes([node()]) -> [node()]. - -%%---------------------------------------------------------------------------- %% Start %%---------------------------------------------------------------------------- +-spec start_link() -> rabbit_types:ok_pid_or_error(). + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%---------------------------------------------------------------------------- @@ -97,15 +71,21 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% the information we have will be outdated, but it cannot be %% otherwise. +-spec running_nodes_filename() -> string(). + running_nodes_filename() -> filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). +-spec cluster_status_filename() -> string(). + cluster_status_filename() -> filename:join(rabbit_mnesia:dir(), "cluster_nodes.config"). quorum_filename() -> filename:join(rabbit_mnesia:dir(), "quorum"). +-spec prepare_cluster_status_files() -> 'ok'. + prepare_cluster_status_files() -> rabbit_mnesia:ensure_mnesia_dir(), Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end, @@ -133,6 +113,8 @@ prepare_cluster_status_files() -> AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). +-spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'. + write_cluster_status({All, Disc, Running}) -> ClusterStatusFN = cluster_status_filename(), Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of @@ -148,6 +130,8 @@ write_cluster_status({All, Disc, Running}) -> {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) end. +-spec read_cluster_status() -> rabbit_mnesia:cluster_status(). + read_cluster_status() -> case {try_read_file(cluster_status_filename()), try_read_file(running_nodes_filename())} of @@ -157,10 +141,14 @@ read_cluster_status() -> throw({error, {corrupt_or_missing_cluster_files, Stat, Run}}) end. +-spec update_cluster_status() -> 'ok'. + update_cluster_status() -> {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), write_cluster_status(Status). +-spec reset_cluster_status() -> 'ok'. + reset_cluster_status() -> write_cluster_status({[node()], [node()], [node()]}). @@ -168,15 +156,21 @@ reset_cluster_status() -> %% Cluster notifications %%---------------------------------------------------------------------------- +-spec notify_node_up() -> 'ok'. + notify_node_up() -> gen_server:cast(?SERVER, notify_node_up). +-spec notify_joined_cluster() -> 'ok'. + notify_joined_cluster() -> Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], gen_server:abcast(Nodes, ?SERVER, {joined_cluster, node(), rabbit_mnesia:node_type()}), ok. +-spec notify_left_cluster(node()) -> 'ok'. + notify_left_cluster(Node) -> Nodes = rabbit_mnesia:cluster_nodes(running), gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), @@ -186,16 +180,24 @@ notify_left_cluster(Node) -> %% Server calls %%---------------------------------------------------------------------------- +-spec partitions() -> [node()]. + partitions() -> gen_server:call(?SERVER, partitions, infinity). +-spec partitions([node()]) -> [{node(), [node()]}]. + partitions(Nodes) -> {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT), Replies. +-spec status([node()]) -> {[{node(), [node()]}], [node()]}. + status(Nodes) -> gen_server:multi_call(Nodes, ?SERVER, status, infinity). +-spec subscribe(pid()) -> 'ok'. + subscribe(Pid) -> gen_server:cast(?SERVER, {subscribe, Pid}). @@ -218,6 +220,8 @@ subscribe(Pid) -> %% So we have channels call in here before issuing confirms, to do a %% lightweight check that we have not entered a pausing state. +-spec pause_partition_guard() -> 'ok' | 'pausing'. + pause_partition_guard() -> case get(pause_partition_guard) of not_pause_mode -> @@ -888,19 +892,28 @@ all_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) =:= length(Nodes). +-spec all_rabbit_nodes_up() -> boolean(). + all_rabbit_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). +-spec alive_nodes([node()]) -> [node()]. + alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. +-spec alive_rabbit_nodes([node()]) -> [node()]. + alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. %% This one is allowed to connect! + +-spec ping_all() -> 'ok'. + ping_all() -> [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], ok. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 3a27ce8dc9..1e4abad8c9 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -31,28 +31,20 @@ %% Specs %%---------------------------------------------------------------------------- --spec names(string()) -> - rabbit_types:ok_or_error2([{string(), integer()}], term()). --spec diagnostics([node()]) -> string(). --spec cookie_hash() -> string(). --spec is_running(node(), atom()) -> boolean(). --spec is_process_running(node(), atom()) -> boolean(). --spec cluster_name() -> binary(). --spec set_cluster_name(binary(), rabbit_types:username()) -> 'ok'. --spec all_running() -> [node()]. --spec running_count() -> integer(). - -%%---------------------------------------------------------------------------- - name_type() -> case os:getenv("RABBITMQ_USE_LONGNAME") of "true" -> longnames; _ -> shortnames end. +-spec names(string()) -> + rabbit_types:ok_or_error2([{string(), integer()}], term()). + names(Hostname) -> rabbit_nodes_common:names(Hostname). +-spec diagnostics([node()]) -> string(). + diagnostics(Nodes) -> rabbit_nodes_common:diagnostics(Nodes). @@ -62,15 +54,23 @@ make(NodeStr) -> parts(NodeStr) -> rabbit_nodes_common:parts(NodeStr). +-spec cookie_hash() -> string(). + cookie_hash() -> rabbit_nodes_common:cookie_hash(). +-spec is_running(node(), atom()) -> boolean(). + is_running(Node, Application) -> rabbit_nodes_common:is_running(Node, Application). +-spec is_process_running(node(), atom()) -> boolean(). + is_process_running(Node, Process) -> rabbit_nodes_common:is_process_running(Node, Process). +-spec cluster_name() -> binary(). + cluster_name() -> rabbit_runtime_parameters:value_global( cluster_name, cluster_name_default()). @@ -80,6 +80,8 @@ cluster_name_default() -> FQDN = rabbit_net:hostname(), list_to_binary(atom_to_list(make({ID, FQDN}))). +-spec set_cluster_name(binary(), rabbit_types:username()) -> 'ok'. + set_cluster_name(Name, Username) -> %% Cluster name should be binary BinaryName = rabbit_data_coercion:to_binary(Name), @@ -88,8 +90,12 @@ set_cluster_name(Name, Username) -> ensure_epmd() -> rabbit_nodes_common:ensure_epmd(). +-spec all_running() -> [node()]. + all_running() -> rabbit_mnesia:cluster_nodes(running). +-spec running_count() -> integer(). + running_count() -> length(all_running()). -spec await_running_count(integer(), integer()) -> 'ok' | {'error', atom()}. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 69139b7b1a..6adce8a24e 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -31,20 +31,10 @@ -type plugin_name() :: atom(). --spec setup() -> [plugin_name()]. --spec active() -> [plugin_name()]. --spec list(string()) -> [#plugin{}]. --spec list(string(), boolean()) -> [#plugin{}]. --spec read_enabled(file:filename()) -> [plugin_name()]. --spec dependencies(boolean(), [plugin_name()], [#plugin{}]) -> - [plugin_name()]. --spec ensure(string()) -> {'ok', [atom()], [atom()]} | {error, any()}. --spec strictly_plugins([plugin_name()], [#plugin{}]) -> [plugin_name()]. --spec strictly_plugins([plugin_name()]) -> [plugin_name()]. --spec is_strictly_plugin(#plugin{}) -> boolean(). - %%---------------------------------------------------------------------------- +-spec ensure(string()) -> {'ok', [atom()], [atom()]} | {error, any()}. + ensure(FileJustChanged) -> case rabbit:is_running() of true -> ensure1(FileJustChanged); @@ -128,6 +118,9 @@ enabled_plugins() -> end. %% @doc Prepares the file system and installs all enabled plugins. + +-spec setup() -> [plugin_name()]. + setup() -> ExpandDir = plugins_expand_dir(), %% Eliminate the contents of the destination directory @@ -184,15 +177,23 @@ extract_schema(#plugin{type = dir, location = Location}, SchemaDir) -> %% @doc Lists the plugins which are currently running. + +-spec active() -> [plugin_name()]. + active() -> InstalledPlugins = plugin_names(list(plugins_dir())), [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. %% @doc Get the list of plugins which are ready to be enabled. + +-spec list(string()) -> [#plugin{}]. + list(PluginsPath) -> list(PluginsPath, false). +-spec list(string(), boolean()) -> [#plugin{}]. + list(PluginsPath, IncludeRequiredDeps) -> {AllPlugins, LoadingProblems} = discover_plugins(split_path(PluginsPath)), {UniquePlugins, DuplicateProblems} = remove_duplicate_plugins(AllPlugins), @@ -202,6 +203,9 @@ list(PluginsPath, IncludeRequiredDeps) -> ensure_dependencies(Plugins2). %% @doc Read the list of enabled plugins from the supplied term file. + +-spec read_enabled(file:filename()) -> [plugin_name()]. + read_enabled(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; @@ -216,6 +220,10 @@ read_enabled(PluginsFile) -> %% @doc Calculate the dependency graph from <i>Sources</i>. %% When Reverse =:= true the bottom/leaf level applications are returned in %% the resulting list, otherwise they're skipped. + +-spec dependencies(boolean(), [plugin_name()], [#plugin{}]) -> + [plugin_name()]. + dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( fun ({App, _Deps}) -> [{App, App}] end, @@ -231,15 +239,22 @@ dependencies(Reverse, Sources, AllPlugins) -> OrderedDests. %% Filter real plugins from application dependencies + +-spec is_strictly_plugin(#plugin{}) -> boolean(). + is_strictly_plugin(#plugin{extra_dependencies = ExtraDeps}) -> lists:member(rabbit, ExtraDeps). +-spec strictly_plugins([plugin_name()], [#plugin{}]) -> [plugin_name()]. + strictly_plugins(Plugins, AllPlugins) -> lists:filter( fun(Name) -> is_strictly_plugin(lists:keyfind(Name, #plugin.name, AllPlugins)) end, Plugins). +-spec strictly_plugins([plugin_name()]) -> [plugin_name()]. + strictly_plugins(Plugins) -> AllPlugins = list(plugins_dir()), lists:filter( diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 8d13a16e0b..b0fe6d2b04 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -29,13 +29,8 @@ -define(EX_USAGE, 64). %%---------------------------------------------------------------------------- -%% Specs -%%---------------------------------------------------------------------------- -spec start() -> no_return(). --spec stop() -> 'ok'. - -%%---------------------------------------------------------------------------- start() -> case init:get_plain_arguments() of @@ -55,6 +50,8 @@ start() -> rabbit_misc:quit(?SET_DIST_PORT), ok. +-spec stop() -> 'ok'. + stop() -> ok. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index b750482ba1..5e92013424 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -38,11 +38,11 @@ -type start_mode() :: 'declare' | 'recovery' | 'slave'. +%%---------------------------------------------------------------------------- + -spec start_link(amqqueue:amqqueue(), start_mode(), pid()) -> rabbit_types:ok_pid_or_error(). -%%---------------------------------------------------------------------------- - start_link(Q, StartMode, Marker) -> gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index e52156f60d..732415b82e 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -33,13 +33,12 @@ %%---------------------------------------------------------------------------- -spec start_link(rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error(). --spec register(pid(), pid()) -> 'ok'. - -%%---------------------------------------------------------------------------- start_link(ProcName) -> gen_server:start_link(?MODULE, [ProcName], []). +-spec register(pid(), pid()) -> 'ok'. + register(CollectorPid, Q) -> gen_server:call(CollectorPid, {register, Q}, infinity). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 7a0c0f98e3..2ede7b7b8e 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -66,57 +66,29 @@ -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. --spec new() -> state(). --spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. --spec inactive(state()) -> boolean(). --spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table(), - rabbit_types:username()}]. --spec count() -> non_neg_integer(). --spec unacknowledged_message_count() -> non_neg_integer(). --spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table(), boolean(), - rabbit_types:username(), state()) - -> state(). --spec remove(ch(), rabbit_types:ctag(), state()) -> - 'not_found' | state(). --spec erase_ch(ch(), state()) -> - 'not_found' | {[ack()], [rabbit_types:ctag()], - state()}. --spec send_drained() -> 'ok'. --spec deliver(fun ((boolean()) -> {fetch_result(), T}), - rabbit_amqqueue:name(), state(), boolean(), - none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) -> - {'delivered', boolean(), T, state()} | - {'undelivered', boolean(), state()}. --spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()], state()) -> - 'not_found' | 'unchanged' | {'unblocked', state()}. --spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | {'unblocked', state()}. --spec resume_fun() -> cr_fun(). --spec notify_sent_fun(non_neg_integer()) -> cr_fun(). --spec activate_limit_fun() -> cr_fun(). --spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), - state()) -> 'unchanged' | {'unblocked', state()}. --spec utilisation(state()) -> ratio(). --spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer(). --spec consumer_tag(consumer()) -> rabbit_types:ctag(). --spec get_infos(consumer()) -> term(). - %%---------------------------------------------------------------------------- +-spec new() -> state(). + new() -> #state{consumers = priority_queue:new(), use = {active, erlang:monotonic_time(micro_seconds), 1.0}}. +-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. + max_active_priority(#state{consumers = Consumers}) -> priority_queue:highest(Consumers). +-spec inactive(state()) -> boolean(). + inactive(#state{consumers = Consumers}) -> priority_queue:is_empty(Consumers). +-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table(), + rabbit_types:username()}]. + all(State) -> all(State, none, false). @@ -146,11 +118,20 @@ consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) -> [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1] end, Acc, Consumers). +-spec count() -> non_neg_integer(). + count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). +-spec unacknowledged_message_count() -> non_neg_integer(). + unacknowledged_message_count() -> lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]). +-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), + rabbit_types:username(), state()) + -> state(). + add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, Username, State = #state{consumers = Consumers, use = CUInfo}) -> @@ -176,6 +157,9 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers), use = update_use(CUInfo, active)}. +-spec remove(ch(), rabbit_types:ctag(), state()) -> + 'not_found' | state(). + remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> @@ -196,6 +180,10 @@ remove(ChPid, CTag, State = #state{consumers = Consumers}) -> remove_consumer(ChPid, CTag, Consumers)} end. +-spec erase_ch(ch(), state()) -> + 'not_found' | {[ack()], [rabbit_types:ctag()], + state()}. + erase_ch(ChPid, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> @@ -211,9 +199,17 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> State#state{consumers = remove_consumers(ChPid, Consumers)}} end. +-spec send_drained() -> 'ok'. + send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state(), boolean(), + none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) -> + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. + deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) -> deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer). @@ -299,11 +295,16 @@ is_blocked(Consumer = {ChPid, _C}) -> #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid), priority_queue:member(Consumer, BlockedConsumers). +-spec record_ack(ch(), pid(), ack()) -> 'ok'. + record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}), ok. +-spec subtract_acks(ch(), [ack()], state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. + subtract_acks(ChPid, AckTags, State) -> case lookup_ch(ChPid) of not_found -> @@ -341,6 +342,9 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> subtract_acks([], Prefix, CTagCounts, AckQ) end. +-spec possibly_unblock(cr_fun(), ch(), state()) -> + 'unchanged' | {'unblocked', state()}. + possibly_unblock(Update, ChPid, State) -> case lookup_ch(ChPid) of not_found -> unchanged; @@ -370,23 +374,31 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, use = update_use(Use, active)}} end. +-spec resume_fun() -> cr_fun(). + resume_fun() -> fun (C = #cr{limiter = Limiter}) -> C#cr{limiter = rabbit_limiter:resume(Limiter)} end. +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). + notify_sent_fun(Credit) -> fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - Credit} end. +-spec activate_limit_fun() -> cr_fun(). + activate_limit_fun() -> fun (C = #cr{limiter = Limiter}) -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. -credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> +-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), + state()) -> 'unchanged' | {'unblocked', state()}. +credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> case lookup_ch(ChPid) of not_found -> unchanged; @@ -405,6 +417,8 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> drain_mode(true) -> drain; drain_mode(false) -> manual. +-spec utilisation(state()) -> ratio(). + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -421,6 +435,8 @@ get_consumer(#state{consumers = Consumers}) -> {empty, _} -> undefined end. +-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer(). + get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> (CP == ChPid) and (CT == ConsumerTag) @@ -430,10 +446,14 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> {{value, Consumer, _Priority}, _Tail} -> Consumer end. +-spec get_infos(consumer()) -> term(). + get_infos(Consumer) -> {Consumer#consumer.tag,Consumer#consumer.ack_required, Consumer#consumer.prefetch, Consumer#consumer.args}. +-spec consumer_tag(consumer()) -> rabbit_types:ctag(). + consumer_tag(#consumer{tag = CTag}) -> CTag. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9785ae170d..e4047a9902 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -258,46 +258,20 @@ {rabbit_types:msg_id(), non_neg_integer(), A}). -type shutdown_terms() :: [term()] | 'non_clean_shutdown'. --spec erase(rabbit_amqqueue:name()) -> 'ok'. --spec reset_state(qistate()) -> qistate(). --spec init(rabbit_amqqueue:name(), - on_sync_fun(), on_sync_fun()) -> qistate(). --spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), - on_sync_fun(), on_sync_fun()) -> - {'undefined' | non_neg_integer(), - 'undefined' | non_neg_integer(), qistate()}. --spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). --spec delete_and_terminate(qistate()) -> qistate(). --spec publish(rabbit_types:msg_id(), seq_id(), - rabbit_types:message_properties(), boolean(), - non_neg_integer(), qistate()) -> qistate(). --spec deliver([seq_id()], qistate()) -> qistate(). --spec ack([seq_id()], qistate()) -> qistate(). --spec sync(qistate()) -> qistate(). --spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'. --spec flush(qistate()) -> qistate(). --spec read(seq_id(), seq_id(), qistate()) -> - {[{rabbit_types:msg_id(), seq_id(), - rabbit_types:message_properties(), - boolean(), boolean()}], qistate()}. --spec next_segment_boundary(seq_id()) -> seq_id(). --spec bounds(qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}. --spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. - --spec add_queue_ttl() -> 'ok'. - - %%---------------------------------------------------------------------------- %% public API %%---------------------------------------------------------------------------- +-spec erase(rabbit_amqqueue:name()) -> 'ok'. + erase(Name) -> #qistate { dir = Dir } = blank_state(Name), erase_index_dir(Dir). %% used during variable queue purge when there are no pending acks + +-spec reset_state(qistate()) -> qistate(). + reset_state(#qistate{ queue_name = Name, dir = Dir, on_sync = OnSyncFun, @@ -310,12 +284,21 @@ reset_state(#qistate{ queue_name = Name, ok = erase_index_dir(Dir), blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun). +-spec init(rabbit_amqqueue:name(), + on_sync_fun(), on_sync_fun()) -> qistate(). + init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir State#qistate{on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}. +-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), + contains_predicate(), + on_sync_fun(), on_sync_fun()) -> + {'undefined' | non_neg_integer(), + 'undefined' | non_neg_integer(), qistate()}. + recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun, OnSyncMsgFun) -> State = blank_state(Name), @@ -328,12 +311,16 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. +-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). + terminate(VHost, Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), rabbit_recovery_terms:store(VHost, filename:basename(Dir), [{segments, SegmentCounts} | Terms]), State1. +-spec delete_and_terminate(qistate()) -> qistate(). + delete_and_terminate(State) -> {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), ok = rabbit_file:recursive_delete([Dir]), @@ -396,6 +383,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> State1 = deliver(lists:reverse(DC), State), State1#qistate{delivered_cache = []}. +-spec publish(rabbit_types:msg_id(), seq_id(), + rabbit_types:message_properties(), boolean(), + non_neg_integer(), qistate()) -> qistate(). + publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) -> {JournalHdl, State1} = get_journal_handle( @@ -429,20 +420,29 @@ maybe_needs_confirming(MsgProps, MsgOrId, {false, _} -> State end. +-spec deliver([seq_id()], qistate()) -> qistate(). + deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). +-spec ack([seq_id()], qistate()) -> qistate(). + ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). %% This is called when there are outstanding confirms or when the %% queue is idle and the journal needs syncing (see needs_sync/1). + +-spec sync(qistate()) -> qistate(). + sync(State = #qistate { journal_handle = undefined }) -> State; sync(State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). +-spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'. + needs_sync(#qistate{journal_handle = undefined}) -> false; needs_sync(#qistate{journal_handle = JournalHdl, @@ -456,9 +456,16 @@ needs_sync(#qistate{journal_handle = JournalHdl, false -> confirms end. +-spec flush(qistate()) -> qistate(). + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). +-spec read(seq_id(), seq_id(), qistate()) -> + {[{rabbit_types:msg_id(), seq_id(), + rabbit_types:message_properties(), + boolean(), boolean()}], qistate()}. + read(StartEnd, StartEnd, State) -> {[], State}; read(Start, End, State = #qistate { segments = Segments, @@ -472,10 +479,15 @@ read(Start, End, State = #qistate { segments = Segments, end, {[], Segments}, lists:seq(StartSeg, EndSeg)), {Messages, State #qistate { segments = Segments1 }}. +-spec next_segment_boundary(seq_id()) -> seq_id(). + next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), reconstruct_seq_id(Seg + 1, 0). +-spec bounds(qistate()) -> + {non_neg_integer(), non_neg_integer(), qistate()}. + bounds(State = #qistate { segments = Segments }) -> %% This is not particularly efficient, but only gets invoked on %% queue initialisation. @@ -498,6 +510,8 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. +-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. + start(VHost, DurableQueueNames) -> ok = rabbit_recovery_terms:start(VHost), {DurableTerms, DurableDirectories} = @@ -1286,6 +1300,8 @@ journal_minus_segment1({no_pub, del, ack}, undefined) -> %% upgrade %%---------------------------------------------------------------------------- +-spec add_queue_ttl() -> 'ok'. + add_queue_ttl() -> foreach_queue_index({fun add_queue_ttl_journal/1, fun add_queue_ttl_segment/1}). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 45a1c12cac..b54de3a2e6 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -47,26 +47,6 @@ -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}. --spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> - {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | - {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. --spec stop(rabbit_types:vhost()) -> 'ok'. --spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> - {'ok', rabbit_fifo_client:state()}. --spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> - {'ok', rabbit_fifo_client:state()}. --spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> - {'ok', rabbit_fifo_client:state()}. --spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. --spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). --spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). --spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). --spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. --spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. --spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}. - -define(STATISTICS_KEYS, [policy, operator_policy, @@ -104,12 +84,17 @@ init_state({Name, _}, QName = #resource{}) -> fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). +-spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> + {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | + {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. + handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). -spec declare(amqqueue:amqqueue()) -> {'new', amqqueue:amqqueue()} | {existing, amqqueue:amqqueue()}. + declare(Q) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), Durable = amqqueue:is_durable(Q), @@ -268,6 +253,9 @@ reductions(Name) -> 0 end. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | + {'absent', amqqueue:amqqueue(), atom()}]. + recover(Queues) -> [begin {Name, _} = amqqueue:get_pid(Q0), @@ -308,6 +296,8 @@ recover(Queues) -> Q end || Q0 <- Queues]. +-spec stop(rabbit_types:vhost()) -> 'ok'. + stop(VHost) -> _ = [begin Pid = amqqueue:get_pid(Q), @@ -319,6 +309,7 @@ stop(VHost) -> boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. + delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> {Name, _} = amqqueue:get_pid(Q), @@ -389,9 +380,15 @@ delete_immediately(Resource, {_Name, _} = QPid) -> rabbit_core_metrics:queue_deleted(Resource), ok. +-spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> + {'ok', rabbit_fifo_client:state()}. + ack(CTag, MsgIds, QState) -> rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState). +-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> + {'ok', rabbit_fifo_client:state()}. + reject(true, CTag, MsgIds, QState) -> rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState); reject(false, CTag, MsgIds, QState) -> @@ -478,10 +475,15 @@ basic_consume(Q, NoAck, ChPid, ActivityStatus, Args), {ok, QState}. +-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> + {'ok', rabbit_fifo_client:state()}. + basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> maybe_send_reply(ChPid, OkMsg), rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0). +-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. + stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], Delivery#delivery.message). @@ -489,16 +491,21 @@ stateless_deliver(ServerId, Delivery) -> -spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) -> {ok | slow, rabbit_fifo_client:state()}. + deliver(false, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, Delivery#delivery.message, QState0). +-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). + info(Q) -> info(Q, [name, durable, auto_delete, arguments, pid, state, messages, messages_ready, messages_unacknowledged]). +-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). + infos(QName) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> @@ -507,10 +514,15 @@ infos(QName) -> [] end. +-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). + info(Q, Items) -> [{Item, i(Item, Q)} || Item <- Items]. -stat(#amqqueue{pid = Leader}) -> +-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. + +stat(Q) when ?is_amqqueue(Q) -> + Leader = amqqueue:get_pid(Q), try case rabbit_fifo_client:stat(Leader) of {ok, _, _} = Stat -> @@ -557,6 +569,8 @@ policy_changed(QName, Node) -> {ok, Q} = rabbit_amqqueue:lookup(QName), rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). +-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. + cluster_state(Name) -> case whereis(Name) of undefined -> down; @@ -567,6 +581,8 @@ cluster_state(Name) -> end end. +-spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}. + status(Vhost, QueueName) -> %% Handle not found queues QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 20acec77ae..95a6b185c2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -159,39 +159,26 @@ %%-------------------------------------------------------------------------- --spec start_link(pid(), any()) -> rabbit_types:ok(pid()). --spec info_keys() -> rabbit_types:info_keys(). --spec info(pid()) -> rabbit_types:infos(). --spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). --spec force_event_refresh(pid(), reference()) -> 'ok'. --spec shutdown(pid(), string()) -> 'ok'. -type resource_alert() :: {WasAlarmSetForNode :: boolean(), IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(), NodeForWhichAlarmWasSetOrCleared :: node()}. --spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'. --spec server_properties(rabbit_types:protocol()) -> - rabbit_framing:amqp_table(). - -%% These specs only exists to add no_return() to keep dialyzer happy --spec init(pid(), pid(), any()) -> no_return(). --spec start_connection(pid(), pid(), any(), rabbit_net:socket()) -> - no_return(). - --spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any(). --spec system_code_change(_,_,_,_) -> {'ok',_}. --spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any(). --spec system_terminate(_,_,_,_) -> none(). %%-------------------------------------------------------------------------- +-spec start_link(pid(), any()) -> rabbit_types:ok(pid()). + start_link(HelperSup, Ref) -> Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]), {ok, Pid}. +-spec shutdown(pid(), string()) -> 'ok'. + shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). +-spec init(pid(), pid(), any()) -> no_return(). + init(Parent, HelperSup, Ref) -> ?LG_PROCESS_TYPE(reader), {ok, Sock} = rabbit_networking:handshake(Ref, @@ -199,33 +186,52 @@ init(Parent, HelperSup, Ref) -> Deb = sys:debug_options([]), start_connection(Parent, HelperSup, Deb, Sock). +-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any(). + system_continue(Parent, Deb, {Buf, BufLen, State}) -> mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). +-spec system_terminate(_,_,_,_) -> none(). + system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). +-spec system_code_change(_,_,_,_) -> {'ok',_}. + system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +-spec info_keys() -> rabbit_types:info_keys(). + info_keys() -> ?INFO_KEYS. +-spec info(pid()) -> rabbit_types:infos(). + info(Pid) -> gen_server:call(Pid, info, infinity). +-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). + info(Pid, Items) -> case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. +-spec force_event_refresh(pid(), reference()) -> 'ok'. + force_event_refresh(Pid, Ref) -> gen_server:cast(Pid, {force_event_refresh, Ref}). +-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'. + conserve_resources(Pid, Source, {_, Conserve, _}) -> Pid ! {conserve_resources, Source, Conserve}, ok. +-spec server_properties(rabbit_types:protocol()) -> + rabbit_framing:amqp_table(). + server_properties(Protocol) -> {ok, Product} = application:get_key(rabbit, description), {ok, Version} = application:get_key(rabbit, vsn), @@ -303,6 +309,9 @@ socket_op(Sock, Fun) -> exit(normal) end. +-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) -> + no_return(). + start_connection(Parent, HelperSup, Deb, Sock) -> process_flag(trap_exit, true), RealSocket = rabbit_net:unwrap_socket(Sock), @@ -493,6 +502,8 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> binlist_split(Len, [H|T], Acc) -> binlist_split(Len - size(H), T, [H|Acc]). +-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any(). + mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, connection_state = CS, connection = #connection{ diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index ab70fa2be7..4e8ae128de 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -40,12 +40,6 @@ %%---------------------------------------------------------------------------- -spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). --spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). --spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()). --spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). --spec clear(rabbit_types:vhost()) -> 'ok'. - -%%---------------------------------------------------------------------------- start(VHost) -> case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of @@ -64,6 +58,8 @@ start(VHost) -> end, ok. +-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). + stop(VHost) -> case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> @@ -79,15 +75,21 @@ stop(VHost) -> ok end. +-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()). + store(VHost, DirBaseName, Terms) -> dets:insert(VHost, {DirBaseName, Terms}). +-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). + read(VHost, DirBaseName) -> case dets:lookup(VHost, DirBaseName) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. +-spec clear(rabbit_types:vhost()) -> 'ok'. + clear(VHost) -> try dets:delete_all_objects(VHost) diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index ecbd10a3d7..dbf5a24b50 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -31,8 +31,6 @@ -spec start_link(atom(), rabbit_types:mfargs(), boolean()) -> rabbit_types:ok_pid_or_error(). -%%---------------------------------------------------------------------------- - start_link(Name, {_M, _F, _A} = Fun, Delay) -> supervisor2:start_link({local, Name}, ?MODULE, [Fun, Delay]). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index a1a0e4897d..2a0ae34e3c 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -34,53 +34,63 @@ %%---------------------------------------------------------------------------- -spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start_child(atom()) -> 'ok'. --spec start_child(atom(), [any()]) -> 'ok'. --spec start_child(atom(), atom(), [any()]) -> 'ok'. --spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'. --spec start_supervisor_child(atom()) -> 'ok'. --spec start_supervisor_child(atom(), [any()]) -> 'ok'. --spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'. --spec start_restartable_child(atom()) -> 'ok'. --spec start_restartable_child(atom(), [any()]) -> 'ok'. --spec start_delayed_restartable_child(atom()) -> 'ok'. --spec start_delayed_restartable_child(atom(), [any()]) -> 'ok'. --spec stop_child(atom()) -> rabbit_types:ok_or_error(any()). - -%%---------------------------------------------------------------------------- start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +-spec start_child(atom()) -> 'ok'. + start_child(Mod) -> start_child(Mod, []). +-spec start_child(atom(), [any()]) -> 'ok'. + start_child(Mod, Args) -> start_child(Mod, Mod, Args). +-spec start_child(atom(), atom(), [any()]) -> 'ok'. + start_child(ChildId, Mod, Args) -> child_reply(supervisor:start_child( ?SERVER, {ChildId, {Mod, start_link, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). +-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'. + start_child(ChildId, Mod, Fun, Args) -> child_reply(supervisor:start_child( ?SERVER, {ChildId, {Mod, Fun, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). +-spec start_supervisor_child(atom()) -> 'ok'. start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). +-spec start_supervisor_child(atom(), [any()]) -> 'ok'. + start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). +-spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'. + start_supervisor_child(ChildId, Mod, Args) -> child_reply(supervisor:start_child( ?SERVER, {ChildId, {Mod, start_link, Args}, transient, infinity, supervisor, [Mod]})). +-spec start_restartable_child(atom()) -> 'ok'. + start_restartable_child(M) -> start_restartable_child(M, [], false). + +-spec start_restartable_child(atom(), [any()]) -> 'ok'. + start_restartable_child(M, A) -> start_restartable_child(M, A, false). + +-spec start_delayed_restartable_child(atom()) -> 'ok'. + start_delayed_restartable_child(M) -> start_restartable_child(M, [], true). + +-spec start_delayed_restartable_child(atom(), [any()]) -> 'ok'. + start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true). start_restartable_child(Mod, Args, Delay) -> @@ -91,6 +101,8 @@ start_restartable_child(Mod, Args, Delay) -> [Name, {Mod, start_link, Args}, Delay]}, transient, infinity, supervisor, [rabbit_restartable_sup]})). +-spec stop_child(atom()) -> rabbit_types:ok_or_error(any()). + stop_child(ChildId) -> case supervisor:terminate_child(?SERVER, ChildId) of ok -> supervisor:delete_child(?SERVER, ChildId); diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 272e3966b2..db16152d5b 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -27,25 +27,15 @@ -include_lib("rabbit_common/include/rabbit.hrl"). %%---------------------------------------------------------------------------- --type retry() :: boolean(). --spec create() -> 'ok'. --spec create_local_copy('disc' | 'ram') -> 'ok'. --spec wait_for_replicated(retry()) -> 'ok'. --spec wait_for_replicated() -> 'ok'. --spec wait([atom()]) -> 'ok'. --spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}. --spec force_load() -> 'ok'. --spec is_present() -> boolean(). --spec is_empty() -> boolean(). --spec needs_default_data() -> boolean(). --spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()). --spec clear_ram_only_tables() -> 'ok'. +-type retry() :: boolean(). %%---------------------------------------------------------------------------- %% Main interface %%---------------------------------------------------------------------------- +-spec create() -> 'ok'. + create() -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), @@ -74,6 +64,9 @@ ensure_secondary_index(Table, Field) -> %% tables is important: if we delete the schema first when moving to %% RAM mnesia will loudly complain since it doesn't make much sense to %% do that. But when moving to disc, we need to move the schema first. + +-spec create_local_copy('disc' | 'ram') -> 'ok'. + create_local_copy(disc) -> create_local_copy(schema, disc_copies), create_local_copies(disc); @@ -83,13 +76,20 @@ create_local_copy(ram) -> %% This arity only exists for backwards compatibility with certain %% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19. + +-spec wait_for_replicated() -> 'ok'. + wait_for_replicated() -> wait_for_replicated(false). +-spec wait_for_replicated(retry()) -> 'ok'. + wait_for_replicated(Retry) -> wait([Tab || {Tab, TabDef} <- definitions(), not lists:member({local_content, true}, TabDef)], Retry). +-spec wait([atom()]) -> 'ok'. + wait(TableNames) -> wait(TableNames, _Retry = false). @@ -131,17 +131,28 @@ retry_timeout(_Retry = true) -> end, {retry_timeout(), Retries}. +-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}. + retry_timeout() -> case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of {ok, T} -> T; undefined -> 30000 end. +-spec force_load() -> 'ok'. + force_load() -> [mnesia:force_load_table(T) || T <- names()], ok. +-spec is_present() -> boolean(). + is_present() -> names() -- mnesia:system_info(tables) =:= []. +-spec is_empty() -> boolean(). + is_empty() -> is_empty(names()). + +-spec needs_default_data() -> boolean(). + needs_default_data() -> is_empty([rabbit_user, rabbit_user_permission, rabbit_vhost]). @@ -149,6 +160,8 @@ is_empty(Names) -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, Names). +-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()). + check_schema_integrity(Retry) -> Tables = mnesia:system_info(tables), case check(fun (Tab, TabDef) -> @@ -162,6 +175,8 @@ check_schema_integrity(Retry) -> Other -> Other end. +-spec clear_ram_only_tables() -> 'ok'. + clear_ram_only_tables() -> Node = node(), lists:foreach( diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 6047eb24a3..2c85de2f3a 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -28,20 +28,10 @@ -type state() :: rabbit_types:exchange() | 'none'. --spec init(rabbit_types:vhost()) -> state(). --spec enabled(rabbit_types:vhost()) -> boolean(). --spec tap_in(rabbit_types:basic_message(), [rabbit_amqqueue:name()], - binary(), rabbit_channel:channel_number(), - rabbit_types:username(), state()) -> 'ok'. --spec tap_out(rabbit_amqqueue:qmsg(), binary(), - rabbit_channel:channel_number(), - rabbit_types:username(), state()) -> 'ok'. - --spec start(rabbit_types:vhost()) -> 'ok'. --spec stop(rabbit_types:vhost()) -> 'ok'. - %%---------------------------------------------------------------------------- +-spec init(rabbit_types:vhost()) -> state(). + init(VHost) -> case enabled(VHost) of false -> none; @@ -50,10 +40,16 @@ init(VHost) -> X end. +-spec enabled(rabbit_types:vhost()) -> boolean(). + enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). +-spec tap_in(rabbit_types:basic_message(), [rabbit_amqqueue:name()], + binary(), rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'. + tap_in(_Msg, _QNames, _ConnName, _ChannelNum, _Username, none) -> ok; tap_in(Msg = #basic_message{exchange_name = #resource{name = XName, virtual_host = VHost}}, @@ -66,6 +62,10 @@ tap_in(Msg = #basic_message{exchange_name = #resource{name = XName, {<<"routed_queues">>, array, [{longstr, QName#resource.name} || QName <- QNames]}]). +-spec tap_out(rabbit_amqqueue:qmsg(), binary(), + rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'. + tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; tap_out({#resource{name = QName, virtual_host = VHost}, _QPid, _QMsgId, Redelivered, Msg}, @@ -80,10 +80,14 @@ tap_out({#resource{name = QName, virtual_host = VHost}, %%---------------------------------------------------------------------------- +-spec start(rabbit_types:vhost()) -> 'ok'. + start(VHost) -> rabbit_log:info("Enabling tracing for vhost '~s'~n", [VHost]), update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end). +-spec stop(rabbit_types:vhost()) -> 'ok'. + stop(VHost) -> rabbit_log:info("Disabling tracing for vhost '~s'~n", [VHost]), update_config(fun (VHosts) -> VHosts -- [VHost] end). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 07eef293d3..214a1e390b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -27,14 +27,6 @@ %% ------------------------------------------------------------------- --spec maybe_upgrade_mnesia() -> 'ok'. --spec maybe_upgrade_local() -> - 'ok' | - 'version_not_available' | - 'starting_from_scratch'. - -%% ------------------------------------------------------------------- - %% The upgrade logic is quite involved, due to the existence of %% clusters. %% @@ -125,6 +117,8 @@ remove_backup() -> ok = rabbit_file:recursive_delete([backup_dir()]), info("upgrades: Mnesia backup removed~n", []). +-spec maybe_upgrade_mnesia() -> 'ok'. + maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:cluster_nodes(all), ok = rabbit_mnesia_rename:maybe_finish(AllNodes), @@ -244,6 +238,11 @@ nodes_running(Nodes) -> %% ------------------------------------------------------------------- +-spec maybe_upgrade_local() -> + 'ok' | + 'version_not_available' | + 'starting_from_scratch'. + maybe_upgrade_local() -> case rabbit_version:upgrades_required(local) of {error, version_not_available} -> version_not_available; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3072728d69..afbcb863aa 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -64,49 +64,12 @@ %% ------------------------------------------------------------------- --spec remove_user_scope() -> 'ok'. --spec hash_passwords() -> 'ok'. --spec add_ip_to_listener() -> 'ok'. --spec add_opts_to_listener() -> 'ok'. --spec internal_exchanges() -> 'ok'. --spec user_to_internal_user() -> 'ok'. --spec topic_trie() -> 'ok'. --spec semi_durable_route() -> 'ok'. --spec exchange_event_serial() -> 'ok'. --spec trace_exchanges() -> 'ok'. --spec user_admin_to_tags() -> 'ok'. --spec ha_mirrors() -> 'ok'. --spec gm() -> 'ok'. --spec exchange_scratch() -> 'ok'. --spec mirrored_supervisor() -> 'ok'. --spec topic_trie_node() -> 'ok'. --spec runtime_parameters() -> 'ok'. --spec policy() -> 'ok'. --spec sync_slave_pids() -> 'ok'. --spec no_mirror_nodes() -> 'ok'. --spec gm_pids() -> 'ok'. --spec exchange_decorators() -> 'ok'. --spec policy_apply_to() -> 'ok'. --spec queue_decorators() -> 'ok'. --spec internal_system_x() -> 'ok'. --spec cluster_name() -> 'ok'. --spec down_slave_nodes() -> 'ok'. --spec queue_state() -> 'ok'. --spec recoverable_slaves() -> 'ok'. --spec user_password_hashing() -> 'ok'. --spec vhost_limits() -> 'ok'. --spec operator_policies() -> 'ok'. --spec queue_vhost_field() -> 'ok'. --spec queue_options() -> 'ok'. --spec exchange_options() -> 'ok'. - --spec remove_explicit_default_exchange_bindings() -> 'ok'. - -%%-------------------------------------------------------------------- - %% replaces vhost.dummy (used to avoid having a single-field record %% which Mnesia doesn't like) with vhost.limits (which is actually %% used) + +-spec vhost_limits() -> 'ok'. + vhost_limits() -> transform( rabbit_vhost, @@ -121,6 +84,8 @@ vhost_limits() -> %% would be messy to have to go back and fix old transforms at that %% point. +-spec remove_user_scope() -> 'ok'. + remove_user_scope() -> transform( rabbit_user_permission, @@ -133,6 +98,9 @@ remove_user_scope() -> %% only relevant to those migrating from 2.1.1. %% all users created after in 3.6.0 or later will use SHA-256 (unless configured %% otherwise) + +-spec hash_passwords() -> 'ok'. + hash_passwords() -> transform( rabbit_user, @@ -142,6 +110,8 @@ hash_passwords() -> end, [username, password_hash, is_admin]). +-spec add_ip_to_listener() -> 'ok'. + add_ip_to_listener() -> transform( rabbit_listener, @@ -150,6 +120,8 @@ add_ip_to_listener() -> end, [node, protocol, host, ip_address, port]). +-spec add_opts_to_listener() -> 'ok'. + add_opts_to_listener() -> transform( rabbit_listener, @@ -158,6 +130,8 @@ add_opts_to_listener() -> end, [node, protocol, host, ip_address, port, opts]). +-spec internal_exchanges() -> 'ok'. + internal_exchanges() -> Tables = [rabbit_exchange, rabbit_durable_exchange], AddInternalFun = @@ -170,6 +144,8 @@ internal_exchanges() -> || T <- Tables ], ok. +-spec user_to_internal_user() -> 'ok'. + user_to_internal_user() -> transform( rabbit_user, @@ -178,6 +154,8 @@ user_to_internal_user() -> end, [username, password_hash, is_admin], internal_user). +-spec topic_trie() -> 'ok'. + topic_trie() -> create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, [trie_edge, node_id]}, @@ -186,20 +164,28 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +-spec semi_durable_route() -> 'ok'. + semi_durable_route() -> create(rabbit_semi_durable_route, [{record_name, route}, {attributes, [binding, value]}]). +-spec exchange_event_serial() -> 'ok'. + exchange_event_serial() -> create(rabbit_exchange_serial, [{record_name, exchange_serial}, {attributes, [name, next]}]). +-spec trace_exchanges() -> 'ok'. + trace_exchanges() -> [declare_exchange( rabbit_misc:r(VHost, exchange, <<"amq.rabbitmq.trace">>), topic) || VHost <- rabbit_vhost:list()], ok. +-spec user_admin_to_tags() -> 'ok'. + user_admin_to_tags() -> transform( rabbit_user, @@ -210,6 +196,8 @@ user_admin_to_tags() -> end, [username, password_hash, tags], internal_user). +-spec ha_mirrors() -> 'ok'. + ha_mirrors() -> Tables = [rabbit_queue, rabbit_durable_queue], AddMirrorPidsFun = @@ -224,10 +212,14 @@ ha_mirrors() -> || T <- Tables ], ok. +-spec gm() -> 'ok'. + gm() -> create(gm_group, [{record_name, gm_group}, {attributes, [name, version, members]}]). +-spec exchange_scratch() -> 'ok'. + exchange_scratch() -> ok = exchange_scratch(rabbit_exchange), ok = exchange_scratch(rabbit_durable_exchange). @@ -240,17 +232,23 @@ exchange_scratch(Table) -> end, [name, type, durable, auto_delete, internal, arguments, scratch]). +-spec mirrored_supervisor() -> 'ok'. + mirrored_supervisor() -> create(mirrored_sup_childspec, [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +-spec topic_trie_node() -> 'ok'. + topic_trie_node() -> create(rabbit_topic_trie_node, [{record_name, topic_trie_node}, {attributes, [trie_node, edge_count, binding_count]}, {type, ordered_set}]). +-spec runtime_parameters() -> 'ok'. + runtime_parameters() -> create(rabbit_runtime_parameters, [{record_name, runtime_parameters}, @@ -274,6 +272,8 @@ exchange_scratches(Table) -> end, [name, type, durable, auto_delete, internal, arguments, scratches]). +-spec policy() -> 'ok'. + policy() -> ok = exchange_policy(rabbit_exchange), ok = exchange_policy(rabbit_durable_exchange), @@ -300,6 +300,8 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +-spec sync_slave_pids() -> 'ok'. + sync_slave_pids() -> Tables = [rabbit_queue, rabbit_durable_queue], AddSyncSlavesFun = @@ -312,6 +314,8 @@ sync_slave_pids() -> || T <- Tables], ok. +-spec no_mirror_nodes() -> 'ok'. + no_mirror_nodes() -> Tables = [rabbit_queue, rabbit_durable_queue], RemoveMirrorNodesFun = @@ -324,6 +328,8 @@ no_mirror_nodes() -> || T <- Tables], ok. +-spec gm_pids() -> 'ok'. + gm_pids() -> Tables = [rabbit_queue, rabbit_durable_queue], AddGMPidsFun = @@ -336,6 +342,8 @@ gm_pids() -> || T <- Tables], ok. +-spec exchange_decorators() -> 'ok'. + exchange_decorators() -> ok = exchange_decorators(rabbit_exchange), ok = exchange_decorators(rabbit_durable_exchange). @@ -351,6 +359,8 @@ exchange_decorators(Table) -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +-spec policy_apply_to() -> 'ok'. + policy_apply_to() -> transform( rabbit_runtime_parameters, @@ -373,6 +383,8 @@ apply_to(Def) -> [_, _] -> <<"all">> end. +-spec queue_decorators() -> 'ok'. + queue_decorators() -> ok = queue_decorators(rabbit_queue), ok = queue_decorators(rabbit_durable_queue). @@ -388,6 +400,8 @@ queue_decorators(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, policy, gm_pids, decorators]). +-spec internal_system_x() -> 'ok'. + internal_system_x() -> transform( rabbit_durable_exchange, @@ -401,6 +415,8 @@ internal_system_x() -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +-spec cluster_name() -> 'ok'. + cluster_name() -> {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), ok. @@ -427,6 +443,8 @@ cluster_name_tx() -> [mnesia:delete(T, K, write) || K <- Ks], ok. +-spec down_slave_nodes() -> 'ok'. + down_slave_nodes() -> ok = down_slave_nodes(rabbit_queue), ok = down_slave_nodes(rabbit_durable_queue). @@ -442,6 +460,8 @@ down_slave_nodes(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). +-spec queue_state() -> 'ok'. + queue_state() -> ok = queue_state(rabbit_queue), ok = queue_state(rabbit_durable_queue). @@ -458,6 +478,8 @@ queue_state(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). +-spec recoverable_slaves() -> 'ok'. + recoverable_slaves() -> ok = recoverable_slaves(rabbit_queue), ok = recoverable_slaves(rabbit_durable_queue). @@ -505,6 +527,8 @@ slave_pids_pending_shutdown(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). +-spec operator_policies() -> 'ok'. + operator_policies() -> ok = exchange_operator_policies(rabbit_exchange), ok = exchange_operator_policies(rabbit_durable_exchange), @@ -536,6 +560,7 @@ queue_operator_policies(Table) -> sync_slave_pids, recoverable_slaves, policy, operator_policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). +-spec queue_vhost_field() -> 'ok'. queue_vhost_field() -> ok = queue_vhost_field(rabbit_queue), @@ -558,6 +583,8 @@ queue_vhost_field(Table) -> sync_slave_pids, recoverable_slaves, policy, operator_policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost]). +-spec queue_options() -> 'ok'. + queue_options() -> ok = queue_options(rabbit_queue), ok = queue_options(rabbit_durable_queue), @@ -581,6 +608,9 @@ queue_options(Table) -> %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal %% authn backend. + +-spec user_password_hashing() -> 'ok'. + user_password_hashing() -> transform( rabbit_user, @@ -595,6 +625,8 @@ topic_permission() -> {attributes, [topic_permission_key, permission]}, {disc_copies, [node()]}]). +-spec exchange_options() -> 'ok'. + exchange_options() -> ok = exchange_options(rabbit_exchange), ok = exchange_options(rabbit_durable_exchange). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f0ad0eb6e1..8b773f2cc2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -428,10 +428,6 @@ io_batch_size :: pos_integer(), mode :: 'default' | 'lazy', memory_reduction_run_count :: non_neg_integer()}. -%% Duplicated from rabbit_backing_queue --spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. - --spec multiple_routing_keys() -> 'ok'. -define(BLANK_DELTA, #delta { start_seq_id = undefined, count = 0, @@ -704,6 +700,9 @@ drop(AckRequired, State) -> {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. +%% Duplicated from rabbit_backing_queue +-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. + ack([], State) -> {[], State}; %% optimisation: this head is essentially a partial evaluation of the @@ -2792,6 +2791,8 @@ ui(#vqstate{index_state = IndexState, %% Upgrading %%---------------------------------------------------------------------------- +-spec multiple_routing_keys() -> 'ok'. + multiple_routing_keys() -> transform_storage( fun ({basic_message, ExchangeName, Routing_Key, Content, diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 5b1722fdbf..a63d60058c 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -33,23 +33,6 @@ -type version() :: [atom()]. --spec recorded() -> rabbit_types:ok_or_error2(version(), any()). --spec matches([A], [A]) -> boolean(). --spec desired() -> version(). --spec desired_for_scope(scope()) -> scope_version(). --spec record_desired() -> 'ok'. --spec record_desired_for_scope - (scope()) -> rabbit_types:ok_or_error(any()). --spec upgrades_required - (scope()) -> rabbit_types:ok_or_error2([step()], any()). --spec check_version_consistency - (string(), string(), string()) -> rabbit_types:ok_or_error(any()). --spec check_version_consistency - (string(), string(), string(), string()) -> - rabbit_types:ok_or_error(any()). --spec check_otp_consistency - (string()) -> rabbit_types:ok_or_error(any()). - %% ------------------------------------------------------------------- -define(VERSION_FILENAME, "schema_version"). @@ -57,6 +40,8 @@ %% ------------------------------------------------------------------- +-spec recorded() -> rabbit_types:ok_or_error2(version(), any()). + recorded() -> case rabbit_file:read_term_file(schema_filename()) of {ok, [V]} -> {ok, V}; {error, _} = Err -> Err @@ -87,20 +72,34 @@ record_for_scope(Scope, ScopeVersion) -> %% ------------------------------------------------------------------- +-spec matches([A], [A]) -> boolean(). + matches(VerA, VerB) -> lists:usort(VerA) =:= lists:usort(VerB). %% ------------------------------------------------------------------- +-spec desired() -> version(). + desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. +-spec desired_for_scope(scope()) -> scope_version(). + desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope). +-spec record_desired() -> 'ok'. + record_desired() -> record(desired()). +-spec record_desired_for_scope + (scope()) -> rabbit_types:ok_or_error(any()). + record_desired_for_scope(Scope) -> record_for_scope(Scope, desired_for_scope(Scope)). +-spec upgrades_required + (scope()) -> rabbit_types:ok_or_error2([step()], any()). + upgrades_required(Scope) -> case recorded_for_scope(Scope) of {error, enoent} -> @@ -208,9 +207,16 @@ schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). %% -------------------------------------------------------------------- +-spec check_version_consistency + (string(), string(), string()) -> rabbit_types:ok_or_error(any()). + check_version_consistency(This, Remote, Name) -> check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end). +-spec check_version_consistency + (string(), string(), string(), string()) -> + rabbit_types:ok_or_error(any()). + check_version_consistency(This, Remote, Name, Comp) -> case Comp(This, Remote) of true -> ok; @@ -222,5 +228,8 @@ version_error(Name, This, Remote) -> rabbit_misc:format("~s version mismatch: local node is ~s, " "remote node ~s", [Name, This, Remote])}}. +-spec check_otp_consistency + (string()) -> rabbit_types:ok_or_error(any()). + check_otp_consistency(Remote) -> check_version_consistency(rabbit_misc:otp_release(), Remote, "OTP"). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 75f96ac1eb..c5b28e7e1c 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -28,24 +28,6 @@ -export([delete_storage/1]). -export([vhost_down/1]). --spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). --spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). --spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. --spec exists(rabbit_types:vhost()) -> boolean(). --spec list() -> [rabbit_types:vhost()]. --spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. --spec with_user_and_vhost - (rabbit_types:username(), rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. --spec assert(rabbit_types:vhost()) -> 'ok'. - --spec info(rabbit_types:vhost()) -> rabbit_types:infos(). --spec info(rabbit_types:vhost(), rabbit_types:info_keys()) - -> rabbit_types:infos(). --spec info_all() -> [rabbit_types:infos()]. --spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec info_all(rabbit_types:info_keys(), reference(), pid()) -> - 'ok'. - recover() -> %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. @@ -83,6 +65,8 @@ recover(VHost) -> -define(INFO_KEYS, [name, tracing, cluster_state]). +-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). + add(VHost, ActingUser) -> case exists(VHost) of true -> ok; @@ -134,6 +118,8 @@ do_add(VHostPath, ActingUser) -> {error, Msg} end. +-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). + delete(VHostPath, ActingUser) -> %% FIXME: We are forced to delete the queues and exchanges outside %% the TX below. Queue deletion involves sending messages to the queue @@ -252,12 +238,18 @@ internal_delete(VHostPath, ActingUser) -> ok = mnesia:delete({rabbit_vhost, VHostPath}), Fs1 ++ Fs2. +-spec exists(rabbit_types:vhost()) -> boolean(). + exists(VHostPath) -> mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. +-spec list() -> [rabbit_types:vhost()]. + list() -> mnesia:dirty_all_keys(rabbit_vhost). +-spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. + with(VHostPath, Thunk) -> fun () -> case mnesia:read({rabbit_vhost, VHostPath}) of @@ -268,15 +260,23 @@ with(VHostPath, Thunk) -> end end. +-spec with_user_and_vhost + (rabbit_types:username(), rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. + with_user_and_vhost(Username, VHostPath, Thunk) -> rabbit_misc:with_user(Username, with(VHostPath, Thunk)). %% Like with/2 but outside an Mnesia tx + +-spec assert(rabbit_types:vhost()) -> 'ok'. + assert(VHostPath) -> case exists(VHostPath) of true -> ok; false -> throw({error, {no_such_vhost, VHostPath}}) end. +-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. + update(VHostPath, Fun) -> case mnesia:read({rabbit_vhost, VHostPath}) of [] -> @@ -329,13 +329,27 @@ i(tracing, VHost) -> rabbit_trace:enabled(VHost); i(cluster_state, VHost) -> vhost_cluster_state(VHost); i(Item, _) -> throw({bad_argument, Item}). +-spec info(rabbit_types:vhost()) -> rabbit_types:infos(). + info(VHost) -> infos(?INFO_KEYS, VHost). + +-spec info(rabbit_types:vhost(), rabbit_types:info_keys()) + -> rabbit_types:infos(). + info(VHost, Items) -> infos(Items, VHost). +-spec info_all() -> [rabbit_types:infos()]. + info_all() -> info_all(?INFO_KEYS). + +-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. + info_all(Items) -> [info(VHost, Items) || VHost <- list()]. info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid). + +-spec info_all(rabbit_types:info_keys(), reference(), pid()) -> + 'ok'. info_all(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()). diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 1b11017076..e78aca0a54 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -23,12 +23,6 @@ %%---------------------------------------------------------------------------- -spec memory() -> rabbit_types:infos(). --spec binary() -> rabbit_types:infos(). --spec ets_tables_memory(Owners) -> rabbit_types:infos() - when Owners :: all | OwnerProcessName | [OwnerProcessName], - OwnerProcessName :: atom(). - -%%---------------------------------------------------------------------------- memory() -> All = interesting_sups(), @@ -115,6 +109,8 @@ memory() -> %% claims about negative memory. See %% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html +-spec binary() -> rabbit_types:infos(). + binary() -> All = interesting_sups(), {Sums, Rest} = @@ -153,6 +149,10 @@ mnesia_memory() -> ets_memory(Owners) -> lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]). +-spec ets_tables_memory(Owners) -> rabbit_types:infos() + when Owners :: all | OwnerProcessName | [OwnerProcessName], + OwnerProcessName :: atom(). + ets_tables_memory(all) -> [{ets:info(T, name), bytes(ets:info(T, memory))} || T <- ets:all(), diff --git a/src/supervised_lifecycle.erl b/src/supervised_lifecycle.erl index 82f6728f17..4be7922125 100644 --- a/src/supervised_lifecycle.erl +++ b/src/supervised_lifecycle.erl @@ -39,8 +39,6 @@ -spec start_link(atom(), rabbit_types:mfargs(), rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error(). -%%---------------------------------------------------------------------------- - start_link(Name, StartMFA, StopMFA) -> gen_server:start_link({local, Name}, ?MODULE, [StartMFA, StopMFA], []). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index d6c615e3b6..d5fb900198 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -64,8 +64,6 @@ mfargs(), mfargs(), string()) -> rabbit_types:ok_pid_or_error(). -%%-------------------------------------------------------------------- - start_link(IPAddress, Port, OnStartup, OnShutdown, Label) -> gen_server:start_link( diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 074a4e4d6a..a31a60e505 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -38,8 +38,6 @@ module(), any(), mfargs(), mfargs(), integer(), string()) -> rabbit_types:ok_pid_or_error(). -%%---------------------------------------------------------------------------- - start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, ConcurrentAcceptorCount, Label) -> supervisor:start_link( |
