summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2018-11-30 11:30:36 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-02-01 11:23:16 +0100
commit5bbde6d0a3eb1790d4965d76c8699d0187b74183 (patch)
treedf1cd4f48442a8e79d45e35c255cd28851dee63d
parentd142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8 (diff)
downloadrabbitmq-server-git-5bbde6d0a3eb1790d4965d76c8699d0187b74183.tar.gz
Move `-spec()` near their function
-rw-r--r--src/background_gc.erl8
-rw-r--r--src/dtree.erl40
-rw-r--r--src/gatherer.erl24
-rw-r--r--src/gm.erl27
-rw-r--r--src/lqueue.erl42
-rw-r--r--src/pg_local.erl23
-rw-r--r--src/rabbit.erl93
-rw-r--r--src/rabbit_access_control.erl34
-rw-r--r--src/rabbit_alarm.erl31
-rw-r--r--src/rabbit_amqqueue.erl367
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl9
-rw-r--r--src/rabbit_auth_backend_internal.erl105
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_basic.erl103
-rw-r--r--src/rabbit_binding.erl106
-rw-r--r--src/rabbit_channel.erl86
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_channel_sup_sup.erl7
-rw-r--r--src/rabbit_client_sup.erl12
-rw-r--r--src/rabbit_connection_helper_sup.erl10
-rw-r--r--src/rabbit_connection_sup.erl5
-rw-r--r--src/rabbit_core_metrics_gc.erl4
-rw-r--r--src/rabbit_credential_validation.erl4
-rw-r--r--src/rabbit_dead_letter.erl4
-rw-r--r--src/rabbit_direct.erl48
-rw-r--r--src/rabbit_disk_monitor.erl24
-rw-r--r--src/rabbit_epmd_monitor.erl6
-rw-r--r--src/rabbit_exchange.erl162
-rw-r--r--src/rabbit_exchange_type_headers.erl9
-rw-r--r--src/rabbit_file.erl65
-rw-r--r--src/rabbit_guid.erl23
-rw-r--r--src/rabbit_health_check.erl7
-rw-r--r--src/rabbit_limiter.erl76
-rw-r--r--src/rabbit_memory_monitor.erl20
-rw-r--r--src/rabbit_metrics.erl5
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl18
-rw-r--r--src/rabbit_mirror_queue_master.erl29
-rw-r--r--src/rabbit_mirror_queue_misc.erl50
-rw-r--r--src/rabbit_mirror_queue_sync.erl32
-rw-r--r--src/rabbit_mnesia.erl97
-rw-r--r--src/rabbit_mnesia_rename.erl5
-rw-r--r--src/rabbit_msg_file.erl16
-rw-r--r--src/rabbit_msg_store.erl71
-rw-r--r--src/rabbit_msg_store_gc.erl19
-rw-r--r--src/rabbit_networking.erl113
-rw-r--r--src/rabbit_node_monitor.erl69
-rw-r--r--src/rabbit_nodes.erl32
-rw-r--r--src/rabbit_plugins.erl39
-rw-r--r--src/rabbit_prelaunch.erl7
-rw-r--r--src/rabbit_prequeue.erl4
-rw-r--r--src/rabbit_queue_collector.erl5
-rw-r--r--src/rabbit_queue_consumers.erl98
-rw-r--r--src/rabbit_queue_index.erl78
-rw-r--r--src/rabbit_quorum_queue.erl58
-rw-r--r--src/rabbit_reader.erl49
-rw-r--r--src/rabbit_recovery_terms.erl14
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_sup.erl40
-rw-r--r--src/rabbit_table.erl41
-rw-r--r--src/rabbit_trace.erl28
-rw-r--r--src/rabbit_upgrade.erl15
-rw-r--r--src/rabbit_upgrade_functions.erl112
-rw-r--r--src/rabbit_variable_queue.erl9
-rw-r--r--src/rabbit_version.erl43
-rw-r--r--src/rabbit_vhost.erl50
-rw-r--r--src/rabbit_vm.erl12
-rw-r--r--src/supervised_lifecycle.erl2
-rw-r--r--src/tcp_listener.erl2
-rw-r--r--src/tcp_listener_sup.erl2
71 files changed, 1730 insertions, 1145 deletions
diff --git a/src/background_gc.erl b/src/background_gc.erl
index 43c109ee2a..7aea28c1f4 100644
--- a/src/background_gc.erl
+++ b/src/background_gc.erl
@@ -32,14 +32,12 @@
%%----------------------------------------------------------------------------
-spec start_link() -> {'ok', pid()} | {'error', any()}.
--spec run() -> 'ok'.
--spec gc() -> 'ok'.
-
-%%----------------------------------------------------------------------------
start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [],
[{timeout, infinity}]).
+-spec run() -> 'ok'.
+
run() -> gen_server2:cast(?MODULE, run).
%%----------------------------------------------------------------------------
@@ -73,6 +71,8 @@ interval_gc(State = #state{last_interval = LastInterval}) ->
erlang:send_after(Interval, self(), run),
State#state{last_interval = Interval}.
+-spec gc() -> 'ok'.
+
gc() ->
Enabled = rabbit_misc:get_env(rabbit, background_gc_enabled, false),
case Enabled of
diff --git a/src/dtree.erl b/src/dtree.erl
index 08ddd22532..fd2188de29 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -46,24 +46,17 @@
-type val() :: any().
-type kv() :: {pk(), val()}.
--spec empty() -> ?MODULE().
--spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
--spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
--spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
--spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
--spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
--spec drop(pk(), ?MODULE()) -> ?MODULE().
--spec is_defined(sk(), ?MODULE()) -> boolean().
--spec is_empty(?MODULE()) -> boolean().
--spec smallest(?MODULE()) -> kv().
--spec size(?MODULE()) -> non_neg_integer().
-
%%----------------------------------------------------------------------------
+-spec empty() -> ?MODULE().
+
empty() -> {gb_trees:empty(), gb_trees:empty()}.
%% Insert an entry. Fails if there already is an entry with the given
%% primary key.
+
+-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
+
insert(PK, [], V, {P, S}) ->
%% dummy insert to force error if PK exists
_ = gb_trees:insert(PK, {gb_sets:empty(), V}, P),
@@ -84,6 +77,9 @@ insert(PK, SKs, V, {P, S}) ->
%% that were dropped as the result (i.e. due to their secondary key
%% set becoming empty). It is ok for the given primary keys and/or
%% secondary key to not exist.
+
+-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+
take(PKs, SK, {P, S}) ->
case gb_trees:lookup(SK, S) of
none -> {[], {P, S}};
@@ -101,6 +97,9 @@ take(PKs, SK, {P, S}) ->
%% primary-key/value pairs of any entries that were dropped as the
%% result (i.e. due to their secondary key set becoming empty). It is
%% ok for the given secondary key to not exist.
+
+-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+
take(SK, {P, S}) ->
case gb_trees:lookup(SK, S) of
none -> {[], {P, S}};
@@ -111,6 +110,9 @@ take(SK, {P, S}) ->
%% Drop an entry with the primary key and clears secondary keys for this key,
%% returning a list with a key-value pair as a result.
%% If the primary key does not exist, returns an empty list.
+
+-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
+
take_one(PK, {P, S}) ->
case gb_trees:lookup(PK, P) of
{value, {SKS, Value}} ->
@@ -131,6 +133,9 @@ take_one(PK, {P, S}) ->
%% Drop all entries which contain the given secondary key, returning
%% the primary-key/value pairs of these entries. It is ok for the
%% given secondary key to not exist.
+
+-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+
take_all(SK, {P, S}) ->
case gb_trees:lookup(SK, S) of
none -> {[], {P, S}};
@@ -139,6 +144,9 @@ take_all(SK, {P, S}) ->
end.
%% Drop all entries for the given primary key (which does not have to exist).
+
+-spec drop(pk(), ?MODULE()) -> ?MODULE().
+
drop(PK, {P, S}) ->
case gb_trees:lookup(PK, P) of
none -> {P, S};
@@ -146,13 +154,21 @@ drop(PK, {P, S}) ->
prune(SKS, gb_sets:singleton(PK), S)}
end.
+-spec is_defined(sk(), ?MODULE()) -> boolean().
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
+-spec is_empty(?MODULE()) -> boolean().
+
is_empty({P, _S}) -> gb_trees:is_empty(P).
+-spec smallest(?MODULE()) -> kv().
+
smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
{K, V}.
+-spec size(?MODULE()) -> non_neg_integer().
+
size({P, _S}) -> gb_trees:size(P).
%%----------------------------------------------------------------------------
diff --git a/src/gatherer.erl b/src/gatherer.erl
index a8b55892c1..1625468a52 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -39,16 +39,6 @@
%%----------------------------------------------------------------------------
--spec start_link() -> rabbit_types:ok_pid_or_error().
--spec stop(pid()) -> 'ok'.
--spec fork(pid()) -> 'ok'.
--spec finish(pid()) -> 'ok'.
--spec in(pid(), any()) -> 'ok'.
--spec sync_in(pid(), any()) -> 'ok'.
--spec out(pid()) -> {'value', any()} | 'empty'.
-
-%%----------------------------------------------------------------------------
-
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -58,24 +48,38 @@
%%----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+-spec stop(pid()) -> 'ok'.
+
stop(Pid) ->
gen_server2:call(Pid, stop, infinity).
+-spec fork(pid()) -> 'ok'.
+
fork(Pid) ->
gen_server2:call(Pid, fork, infinity).
+-spec finish(pid()) -> 'ok'.
+
finish(Pid) ->
gen_server2:cast(Pid, finish).
+-spec in(pid(), any()) -> 'ok'.
+
in(Pid, Value) ->
gen_server2:cast(Pid, {in, Value}).
+-spec sync_in(pid(), any()) -> 'ok'.
+
sync_in(Pid, Value) ->
gen_server2:call(Pid, {in, Value}, infinity).
+-spec out(pid()) -> {'value', any()} | 'empty'.
+
out(Pid) ->
gen_server2:call(Pid, out, infinity).
diff --git a/src/gm.erl b/src/gm.erl
index 427fa78f4e..02ee76cd60 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -436,16 +436,6 @@
-type group_name() :: any().
-type txn_fun() :: fun((fun(() -> any())) -> any()).
--spec create_tables() -> 'ok' | {'aborted', any()}.
--spec start_link(group_name(), atom(), any(), txn_fun()) ->
- rabbit_types:ok_pid_or_error().
--spec leave(pid()) -> 'ok'.
--spec broadcast(pid(), any()) -> 'ok'.
--spec confirmed_broadcast(pid(), any()) -> 'ok'.
--spec info(pid()) -> rabbit_types:infos().
--spec validate_members(pid(), [pid()]) -> 'ok'.
--spec forget_group(group_name()) -> 'ok'.
-
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
%%
@@ -490,6 +480,8 @@
-callback handle_terminate(Args :: term(), Reason :: term()) ->
ok | term().
+-spec create_tables() -> 'ok' | {'aborted', any()}.
+
create_tables() ->
create_tables([?TABLE]).
@@ -506,27 +498,42 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
+-spec start_link(group_name(), atom(), any(), txn_fun()) ->
+ rabbit_types:ok_pid_or_error().
+
start_link(GroupName, Module, Args, TxnFun) ->
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
[{spawn_opt, [{fullsweep_after, 0}]}]).
+-spec leave(pid()) -> 'ok'.
+
leave(Server) ->
gen_server2:cast(Server, leave).
+-spec broadcast(pid(), any()) -> 'ok'.
+
broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
broadcast(Server, Msg, SizeHint) ->
gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
+-spec confirmed_broadcast(pid(), any()) -> 'ok'.
+
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
+-spec info(pid()) -> rabbit_types:infos().
+
info(Server) ->
gen_server2:call(Server, info, infinity).
+-spec validate_members(pid(), [pid()]) -> 'ok'.
+
validate_members(Server, Members) ->
gen_server2:cast(Server, {validate_members, Members}).
+-spec forget_group(group_name()) -> 'ok'.
+
forget_group(GroupName) ->
{atomic, ok} = mnesia:sync_transaction(
fun () ->
diff --git a/src/lqueue.erl b/src/lqueue.erl
index acfdbe79ef..820f9f4a2f 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -36,64 +36,76 @@
-type result(T) :: 'empty' | {'value', T}.
-spec new() -> ?MODULE(_).
--spec drop(?MODULE(T)) -> ?MODULE(T).
--spec is_empty(?MODULE(_)) -> boolean().
--spec len(?MODULE(_)) -> non_neg_integer().
--spec in(T, ?MODULE(T)) -> ?MODULE(T).
--spec in_r(value(), ?MODULE()) -> ?MODULE().
--spec out(?MODULE(T)) -> {result(T), ?MODULE()}.
--spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}.
--spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B).
--spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
--spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
--spec from_list([T]) -> ?MODULE(T).
--spec to_list(?MODULE(T)) -> [T].
-% -spec peek(?MODULE()) -> result().
--spec peek(?MODULE(T)) -> result(T).
--spec peek_r(?MODULE(T)) -> result(T).
new() -> {0, ?QUEUE:new()}.
+-spec drop(?MODULE(T)) -> ?MODULE(T).
+
drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}.
+-spec is_empty(?MODULE(_)) -> boolean().
+
is_empty({0, _Q}) -> true;
is_empty(_) -> false.
+-spec in(T, ?MODULE(T)) -> ?MODULE(T).
+
in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}.
+-spec in_r(value(), ?MODULE()) -> ?MODULE().
+
in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}.
+-spec out(?MODULE(T)) -> {result(T), ?MODULE()}.
+
out({0, _Q} = Q) -> {empty, Q};
out({L, Q}) -> {Result, Q1} = ?QUEUE:out(Q),
{Result, {L-1, Q1}}.
+-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}.
+
out_r({0, _Q} = Q) -> {empty, Q};
out_r({L, Q}) -> {Result, Q1} = ?QUEUE:out_r(Q),
{Result, {L-1, Q1}}.
+-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B).
+
join({L1, Q1}, {L2, Q2}) -> {L1 + L2, ?QUEUE:join(Q1, Q2)}.
+-spec to_list(?MODULE(T)) -> [T].
+
to_list({_L, Q}) -> ?QUEUE:to_list(Q).
+-spec from_list([T]) -> ?MODULE(T).
+
from_list(L) -> {length(L), ?QUEUE:from_list(L)}.
+-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+
foldl(Fun, Init, Q) ->
case out(Q) of
{empty, _Q} -> Init;
{{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1)
end.
+-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+
foldr(Fun, Init, Q) ->
case out_r(Q) of
{empty, _Q} -> Init;
{{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
end.
+-spec len(?MODULE(_)) -> non_neg_integer().
+
len({L, _}) -> L.
+-spec peek(?MODULE(T)) -> result(T).
peek({ 0, _Q}) -> empty;
peek({_L, Q}) -> ?QUEUE:peek(Q).
+-spec peek_r(?MODULE(T)) -> result(T).
+
peek_r({ 0, _Q}) -> empty;
peek_r({_L, Q}) -> ?QUEUE:peek_r(Q).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index 0ed7e9d85d..3f03c97182 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -44,15 +44,6 @@
-type name() :: term().
--spec start_link() -> {'ok', pid()} | {'error', any()}.
--spec start() -> {'ok', pid()} | {'error', any()}.
--spec join(name(), pid()) -> 'ok'.
--spec leave(name(), pid()) -> 'ok'.
--spec get_members(name()) -> [pid()].
--spec in_group(name(), pid()) -> boolean().
-
--spec sync() -> 'ok'.
-
%%----------------------------------------------------------------------------
-define(TABLE, pg_local_table).
@@ -61,24 +52,36 @@
%%% Exported functions
%%%
+-spec start_link() -> {'ok', pid()} | {'error', any()}.
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+-spec start() -> {'ok', pid()} | {'error', any()}.
+
start() ->
ensure_started().
+-spec join(name(), pid()) -> 'ok'.
+
join(Name, Pid) when is_pid(Pid) ->
_ = ensure_started(),
gen_server:cast(?MODULE, {join, Name, Pid}).
+-spec leave(name(), pid()) -> 'ok'.
+
leave(Name, Pid) when is_pid(Pid) ->
_ = ensure_started(),
gen_server:cast(?MODULE, {leave, Name, Pid}).
+-spec get_members(name()) -> [pid()].
+
get_members(Name) ->
_ = ensure_started(),
group_members(Name).
+-spec in_group(name(), pid()) -> boolean().
+
in_group(Name, Pid) ->
_ = ensure_started(),
%% The join message is a cast and thus can race, but we want to
@@ -89,6 +92,8 @@ in_group(Name, Pid) ->
member_present(Name, Pid)
end.
+-spec sync() -> 'ok'.
+
sync() ->
_ = ensure_started(),
gen_server:call(?MODULE, sync, infinity).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e06e50561d..2d16661768 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,6 +33,8 @@
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
+-deprecated([{force_event_refresh, 1, eventually}]).
+
-ifdef(TEST).
-export([start_logger/0]).
@@ -257,42 +259,6 @@
-type param() :: atom().
-type app_name() :: atom().
--spec start() -> 'ok'.
--spec boot() -> 'ok'.
--spec stop() -> 'ok'.
--spec stop_and_halt() -> no_return().
-
--spec status
- () -> [{pid, integer()} |
- {running_applications, [{atom(), string(), string()}]} |
- {os, {atom(), atom()}} |
- {erlang_version, string()} |
- {memory, any()}].
--spec is_running() -> boolean().
--spec is_running(node()) -> boolean().
--spec environment() -> [{param(), term()}].
--spec rotate_logs() -> rabbit_types:ok_or_error(any()).
--deprecated([{force_event_refresh, 1, eventually}]).
--spec force_event_refresh(reference()) -> 'ok'.
-
--spec log_locations() -> [log_location()].
-
--spec start('normal',[]) ->
- {'error',
- {'erlang_version_too_old',
- {'found',string(),string()},
- {'required',string(),string()}}} |
- {'ok',pid()}.
--spec stop(_) -> 'ok'.
-
--spec maybe_insert_default_data() -> 'ok'.
--spec boot_delegate() -> 'ok'.
--spec recover() -> 'ok'.
--spec start_apps([app_name()]) -> 'ok'.
--spec start_apps([app_name()],
- #{app_name() => restart_type()}) -> 'ok'.
--spec stop_apps([app_name()]) -> 'ok'.
-
%%----------------------------------------------------------------------------
ensure_application_loaded() ->
@@ -304,6 +270,8 @@ ensure_application_loaded() ->
{error, {already_loaded, rabbit}} -> ok
end.
+-spec start() -> 'ok'.
+
start() ->
start_it(fun() ->
%% We do not want to upgrade mnesia after just
@@ -317,6 +285,8 @@ start() ->
broker_start()
end).
+-spec boot() -> 'ok'.
+
boot() ->
start_it(fun() ->
ensure_config(),
@@ -494,6 +464,8 @@ start_it(StartFun) ->
Marker ! stop
end.
+-spec stop() -> 'ok'.
+
stop() ->
case whereis(rabbit_boot) of
undefined -> ok;
@@ -508,6 +480,8 @@ stop() ->
stop_apps(app_utils:app_dependency_order(Apps, true)),
rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []).
+-spec stop_and_halt() -> no_return().
+
stop_and_halt() ->
try
stop()
@@ -533,9 +507,14 @@ stop_and_halt() ->
end,
ok.
+-spec start_apps([app_name()]) -> 'ok'.
+
start_apps(Apps) ->
start_apps(Apps, #{}).
+-spec start_apps([app_name()],
+ #{app_name() => restart_type()}) -> 'ok'.
+
start_apps(Apps, RestartTypes) ->
app_utils:load_applications(Apps),
rabbit_feature_flags:initialize_registry(),
@@ -678,6 +657,8 @@ decrypt_list([{Key, Value}|Tail], Algo, Acc) when Key =/= encrypted ->
decrypt_list([Value|Tail], Algo, Acc) ->
decrypt_list(Tail, Algo, [decrypt(Value, Algo)|Acc]).
+-spec stop_apps([app_name()]) -> 'ok'.
+
stop_apps([]) ->
ok;
stop_apps(Apps) ->
@@ -712,16 +693,19 @@ is_booting(Node) ->
-spec await_startup() -> 'ok' | {'error', 'timeout'}.
+
await_startup() ->
await_startup(node(), false).
-spec await_startup(node() | non_neg_integer()) -> 'ok' | {'error', 'timeout'}.
+
await_startup(Node) when is_atom(Node) ->
await_startup(Node, false);
await_startup(Timeout) when is_integer(Timeout) ->
await_startup(node(), false, Timeout).
-spec await_startup(node(), boolean()) -> 'ok' | {'error', 'timeout'}.
+
await_startup(Node, PrintProgressReports) ->
case is_booting(Node) of
true -> wait_for_boot_to_finish(Node, PrintProgressReports);
@@ -734,6 +718,7 @@ await_startup(Node, PrintProgressReports) ->
end.
-spec await_startup(node(), boolean(), non_neg_integer()) -> 'ok' | {'error', 'timeout'}.
+
await_startup(Node, PrintProgressReports, Timeout) ->
case is_booting(Node) of
true -> wait_for_boot_to_finish(Node, PrintProgressReports, Timeout);
@@ -805,6 +790,13 @@ maybe_print_boot_progress(true, IterationsLeft) ->
_ -> ok
end.
+-spec status
+ () -> [{pid, integer()} |
+ {running_applications, [{atom(), string(), string()}]} |
+ {os, {atom(), atom()}} |
+ {erlang_version, string()} |
+ {memory, any()}].
+
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
%% The timeout value used is twice that of gen_server:call/2.
@@ -860,8 +852,13 @@ listeners() ->
%% TODO this only determines if the rabbit application has started,
%% not if it is running, never mind plugins. It would be nice to have
%% more nuance here.
+
+-spec is_running() -> boolean().
+
is_running() -> is_running(node()).
+-spec is_running(node()) -> boolean().
+
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
is_booted() -> is_booted(node()).
@@ -873,6 +870,8 @@ is_booted(Node) ->
_ -> false
end.
+-spec environment() -> [{param(), term()}].
+
environment() ->
%% The timeout value is twice that of gen_server:call/2.
[{A, environment(A)} ||
@@ -883,6 +882,8 @@ environment(App) ->
lists:keysort(1, [P || P = {K, _} <- application:get_all_env(App),
not lists:member(K, Ignore)]).
+-spec rotate_logs() -> rabbit_types:ok_or_error(any()).
+
rotate_logs() ->
rabbit_lager:fold_sinks(
fun
@@ -906,6 +907,13 @@ rotate_logs() ->
%%--------------------------------------------------------------------
+-spec start('normal',[]) ->
+ {'error',
+ {'erlang_version_too_old',
+ {'found',string(),string()},
+ {'required',string(),string()}}} |
+ {'ok',pid()}.
+
start(normal, []) ->
case erts_version_check() of
ok ->
@@ -938,6 +946,8 @@ prep_stop(State) ->
rabbit_peer_discovery:maybe_unregister(),
State.
+-spec stop(_) -> 'ok'.
+
stop(_State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -992,14 +1002,20 @@ log_boot_error_and_exit(Reason, Format, Args) ->
%%---------------------------------------------------------------------------
%% boot step functions
+-spec boot_delegate() -> 'ok'.
+
boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
+-spec recover() -> 'ok'.
+
recover() ->
rabbit_policy:recover(),
rabbit_vhost:recover().
+-spec maybe_insert_default_data() -> 'ok'.
+
maybe_insert_default_data() ->
case rabbit_table:needs_default_data() of
true -> insert_default_data();
@@ -1044,12 +1060,17 @@ start_logger() ->
rabbit_lager:start_logger(),
ok.
+-spec log_locations() -> [log_location()].
+
log_locations() ->
rabbit_lager:log_locations().
%% This feature was used by the management API up-to and including
%% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it
%% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters).
+
+-spec force_event_refresh(reference()) -> 'ok'.
+
force_event_refresh(Ref) ->
rabbit_direct:force_event_refresh(Ref),
rabbit_networking:force_connection_event_refresh(Ref),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 4356cb427f..44e72f7ce4 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -27,29 +27,20 @@
-type permission_atom() :: 'configure' | 'read' | 'write'.
+%%----------------------------------------------------------------------------
+
-spec check_user_pass_login
(rabbit_types:username(), rabbit_types:password()) ->
{'ok', rabbit_types:user()} |
{'refused', rabbit_types:username(), string(), [any()]}.
+
+check_user_pass_login(Username, Password) ->
+ check_user_login(Username, [{password, Password}]).
+
-spec check_user_login
(rabbit_types:username(), [{atom(), any()}]) ->
{'ok', rabbit_types:user()} |
{'refused', rabbit_types:username(), string(), [any()]}.
--spec check_user_loopback
- (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) ->
- 'ok' | 'not_allowed'.
--spec check_vhost_access
- (rabbit_types:user(), rabbit_types:vhost(),
- rabbit_net:socket() | #authz_socket_info{}) ->
- 'ok' | rabbit_types:channel_exit().
--spec check_resource_access
- (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) ->
- 'ok' | rabbit_types:channel_exit().
-
-%%----------------------------------------------------------------------------
-
-check_user_pass_login(Username, Password) ->
- check_user_login(Username, [{password, Password}]).
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
@@ -122,6 +113,10 @@ auth_user(#user{username = Username, tags = Tags}, Impl) ->
tags = Tags,
impl = Impl}.
+-spec check_user_loopback
+ (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) ->
+ 'ok' | 'not_allowed'.
+
check_user_loopback(Username, SockOrAddr) ->
{ok, Users} = application:get_env(rabbit, loopback_users),
case rabbit_net:is_loopback(SockOrAddr)
@@ -130,6 +125,11 @@ check_user_loopback(Username, SockOrAddr) ->
false -> not_allowed
end.
+-spec check_vhost_access
+ (rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_net:socket() | #authz_socket_info{}) ->
+ 'ok' | rabbit_types:channel_exit().
+
check_vhost_access(User = #user{username = Username,
authz_backends = Modules}, VHostPath, Sock) ->
lists:foldl(
@@ -146,6 +146,10 @@ check_vhost_access(User = #user{username = Username,
Else
end, ok, Modules).
+-spec check_resource_access
+ (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) ->
+ 'ok' | rabbit_types:channel_exit().
+
check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
check_resource_access(User, R#resource{name = <<"amq.default">>},
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 789992cb3d..6f33e02fe4 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -52,21 +52,15 @@
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
-type alarm() :: local_alarm() | resource_alarm().
--spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start() -> 'ok'.
--spec stop() -> 'ok'.
--spec register(pid(), rabbit_types:mfargs()) -> [atom()].
--spec set_alarm({alarm(), []}) -> 'ok'.
--spec clear_alarm(alarm()) -> 'ok'.
--spec on_node_up(node()) -> 'ok'.
--spec on_node_down(node()) -> 'ok'.
--spec get_alarms() -> [{alarm(), []}].
-
%%----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_event:start_link({local, ?SERVER}).
+-spec start() -> 'ok'.
+
start() ->
ok = rabbit_sup:start_restartable_child(?MODULE),
ok = gen_event:add_handler(?SERVER, ?MODULE, []),
@@ -84,21 +78,38 @@ start() ->
rabbit_disk_monitor, [DiskLimit]),
ok.
+-spec stop() -> 'ok'.
+
stop() -> ok.
%% Registers a handler that should be called on every resource alarm change.
%% Given a call rabbit_alarm:register(Pid, {M, F, A}), the handler would be
%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source'
%% has the type of resource_alarm_source() and `Alert' has the type of resource_alert().
+
+-spec register(pid(), rabbit_types:mfargs()) -> [atom()].
+
register(Pid, AlertMFA) ->
gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
+-spec set_alarm({alarm(), []}) -> 'ok'.
+
set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
+
+-spec clear_alarm(alarm()) -> 'ok'.
+
clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
+-spec get_alarms() -> [{alarm(), []}].
+
get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
+-spec on_node_up(node()) -> 'ok'.
+
on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
+
+-spec on_node_down(node()) -> 'ok'.
+
on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
remote_conserve_resources(Pid, Source, {true, _, _}) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 118a3d0fab..621ce95d41 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -51,6 +51,8 @@
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
+-deprecated([{force_event_refresh, 1, eventually}]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -84,150 +86,6 @@
{'absent', amqqueue:amqqueue(),absent_reason()}.
-type not_found_or_absent() ::
'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}.
--spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
--spec stop(rabbit_types:vhost()) -> 'ok'.
--spec start([amqqueue:amqqueue()]) -> 'ok'.
--spec declare
- (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
- rabbit_types:maybe(pid()), rabbit_types:username()) ->
- {'new' | 'existing' | 'absent' | 'owner_died',
- amqqueue:amqqueue()} |
- {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
- rabbit_types:channel_exit().
--spec declare
- (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
- rabbit_types:maybe(pid()), rabbit_types:username(), node()) ->
- {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
- {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
- {'absent', amqqueue:amqqueue(), absent_reason()} |
- rabbit_types:channel_exit().
--spec internal_declare(amqqueue:amqqueue(), boolean()) ->
- queue_or_absent() | rabbit_misc:thunk(queue_or_absent()).
--spec update
- (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
- 'not_found' | amqqueue:amqqueue().
--spec lookup
- (name()) ->
- rabbit_types:ok(amqqueue:amqqueue()) |
- rabbit_types:error('not_found');
- ([name()]) ->
- [amqqueue:amqqueue()].
--spec not_found_or_absent(name()) -> not_found_or_absent().
--spec with(name(), qfun(A)) ->
- A | rabbit_types:error(not_found_or_absent()).
--spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B.
--spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
--spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit().
--spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) ->
- rabbit_types:channel_exit().
--spec assert_equivalence
- (amqqueue:amqqueue(), boolean(), boolean(),
- rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
- 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
--spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
- 'ok' | rabbit_types:channel_exit().
--spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
- A | rabbit_types:channel_exit().
--spec list() -> [amqqueue:amqqueue()].
--spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
--spec list_names() -> [rabbit_amqqueue:name()].
--spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
--spec list_by_type(atom()) -> [amqqueue:amqqueue()].
--spec info_keys() -> rabbit_types:info_keys().
--spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
--spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) ->
- rabbit_types:infos().
--spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
--spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
- [rabbit_types:infos()].
--deprecated([{force_event_refresh, 1, eventually}]).
--spec force_event_refresh(reference()) -> 'ok'.
--spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'.
--spec consumers(amqqueue:amqqueue()) ->
- [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
- rabbit_framing:amqp_table()}].
--spec consumer_info_keys() -> rabbit_types:info_keys().
--spec consumers_all(rabbit_types:vhost()) ->
- [{name(), pid(), rabbit_types:ctag(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table()}].
--spec stat(amqqueue:amqqueue()) ->
- {'ok', non_neg_integer(), non_neg_integer()}.
--spec delete_immediately(qpids()) -> 'ok'.
--spec delete_exclusive(qpids(), pid()) -> 'ok'.
--spec delete
- (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
- qlen();
- (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
- qlen() | rabbit_types:error('in_use');
- (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
- qlen() | rabbit_types:error('not_empty');
- (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
- qlen() |
- rabbit_types:error('in_use') |
- rabbit_types:error('not_empty').
--spec delete_crashed(amqqueue:amqqueue()) -> 'ok'.
--spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
--spec purge(amqqueue:amqqueue()) -> {ok, qlen()}.
--spec forget_all_durable(node()) -> 'ok'.
--spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') ->
- {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}.
--spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'.
--spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
--spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
--spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(),
- #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
--spec notify_down_all(qpids(), pid()) -> ok_or_errors().
--spec notify_down_all(qpids(), pid(), non_neg_integer()) ->
- ok_or_errors().
--spec activate_limit_all(qpids(), pid()) -> ok_or_errors().
--spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(),
- #{Name :: atom() => rabbit_fifo_client:state()}) ->
- {'ok', non_neg_integer(), qmsg()} | 'empty'.
--spec credit
- (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
- boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
- 'ok'.
--spec basic_consume
- (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
- non_neg_integer(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table(), any(), rabbit_types:username(),
- #{Name :: atom() => rabbit_fifo_client:state()}) ->
- rabbit_types:ok_or_error('exclusive_consume_unavailable').
--spec basic_cancel
- (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(),
- rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
- 'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}.
--spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
--spec resume(pid(), pid()) -> 'ok'.
--spec internal_delete(name(), rabbit_types:username()) ->
- 'ok' | rabbit_types:connection_exit() |
- fun (() ->
- 'ok' | rabbit_types:connection_exit()).
--spec run_backing_queue
- (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) ->
- 'ok'.
--spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'.
--spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
--spec on_node_up(node()) -> 'ok'.
--spec on_node_down(node()) -> 'ok'.
--spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue().
--spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue().
--spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
--spec store_queue(amqqueue:amqqueue()) -> 'ok'.
--spec update_decorators(name()) -> 'ok'.
--spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) ->
- 'ok'.
--spec update_mirroring(pid()) -> 'ok'.
--spec sync_mirrors(amqqueue:amqqueue() | pid()) ->
- 'ok' | rabbit_types:error('not_mirrored').
--spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) ->
- 'ok' | {'ok', 'not_syncing'}.
--spec is_replicated(amqqueue:amqqueue()) -> boolean().
-
--spec pid_of(amqqueue:amqqueue()) ->
- {'ok', pid()} | rabbit_types:error('not_found').
--spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
- {'ok', pid()} | rabbit_types:error('not_found').
%%----------------------------------------------------------------------------
@@ -250,6 +108,8 @@ warn_file_limit() ->
ok
end.
+-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+
recover(VHost) ->
Classic = find_local_durable_classic_queues(VHost),
Quorum = find_local_quorum_queues(VHost),
@@ -278,11 +138,14 @@ filter_pid_per_type(QPids) ->
filter_resource_per_type(Resources) ->
Queues = [begin
- {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {ok, Q} = lookup(Resource),
+ QPid = amqqueue:get_pid(Q),
{Resource, QPid}
end || Resource <- Resources],
lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+-spec stop(rabbit_types:vhost()) -> 'ok'.
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -290,6 +153,8 @@ stop(VHost) ->
ok = BQ:stop(VHost),
rabbit_quorum_queue:stop(VHost).
+-spec start([amqqueue:amqqueue()]) -> 'ok'.
+
start(Qs) ->
{Classic, _Quorum} = filter_per_type(Qs),
%% At this point all recovered queues and their bindings are
@@ -366,6 +231,14 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
+-spec declare
+ (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid()), rabbit_types:username()) ->
+ {'new' | 'existing' | 'absent' | 'owner_died',
+ amqqueue:amqqueue()} |
+ {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
+ rabbit_types:channel_exit().
+
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()).
@@ -373,6 +246,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
%% The Node argument suggests where the queue (master if mirrored)
%% should be. Note that in some cases (e.g. with "nodes" policy in
%% effect) this might not be possible to satisfy.
+
+-spec declare
+ (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid()), rabbit_types:username(), node()) ->
+ {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
+ {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
+ {'absent', amqqueue:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit().
+
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
ok = check_declare_arguments(QueueName, Args),
@@ -434,6 +316,9 @@ get_queue_type(Args) ->
erlang:binary_to_existing_atom(V, utf8)
end.
+-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
+ queue_or_absent() | rabbit_misc:thunk(queue_or_absent()).
+
internal_declare(Q, Recover) ->
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
do_internal_declare(Q, Recover),
@@ -466,6 +351,10 @@ do_internal_declare(Q, false) ->
end
end).
+-spec update
+ (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
+ 'not_found' | amqqueue:amqqueue().
+
update(Name, Fun) ->
case mnesia:wread({rabbit_queue, Name}) of
[Q] ->
@@ -498,6 +387,8 @@ do_ensure_rabbit_queue_record_is_initialized(Q) ->
rabbit_misc:const({ok, Q})
end).
+-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
+
store_queue(Q) when ?amqqueue_is_durable(Q) ->
Q1 = amqqueue:reset_mirroring_and_decorators(Q),
ok = mnesia:write(rabbit_durable_queue, Q1, write),
@@ -508,6 +399,8 @@ store_queue(Q) when not ?amqqueue_is_durable(Q) ->
store_queue_ram(Q) ->
ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+-spec update_decorators(name()) -> 'ok'.
+
update_decorators(Name) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
@@ -518,6 +411,9 @@ update_decorators(Name) ->
end
end).
+-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) ->
+ 'ok'.
+
policy_changed(Q1, Q2) ->
Decorators1 = amqqueue:get_decorators(Q1),
Decorators2 = amqqueue:get_decorators(Q2),
@@ -529,6 +425,13 @@ policy_changed(Q1, Q2) ->
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
+-spec lookup
+ (name()) ->
+ rabbit_types:ok(amqqueue:amqqueue()) |
+ rabbit_types:error('not_found');
+ ([name()]) ->
+ [amqqueue:amqqueue()].
+
lookup([]) -> []; %% optimisation
lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
@@ -538,6 +441,8 @@ lookup(Names) when is_list(Names) ->
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
+-spec not_found_or_absent(name()) -> not_found_or_absent().
+
not_found_or_absent(Name) ->
%% NB: we assume that the caller has already performed a lookup on
%% rabbit_queue and not found anything
@@ -555,6 +460,8 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
+-spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B.
+
with(Name, F, E) ->
with(Name, F, E, 2000).
@@ -620,15 +527,25 @@ retry_wait(Q, F, E, RetriesLeft) ->
with(Name, F, E, RetriesLeft - 1)
end.
+-spec with(name(), qfun(A)) ->
+ A | rabbit_types:error(not_found_or_absent()).
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
+-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
+
with_or_die(Name, F) ->
with(Name, F, fun (not_found) -> not_found(Name);
({absent, Q, Reason}) -> absent(Q, Reason)
end).
+-spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit().
+
not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]).
+-spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) ->
+ rabbit_types:channel_exit().
+
absent(Q, AbsentReason) ->
QueueName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
@@ -660,6 +577,11 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) ->
not_found,
"failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]).
+-spec assert_equivalence
+ (amqqueue:amqqueue(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
+ 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
+
assert_equivalence(Q, Durable1, AD1, Args1, Owner) ->
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
@@ -669,6 +591,9 @@ assert_equivalence(Q, Durable1, AD1, Args1, Owner) ->
assert_args_equivalence(Q, Args1),
check_exclusive_access(Q, Owner, strict).
+-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
+ 'ok' | rabbit_types:channel_exit().
+
check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
check_exclusive_access(Q, Owner, _MatchType)
@@ -684,6 +609,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
"cannot obtain exclusive access to locked ~s",
[rabbit_misc:rs(QueueName)]).
+-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
+ A | rabbit_types:channel_exit().
+
with_exclusive_access_or_die(Name, ReaderPid, F) ->
with_or_die(Name,
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
@@ -817,6 +745,7 @@ check_queue_type({longstr, Val}, _Args) ->
check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+-spec list() -> [amqqueue:amqqueue()].
list() ->
list_with_possible_retry(fun do_list/0).
@@ -824,6 +753,8 @@ list() ->
do_list() ->
mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
+-spec list_names() -> [rabbit_amqqueue:name()].
+
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)].
@@ -832,6 +763,8 @@ list_local_names() ->
[ amqqueue:get_name(Q) || Q <- list(),
amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
+-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
+
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
@@ -853,6 +786,8 @@ is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
Node =:= Leader.
+-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+
list(VHostPath) ->
list(VHostPath, rabbit_queue).
@@ -902,6 +837,8 @@ list_with_possible_retry(Fun) ->
Ret
end.
+-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+
list_down(VHostPath) ->
case rabbit_vhost:exists(VHostPath) of
false -> [];
@@ -937,6 +874,8 @@ list_for_count(VHost) ->
amqqueue:field_vhost())
end).
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
@@ -957,6 +896,8 @@ is_unresponsive(Q, Timeout) ->
format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
format(_) -> [].
+-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
+
info(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q);
info(Q) when ?amqqueue_state_is(Q, crashed) -> info_down(Q, crashed);
info(Q) when ?amqqueue_state_is(Q, stopped) -> info_down(Q, stopped);
@@ -964,6 +905,9 @@ info(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
+-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+
info(Q, Items) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue:info(Q, Items);
info(Q, Items) when ?amqqueue_state_is(Q, crashed) ->
@@ -996,10 +940,15 @@ i_down(K, _Q, _DownReason) ->
false -> throw({bad_argument, K})
end.
+-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+
info_all(VHostPath) ->
map(list(VHostPath), fun (Q) -> info(Q) end) ++
map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end).
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+
info_all(VHostPath, Items) ->
map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
@@ -1038,11 +987,15 @@ list_local(VHostPath) ->
[Q || Q <- list(VHostPath),
amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
+-spec force_event_refresh(reference()) -> 'ok'.
+
force_event_refresh(Ref) ->
[gen_server2:cast(amqqueue:get_pid(Q),
{force_event_refresh, Ref}) || Q <- list()],
ok.
+-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'.
+
notify_policy_changed(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
gen_server2:cast(QPid, policy_changed);
@@ -1051,6 +1004,10 @@ notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
rabbit_quorum_queue:policy_changed(QName, QPid).
+-spec consumers(amqqueue:amqqueue()) ->
+ [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
+ rabbit_framing:amqp_table()}].
+
consumers(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
@@ -1060,8 +1017,14 @@ consumers(Q) when ?amqqueue_is_quorum(Q) ->
fun rabbit_fifo:query_consumers/1),
maps:values(Result).
+-spec consumer_info_keys() -> rabbit_types:info_keys().
+
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
+-spec consumers_all(rabbit_types:vhost()) ->
+ [{name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}].
+
consumers_all(VHostPath) ->
ConsumerInfoKeys = consumer_info_keys(),
lists:append(
@@ -1086,20 +1049,34 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
+-spec stat(amqqueue:amqqueue()) ->
+ {'ok', non_neg_integer(), non_neg_integer()}.
+
stat(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:stat(Q);
stat(Q) -> delegate:invoke(amqqueue:get_pid(Q), {gen_server2, call, [stat, infinity]}).
+-spec pid_of(amqqueue:amqqueue()) ->
+ {'ok', pid()} | rabbit_types:error('not_found').
+
pid_of(Q) -> amqqueue:get_pid(Q).
+
+-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
+ {'ok', pid()} | rabbit_types:error('not_found').
+
pid_of(VHost, QueueName) ->
case lookup(rabbit_misc:r(VHost, queue, QueueName)) of
{ok, Q} -> pid_of(Q);
{error, not_found} = E -> E
end.
+-spec delete_exclusive(qpids(), pid()) -> 'ok'.
+
delete_exclusive(QPids, ConnId) ->
[gen_server2:cast(QPid, {delete_exclusive, ConnId}) || QPid <- QPids],
ok.
+-spec delete_immediately(qpids()) -> 'ok'.
+
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
@@ -1115,6 +1092,18 @@ delete_immediately_by_resource(Resources) ->
|| {Resource, QPid} <- Quorum],
ok.
+-spec delete
+ (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
+ qlen();
+ (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
+ qlen() | rabbit_types:error('in_use');
+ (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
+ qlen() | rabbit_types:error('not_empty');
+ (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
+ qlen() |
+ rabbit_types:error('in_use') |
+ rabbit_types:error('not_empty').
+
delete(Q,
IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue:delete(Q, IfUnused, IfEmpty, ActingUser);
@@ -1175,18 +1164,24 @@ wait_for_promoted_or_stopped(Q0) ->
{error, not_found}
end.
+-spec delete_crashed(amqqueue:amqqueue()) -> 'ok'.
+
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
delete_crashed(Q, ActingUser) ->
ok = rpc:call(amqqueue:qnode(Q), ?MODULE, delete_crashed_internal, [Q, ActingUser]).
+-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
+
delete_crashed_internal(Q, ActingUser) ->
QName = amqqueue:get_name(Q),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
BQ:delete_crashed(Q),
ok = internal_delete(QName, ActingUser).
+-spec purge(amqqueue:amqqueue()) -> {ok, qlen()}.
+
purge(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [purge, infinity]});
@@ -1194,6 +1189,7 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
+-spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}),
@@ -1209,6 +1205,8 @@ requeue({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
QuorumStates
end.
+-spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+
ack(QPid, {_, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) ->
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}),
QueueStates;
@@ -1223,6 +1221,9 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
QuorumStates
end.
+-spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(),
+ #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+
reject(QPid, Requeue, {_, MsgIds}, ChPid, QStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke_no_result(QPid, {gen_server2, cast,
[{reject, Requeue, MsgIds, ChPid}]}),
@@ -1239,9 +1240,14 @@ reject({Name, _} = QPid, Requeue, {CTag, MsgIds}, _ChPid, QuorumStates)
QuorumStates
end.
+-spec notify_down_all(qpids(), pid()) -> ok_or_errors().
+
notify_down_all(QPids, ChPid) ->
notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
+-spec notify_down_all(qpids(), pid(), non_neg_integer()) ->
+ ok_or_errors().
+
notify_down_all(QPids, ChPid, Timeout) ->
case rpc:call(node(), delegate, invoke,
[QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
@@ -1259,11 +1265,18 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
+-spec activate_limit_all(qpids(), pid()) -> ok_or_errors().
+
activate_limit_all(QRefs, ChPid) ->
QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
+-spec credit
+ (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
+ boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
+ 'ok'.
+
credit(Q, ChPid, CTag, Credit,
Drain, QStates) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
@@ -1279,6 +1292,9 @@ credit(Q,
{ok, QState} = rabbit_quorum_queue:credit(CTag, Credit, Drain, QState0),
{ok, maps:put(Name, QState, QStates)}.
+-spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(),
+ #{Name :: atom() => rabbit_fifo_client:state()}) ->
+ {'ok', non_neg_integer(), qmsg()} | 'empty'.
basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _)
when ?amqqueue_is_classic(Q) ->
@@ -1301,8 +1317,15 @@ basic_get(Q, _ChPid, NoAck, _LimiterPid, CTag, QStates)
[rabbit_misc:rs(QName), Reason])
end.
-basic_consume(Q, NoAck, ChPid,
- LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+-spec basic_consume
+ (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
+ non_neg_integer(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table(), any(), rabbit_types:username(),
+ #{Name :: atom() => rabbit_fifo_client:state()}) ->
+ rabbit_types:ok_or_error('exclusive_consume_unavailable').
+
+basic_consume(Q, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, QState)
when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
@@ -1340,6 +1363,11 @@ basic_consume(Q,
OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
+-spec basic_cancel
+ (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(),
+ rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
+ 'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}.
+
basic_cancel(Q, ChPid, ConsumerTag, OkMsg, ActingUser,
QState)
when ?amqqueue_is_classic(Q) ->
@@ -1359,6 +1387,8 @@ basic_cancel(Q, ChPid,
{ok, QState} = rabbit_quorum_queue:basic_cancel(ConsumerTag, ChPid, OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
+-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
+
notify_decorators(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
@@ -1369,6 +1399,8 @@ notify_sent(QPid, ChPid) ->
notify_sent_queue_down(QPid) ->
rabbit_amqqueue_common:notify_sent_queue_down(QPid).
+-spec resume(pid(), pid()) -> 'ok'.
+
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
internal_delete1(QueueName, OnlyDurable) ->
@@ -1389,6 +1421,11 @@ internal_delete1(QueueName, OnlyDurable, Reason) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
+-spec internal_delete(name(), rabbit_types:username()) ->
+ 'ok' | rabbit_types:connection_exit() |
+ fun (() ->
+ 'ok' | rabbit_types:connection_exit()).
+
internal_delete(QueueName, ActingUser) ->
internal_delete(QueueName, ActingUser, normal).
@@ -1413,6 +1450,8 @@ internal_delete(QueueName, ActingUser, Reason) ->
end
end).
+-spec forget_all_durable(node()) -> 'ok'.
+
forget_all_durable(Node) ->
%% Note rabbit is not running so we avoid e.g. the worker pool. Also why
%% we don't invoke the return from rabbit_binding:process_deletions/1.
@@ -1474,29 +1513,48 @@ node_permits_offline_promotion(Node) ->
%%
%% [2] This is simpler; as long as it's down that's OK
+-spec run_backing_queue
+ (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) ->
+ 'ok'.
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
+-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'.
+
set_ram_duration_target(QPid, Duration) ->
gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
+-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
+
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+-spec update_mirroring(pid()) -> 'ok'.
+
update_mirroring(QPid) ->
ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
+-spec sync_mirrors(amqqueue:amqqueue() | pid()) ->
+ 'ok' | rabbit_types:error('not_mirrored').
+
sync_mirrors(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
sync_mirrors(QPid) ->
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
+
+-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) ->
+ 'ok' | {'ok', 'not_syncing'}.
+
cancel_sync_mirrors(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
cancel_sync_mirrors(QPid) ->
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
+-spec is_replicated(amqqueue:amqqueue()) -> boolean().
+
is_replicated(Q) when ?amqqueue_is_quorum(Q) ->
true;
is_replicated(Q) ->
@@ -1508,6 +1566,8 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
Pid = amqqueue:get_pid(Q),
not rabbit_mnesia:is_process_alive(Pid).
+-spec on_node_up(node()) -> 'ok'.
+
on_node_up(Node) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -1548,6 +1608,8 @@ maybe_clear_recoverable_node(Node, Q) ->
ok
end.
+-spec on_node_down(node()) -> 'ok'.
+
on_node_down(Node) ->
{QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
notify_queue_binding_deletions(QueueDeletions),
@@ -1613,9 +1675,13 @@ notify_queues_deleted(QueueDeletions) ->
end,
QueueDeletions).
+-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue().
+
pseudo_queue(QueueName, Pid) ->
pseudo_queue(QueueName, Pid, false).
+-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue().
+
pseudo_queue(QueueName, Pid, Durable) ->
amqqueue:new(QueueName,
Pid,
@@ -1628,12 +1694,19 @@ pseudo_queue(QueueName, Pid, Durable) ->
classic % Type
).
+-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
+
immutable(Q) -> amqqueue:set_immutable(Q).
+-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'.
+
deliver(Qs, Delivery) ->
deliver(Qs, Delivery, untracked),
ok.
+-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') ->
+ {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}.
+
deliver([], _Delivery, QueueState) ->
%% /dev/null optimisation
{[], [], QueueState};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dd05b98e81..c3ba4a5c59 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,14 +102,6 @@
%%----------------------------------------------------------------------------
--spec info_keys() -> rabbit_types:info_keys().
--spec init_with_backing_queue_state
- (amqqueue:amqqueue(), atom(), tuple(), any(),
- [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
- #q{}.
-
-%%----------------------------------------------------------------------------
-
-define(STATISTICS_KEYS,
[messages_ready,
messages_unacknowledged,
@@ -147,6 +139,8 @@
%%----------------------------------------------------------------------------
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
@@ -265,6 +259,11 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
+-spec init_with_backing_queue_state
+ (amqqueue:amqqueue(), atom(), tuple(), any(),
+ [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
+ #q{}.
+
init_with_backing_queue_state(Q, BQ, BQS,
RateTRef, Deliveries, Senders, MTC) ->
Owner = amqqueue:get_exclusive_owner(Q),
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 7c5a70b529..64efc01786 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -29,8 +29,6 @@
-spec start_link(amqqueue:amqqueue(), rabbit_prequeue:start_mode()) ->
{'ok', pid(), pid()}.
-%%----------------------------------------------------------------------------
-
start_link(Q, StartMode) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
ChildSpec = {rabbit_amqqueue,
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 7e40e54aca..9e30a7f0ae 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -31,15 +31,14 @@
%%----------------------------------------------------------------------------
-spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start_queue_process
- (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') ->
- pid().
-
-%%----------------------------------------------------------------------------
start_link() ->
supervisor2:start_link(?MODULE, []).
+-spec start_queue_process
+ (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') ->
+ pid().
+
start_queue_process(Node, Q, StartMode) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
{ok, Sup} = find_for_vhost(VHost, Node),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 83580e8b9c..5732ac4b30 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -47,46 +47,6 @@
-type regexp() :: binary().
--spec add_user(rabbit_types:username(), rabbit_types:password(),
- rabbit_types:username()) -> 'ok' | {'error', string()}.
--spec delete_user(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
--spec lookup_user
- (rabbit_types:username()) ->
- rabbit_types:ok(rabbit_types:internal_user()) |
- rabbit_types:error('not_found').
--spec change_password
- (rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'.
--spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
--spec hash_password
- (module(), rabbit_types:password()) -> rabbit_types:password_hash().
--spec change_password_hash
- (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'.
--spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
--spec set_permissions
- (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
- regexp(), rabbit_types:username()) ->
- 'ok'.
--spec clear_permissions
- (rabbit_types:username(), rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
--spec user_info_keys() -> rabbit_types:info_keys().
--spec perms_info_keys() -> rabbit_types:info_keys().
--spec user_perms_info_keys() -> rabbit_types:info_keys().
--spec vhost_perms_info_keys() -> rabbit_types:info_keys().
--spec user_vhost_perms_info_keys() -> rabbit_types:info_keys().
--spec list_users() -> [rabbit_types:infos()].
--spec list_users(reference(), pid()) -> 'ok'.
--spec list_permissions() -> [rabbit_types:infos()].
--spec list_user_permissions
- (rabbit_types:username()) -> [rabbit_types:infos()].
--spec list_user_permissions
- (rabbit_types:username(), reference(), pid()) -> 'ok'.
--spec list_vhost_permissions
- (rabbit_types:vhost()) -> [rabbit_types:infos()].
--spec list_vhost_permissions
- (rabbit_types:vhost(), reference(), pid()) -> 'ok'.
--spec list_user_vhost_permissions
- (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()].
-
%%----------------------------------------------------------------------------
%% Implementation of rabbit_auth_backend
@@ -238,6 +198,9 @@ validate_and_alternate_credentials(Username, Password, ActingUser, Fun) ->
{error, Err}
end.
+-spec add_user(rabbit_types:username(), rabbit_types:password(),
+ rabbit_types:username()) -> 'ok' | {'error', string()}.
+
add_user(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun add_user_sans_validation/3).
@@ -265,6 +228,8 @@ add_user_sans_validation(Username, Password, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+-spec delete_user(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
+
delete_user(Username, ActingUser) ->
rabbit_log:info("Deleting user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
@@ -291,9 +256,17 @@ delete_user(Username, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+-spec lookup_user
+ (rabbit_types:username()) ->
+ rabbit_types:ok(rabbit_types:internal_user()) |
+ rabbit_types:error('not_found').
+
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
+-spec change_password
+ (rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'.
+
change_password(Username, Password, ActingUser) ->
validate_and_alternate_credentials(Username, Password, ActingUser,
fun change_password_sans_validation/3).
@@ -310,6 +283,8 @@ change_password_sans_validation(Username, Password, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+-spec clear_password(rabbit_types:username(), rabbit_types:username()) -> 'ok'.
+
clear_password(Username, ActingUser) ->
rabbit_log:info("Clearing password for '~s'~n", [Username]),
R = change_password_hash(Username, <<"">>),
@@ -318,9 +293,15 @@ clear_password(Username, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+-spec hash_password
+ (module(), rabbit_types:password()) -> rabbit_types:password_hash().
+
hash_password(HashingMod, Cleartext) ->
rabbit_password:hash(HashingMod, Cleartext).
+-spec change_password_hash
+ (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'.
+
change_password_hash(Username, PasswordHash) ->
change_password_hash(Username, PasswordHash, rabbit_password:hashing_mod()).
@@ -332,6 +313,8 @@ change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
hashing_algorithm = HashingAlgorithm }
end).
+-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
+
set_tags(Username, Tags, ActingUser) ->
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
rabbit_log:info("Setting user tags for user '~s' to ~p~n",
@@ -343,6 +326,11 @@ set_tags(Username, Tags, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+-spec set_permissions
+ (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
+ regexp(), rabbit_types:username()) ->
+ 'ok'.
+
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, ActingUser) ->
rabbit_log:info("Setting permissions for "
"'~s' in '~s' to '~s', '~s', '~s'~n",
@@ -378,6 +366,9 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, ActingU
{user_who_performed_action, ActingUser}]),
R.
+-spec clear_permissions
+ (rabbit_types:username(), rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
+
clear_permissions(Username, VHostPath, ActingUser) ->
R = rabbit_misc:execute_mnesia_transaction(
rabbit_vhost:with_user_and_vhost(
@@ -481,11 +472,24 @@ clear_topic_permissions(Username, VHostPath, Exchange, ActingUser) ->
-define(PERMS_INFO_KEYS, [configure, write, read]).
-define(USER_INFO_KEYS, [user, tags]).
+-spec user_info_keys() -> rabbit_types:info_keys().
+
user_info_keys() -> ?USER_INFO_KEYS.
+-spec perms_info_keys() -> rabbit_types:info_keys().
+
perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS].
+
+-spec vhost_perms_info_keys() -> rabbit_types:info_keys().
+
vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS].
+
+-spec user_perms_info_keys() -> rabbit_types:info_keys().
+
user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS].
+
+-spec user_vhost_perms_info_keys() -> rabbit_types:info_keys().
+
user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS.
topic_perms_info_keys() -> [user, vhost, exchange, write, read].
@@ -493,16 +497,22 @@ user_topic_perms_info_keys() -> [vhost, exchange, write, read].
vhost_topic_perms_info_keys() -> [user, exchange, write, read].
user_vhost_topic_perms_info_keys() -> [exchange, write, read].
+-spec list_users() -> [rabbit_types:infos()].
+
list_users() ->
[extract_internal_user_params(U) ||
U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+-spec list_users(reference(), pid()) -> 'ok'.
+
list_users(Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
fun(U) -> extract_internal_user_params(U) end,
mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})).
+-spec list_permissions() -> [rabbit_types:infos()].
+
list_permissions() ->
list_permissions(perms_info_keys(), match_user_vhost('_', '_')).
@@ -517,28 +527,43 @@ list_permissions(Keys, QueryThunk, Ref, AggregatorPid) ->
filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
+-spec list_user_permissions
+ (rabbit_types:username()) -> [rabbit_types:infos()].
+
list_user_permissions(Username) ->
list_permissions(
user_perms_info_keys(),
rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
+-spec list_user_permissions
+ (rabbit_types:username(), reference(), pid()) -> 'ok'.
+
list_user_permissions(Username, Ref, AggregatorPid) ->
list_permissions(
user_perms_info_keys(),
rabbit_misc:with_user(Username, match_user_vhost(Username, '_')),
Ref, AggregatorPid).
+-spec list_vhost_permissions
+ (rabbit_types:vhost()) -> [rabbit_types:infos()].
+
list_vhost_permissions(VHostPath) ->
list_permissions(
vhost_perms_info_keys(),
rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
+-spec list_vhost_permissions
+ (rabbit_types:vhost(), reference(), pid()) -> 'ok'.
+
list_vhost_permissions(VHostPath, Ref, AggregatorPid) ->
list_permissions(
vhost_perms_info_keys(),
rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath)),
Ref, AggregatorPid).
+-spec list_user_vhost_permissions
+ (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()].
+
list_user_vhost_permissions(Username, VHostPath) ->
list_permissions(
user_vhost_perms_info_keys(),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index abd8635720..c3e570b26f 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -52,8 +52,6 @@
-type queue_mode() :: atom().
--spec info_keys() -> rabbit_types:info_keys().
-
%% Called on startup with a vhost and a list of durable queue names on this vhost.
%% The queues aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
@@ -270,4 +268,6 @@
%% queue
-callback handle_info(term(), state()) -> state().
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 6cb91a9243..40c60ece45 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -37,61 +37,27 @@
-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name().
-type body_input() :: binary() | [binary()].
+%%----------------------------------------------------------------------------
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+
-spec publish
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) ->
publish_result().
--spec publish
- (exchange_input(), rabbit_router:routing_key(), boolean(),
- properties_input(), body_input()) ->
- publish_result().
--spec publish(rabbit_types:delivery()) -> publish_result().
--spec delivery
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
- rabbit_types:delivery().
--spec message
- (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
- binary()) ->
- rabbit_types:message().
--spec message
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- rabbit_types:decoded_content()) ->
- rabbit_types:ok_or_error2(rabbit_types:message(), any()).
--spec properties
- (properties_input()) -> rabbit_framing:amqp_property_record().
-
--spec prepend_table_header
- (binary(), rabbit_framing:amqp_table(), headers()) -> headers().
--spec header(header(), headers()) -> 'undefined' | any().
--spec header(header(), headers(), any()) -> 'undefined' | any().
-
--spec extract_headers(rabbit_types:content()) -> headers().
-
--spec map_headers
- (fun((headers()) -> headers()), rabbit_types:content()) ->
- rabbit_types:content().
-
--spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
--spec build_content
- (rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
- rabbit_types:content().
--spec from_content
- (rabbit_types:content()) ->
- {rabbit_framing:amqp_property_record(), binary()}.
--spec parse_expiration
- (rabbit_framing:amqp_property_record()) ->
- rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
-
-%%----------------------------------------------------------------------------
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
+
+-spec publish
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
+ properties_input(), body_input()) ->
+ publish_result().
+
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
publish(X, delivery(Mandatory, false, Message, undefined));
@@ -99,6 +65,8 @@ publish(XName, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
publish(delivery(Mandatory, false, Message, undefined)).
+-spec publish(rabbit_types:delivery()) -> publish_result().
+
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
case rabbit_exchange:lookup(XName) of
@@ -111,10 +79,18 @@ publish(X, Delivery) ->
DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, DeliveredQPids}.
+-spec delivery
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+ rabbit_types:delivery().
+
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
+-spec build_content
+ (rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
+ rabbit_types:content().
+
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
@@ -128,6 +104,10 @@ build_content(Properties, PFR) ->
protocol = none,
payload_fragments_rev = PFR}.
+-spec from_content
+ (rabbit_types:content()) ->
+ {rabbit_framing:amqp_property_record(), binary()}.
+
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
@@ -153,6 +133,11 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
+-spec message
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) ->
+ rabbit_types:ok_or_error2(rabbit_types:message(), any()).
+
message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
@@ -166,12 +151,20 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{error, _Reason} = Error -> Error
end.
+-spec message
+ (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
+ binary()) ->
+ rabbit_types:message().
+
message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
{ok, Msg} = message(XName, RoutingKey, Content),
Msg.
+-spec properties
+ (properties_input()) -> rabbit_framing:amqp_property_record().
+
properties(P = #'P_basic'{}) ->
P;
properties(P) when is_list(P) ->
@@ -185,6 +178,9 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
+-spec prepend_table_header
+ (binary(), rabbit_framing:amqp_table(), headers()) -> headers().
+
prepend_table_header(Name, Info, undefined) ->
prepend_table_header(Name, Info, []);
prepend_table_header(Name, Info, Headers) ->
@@ -224,6 +220,8 @@ update_invalid(Name, Value, ExistingHdr, Header) ->
NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
set_invalid(NewHdr, Header).
+-spec header(header(), headers()) -> 'undefined' | any().
+
header(_Header, undefined) ->
undefined;
header(_Header, []) ->
@@ -231,12 +229,16 @@ header(_Header, []) ->
header(Header, Headers) ->
header(Header, Headers, undefined).
+-spec header(header(), headers(), any()) -> 'undefined' | any().
+
header(Header, Headers, Default) ->
case lists:keysearch(Header, 1, Headers) of
false -> Default;
{value, Val} -> Val
end.
+-spec extract_headers(rabbit_types:content()) -> headers().
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -247,6 +249,10 @@ extract_timestamp(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Timestamp.
+-spec map_headers
+ (fun((headers()) -> headers()), rabbit_types:content()) ->
+ rabbit_types:content().
+
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
@@ -270,6 +276,9 @@ is_message_persistent(#content{properties = #'P_basic'{
end.
%% Extract CC routes from headers
+
+-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
+
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
@@ -281,6 +290,10 @@ header_routes(HeadersTable) ->
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+-spec parse_expiration
+ (rabbit_framing:amqp_property_record()) ->
+ rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
+
parse_expiration(#'P_basic'{expiration = undefined}) ->
{ok, undefined};
parse_expiration(#'P_basic'{expiration = Expiration}) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index c4542abc61..460e6eab99 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -62,47 +62,6 @@
%% dialyzer into objecting to everything that uses it.
-type deletions() :: dict:dict().
--spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
- 'ok'.
--spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
--spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
--spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
--spec remove(rabbit_types:binding()) -> bind_res().
--spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
--spec list(rabbit_types:vhost()) -> bindings().
--spec list_for_source
- (rabbit_types:binding_source()) -> bindings().
--spec list_for_destination
- (rabbit_types:binding_destination()) -> bindings().
--spec list_for_source_and_destination
- (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
- bindings().
--spec info_keys() -> rabbit_types:info_keys().
--spec info(rabbit_types:binding()) -> rabbit_types:infos().
--spec info(rabbit_types:binding(), rabbit_types:info_keys()) ->
- rabbit_types:infos().
--spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
--spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
- [rabbit_types:infos()].
--spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
- reference(), pid()) -> 'ok'.
--spec has_for_source(rabbit_types:binding_source()) -> boolean().
--spec remove_for_source(rabbit_types:binding_source()) -> bindings().
--spec remove_for_destination
- (rabbit_types:binding_destination(), boolean()) -> deletions().
--spec remove_transient_for_destination
- (rabbit_types:binding_destination()) -> deletions().
--spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
--spec combine_deletions(deletions(), deletions()) -> deletions().
--spec add_deletion
- (rabbit_exchange:name(),
- {'undefined' | rabbit_types:exchange(),
- 'deleted' | 'not_deleted',
- bindings()},
- deletions()) ->
- deletions().
--spec new_deletions() -> deletions().
-
%%----------------------------------------------------------------------------
-define(INFO_KEYS, [source_name, source_kind,
@@ -111,6 +70,10 @@
vhost]).
%% Global table recovery
+
+-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
+ 'ok'.
+
recover() ->
rabbit_misc:table_filter(
fun (Route) ->
@@ -164,6 +127,8 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
(Serial, false) -> x_callback(Serial, X, add_binding, B)
end).
+-spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
+
exists(#binding{source = ?DEFAULT_EXCHANGE(_),
destination = #resource{kind = queue, name = QName} = Queue,
key = QName,
@@ -178,8 +143,12 @@ exists(Binding) ->
rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
end, fun not_found_or_absent_errs/1).
+-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
+
add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
+-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
+
add(Binding, InnerFun, ActingUser) ->
binding_action(
Binding,
@@ -224,8 +193,12 @@ add(Src, Dst, B, ActingUser) ->
true -> rabbit_misc:const({error, binding_not_found})
end.
+-spec remove(rabbit_types:binding()) -> bind_res().
+
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER).
+-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
+
remove(Binding, InnerFun, ActingUser) ->
binding_action(
Binding,
@@ -269,6 +242,8 @@ remove_default_exchange_binding_rows_of(Dst = #resource{}) ->
end,
ok.
+-spec list(rabbit_types:vhost()) -> bindings().
+
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
Route = #route{binding = #binding{source = VHostResource,
@@ -284,6 +259,9 @@ list(VHostPath) ->
end, AllBindings),
implicit_bindings(VHostPath) ++ Filtered.
+-spec list_for_source
+ (rabbit_types:binding_source()) -> bindings().
+
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
implicit_bindings(VHostPath);
list_for_source(SrcName) ->
@@ -294,6 +272,9 @@ list_for_source(SrcName) ->
<- mnesia:match_object(rabbit_route, Route, read)]
end).
+-spec list_for_destination
+ (rabbit_types:binding_destination()) -> bindings().
+
list_for_destination(DstName) ->
implicit_for_destination(DstName) ++
mnesia:async_dirty(
@@ -324,6 +305,10 @@ implicit_for_destination(DstQueue = #resource{kind = queue,
implicit_for_destination(_) ->
[].
+-spec list_for_source_and_destination
+ (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
+ bindings().
+
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
#resource{kind = queue,
virtual_host = VHostPath,
@@ -342,6 +327,8 @@ list_for_source_and_destination(SrcName, DstName) ->
Route, read)]
end).
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -360,18 +347,33 @@ i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
+-spec info(rabbit_types:binding()) -> rabbit_types:infos().
+
info(B = #binding{}) -> infos(?INFO_KEYS, B).
+-spec info(rabbit_types:binding(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+
info(B = #binding{}, Items) -> infos(Items, B).
+-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+
info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+
info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
+ reference(), pid()) -> 'ok'.
+
info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(B) -> info(B, Items) end, list(VHostPath)).
+-spec has_for_source(rabbit_types:binding_source()) -> boolean().
+
has_for_source(SrcName) ->
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
%% we need to check for semi-durable routes (which subsumes
@@ -381,6 +383,8 @@ has_for_source(SrcName) ->
contains(rabbit_route, Match) orelse
contains(rabbit_semi_durable_route, Match).
+-spec remove_for_source(rabbit_types:binding_source()) -> bindings().
+
remove_for_source(SrcName) ->
lock_resource(SrcName),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
@@ -389,9 +393,15 @@ remove_for_source(SrcName) ->
mnesia:dirty_match_object(rabbit_route, Match) ++
mnesia:dirty_match_object(rabbit_semi_durable_route, Match))).
+-spec remove_for_destination
+ (rabbit_types:binding_destination(), boolean()) -> deletions().
+
remove_for_destination(DstName, OnlyDurable) ->
remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
+-spec remove_transient_for_destination
+ (rabbit_types:binding_destination()) -> deletions().
+
remove_transient_for_destination(DstName) ->
remove_for_destination(DstName, false, fun remove_transient_routes/1).
@@ -597,12 +607,24 @@ anything_but( NotThis, NotThis, This) -> This;
anything_but( NotThis, This, NotThis) -> This;
anything_but(_NotThis, This, This) -> This.
+-spec new_deletions() -> deletions().
+
new_deletions() -> dict:new().
+-spec add_deletion
+ (rabbit_exchange:name(),
+ {'undefined' | rabbit_types:exchange(),
+ 'deleted' | 'not_deleted',
+ bindings()},
+ deletions()) ->
+ deletions().
+
add_deletion(XName, Entry, Deletions) ->
dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
Entry, Deletions).
+-spec combine_deletions(deletions(), deletions()) -> deletions().
+
combine_deletions(Deletions1, Deletions2) ->
dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
Deletions1, Deletions2).
@@ -612,6 +634,8 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
+-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
+
process_deletions(Deletions, ActingUser) ->
AugmentedDeletions =
dict:map(fun (_XName, {X, deleted, Bindings}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3f199a4a00..ffd7c8fabc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -67,6 +67,9 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+
+-deprecated([{force_event_refresh, 1, eventually}]).
+
%% Internal
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
-export([get_vhost/1, get_user/1]).
@@ -222,42 +225,13 @@
-type channel() :: #ch{}.
+%%----------------------------------------------------------------------------
+
-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid(), pid()) ->
rabbit_types:ok_pid_or_error().
--spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
--spec do
- (pid(), rabbit_framing:amqp_method_record(),
- rabbit_types:maybe(rabbit_types:content())) ->
- 'ok'.
--spec do_flow
- (pid(), rabbit_framing:amqp_method_record(),
- rabbit_types:maybe(rabbit_types:content())) ->
- 'ok'.
--spec flush(pid()) -> 'ok'.
--spec shutdown(pid()) -> 'ok'.
--spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
--spec deliver
- (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'.
--spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
--spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'.
--spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'.
--spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'.
--spec list() -> [pid()].
--spec list_local() -> [pid()].
--spec info_keys() -> rabbit_types:info_keys().
--spec info(pid()) -> rabbit_types:infos().
--spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
--spec info_all() -> [rabbit_types:infos()].
--spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
--spec refresh_config_local() -> 'ok'.
--spec ready_for_close(pid()) -> 'ok'.
--deprecated([{force_event_refresh, 1, eventually}]).
--spec force_event_refresh(reference()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, CollectorPid, Limiter) ->
@@ -265,27 +239,50 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, CollectorPid, Limiter], []).
+-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+
do(Pid, Method) ->
rabbit_channel_common:do(Pid, Method).
+-spec do
+ (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) ->
+ 'ok'.
+
do(Pid, Method, Content) ->
rabbit_channel_common:do(Pid, Method, Content).
+-spec do_flow
+ (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) ->
+ 'ok'.
+
do_flow(Pid, Method, Content) ->
rabbit_channel_common:do_flow(Pid, Method, Content).
+-spec flush(pid()) -> 'ok'.
+
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
+-spec shutdown(pid()) -> 'ok'.
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
+-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+
send_command(Pid, Msg) ->
gen_server2:cast(Pid, {command, Msg}).
+-spec deliver
+ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'.
+
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
+
deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
case decode_fast_reply_to(Rest) of
{ok, Pid, Key} ->
@@ -297,6 +294,9 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
%% We want to ensure people can't use this mechanism to send a message
%% to an arbitrary process and kill it!
+
+-spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'.
+
deliver_reply_local(Pid, Key, Delivery) ->
case pg_local:in_group(rabbit_channels, Pid) of
true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery});
@@ -325,21 +325,33 @@ decode_fast_reply_to(Rest) ->
_ -> error
end.
+-spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'.
+
send_credit_reply(Pid, Len) ->
gen_server2:cast(Pid, {send_credit_reply, Len}).
+-spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'.
+
send_drained(Pid, CTagCredit) ->
gen_server2:cast(Pid, {send_drained, CTagCredit}).
+-spec list() -> [pid()].
+
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
+-spec list_local() -> [pid()].
+
list_local() ->
pg_local:get_members(rabbit_channels).
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS.
+-spec info(pid()) -> rabbit_types:infos().
+
info(Pid) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
@@ -353,6 +365,8 @@ info(Pid) ->
throw(timeout)
end.
+-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+
info(Pid, Items) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
@@ -366,9 +380,13 @@ info(Pid, Items) ->
throw(timeout)
end.
+-spec info_all() -> [rabbit_types:infos()].
+
info_all() ->
rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
+
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
@@ -386,6 +404,8 @@ emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).
+-spec refresh_config_local() -> 'ok'.
+
refresh_config_local() ->
rabbit_misc:upmap(
fun (C) ->
@@ -414,9 +434,13 @@ refresh_interceptors() ->
list_local()),
ok.
+-spec ready_for_close(pid()) -> 'ok'.
+
ready_for_close(Pid) ->
rabbit_channel_common:ready_for_close(Pid).
+-spec force_event_refresh(reference()) -> 'ok'.
+
force_event_refresh(Ref) ->
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
ok.
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index e2e24e2f38..86a0f15650 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,12 +47,12 @@
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(), pid()}.
--spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}.
-
-define(FAIR_WAIT, 70000).
%%----------------------------------------------------------------------------
+-spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}.
+
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index d39e3f3e9b..b813b42e89 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -32,14 +32,13 @@
%%----------------------------------------------------------------------------
-spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start_channel(pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), {pid(), any()}}.
-
-%%----------------------------------------------------------------------------
start_link() ->
supervisor2:start_link(?MODULE, []).
+-spec start_channel(pid(), rabbit_channel_sup:start_link_args()) ->
+ {'ok', pid(), {pid(), any()}}.
+
start_channel(Pid, Args) ->
supervisor2:start_child(Pid, [Args]).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 982c1d1e62..8b49244982 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,19 +28,19 @@
-spec start_link(rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error().
--spec start_link({'local', atom()}, rabbit_types:mfargs()) ->
- rabbit_types:ok_pid_or_error().
--spec start_link_worker({'local', atom()}, rabbit_types:mfargs()) ->
- rabbit_types:ok_pid_or_error().
-
-%%----------------------------------------------------------------------------
start_link(Callback) ->
supervisor2:start_link(?MODULE, Callback).
+-spec start_link({'local', atom()}, rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error().
+
start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
+-spec start_link_worker({'local', atom()}, rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error().
+
start_link_worker(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, {Callback, worker}).
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index 8c4ba88da2..c5a0825d83 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -38,21 +38,21 @@
%%----------------------------------------------------------------------------
-spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start_channel_sup_sup(pid()) -> rabbit_types:ok_pid_or_error().
--spec start_queue_collector(pid(), rabbit_types:proc_name()) ->
- rabbit_types:ok_pid_or_error().
-
-%%----------------------------------------------------------------------------
start_link() ->
supervisor2:start_link(?MODULE, []).
+-spec start_channel_sup_sup(pid()) -> rabbit_types:ok_pid_or_error().
+
start_channel_sup_sup(SupPid) ->
supervisor2:start_child(
SupPid,
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
+-spec start_queue_collector(pid(), rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error().
+
start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 6e85514d44..51661dd8b6 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -38,9 +38,6 @@
-spec start_link(any(), rabbit_net:socket(), module(), any()) ->
{'ok', pid(), pid()}.
--spec reader(pid()) -> pid().
-
-%%--------------------------------------------------------------------------
start_link(Ref, _Sock, _Transport, _Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
@@ -66,6 +63,8 @@ start_link(Ref, _Sock, _Transport, _Opts) ->
intrinsic, ?WORKER_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
+-spec reader(pid()) -> pid().
+
reader(Pid) ->
hd(supervisor2:find_child(Pid, reader)).
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index d4c065e64f..586913ea2c 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -19,12 +19,12 @@
interval
}).
--spec start_link() -> rabbit_types:ok_pid_or_error().
-
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
diff --git a/src/rabbit_credential_validation.erl b/src/rabbit_credential_validation.erl
index 4b24d35f9d..ec0dea7893 100644
--- a/src/rabbit_credential_validation.erl
+++ b/src/rabbit_credential_validation.erl
@@ -27,8 +27,6 @@
-export([validate/2, backend/0]).
--spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
-
%% Validates a username/password pair by delegating to the effective
%% `rabbit_credential_validator`. Used by `rabbit_auth_backend_internal`.
%% Note that some validators may choose to only validate passwords.
@@ -38,6 +36,8 @@
%% * ok: provided credentials passed validation.
%% * {error, Error, Args}: provided password password failed validation.
+-spec validate(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+
validate(Username, Password) ->
Backend = backend(),
Backend:validate(Username, Password).
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 71ad8814b1..e26ea8297b 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -25,11 +25,11 @@
-type reason() :: 'expired' | 'rejected' | 'maxlen'.
+%%----------------------------------------------------------------------------
+
-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
-%%----------------------------------------------------------------------------
-
publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index cc2a9f1026..50e8f3d2b0 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -18,6 +18,9 @@
-export([boot/0, force_event_refresh/1, list/0, connect/5,
start_channel/9, disconnect/2]).
+
+-deprecated([{force_event_refresh, 1, eventually}]).
+
%% Internal
-export([list_local/0]).
@@ -29,40 +32,25 @@
%%----------------------------------------------------------------------------
-spec boot() -> 'ok'.
--deprecated([{force_event_refresh, 1, eventually}]).
--spec force_event_refresh(reference()) -> 'ok'.
--spec list() -> [pid()].
--spec list_local() -> [pid()].
--spec connect
- (({'none', 'none'} | {rabbit_types:username(), 'none'} |
- {rabbit_types:username(), rabbit_types:password()}),
- rabbit_types:vhost(), rabbit_types:protocol(), pid(),
- rabbit_event:event_props()) ->
- rabbit_types:ok_or_error2(
- {rabbit_types:user(), rabbit_framing:amqp_table()},
- 'broker_not_found_on_node' |
- {'auth_failure', string()} | 'access_refused').
--spec start_channel
- (rabbit_channel:channel_number(), pid(), pid(), string(),
- rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(), pid()) ->
- {'ok', pid()}.
--spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
boot() -> rabbit_sup:start_supervisor_child(
rabbit_direct_client_sup, rabbit_client_sup,
[{local, rabbit_direct_client_sup},
{rabbit_channel_sup, start_link, []}]).
+-spec force_event_refresh(reference()) -> 'ok'.
+
force_event_refresh(Ref) ->
[Pid ! {force_event_refresh, Ref} || Pid <- list()],
ok.
+-spec list_local() -> [pid()].
+
list_local() ->
pg_local:get_members(rabbit_direct).
+-spec list() -> [pid()].
+
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_direct, list_local, []).
@@ -82,6 +70,16 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) ->
[{password, Password}, {vhost, VHost}] ++ ExtraAuthProps)
end.
+-spec connect
+ (({'none', 'none'} | {rabbit_types:username(), 'none'} |
+ {rabbit_types:username(), rabbit_types:password()}),
+ rabbit_types:vhost(), rabbit_types:protocol(), pid(),
+ rabbit_event:event_props()) ->
+ rabbit_types:ok_or_error2(
+ {rabbit_types:user(), rabbit_framing:amqp_table()},
+ 'broker_not_found_on_node' |
+ {'auth_failure', string()} | 'access_refused').
+
connect(Creds, VHost, Protocol, Pid, Infos) ->
ExtraAuthProps = extract_extra_auth_props(Creds, VHost, Pid, Infos),
AuthFun = auth_fun(Creds, VHost, ExtraAuthProps),
@@ -200,6 +198,12 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
{error, Reason}
end.
+-spec start_channel
+ (rabbit_channel:channel_number(), pid(), pid(), string(),
+ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_framing:amqp_table(), pid()) ->
+ {'ok', pid()}.
+
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
@@ -209,6 +213,8 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
User, VHost, Capabilities, Collector}]),
{ok, ChannelPid}.
+-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
+
disconnect(Pid, Infos) ->
pg_local:leave(rabbit_direct, Pid),
rabbit_core_metrics:connection_closed(Pid),
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 3ba1f35a6b..f48b0001fb 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -76,37 +76,43 @@
%%----------------------------------------------------------------------------
-type disk_free_limit() :: (integer() | string() | {'mem_relative', float() | integer()}).
--spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error().
--spec get_disk_free_limit() -> integer().
--spec set_disk_free_limit(disk_free_limit()) -> 'ok'.
--spec get_min_check_interval() -> integer().
--spec set_min_check_interval(integer()) -> 'ok'.
--spec get_max_check_interval() -> integer().
--spec set_max_check_interval(integer()) -> 'ok'.
--spec get_disk_free() -> (integer() | 'unknown').
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
+-spec get_disk_free_limit() -> integer().
+
get_disk_free_limit() ->
gen_server:call(?MODULE, get_disk_free_limit, infinity).
+-spec set_disk_free_limit(disk_free_limit()) -> 'ok'.
+
set_disk_free_limit(Limit) ->
gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
+-spec get_min_check_interval() -> integer().
+
get_min_check_interval() ->
gen_server:call(?MODULE, get_min_check_interval, infinity).
+-spec set_min_check_interval(integer()) -> 'ok'.
+
set_min_check_interval(Interval) ->
gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity).
+-spec get_max_check_interval() -> integer().
+
get_max_check_interval() ->
gen_server:call(?MODULE, get_max_check_interval, infinity).
+-spec set_max_check_interval(integer()) -> 'ok'.
+
set_max_check_interval(Interval) ->
gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
+-spec get_disk_free() -> (integer() | 'unknown').
+
get_disk_free() ->
gen_server:call(?MODULE, get_disk_free, infinity).
@@ -114,6 +120,8 @@ get_disk_free() ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
+-spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error().
+
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
index 15c7489e99..fb13f61ea3 100644
--- a/src/rabbit_epmd_monitor.erl
+++ b/src/rabbit_epmd_monitor.erl
@@ -29,10 +29,6 @@
-define(CHECK_FREQUENCY, 60000).
%%----------------------------------------------------------------------------
-
--spec start_link() -> rabbit_types:ok_pid_or_error().
-
-%%----------------------------------------------------------------------------
%% It's possible for epmd to be killed out from underneath us. If that
%% happens, then obviously clustering and rabbitmqctl stop
%% working. This process checks up on epmd and restarts it /
@@ -48,6 +44,8 @@
%% epmd" as a shutdown or uninstall step.
%% ----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2480e7c3d0..1b262e9a48 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,77 +36,13 @@
-type type() :: atom().
-type fun_name() :: atom().
--spec recover(rabbit_types:vhost()) -> [name()].
--spec callback
- (rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
--spec policy_changed
- (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
--spec declare
- (name(), type(), boolean(), boolean(), boolean(),
- rabbit_framing:amqp_table(), rabbit_types:username())
- -> rabbit_types:exchange().
--spec check_type
- (binary()) -> atom() | rabbit_types:connection_exit().
--spec assert_equivalence
- (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
- rabbit_framing:amqp_table())
- -> 'ok' | rabbit_types:connection_exit().
--spec assert_args_equivalence
- (rabbit_types:exchange(), rabbit_framing:amqp_table())
- -> 'ok' | rabbit_types:connection_exit().
--spec lookup
- (name()) -> rabbit_types:ok(rabbit_types:exchange()) |
- rabbit_types:error('not_found').
--spec lookup_or_die
- (name()) -> rabbit_types:exchange() |
- rabbit_types:channel_exit().
--spec list() -> [rabbit_types:exchange()].
--spec list_names() -> [rabbit_exchange:name()].
--spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()].
--spec lookup_scratch(name(), atom()) ->
- rabbit_types:ok(term()) |
- rabbit_types:error('not_found').
--spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'.
--spec update
- (name(),
- fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
- -> not_found | rabbit_types:exchange().
--spec update_decorators(name()) -> 'ok'.
--spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange().
--spec info_keys() -> rabbit_types:info_keys().
--spec info(rabbit_types:exchange()) -> rabbit_types:infos().
--spec info
- (rabbit_types:exchange(), rabbit_types:info_keys())
- -> rabbit_types:infos().
--spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
--spec info_all(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()].
--spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
- reference(), pid())
- -> 'ok'.
--spec route(rabbit_types:exchange(), rabbit_types:delivery())
- -> [rabbit_amqqueue:name()].
--spec delete
- (name(), 'true', rabbit_types:username()) ->
- 'ok'| rabbit_types:error('not_found' | 'in_use');
- (name(), 'false', rabbit_types:username()) ->
- 'ok' | rabbit_types:error('not_found').
--spec validate_binding
- (rabbit_types:exchange(), rabbit_types:binding())
- -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
--spec maybe_auto_delete
- (rabbit_types:exchange(), boolean())
- -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}.
--spec serial(rabbit_types:exchange()) ->
- fun((boolean()) -> 'none' | pos_integer()).
--spec peek_serial(name()) -> pos_integer() | 'undefined'.
-
%%----------------------------------------------------------------------------
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
policy, user_who_performed_action]).
+-spec recover(rabbit_types:vhost()) -> [name()].
+
recover(VHost) ->
Xs = rabbit_misc:table_filter(
fun (#exchange{name = XName}) ->
@@ -123,6 +59,10 @@ recover(VHost) ->
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
+-spec callback
+ (rabbit_types:exchange(), fun_name(),
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
+
callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
@@ -133,6 +73,9 @@ callback(X = #exchange{type = XType,
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+-spec policy_changed
+ (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
policy_changed(X = #exchange{type = XType,
decorators = Decorators},
X1 = #exchange{decorators = Decorators1}) ->
@@ -147,6 +90,9 @@ serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
+-spec serial(rabbit_types:exchange()) ->
+ fun((boolean()) -> 'none' | pos_integer()).
+
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
true -> next_serial(XName);
@@ -156,6 +102,11 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
+-spec declare
+ (name(), type(), boolean(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:username())
+ -> rabbit_types:exchange().
+
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
X = rabbit_exchange_decorator:set(
rabbit_policy:set(#exchange{name = XName,
@@ -218,6 +169,10 @@ store_ram(X) ->
X1.
%% Used with binaries sent over the wire; the type may not exist.
+
+-spec check_type
+ (binary()) -> atom() | rabbit_types:connection_exit().
+
check_type(TypeBin) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
@@ -232,6 +187,11 @@ check_type(TypeBin) ->
end
end.
+-spec assert_equivalence
+ (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit().
+
assert_equivalence(X = #exchange{ name = XName,
durable = Durable,
auto_delete = AutoDelete,
@@ -245,6 +205,10 @@ assert_equivalence(X = #exchange{ name = XName,
AFE(Internal, ReqInternal, XName, internal),
(type_to_module(Type)):assert_args_equivalence(X, ReqArgs).
+-spec assert_args_equivalence
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit().
+
assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
@@ -253,21 +217,36 @@ assert_args_equivalence(#exchange{ name = Name, arguments = Args },
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name,
[<<"alternate-exchange">>]).
+-spec lookup
+ (name()) -> rabbit_types:ok(rabbit_types:exchange()) |
+ rabbit_types:error('not_found').
+
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
+-spec lookup_or_die
+ (name()) -> rabbit_types:exchange() |
+ rabbit_types:channel_exit().
+
lookup_or_die(Name) ->
case lookup(Name) of
{ok, X} -> X;
{error, not_found} -> rabbit_amqqueue:not_found(Name)
end.
+-spec list() -> [rabbit_types:exchange()].
+
list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+-spec list_names() -> [rabbit_exchange:name()].
+
list_names() -> mnesia:dirty_all_keys(rabbit_exchange).
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
+
+-spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()].
+
list(VHostPath) ->
mnesia:async_dirty(
fun () ->
@@ -277,6 +256,10 @@ list(VHostPath) ->
read)
end).
+-spec lookup_scratch(name(), atom()) ->
+ rabbit_types:ok(term()) |
+ rabbit_types:error('not_found').
+
lookup_scratch(Name, App) ->
case lookup(Name) of
{ok, #exchange{scratches = undefined}} ->
@@ -290,6 +273,8 @@ lookup_scratch(Name, App) ->
{error, not_found}
end.
+-spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'.
+
update_scratch(Name, App, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
@@ -310,6 +295,8 @@ update_scratch(Name, App, Fun) ->
ok
end).
+-spec update_decorators(name()) -> 'ok'.
+
update_decorators(Name) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
@@ -320,6 +307,11 @@ update_decorators(Name) ->
end
end).
+-spec update
+ (name(),
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
+ -> not_found | rabbit_types:exchange().
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
[X] -> X1 = Fun(X),
@@ -327,10 +319,14 @@ update(Name, Fun) ->
[] -> not_found
end.
+-spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange().
+
immutable(X) -> X#exchange{scratches = none,
policy = none,
decorators = none}.
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -358,20 +354,38 @@ i(Item, #exchange{type = Type} = X) ->
[] -> throw({bad_argument, Item})
end.
+-spec info(rabbit_types:exchange()) -> rabbit_types:infos().
+
info(X = #exchange{type = Type}) ->
infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X).
+-spec info
+ (rabbit_types:exchange(), rabbit_types:info_keys())
+ -> rabbit_types:infos().
+
info(X = #exchange{type = _Type}, Items) ->
infos(Items, X).
+-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+
info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys())
+ -> [rabbit_types:infos()].
+
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
+ reference(), pid())
+ -> 'ok'.
+
info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).
+-spec route(rabbit_types:exchange(), rabbit_types:delivery())
+ -> [rabbit_amqqueue:name()].
+
route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
@@ -447,6 +461,12 @@ call_with_exchange(XName, Fun) ->
end
end).
+-spec delete
+ (name(), 'true', rabbit_types:username()) ->
+ 'ok'| rabbit_types:error('not_found' | 'in_use');
+ (name(), 'false', rabbit_types:username()) ->
+ 'ok' | rabbit_types:error('not_found').
+
delete(XName, IfUnused, Username) ->
Fun = case IfUnused of
true -> fun conditional_delete/2;
@@ -478,10 +498,18 @@ delete(XName, IfUnused, Username) ->
XName#resource.name, Username)
end.
+-spec validate_binding
+ (rabbit_types:exchange(), rabbit_types:binding())
+ -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
+
validate_binding(X = #exchange{type = XType}, Binding) ->
Module = type_to_module(XType),
Module:validate_binding(X, Binding).
+-spec maybe_auto_delete
+ (rabbit_types:exchange(), boolean())
+ -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}.
+
maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) ->
@@ -516,6 +544,8 @@ next_serial(XName) ->
#exchange_serial{name = XName, next = Serial + 1}, write),
Serial.
+-spec peek_serial(name()) -> pos_integer() | 'undefined'.
+
peek_serial(XName) -> peek_serial(XName, read).
peek_serial(XName, LockType) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 4d6d49ef58..8b94bd343b 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -33,10 +33,6 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--spec headers_match
- (rabbit_framing:amqp_table(), rabbit_framing:amqp_table()) ->
- boolean().
-
info(_X) -> [].
info(_X, _) -> [].
@@ -84,6 +80,11 @@ parse_x_match(_) -> all. %% legacy; we didn't validate
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%%
+
+-spec headers_match
+ (rabbit_framing:amqp_table(), rabbit_framing:amqp_table()) ->
+ boolean().
+
headers_match(Args, Data) ->
MK = parse_x_match(rabbit_misc:table_lookup(Args, <<"x-match">>)),
headers_match(Args, Data, true, false, MK).
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 557bc6f02a..462ddb4e11 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -34,31 +34,10 @@
-type ok_or_error() :: rabbit_types:ok_or_error(any()).
--spec is_file((file:filename())) -> boolean().
--spec is_dir((file:filename())) -> boolean().
--spec file_size((file:filename())) -> non_neg_integer().
--spec ensure_dir((file:filename())) -> ok_or_error().
--spec wildcard(string(), file:filename()) -> [file:filename()].
--spec list_dir(file:filename()) ->
- rabbit_types:ok_or_error2([file:filename()], any()).
--spec read_term_file
- (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()).
--spec write_term_file(file:filename(), [any()]) -> ok_or_error().
--spec write_file(file:filename(), iodata()) -> ok_or_error().
--spec write_file(file:filename(), iodata(), [any()]) -> ok_or_error().
--spec append_file(file:filename(), string()) -> ok_or_error().
--spec ensure_parent_dirs_exist(string()) -> 'ok'.
--spec rename(file:filename(), file:filename()) -> ok_or_error().
--spec delete([file:filename()]) -> ok_or_error().
--spec recursive_delete([file:filename()]) ->
- rabbit_types:ok_or_error({file:filename(), any()}).
--spec recursive_copy(file:filename(), file:filename()) ->
- rabbit_types:ok_or_error({file:filename(), file:filename(), any()}).
--spec lock_file(file:filename()) -> rabbit_types:ok_or_error('eexist').
--spec filename_as_a_directory(file:filename()) -> file:filename().
-
%%----------------------------------------------------------------------------
+-spec is_file((file:filename())) -> boolean().
+
is_file(File) ->
case read_file_info(File) of
{ok, #file_info{type=regular}} -> true;
@@ -66,6 +45,8 @@ is_file(File) ->
_ -> false
end.
+-spec is_dir((file:filename())) -> boolean().
+
is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
@@ -73,12 +54,16 @@ is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
is_dir_internal({ok, #file_info{type=directory}}) -> true;
is_dir_internal(_) -> false.
+-spec file_size((file:filename())) -> non_neg_integer().
+
file_size(File) ->
case read_file_info(File) of
{ok, #file_info{size=Size}} -> Size;
_ -> 0
end.
+-spec ensure_dir((file:filename())) -> ok_or_error().
+
ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end).
ensure_dir_internal("/") ->
@@ -91,6 +76,8 @@ ensure_dir_internal(File) ->
prim_file:make_dir(Dir)
end.
+-spec wildcard(string(), file:filename()) -> [file:filename()].
+
wildcard(Pattern, Dir) ->
case list_dir(Dir) of
{ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]),
@@ -99,11 +86,17 @@ wildcard(Pattern, Dir) ->
{error, _} -> []
end.
+-spec list_dir(file:filename()) ->
+ rabbit_types:ok_or_error2([file:filename()], any()).
+
list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end).
read_file_info(File) ->
with_handle(fun () -> prim_file:read_file_info(File) end).
+-spec read_term_file
+ (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any()).
+
read_term_file(File) ->
try
{ok, Data} = with_handle(fun () -> prim_file:read_file(File) end),
@@ -124,12 +117,18 @@ group_tokens(Cur, []) -> [Cur];
group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
+-spec write_term_file(file:filename(), [any()]) -> ok_or_error().
+
write_term_file(File, Terms) ->
write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
Term <- Terms])).
+-spec write_file(file:filename(), iodata()) -> ok_or_error().
+
write_file(Path, Data) -> write_file(Path, Data, []).
+-spec write_file(file:filename(), iodata(), [any()]) -> ok_or_error().
+
write_file(Path, Data, Modes) ->
Modes1 = [binary, write | (Modes -- [binary, write])],
case make_binary(Data) of
@@ -185,6 +184,9 @@ with_synced_copy(Path, Modes, Fun) ->
end.
%% TODO the semantics of this function are rather odd. But see bug 25021.
+
+-spec append_file(file:filename(), string()) -> ok_or_error().
+
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -209,6 +211,8 @@ append_file(File, _, Suffix) ->
Error -> Error
end.
+-spec ensure_parent_dirs_exist(string()) -> 'ok'.
+
ensure_parent_dirs_exist(Filename) ->
case ensure_dir(Filename) of
ok -> ok;
@@ -216,10 +220,17 @@ ensure_parent_dirs_exist(Filename) ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
+-spec rename(file:filename(), file:filename()) -> ok_or_error().
+
rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end).
+-spec delete([file:filename()]) -> ok_or_error().
+
delete(File) -> with_handle(fun () -> prim_file:delete(File) end).
+-spec recursive_delete([file:filename()]) ->
+ rabbit_types:ok_or_error({file:filename(), any()}).
+
recursive_delete(Files) ->
with_handle(
fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
@@ -262,6 +273,9 @@ is_symlink_no_handle(File) ->
_ -> false
end.
+-spec recursive_copy(file:filename(), file:filename()) ->
+ rabbit_types:ok_or_error({file:filename(), file:filename(), any()}).
+
recursive_copy(Src, Dest) ->
%% Note that this uses the 'file' module and, hence, shouldn't be
%% run on many processes at once.
@@ -293,6 +307,9 @@ recursive_copy(Src, Dest) ->
%% TODO: When we stop supporting Erlang prior to R14, this should be
%% replaced with file:open [write, exclusive]
+
+-spec lock_file(file:filename()) -> rabbit_types:ok_or_error('eexist').
+
lock_file(Path) ->
case is_file(Path) of
true -> {error, eexist};
@@ -302,6 +319,8 @@ lock_file(Path) ->
end)
end.
+-spec filename_as_a_directory(file:filename()) -> file:filename().
+
filename_as_a_directory(FileName) ->
case lists:last(FileName) of
"/" ->
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 386702e86b..6f03a1a04f 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -36,15 +36,10 @@
-type guid() :: binary().
--spec start_link() -> rabbit_types:ok_pid_or_error().
--spec filename() -> string().
--spec gen() -> guid().
--spec gen_secure() -> guid().
--spec string(guid(), any()) -> string().
--spec binary(guid(), any()) -> binary().
-
%%----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE,
[update_disk_serial()], []).
@@ -52,6 +47,9 @@ start_link() ->
%% We use this to detect a (possibly rather old) Mnesia directory,
%% since it has existed since at least 1.7.0 (as far back as I cared
%% to go).
+
+-spec filename() -> string().
+
filename() ->
filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME).
@@ -108,6 +106,9 @@ advance_blocks({B1, B2, B3, B4}, I) ->
%% generate a GUID. This function should be used when performance is a
%% priority and predictability is not an issue. Otherwise use
%% gen_secure/0.
+
+-spec gen() -> guid().
+
gen() ->
%% We hash a fresh GUID with md5, split it in 4 blocks, and each
%% time we need a new guid we rotate them producing a new hash
@@ -129,6 +130,9 @@ gen() ->
%% serial store hasn't been deleted.
%%
%% If you are not concerned with predictability, gen/0 is faster.
+
+-spec gen_secure() -> guid().
+
gen_secure() ->
%% Here instead of hashing once we hash the GUID and the counter
%% each time, so that the GUID is not predictable.
@@ -143,9 +147,14 @@ gen_secure() ->
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
+
+-spec string(guid(), any()) -> string().
+
string(G, Prefix) ->
Prefix ++ "-" ++ rabbit_misc:base64url(G).
+-spec binary(guid(), any()) -> binary().
+
binary(G, Prefix) ->
list_to_binary(string(G, Prefix)).
diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl
index 54508bfdb0..97d8d44dc3 100644
--- a/src/rabbit_health_check.erl
+++ b/src/rabbit_health_check.erl
@@ -21,19 +21,20 @@
%% Internal API
-export([local/0]).
--spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
--spec local() -> ok | {error_string, string()}.
-
%%----------------------------------------------------------------------------
%% External functions
%%----------------------------------------------------------------------------
+-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
+
node(Node) ->
%% same default as in CLI
node(Node, 70000).
node(Node, Timeout) ->
rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout).
+-spec local() -> ok | {error_string, string()}.
+
local() ->
run_checks([list_channels, list_queues, alarms, rabbit_node_monitor]).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b5f09a525b..9770172089 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -147,36 +147,6 @@
-type credit_mode() :: 'manual' | 'drain' | 'auto'.
--spec start_link(rabbit_types:proc_name()) ->
- rabbit_types:ok_pid_or_error().
--spec new(pid()) -> lstate().
-
--spec limit_prefetch(lstate(), non_neg_integer(), non_neg_integer()) ->
- lstate().
--spec unlimit_prefetch(lstate()) -> lstate().
--spec is_active(lstate()) -> boolean().
--spec get_prefetch_limit(lstate()) -> non_neg_integer().
--spec ack(lstate(), non_neg_integer()) -> 'ok'.
--spec pid(lstate()) -> pid().
-
--spec client(pid()) -> qstate().
--spec activate(qstate()) -> qstate().
--spec can_send(qstate(), boolean(), rabbit_types:ctag()) ->
- {'continue' | 'suspend', qstate()}.
--spec resume(qstate()) -> qstate().
--spec deactivate(qstate()) -> qstate().
--spec is_suspended(qstate()) -> boolean().
--spec is_consumer_blocked(qstate(), rabbit_types:ctag()) -> boolean().
--spec credit
- (qstate(), rabbit_types:ctag(), non_neg_integer(), credit_mode(),
- boolean()) ->
- {boolean(), qstate()}.
--spec ack_from_queue(qstate(), rabbit_types:ctag(), non_neg_integer()) ->
- {boolean(), qstate()}.
--spec drained(qstate()) ->
- {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}.
--spec forget_consumer(qstate(), rabbit_types:ctag()) -> qstate().
-
%%----------------------------------------------------------------------------
-record(lim, {prefetch_count = 0,
@@ -194,41 +164,66 @@
%% API
%%----------------------------------------------------------------------------
+-spec start_link(rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error().
+
start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
+-spec new(pid()) -> lstate().
+
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}, infinity),
#lstate{pid = Pid, prefetch_limited = false}.
+-spec limit_prefetch(lstate(), non_neg_integer(), non_neg_integer()) ->
+ lstate().
+
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
ok = gen_server:call(
L#lstate.pid,
{limit_prefetch, PrefetchCount, UnackedCount}, infinity),
L#lstate{prefetch_limited = true}.
+-spec unlimit_prefetch(lstate()) -> lstate().
+
unlimit_prefetch(L) ->
ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
+-spec is_active(lstate()) -> boolean().
+
is_active(#lstate{prefetch_limited = Limited}) -> Limited.
+-spec get_prefetch_limit(lstate()) -> non_neg_integer().
+
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
get_prefetch_limit(L) ->
gen_server:call(L#lstate.pid, get_prefetch_limit, infinity).
+-spec ack(lstate(), non_neg_integer()) -> 'ok'.
+
ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
+-spec pid(lstate()) -> pid().
+
pid(#lstate{pid = Pid}) -> Pid.
+-spec client(pid()) -> qstate().
+
client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}.
+-spec activate(qstate()) -> qstate().
+
activate(L = #qstate{state = dormant}) ->
ok = gen_server:cast(L#qstate.pid, {register, self()}),
L#qstate{state = active};
activate(L) -> L.
+-spec can_send(qstate(), boolean(), rabbit_types:ctag()) ->
+ {'continue' | 'suspend', qstate()}.
+
can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
AckRequired, CTag) ->
case is_consumer_blocked(L, CTag) of
@@ -246,18 +241,26 @@ safe_call(Pid, Msg, ExitValue) ->
fun () -> ExitValue end,
fun () -> gen_server2:call(Pid, Msg, infinity) end).
+-spec resume(qstate()) -> qstate().
+
resume(L = #qstate{state = suspended}) ->
L#qstate{state = active};
resume(L) -> L.
+-spec deactivate(qstate()) -> qstate().
+
deactivate(L = #qstate{state = dormant}) -> L;
deactivate(L) ->
ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
L#qstate{state = dormant}.
+-spec is_suspended(qstate()) -> boolean().
+
is_suspended(#qstate{state = suspended}) -> true;
is_suspended(#qstate{}) -> false.
+-spec is_consumer_blocked(qstate(), rabbit_types:ctag()) -> boolean().
+
is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
case gb_trees:lookup(CTag, Credits) of
none -> false;
@@ -265,6 +268,11 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
+-spec credit
+ (qstate(), rabbit_types:ctag(), non_neg_integer(), credit_mode(),
+ boolean()) ->
+ {boolean(), qstate()}.
+
credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) ->
{Res, Cr} =
case IsEmpty andalso Mode =:= drain of
@@ -273,6 +281,9 @@ credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) ->
end,
{Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
+-spec ack_from_queue(qstate(), rabbit_types:ctag(), non_neg_integer()) ->
+ {boolean(), qstate()}.
+
ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
{Credits1, Unblocked} =
case gb_trees:lookup(CTag, Credits) of
@@ -284,6 +295,9 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
end,
{Unblocked, Limiter#qstate{credits = Credits1}}.
+-spec drained(qstate()) ->
+ {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}.
+
drained(Limiter = #qstate{credits = Credits}) ->
Drain = fun(C) -> C#credit{credit = 0, mode = manual} end,
{CTagCredits, Credits2} =
@@ -295,6 +309,8 @@ drained(Limiter = #qstate{credits = Credits}) ->
end, {[], Credits}, Credits),
{CTagCredits, Limiter#qstate{credits = Credits2}}.
+-spec forget_consumer(qstate(), rabbit_types:ctag()) -> qstate().
+
forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 62cf87fb11..0f5c9fbe3c 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -54,31 +54,33 @@
-define(EPSILON, 0.000001). %% less than this and we clamp to 0
%%----------------------------------------------------------------------------
-
--spec start_link() -> rabbit_types:ok_pid_or_error().
--spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'.
--spec deregister(pid()) -> 'ok'.
--spec report_ram_duration
- (pid(), float() | 'infinity') -> number() | 'infinity'.
--spec stop() -> 'ok'.
-
-%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+-spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'.
+
register(Pid, MFA = {_M, _F, _A}) ->
gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
+-spec deregister(pid()) -> 'ok'.
+
deregister(Pid) ->
gen_server2:cast(?SERVER, {deregister, Pid}).
+-spec report_ram_duration
+ (pid(), float() | 'infinity') -> number() | 'infinity'.
+
report_ram_duration(Pid, QueueDuration) ->
gen_server2:call(?SERVER,
{report_ram_duration, Pid, QueueDuration}, infinity).
+-spec stop() -> 'ok'.
+
stop() ->
gen_server2:cast(?SERVER, stop).
diff --git a/src/rabbit_metrics.erl b/src/rabbit_metrics.erl
index 2a0a967e86..2d5f9c34e2 100644
--- a/src/rabbit_metrics.erl
+++ b/src/rabbit_metrics.erl
@@ -25,11 +25,12 @@
-define(SERVER, ?MODULE).
--spec start_link() -> rabbit_types:ok_pid_or_error().
-
%%----------------------------------------------------------------------------
%% Starts the raw metrics storage and owns the ETS tables.
%%----------------------------------------------------------------------------
+
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 9e5bddb28a..96474b0d4e 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -37,14 +37,6 @@
depth_fun
}).
--spec start_link
- (amqqueue:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun(),
- rabbit_mirror_queue_master:depth_fun()) ->
- rabbit_types:ok_pid_or_error().
--spec get_gm(pid()) -> pid().
--spec ensure_monitoring(pid(), [pid()]) -> 'ok'.
-
%%----------------------------------------------------------------------------
%%
%% Mirror Queues
@@ -307,12 +299,22 @@
%%
%%----------------------------------------------------------------------------
+-spec start_link
+ (amqqueue:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:depth_fun()) ->
+ rabbit_types:ok_pid_or_error().
+
start_link(Queue, GM, DeathFun, DepthFun) ->
gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
+-spec get_gm(pid()) -> pid().
+
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
+-spec ensure_monitoring(pid(), [pid()]) -> 'ok'.
+
ensure_monitoring(CPid, Pids) ->
gen_server2:cast(CPid, {ensure_monitoring, Pids}).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c76bc521c2..6ab9a875c2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -62,18 +62,6 @@
confirmed :: [rabbit_guid:guid()],
known_senders :: sets:set()
}.
--spec promote_backing_queue_state
- (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
- map(), [pid()]) ->
- master_state().
-
--spec sender_death_fun() -> death_fun().
--spec depth_fun() -> depth_fun().
--spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) ->
- master_state().
--spec stop_mirroring(master_state()) -> {atom(), any()}.
--spec sync_mirrors(stats_fun(), stats_fun(), master_state()) ->
- {'ok', master_state()} | {stop, any(), master_state()}.
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
@@ -101,6 +89,9 @@ init(Q, Recover, AsyncCallback) ->
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
+-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) ->
+ master_state().
+
init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
QName = amqqueue:get_name(Q0),
case rabbit_mirror_queue_coordinator:start_link(
@@ -146,6 +137,8 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
throw({coordinator_not_started, Reason})
end.
+-spec stop_mirroring(master_state()) -> {atom(), any()}.
+
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -153,6 +146,9 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+-spec sync_mirrors(stats_fun(), stats_fun(), master_state()) ->
+ {'ok', master_state()} | {stop, any(), master_state()}.
+
sync_mirrors(HandleInfo, EmitStats,
State = #state { name = QName,
gm = GM,
@@ -506,6 +502,11 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
%% Other exported functions
%% ---------------------------------------------------------------------------
+-spec promote_backing_queue_state
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
+ map(), [pid()]) ->
+ master_state().
+
promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
@@ -523,6 +524,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
known_senders = sets:from_list(KS),
wait_timeout = WaitTimeout }.
+-spec sender_death_fun() -> death_fun().
+
sender_death_fun() ->
Self = self(),
fun (DeadPid) ->
@@ -535,6 +538,8 @@ sender_death_fun() ->
end)
end.
+-spec depth_fun() -> depth_fun().
+
depth_fun() ->
Self = self(),
fun () ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 6260c4319c..88aef8fcdc 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -57,29 +57,12 @@
%%----------------------------------------------------------------------------
+%% Returns {ok, NewMPid, DeadPids, ExtraNodes}
+
-spec remove_from_queue
(rabbit_amqqueue:name(), pid(), [pid()]) ->
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
--spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
- 'ok'.
--spec store_updated_slaves(amqqueue:amqqueue()) ->
- amqqueue:amqqueue().
--spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
--spec suggested_queue_nodes(amqqueue:amqqueue()) ->
- {node(), [node()]}.
--spec is_mirrored(amqqueue:amqqueue()) -> boolean().
--spec update_mirrors
- (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.
--spec update_mirrors
- (amqqueue:amqqueue()) -> 'ok'.
--spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'.
--spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.
--spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
--spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
-
-%%----------------------------------------------------------------------------
-%% Returns {ok, NewMPid, DeadPids, ExtraNodes}
remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -241,6 +224,9 @@ drop_mirror(QName, MirrorNode) ->
E
end.
+-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
+ 'ok'.
+
add_mirrors(QName, Nodes, SyncMode) ->
[add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
@@ -282,13 +268,21 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
rabbit_misc:pid_to_string(MirrorPid),
[[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
+-spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
+
log_info (QName, Fmt, Args) ->
rabbit_log_mirroring:info("Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
+
+-spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
+
log_warning(QName, Fmt, Args) ->
rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
+-spec store_updated_slaves(amqqueue:amqqueue()) ->
+ amqqueue:amqqueue().
+
store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
SPids = amqqueue:get_slave_pids(Q0),
SSPids = amqqueue:get_sync_slave_pids(Q0),
@@ -373,10 +367,15 @@ promote_slave([SPid | SPids]) ->
%% the one to promote is the oldest.
{SPid, SPids}.
+-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
+
initial_queue_node(Q, DefNode) ->
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
MNode.
+-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
+ {node(), [node()]}.
+
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
@@ -429,6 +428,8 @@ validate_mode(Mode) ->
{error, "~p is not a valid ha-mode value", [Mode]}
end.
+-spec is_mirrored(amqqueue:amqqueue()) -> boolean().
+
is_mirrored(Q) ->
case module(Q) of
{ok, _} -> true;
@@ -451,6 +452,8 @@ actual_queue_nodes(Q) when ?is_amqqueue(Q) ->
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
+-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.
+
maybe_auto_sync(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
case policy(<<"ha-sync-mode">>, Q) of
@@ -496,6 +499,9 @@ default_batch_size() ->
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
?DEFAULT_BATCH_SIZE).
+-spec update_mirrors
+ (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.
+
update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->
% Note: we do want to ensure both queues have same pid
QPid = amqqueue:get_pid(OldQ),
@@ -505,6 +511,9 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->
_ -> rabbit_amqqueue:update_mirroring(QPid)
end.
+-spec update_mirrors
+ (amqqueue:amqqueue()) -> 'ok'.
+
update_mirrors(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
@@ -532,6 +541,9 @@ update_mirrors(Q) when ?is_amqqueue(Q) ->
%% We don't just call update_mirrors/2 here since that could decide to
%% start a slave for some other reason, and since we are the slave ATM
%% that allows complicated deadlocks.
+
+-spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'.
+
maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
MPid = amqqueue:get_pid(Q),
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index d206650d0f..2d7ce7edb2 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -59,8 +59,19 @@
-type slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
bqs()}.
+%% ---------------------------------------------------------------------------
+%% Master
+
-spec master_prepare(reference(), rabbit_amqqueue:name(),
log_fun(), [pid()]) -> pid().
+
+master_prepare(Ref, QName, Log, SPids) ->
+ MPid = self(),
+ spawn_link(fun () ->
+ ?store_proc_name(QName),
+ syncer(Ref, Log, MPid, SPids)
+ end).
+
-spec master_go(pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
@@ -69,21 +80,6 @@
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}.
--spec slave(non_neg_integer(), reference(), timer:tref(), pid(),
- bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
- 'denied' |
- {'ok' | 'failed', slave_sync_state()} |
- {'stop', any(), slave_sync_state()}.
-
-%% ---------------------------------------------------------------------------
-%% Master
-
-master_prepare(Ref, QName, Log, SPids) ->
- MPid = self(),
- spawn_link(fun () ->
- ?store_proc_name(QName),
- syncer(Ref, Log, MPid, SPids)
- end).
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
@@ -321,6 +317,12 @@ wait_for_resources(Ref, SPids) ->
%% ---------------------------------------------------------------------------
%% Slave
+-spec slave(non_neg_integer(), reference(), timer:tref(), pid(),
+ bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
+ 'denied' |
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}.
+
slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
Syncer ! {sync_deny, Ref, self()},
denied;
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 3d561aea5e..ab29faf368 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -16,7 +16,8 @@
-module(rabbit_mnesia).
--export([init/0,
+-export([%% Main interface
+ init/0,
join_cluster/2,
reset/0,
force_reset/0,
@@ -25,6 +26,7 @@
forget_cluster_node/2,
force_load_next_boot/0,
+ %% Various queries to get the status of the db
status/0,
is_clustered/0,
on_running_node/1,
@@ -35,11 +37,13 @@
dir/0,
cluster_status_from_mnesia/0,
+ %% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
init_db_unchecked/2,
copy_db/1,
check_cluster_consistency/0,
ensure_mnesia_dir/0,
+ %% Hooks used in `rabbit_node_monitor'
on_node_up/1,
on_node_down/1
]).
@@ -61,45 +65,12 @@
-type node_type() :: disc | ram.
-type cluster_status() :: {[node()], [node()], [node()]}.
-%% Main interface
--spec init() -> 'ok'.
--spec join_cluster(node(), node_type())
- -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
--spec reset() -> 'ok'.
--spec force_reset() -> 'ok'.
--spec update_cluster_nodes(node()) -> 'ok'.
--spec change_cluster_node_type(node_type()) -> 'ok'.
--spec forget_cluster_node(node(), boolean()) -> 'ok'.
--spec force_load_next_boot() -> 'ok'.
-
-%% Various queries to get the status of the db
--spec status() -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]} |
- {'partitions', [{node(), [node()]}]}].
--spec is_clustered() -> boolean().
--spec on_running_node(pid()) -> boolean().
--spec is_process_alive(pid() | {atom(), node()}) -> boolean().
--spec is_registered_process_alive(atom()) -> boolean().
--spec cluster_nodes('all' | 'disc' | 'ram' | 'running') -> [node()].
--spec node_type() -> node_type().
--spec dir() -> file:filename().
--spec cluster_status_from_mnesia() -> rabbit_types:ok_or_error2(
- cluster_status(), any()).
-
-%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
--spec init_db_unchecked([node()], node_type()) -> 'ok'.
--spec copy_db(file:filename()) -> rabbit_types:ok_or_error(any()).
--spec check_cluster_consistency() -> 'ok'.
--spec ensure_mnesia_dir() -> 'ok'.
-
-%% Hooks used in `rabbit_node_monitor'
--spec on_node_up(node()) -> 'ok'.
--spec on_node_down(node()) -> 'ok'.
-
%%----------------------------------------------------------------------------
%% Main interface
%%----------------------------------------------------------------------------
+-spec init() -> 'ok'.
+
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
@@ -228,6 +199,10 @@ join_discovered_peers(TryNodes, NodeType) ->
%% Note that we make no attempt to verify that the nodes provided are
%% all in the same cluster, we simply pick the first online node and
%% we cluster to its cluster.
+
+-spec join_cluster(node(), node_type())
+ -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
+
join_cluster(DiscoveryNode, NodeType) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
@@ -275,11 +250,16 @@ join_cluster(DiscoveryNode, NodeType) ->
%% return node to its virgin state, where it is not member of any
%% cluster, has no cluster configuration, no local database, and no
%% persisted messages
+
+-spec reset() -> 'ok'.
+
reset() ->
ensure_mnesia_not_running(),
rabbit_log:info("Resetting Rabbit~n", []),
reset_gracefully().
+-spec force_reset() -> 'ok'.
+
force_reset() ->
ensure_mnesia_not_running(),
rabbit_log:info("Resetting Rabbit forcefully~n", []),
@@ -310,6 +290,8 @@ wipe() ->
ok = rabbit_node_monitor:reset_cluster_status(),
ok.
+-spec change_cluster_node_type(node_type()) -> 'ok'.
+
change_cluster_node_type(Type) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
@@ -327,6 +309,8 @@ change_cluster_node_type(Type) ->
ok = reset(),
ok = join_cluster(Node, Type).
+-spec update_cluster_nodes(node()) -> 'ok'.
+
update_cluster_nodes(DiscoveryNode) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
@@ -353,6 +337,9 @@ update_cluster_nodes(DiscoveryNode) ->
%% * This node was, at the best of our knowledge (see comment below)
%% the last or second to last after the node we're removing to go
%% down
+
+-spec forget_cluster_node(node(), boolean()) -> 'ok'.
+
forget_cluster_node(Node, RemoveWhenOffline) ->
forget_cluster_node(Node, RemoveWhenOffline, true).
@@ -407,6 +394,10 @@ remove_node_offline_node(Node) ->
%% Queries
%%----------------------------------------------------------------------------
+-spec status() -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}].
+
status() ->
IfNonEmpty = fun (_, []) -> [];
(Type, Nodes) -> [{Type, Nodes}]
@@ -427,15 +418,22 @@ mnesia_partitions(Nodes) ->
is_running() -> mnesia:system_info(is_running) =:= yes.
+-spec is_clustered() -> boolean().
+
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
+-spec on_running_node(pid()) -> boolean().
+
on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).
%% This requires the process be in the same running cluster as us
%% (i.e. not partitioned or some random node).
%%
%% See also rabbit_misc:is_process_alive/1 which does not.
+
+-spec is_process_alive(pid() | {atom(), node()}) -> boolean().
+
is_process_alive(Pid) when is_pid(Pid) ->
on_running_node(Pid) andalso
rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true;
@@ -443,14 +441,22 @@ is_process_alive({Name, Node}) ->
lists:member(Node, cluster_nodes(running)) andalso
rpc:call(Node, rabbit_mnesia, is_registered_process_alive, [Name]) =:= true.
+-spec is_registered_process_alive(atom()) -> boolean().
+
is_registered_process_alive(Name) ->
is_pid(whereis(Name)).
+-spec cluster_nodes('all' | 'disc' | 'ram' | 'running') -> [node()].
+
cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
%% the data from mnesia. Obviously it'll work only when mnesia is
%% running.
+
+-spec cluster_status_from_mnesia() -> rabbit_types:ok_or_error2(
+ cluster_status(), any()).
+
cluster_status_from_mnesia() ->
case is_running() of
false ->
@@ -505,6 +511,8 @@ node_info() ->
mnesia:system_info(protocol_version),
cluster_status_from_mnesia()}.
+-spec node_type() -> node_type().
+
node_type() ->
{_AllNodes, DiscNodes, _RunningNodes} =
rabbit_node_monitor:read_cluster_status(),
@@ -513,6 +521,8 @@ node_type() ->
false -> ram
end.
+-spec dir() -> file:filename().
+
dir() -> mnesia:system_info(directory).
%%----------------------------------------------------------------------------
@@ -552,6 +562,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
rabbit_node_monitor:update_cluster_status(),
ok.
+-spec init_db_unchecked([node()], node_type()) -> 'ok'.
+
init_db_unchecked(ClusterNodes, NodeType) ->
init_db(ClusterNodes, NodeType, false).
@@ -582,6 +594,8 @@ init_db_with_mnesia(ClusterNodes, NodeType,
stop_mnesia()
end.
+-spec ensure_mnesia_dir() -> 'ok'.
+
ensure_mnesia_dir() ->
MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
@@ -629,6 +643,8 @@ ensure_schema_integrity() ->
throw({error, {schema_integrity_check_failed, Reason}})
end.
+-spec copy_db(file:filename()) -> rabbit_types:ok_or_error(any()).
+
copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_file:recursive_copy(dir(), Destination).
@@ -636,6 +652,8 @@ copy_db(Destination) ->
force_load_filename() ->
filename:join(dir(), "force_load").
+-spec force_load_next_boot() -> 'ok'.
+
force_load_next_boot() ->
rabbit_file:write_file(force_load_filename(), <<"">>).
@@ -648,6 +666,9 @@ maybe_force_load() ->
%% This does not guarantee us much, but it avoids some situations that
%% will definitely end up badly
+
+-spec check_cluster_consistency() -> 'ok'.
+
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
case lists:foldl(
@@ -717,12 +738,16 @@ remote_node_info(Node) ->
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
+-spec on_node_up(node()) -> 'ok'.
+
on_node_up(Node) ->
case running_disc_nodes() of
[Node] -> rabbit_log:info("cluster contains disc nodes again~n");
_ -> ok
end.
+-spec on_node_down(node()) -> 'ok'.
+
on_node_down(_Node) ->
case running_disc_nodes() of
[] -> rabbit_log:info("only running disc node went down~n");
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index c6648ac802..76e120f292 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -44,9 +44,6 @@
%%----------------------------------------------------------------------------
-spec rename(node(), [{node(), node()}]) -> 'ok'.
--spec maybe_finish([node()]) -> 'ok'.
-
-%%----------------------------------------------------------------------------
rename(Node, NodeMapList) ->
try
@@ -139,6 +136,8 @@ restore_backup(Backup) ->
stop_mnesia(),
rabbit_mnesia:force_load_next_boot().
+-spec maybe_finish([node()]) -> 'ok'.
+
maybe_finish(AllNodes) ->
case rabbit_file:read_term_file(rename_config_name()) of
{ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes);
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index f2496cd71f..f3ec7b78cc 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -41,15 +41,10 @@
fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) ->
A).
+%%----------------------------------------------------------------------------
+
-spec append(io_device(), rabbit_types:msg_id(), msg()) ->
rabbit_types:ok_or_error2(msg_size(), any()).
--spec read(io_device(), msg_size()) ->
- rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
- any()).
--spec scan(io_device(), file_size(), message_accumulator(A), A) ->
- {'ok', A, position()}.
-
-%%----------------------------------------------------------------------------
append(FileHdl, MsgId, MsgBody)
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
@@ -65,6 +60,10 @@ append(FileHdl, MsgId, MsgBody)
KO -> KO
end.
+-spec read(io_device(), msg_size()) ->
+ rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
+ any()).
+
read(FileHdl, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
@@ -77,6 +76,9 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
+-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
+ {'ok', A, position()}.
+
scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bae8364614..337064ad39 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -173,33 +173,6 @@
-type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok').
-type deletion_thunk() :: fun (() -> boolean()).
--spec start_link
- (atom(), file:filename(), [binary()] | 'undefined',
- {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error().
--spec successfully_recovered_state(server()) -> boolean().
--spec client_init(server(), client_ref(), maybe_msg_id_fun(),
- maybe_close_fds_fun()) -> client_msstate().
--spec client_terminate(client_msstate()) -> 'ok'.
--spec client_delete_and_terminate(client_msstate()) -> 'ok'.
--spec client_ref(client_msstate()) -> client_ref().
--spec close_all_indicated
- (client_msstate()) -> rabbit_types:ok(client_msstate()).
--spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
--spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
--spec read(rabbit_types:msg_id(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
--spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
--spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'.
-
--spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'.
--spec has_readers(non_neg_integer(), gc_state()) -> boolean().
--spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
- deletion_thunk().
--spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
--spec force_recovery(file:filename(), server()) -> 'ok'.
--spec transform_dir(file:filename(), server(),
- fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'.
-
%%----------------------------------------------------------------------------
%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
@@ -472,6 +445,10 @@
%% public API
%%----------------------------------------------------------------------------
+-spec start_link
+ (atom(), file:filename(), [binary()] | 'undefined',
+ {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error().
+
start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
gen_server2:start_link(?MODULE,
[Type, Dir, ClientRefs, StartupFunState],
@@ -482,9 +459,14 @@ start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Typ
[Type, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
+-spec successfully_recovered_state(server()) -> boolean().
+
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
+-spec client_init(server(), client_ref(), maybe_msg_id_fun(),
+ maybe_close_fds_fun()) -> client_msstate().
+
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
@@ -506,17 +488,25 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
flying_ets = FlyingEts,
credit_disc_bound = CreditDiscBound }.
+-spec client_terminate(client_msstate()) -> 'ok'.
+
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
ok = server_call(CState, {client_terminate, Ref}).
+-spec client_delete_and_terminate(client_msstate()) -> 'ok'.
+
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
+-spec client_ref(client_msstate()) -> client_ref().
+
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
+-spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
+
write_flow(MsgId, Msg,
CState = #client_msstate {
server = Server,
@@ -528,8 +518,13 @@ write_flow(MsgId, Msg,
credit_flow:send(Server, CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
+-spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
+
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
+-spec read(rabbit_types:msg_id(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
+
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
file_handle_cache_stats:update(msg_store_read),
@@ -545,12 +540,19 @@ read(MsgId,
{{ok, Msg}, CState}
end.
+-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
+
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
+
+-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'.
+
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
+-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'.
+
set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -1447,6 +1449,9 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
true
end.
+-spec close_all_indicated
+ (client_msstate()) -> rabbit_types:ok(client_msstate()).
+
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
CState) ->
@@ -1965,11 +1970,16 @@ cleanup_after_file_deletion(File,
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
+-spec has_readers(non_neg_integer(), gc_state()) -> boolean().
+
has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
[#file_summary { locked = true, readers = Count }] =
ets:lookup(FileSummaryEts, File),
Count /= 0.
+-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
+ deletion_thunk().
+
combine_files(Source, Destination,
State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
@@ -2046,6 +2056,8 @@ combine_files(Source, Destination,
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
+-spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
+
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir,
@@ -2127,6 +2139,8 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{destination, Destination}]}
end.
+-spec force_recovery(file:filename(), server()) -> 'ok'.
+
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
@@ -2142,6 +2156,9 @@ foreach_file(D, Fun, Files) ->
foreach_file(D1, D2, Fun, Files) ->
[ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+-spec transform_dir(file:filename(), server(),
+ fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'.
+
transform_dir(BaseDir, Store, TransformFun) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index d11d110abf..4b8d95b535 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -37,31 +37,34 @@
-spec start_link(rabbit_msg_store:gc_state()) ->
rabbit_types:ok_pid_or_error().
--spec combine(pid(), rabbit_msg_store:file_num(),
- rabbit_msg_store:file_num()) -> 'ok'.
--spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
--spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
--spec stop(pid()) -> 'ok'.
--spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
start_link(MsgStoreState) ->
gen_server2:start_link(?MODULE, [MsgStoreState],
[{timeout, infinity}]).
+-spec combine(pid(), rabbit_msg_store:file_num(),
+ rabbit_msg_store:file_num()) -> 'ok'.
+
combine(Server, Source, Destination) ->
gen_server2:cast(Server, {combine, Source, Destination}).
+-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
+
delete(Server, File) ->
gen_server2:cast(Server, {delete, File}).
+-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
+
no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).
+-spec stop(pid()) -> 'ok'.
+
stop(Server) ->
gen_server2:call(Server, stop, infinity).
+-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
+
set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 79de7ce180..809b999b98 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -44,6 +44,8 @@
-export([tcp_listener_started/4, tcp_listener_stopped/4]).
+-deprecated([{force_connection_event_refresh, 1, eventually}]).
+
%% Internal
-export([connections_local/0]).
@@ -68,53 +70,7 @@
-type protocol() :: atom().
-type label() :: string().
--spec start_tcp_listener(
- listener_config(), integer()) -> 'ok' | {'error', term()}.
--spec start_ssl_listener(
- listener_config(), rabbit_types:infos(), integer()) -> 'ok' | {'error', term()}.
--spec stop_tcp_listener(listener_config()) -> 'ok'.
--spec active_listeners() -> [rabbit_types:listener()].
--spec node_listeners(node()) -> [rabbit_types:listener()].
--spec register_connection(pid()) -> ok.
--spec unregister_connection(pid()) -> ok.
--spec connections() -> [rabbit_types:connection()].
--spec connections_local() -> [rabbit_types:connection()].
--spec connection_info_keys() -> rabbit_types:info_keys().
--spec connection_info(rabbit_types:connection()) -> rabbit_types:infos().
--spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
- rabbit_types:infos().
--spec connection_info_all() -> [rabbit_types:infos()].
--spec connection_info_all(rabbit_types:info_keys()) ->
- [rabbit_types:infos()].
--spec close_connection(pid(), string()) -> 'ok'.
--deprecated([{force_connection_event_refresh, 1, eventually}]).
--spec force_connection_event_refresh(reference()) -> 'ok'.
-
--spec on_node_down(node()) -> 'ok'.
--spec tcp_listener_addresses(listener_config()) -> [address()].
--spec tcp_listener_spec
- (name_prefix(), address(), [gen_tcp:listen_option()], module(), module(),
- protocol(), any(), non_neg_integer(), label()) ->
- supervisor:child_spec().
--spec ensure_ssl() -> rabbit_types:infos().
--spec poodle_check(atom()) -> 'ok' | 'danger'.
-
-spec boot() -> 'ok'.
--spec tcp_listener_started
- (_, _,
- string() |
- {byte(),byte(),byte(),byte()} |
- {char(),char(),char(),char(),char(),char(),char(),char()}, _) ->
- 'ok'.
--spec tcp_listener_stopped
- (_, _,
- string() |
- {byte(),byte(),byte(),byte()} |
- {char(),char(),char(),char(),char(),char(),char(),char()},
- _) ->
- 'ok'.
-
-%%----------------------------------------------------------------------------
boot() ->
ok = record_distribution_listener(),
@@ -159,12 +115,16 @@ boot_tls(NumAcceptors) ->
ok
end.
+-spec ensure_ssl() -> rabbit_types:infos().
+
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig0} = application:get_env(rabbit, ssl_options),
rabbit_ssl_options:fix(SslOptsConfig0).
+-spec poodle_check(atom()) -> 'ok' | 'danger'.
+
poodle_check(Context) ->
{ok, Vsn} = application:get_key(ssl, vsn),
case rabbit_misc:version_compare(Vsn, "5.3", gte) of %% R16B01
@@ -193,6 +153,8 @@ log_poodle_fail(Context) ->
fix_ssl_options(Config) ->
rabbit_ssl_options:fix(Config).
+-spec tcp_listener_addresses(listener_config()) -> [address()].
+
tcp_listener_addresses(Port) when is_integer(Port) ->
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({"auto", Port}) ->
@@ -213,6 +175,11 @@ tcp_listener_addresses_auto(Port) ->
lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+-spec tcp_listener_spec
+ (name_prefix(), address(), [gen_tcp:listen_option()], module(), module(),
+ protocol(), any(), non_neg_integer(), label()) ->
+ supervisor:child_spec().
+
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors, Label) ->
{rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
@@ -223,9 +190,15 @@ tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
NumAcceptors, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}.
+-spec start_tcp_listener(
+ listener_config(), integer()) -> 'ok' | {'error', term()}.
+
start_tcp_listener(Listener, NumAcceptors) ->
start_listener(Listener, NumAcceptors, amqp, "TCP listener", tcp_opts()).
+-spec start_ssl_listener(
+ listener_config(), rabbit_types:infos(), integer()) -> 'ok' | {'error', term()}.
+
start_ssl_listener(Listener, SslOpts, NumAcceptors) ->
start_listener(Listener, NumAcceptors, 'amqp/ssl', "TLS (SSL) listener", tcp_opts() ++ SslOpts).
@@ -262,6 +235,7 @@ transport(Protocol) ->
'amqp/ssl' -> ranch_ssl
end.
+-spec stop_tcp_listener(listener_config()) -> 'ok'.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
@@ -273,6 +247,13 @@ stop_tcp_listener0({IPAddress, Port, _Family}) ->
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
+-spec tcp_listener_started
+ (_, _,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()}, _) ->
+ 'ok'.
+
tcp_listener_started(Protocol, Opts, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
%% We need the host so we can distinguish multiple instances of the above
@@ -286,6 +267,14 @@ tcp_listener_started(Protocol, Opts, IPAddress, Port) ->
port = Port,
opts = Opts}).
+-spec tcp_listener_stopped
+ (_, _,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok'.
+
tcp_listener_stopped(Protocol, Opts, IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
rabbit_listener,
@@ -301,12 +290,18 @@ record_distribution_listener() ->
{port, Port, _Version} = erl_epmd:port_please(Name, Host),
tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port).
+-spec active_listeners() -> [rabbit_types:listener()].
+
active_listeners() ->
rabbit_misc:dirty_read_all(rabbit_listener).
+-spec node_listeners(node()) -> [rabbit_types:listener()].
+
node_listeners(Node) ->
mnesia:dirty_read(rabbit_listener, Node).
+-spec on_node_down(node()) -> 'ok'.
+
on_node_down(Node) ->
case lists:member(Node, nodes()) of
false ->
@@ -318,22 +313,44 @@ on_node_down(Node) ->
"Keeping ~s listeners: the node is already back~n", [Node])
end.
+-spec register_connection(pid()) -> ok.
+
register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
+-spec unregister_connection(pid()) -> ok.
+
unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+-spec connections() -> [rabbit_types:connection()].
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
+-spec connections_local() -> [rabbit_types:connection()].
+
connections_local() -> pg_local:get_members(rabbit_connections).
+-spec connection_info_keys() -> rabbit_types:info_keys().
+
connection_info_keys() -> rabbit_reader:info_keys().
+-spec connection_info(rabbit_types:connection()) -> rabbit_types:infos().
+
connection_info(Pid) -> rabbit_reader:info(Pid).
+
+-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+
connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+-spec connection_info_all() -> [rabbit_types:infos()].
+
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+
+-spec connection_info_all(rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
@@ -346,6 +363,8 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
connections_local()).
+-spec close_connection(pid(), string()) -> 'ok'.
+
close_connection(Pid, Explanation) ->
case lists:member(Pid, connections()) of
true ->
@@ -359,6 +378,8 @@ close_connection(Pid, Explanation) ->
ok
end.
+-spec force_connection_event_refresh(reference()) -> 'ok'.
+
force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 4a5dea6073..2d3b042fa4 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -50,37 +50,11 @@
keepalive_timer, autoheal, guid, node_guids}).
%%----------------------------------------------------------------------------
-
--spec start_link() -> rabbit_types:ok_pid_or_error().
-
--spec running_nodes_filename() -> string().
--spec cluster_status_filename() -> string().
--spec prepare_cluster_status_files() -> 'ok'.
--spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
--spec read_cluster_status() -> rabbit_mnesia:cluster_status().
--spec update_cluster_status() -> 'ok'.
--spec reset_cluster_status() -> 'ok'.
-
--spec notify_node_up() -> 'ok'.
--spec notify_joined_cluster() -> 'ok'.
--spec notify_left_cluster(node()) -> 'ok'.
-
--spec partitions() -> [node()].
--spec partitions([node()]) -> [{node(), [node()]}].
--spec status([node()]) -> {[{node(), [node()]}], [node()]}.
--spec subscribe(pid()) -> 'ok'.
--spec pause_partition_guard() -> 'ok' | 'pausing'.
-
--spec all_rabbit_nodes_up() -> boolean().
--spec run_outside_applications(fun (() -> any()), boolean()) -> pid().
--spec ping_all() -> 'ok'.
--spec alive_nodes([node()]) -> [node()].
--spec alive_rabbit_nodes([node()]) -> [node()].
-
-%%----------------------------------------------------------------------------
%% Start
%%----------------------------------------------------------------------------
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%----------------------------------------------------------------------------
@@ -97,15 +71,21 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% the information we have will be outdated, but it cannot be
%% otherwise.
+-spec running_nodes_filename() -> string().
+
running_nodes_filename() ->
filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
+-spec cluster_status_filename() -> string().
+
cluster_status_filename() ->
filename:join(rabbit_mnesia:dir(), "cluster_nodes.config").
quorum_filename() ->
filename:join(rabbit_mnesia:dir(), "quorum").
+-spec prepare_cluster_status_files() -> 'ok'.
+
prepare_cluster_status_files() ->
rabbit_mnesia:ensure_mnesia_dir(),
Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
@@ -133,6 +113,8 @@ prepare_cluster_status_files() ->
AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+-spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
+
write_cluster_status({All, Disc, Running}) ->
ClusterStatusFN = cluster_status_filename(),
Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
@@ -148,6 +130,8 @@ write_cluster_status({All, Disc, Running}) ->
{FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
end.
+-spec read_cluster_status() -> rabbit_mnesia:cluster_status().
+
read_cluster_status() ->
case {try_read_file(cluster_status_filename()),
try_read_file(running_nodes_filename())} of
@@ -157,10 +141,14 @@ read_cluster_status() ->
throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
end.
+-spec update_cluster_status() -> 'ok'.
+
update_cluster_status() ->
{ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
write_cluster_status(Status).
+-spec reset_cluster_status() -> 'ok'.
+
reset_cluster_status() ->
write_cluster_status({[node()], [node()], [node()]}).
@@ -168,15 +156,21 @@ reset_cluster_status() ->
%% Cluster notifications
%%----------------------------------------------------------------------------
+-spec notify_node_up() -> 'ok'.
+
notify_node_up() ->
gen_server:cast(?SERVER, notify_node_up).
+-spec notify_joined_cluster() -> 'ok'.
+
notify_joined_cluster() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
+-spec notify_left_cluster(node()) -> 'ok'.
+
notify_left_cluster(Node) ->
Nodes = rabbit_mnesia:cluster_nodes(running),
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
@@ -186,16 +180,24 @@ notify_left_cluster(Node) ->
%% Server calls
%%----------------------------------------------------------------------------
+-spec partitions() -> [node()].
+
partitions() ->
gen_server:call(?SERVER, partitions, infinity).
+-spec partitions([node()]) -> [{node(), [node()]}].
+
partitions(Nodes) ->
{Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
Replies.
+-spec status([node()]) -> {[{node(), [node()]}], [node()]}.
+
status(Nodes) ->
gen_server:multi_call(Nodes, ?SERVER, status, infinity).
+-spec subscribe(pid()) -> 'ok'.
+
subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).
@@ -218,6 +220,8 @@ subscribe(Pid) ->
%% So we have channels call in here before issuing confirms, to do a
%% lightweight check that we have not entered a pausing state.
+-spec pause_partition_guard() -> 'ok' | 'pausing'.
+
pause_partition_guard() ->
case get(pause_partition_guard) of
not_pause_mode ->
@@ -888,19 +892,28 @@ all_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) =:= length(Nodes).
+-spec all_rabbit_nodes_up() -> boolean().
+
all_rabbit_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
+-spec alive_nodes([node()]) -> [node()].
+
alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
+-spec alive_rabbit_nodes([node()]) -> [node()].
+
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
%% This one is allowed to connect!
+
+-spec ping_all() -> 'ok'.
+
ping_all() ->
[net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
ok.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 3a27ce8dc9..1e4abad8c9 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -31,28 +31,20 @@
%% Specs
%%----------------------------------------------------------------------------
--spec names(string()) ->
- rabbit_types:ok_or_error2([{string(), integer()}], term()).
--spec diagnostics([node()]) -> string().
--spec cookie_hash() -> string().
--spec is_running(node(), atom()) -> boolean().
--spec is_process_running(node(), atom()) -> boolean().
--spec cluster_name() -> binary().
--spec set_cluster_name(binary(), rabbit_types:username()) -> 'ok'.
--spec all_running() -> [node()].
--spec running_count() -> integer().
-
-%%----------------------------------------------------------------------------
-
name_type() ->
case os:getenv("RABBITMQ_USE_LONGNAME") of
"true" -> longnames;
_ -> shortnames
end.
+-spec names(string()) ->
+ rabbit_types:ok_or_error2([{string(), integer()}], term()).
+
names(Hostname) ->
rabbit_nodes_common:names(Hostname).
+-spec diagnostics([node()]) -> string().
+
diagnostics(Nodes) ->
rabbit_nodes_common:diagnostics(Nodes).
@@ -62,15 +54,23 @@ make(NodeStr) ->
parts(NodeStr) ->
rabbit_nodes_common:parts(NodeStr).
+-spec cookie_hash() -> string().
+
cookie_hash() ->
rabbit_nodes_common:cookie_hash().
+-spec is_running(node(), atom()) -> boolean().
+
is_running(Node, Application) ->
rabbit_nodes_common:is_running(Node, Application).
+-spec is_process_running(node(), atom()) -> boolean().
+
is_process_running(Node, Process) ->
rabbit_nodes_common:is_process_running(Node, Process).
+-spec cluster_name() -> binary().
+
cluster_name() ->
rabbit_runtime_parameters:value_global(
cluster_name, cluster_name_default()).
@@ -80,6 +80,8 @@ cluster_name_default() ->
FQDN = rabbit_net:hostname(),
list_to_binary(atom_to_list(make({ID, FQDN}))).
+-spec set_cluster_name(binary(), rabbit_types:username()) -> 'ok'.
+
set_cluster_name(Name, Username) ->
%% Cluster name should be binary
BinaryName = rabbit_data_coercion:to_binary(Name),
@@ -88,8 +90,12 @@ set_cluster_name(Name, Username) ->
ensure_epmd() ->
rabbit_nodes_common:ensure_epmd().
+-spec all_running() -> [node()].
+
all_running() -> rabbit_mnesia:cluster_nodes(running).
+-spec running_count() -> integer().
+
running_count() -> length(all_running()).
-spec await_running_count(integer(), integer()) -> 'ok' | {'error', atom()}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 69139b7b1a..6adce8a24e 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -31,20 +31,10 @@
-type plugin_name() :: atom().
--spec setup() -> [plugin_name()].
--spec active() -> [plugin_name()].
--spec list(string()) -> [#plugin{}].
--spec list(string(), boolean()) -> [#plugin{}].
--spec read_enabled(file:filename()) -> [plugin_name()].
--spec dependencies(boolean(), [plugin_name()], [#plugin{}]) ->
- [plugin_name()].
--spec ensure(string()) -> {'ok', [atom()], [atom()]} | {error, any()}.
--spec strictly_plugins([plugin_name()], [#plugin{}]) -> [plugin_name()].
--spec strictly_plugins([plugin_name()]) -> [plugin_name()].
--spec is_strictly_plugin(#plugin{}) -> boolean().
-
%%----------------------------------------------------------------------------
+-spec ensure(string()) -> {'ok', [atom()], [atom()]} | {error, any()}.
+
ensure(FileJustChanged) ->
case rabbit:is_running() of
true -> ensure1(FileJustChanged);
@@ -128,6 +118,9 @@ enabled_plugins() ->
end.
%% @doc Prepares the file system and installs all enabled plugins.
+
+-spec setup() -> [plugin_name()].
+
setup() ->
ExpandDir = plugins_expand_dir(),
%% Eliminate the contents of the destination directory
@@ -184,15 +177,23 @@ extract_schema(#plugin{type = dir, location = Location}, SchemaDir) ->
%% @doc Lists the plugins which are currently running.
+
+-spec active() -> [plugin_name()].
+
active() ->
InstalledPlugins = plugin_names(list(plugins_dir())),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
%% @doc Get the list of plugins which are ready to be enabled.
+
+-spec list(string()) -> [#plugin{}].
+
list(PluginsPath) ->
list(PluginsPath, false).
+-spec list(string(), boolean()) -> [#plugin{}].
+
list(PluginsPath, IncludeRequiredDeps) ->
{AllPlugins, LoadingProblems} = discover_plugins(split_path(PluginsPath)),
{UniquePlugins, DuplicateProblems} = remove_duplicate_plugins(AllPlugins),
@@ -202,6 +203,9 @@ list(PluginsPath, IncludeRequiredDeps) ->
ensure_dependencies(Plugins2).
%% @doc Read the list of enabled plugins from the supplied term file.
+
+-spec read_enabled(file:filename()) -> [plugin_name()].
+
read_enabled(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
@@ -216,6 +220,10 @@ read_enabled(PluginsFile) ->
%% @doc Calculate the dependency graph from <i>Sources</i>.
%% When Reverse =:= true the bottom/leaf level applications are returned in
%% the resulting list, otherwise they're skipped.
+
+-spec dependencies(boolean(), [plugin_name()], [#plugin{}]) ->
+ [plugin_name()].
+
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
fun ({App, _Deps}) -> [{App, App}] end,
@@ -231,15 +239,22 @@ dependencies(Reverse, Sources, AllPlugins) ->
OrderedDests.
%% Filter real plugins from application dependencies
+
+-spec is_strictly_plugin(#plugin{}) -> boolean().
+
is_strictly_plugin(#plugin{extra_dependencies = ExtraDeps}) ->
lists:member(rabbit, ExtraDeps).
+-spec strictly_plugins([plugin_name()], [#plugin{}]) -> [plugin_name()].
+
strictly_plugins(Plugins, AllPlugins) ->
lists:filter(
fun(Name) ->
is_strictly_plugin(lists:keyfind(Name, #plugin.name, AllPlugins))
end, Plugins).
+-spec strictly_plugins([plugin_name()]) -> [plugin_name()].
+
strictly_plugins(Plugins) ->
AllPlugins = list(plugins_dir()),
lists:filter(
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 8d13a16e0b..b0fe6d2b04 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,13 +29,8 @@
-define(EX_USAGE, 64).
%%----------------------------------------------------------------------------
-%% Specs
-%%----------------------------------------------------------------------------
-spec start() -> no_return().
--spec stop() -> 'ok'.
-
-%%----------------------------------------------------------------------------
start() ->
case init:get_plain_arguments() of
@@ -55,6 +50,8 @@ start() ->
rabbit_misc:quit(?SET_DIST_PORT),
ok.
+-spec stop() -> 'ok'.
+
stop() ->
ok.
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index b750482ba1..5e92013424 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -38,11 +38,11 @@
-type start_mode() :: 'declare' | 'recovery' | 'slave'.
+%%----------------------------------------------------------------------------
+
-spec start_link(amqqueue:amqqueue(), start_mode(), pid())
-> rabbit_types:ok_pid_or_error().
-%%----------------------------------------------------------------------------
-
start_link(Q, StartMode, Marker) ->
gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index e52156f60d..732415b82e 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -33,13 +33,12 @@
%%----------------------------------------------------------------------------
-spec start_link(rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error().
--spec register(pid(), pid()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
start_link(ProcName) ->
gen_server:start_link(?MODULE, [ProcName], []).
+-spec register(pid(), pid()) -> 'ok'.
+
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 7a0c0f98e3..2ede7b7b8e 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -66,57 +66,29 @@
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
--spec new() -> state().
--spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
--spec inactive(state()) -> boolean().
--spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table(),
- rabbit_types:username()}].
--spec count() -> non_neg_integer().
--spec unacknowledged_message_count() -> non_neg_integer().
--spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
- rabbit_types:username(), state())
- -> state().
--spec remove(ch(), rabbit_types:ctag(), state()) ->
- 'not_found' | state().
--spec erase_ch(ch(), state()) ->
- 'not_found' | {[ack()], [rabbit_types:ctag()],
- state()}.
--spec send_drained() -> 'ok'.
--spec deliver(fun ((boolean()) -> {fetch_result(), T}),
- rabbit_amqqueue:name(), state(), boolean(),
- none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
- {'delivered', boolean(), T, state()} |
- {'undelivered', boolean(), state()}.
--spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), [ack()], state()) ->
- 'not_found' | 'unchanged' | {'unblocked', state()}.
--spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' | {'unblocked', state()}.
--spec resume_fun() -> cr_fun().
--spec notify_sent_fun(non_neg_integer()) -> cr_fun().
--spec activate_limit_fun() -> cr_fun().
--spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
- state()) -> 'unchanged' | {'unblocked', state()}.
--spec utilisation(state()) -> ratio().
--spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer().
--spec consumer_tag(consumer()) -> rabbit_types:ctag().
--spec get_infos(consumer()) -> term().
-
%%----------------------------------------------------------------------------
+-spec new() -> state().
+
new() -> #state{consumers = priority_queue:new(),
use = {active,
erlang:monotonic_time(micro_seconds),
1.0}}.
+-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
+
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
+-spec inactive(state()) -> boolean().
+
inactive(#state{consumers = Consumers}) ->
priority_queue:is_empty(Consumers).
+-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table(),
+ rabbit_types:username()}].
+
all(State) ->
all(State, none, false).
@@ -146,11 +118,20 @@ consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) ->
[{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1]
end, Acc, Consumers).
+-spec count() -> non_neg_integer().
+
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+-spec unacknowledged_message_count() -> non_neg_integer().
+
unacknowledged_message_count() ->
lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
+-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_types:username(), state())
+ -> state().
+
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Username, State = #state{consumers = Consumers,
use = CUInfo}) ->
@@ -176,6 +157,9 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
use = update_use(CUInfo, active)}.
+-spec remove(ch(), rabbit_types:ctag(), state()) ->
+ 'not_found' | state().
+
remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
@@ -196,6 +180,10 @@ remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
remove_consumer(ChPid, CTag, Consumers)}
end.
+-spec erase_ch(ch(), state()) ->
+ 'not_found' | {[ack()], [rabbit_types:ctag()],
+ state()}.
+
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
@@ -211,9 +199,17 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
+-spec send_drained() -> 'ok'.
+
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state(), boolean(),
+ none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
+
deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
@@ -299,11 +295,16 @@ is_blocked(Consumer = {ChPid, _C}) ->
#cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
priority_queue:member(Consumer, BlockedConsumers).
+-spec record_ack(ch(), pid(), ack()) -> 'ok'.
+
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
ok.
+-spec subtract_acks(ch(), [ack()], state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
+
subtract_acks(ChPid, AckTags, State) ->
case lookup_ch(ChPid) of
not_found ->
@@ -341,6 +342,9 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
subtract_acks([], Prefix, CTagCounts, AckQ)
end.
+-spec possibly_unblock(cr_fun(), ch(), state()) ->
+ 'unchanged' | {'unblocked', state()}.
+
possibly_unblock(Update, ChPid, State) ->
case lookup_ch(ChPid) of
not_found -> unchanged;
@@ -370,23 +374,31 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
use = update_use(Use, active)}}
end.
+-spec resume_fun() -> cr_fun().
+
resume_fun() ->
fun (C = #cr{limiter = Limiter}) ->
C#cr{limiter = rabbit_limiter:resume(Limiter)}
end.
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+
notify_sent_fun(Credit) ->
fun (C = #cr{unsent_message_count = Count}) ->
C#cr{unsent_message_count = Count - Credit}
end.
+-spec activate_limit_fun() -> cr_fun().
+
activate_limit_fun() ->
fun (C = #cr{limiter = Limiter}) ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
-credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
case lookup_ch(ChPid) of
not_found ->
unchanged;
@@ -405,6 +417,8 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
drain_mode(true) -> drain;
drain_mode(false) -> manual.
+-spec utilisation(state()) -> ratio().
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -421,6 +435,8 @@ get_consumer(#state{consumers = Consumers}) ->
{empty, _} -> undefined
end.
+-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer().
+
get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
(CP == ChPid) and (CT == ConsumerTag)
@@ -430,10 +446,14 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
{{value, Consumer, _Priority}, _Tail} -> Consumer
end.
+-spec get_infos(consumer()) -> term().
+
get_infos(Consumer) ->
{Consumer#consumer.tag,Consumer#consumer.ack_required,
Consumer#consumer.prefetch, Consumer#consumer.args}.
+-spec consumer_tag(consumer()) -> rabbit_types:ctag().
+
consumer_tag(#consumer{tag = CTag}) ->
CTag.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9785ae170d..e4047a9902 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -258,46 +258,20 @@
{rabbit_types:msg_id(), non_neg_integer(), A}).
-type shutdown_terms() :: [term()] | 'non_clean_shutdown'.
--spec erase(rabbit_amqqueue:name()) -> 'ok'.
--spec reset_state(qistate()) -> qistate().
--spec init(rabbit_amqqueue:name(),
- on_sync_fun(), on_sync_fun()) -> qistate().
--spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- contains_predicate(),
- on_sync_fun(), on_sync_fun()) ->
- {'undefined' | non_neg_integer(),
- 'undefined' | non_neg_integer(), qistate()}.
--spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
--spec delete_and_terminate(qistate()) -> qistate().
--spec publish(rabbit_types:msg_id(), seq_id(),
- rabbit_types:message_properties(), boolean(),
- non_neg_integer(), qistate()) -> qistate().
--spec deliver([seq_id()], qistate()) -> qistate().
--spec ack([seq_id()], qistate()) -> qistate().
--spec sync(qistate()) -> qistate().
--spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'.
--spec flush(qistate()) -> qistate().
--spec read(seq_id(), seq_id(), qistate()) ->
- {[{rabbit_types:msg_id(), seq_id(),
- rabbit_types:message_properties(),
- boolean(), boolean()}], qistate()}.
--spec next_segment_boundary(seq_id()) -> seq_id().
--spec bounds(qistate()) ->
- {non_neg_integer(), non_neg_integer(), qistate()}.
--spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
-
--spec add_queue_ttl() -> 'ok'.
-
-
%%----------------------------------------------------------------------------
%% public API
%%----------------------------------------------------------------------------
+-spec erase(rabbit_amqqueue:name()) -> 'ok'.
+
erase(Name) ->
#qistate { dir = Dir } = blank_state(Name),
erase_index_dir(Dir).
%% used during variable queue purge when there are no pending acks
+
+-spec reset_state(qistate()) -> qistate().
+
reset_state(#qistate{ queue_name = Name,
dir = Dir,
on_sync = OnSyncFun,
@@ -310,12 +284,21 @@ reset_state(#qistate{ queue_name = Name,
ok = erase_index_dir(Dir),
blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
+-spec init(rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate().
+
init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State#qistate{on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun}.
+-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}.
+
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
OnSyncFun, OnSyncMsgFun) ->
State = blank_state(Name),
@@ -328,12 +311,16 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
+-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
+
terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
[{segments, SegmentCounts} | Terms]),
State1.
+-spec delete_and_terminate(qistate()) -> qistate().
+
delete_and_terminate(State) ->
{_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
ok = rabbit_file:recursive_delete([Dir]),
@@ -396,6 +383,10 @@ flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
State1 = deliver(lists:reverse(DC), State),
State1#qistate{delivered_cache = []}.
+-spec publish(rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(), boolean(),
+ non_neg_integer(), qistate()) -> qistate().
+
publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) ->
{JournalHdl, State1} =
get_journal_handle(
@@ -429,20 +420,29 @@ maybe_needs_confirming(MsgProps, MsgOrId,
{false, _} -> State
end.
+-spec deliver([seq_id()], qistate()) -> qistate().
+
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
+-spec ack([seq_id()], qistate()) -> qistate().
+
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
%% This is called when there are outstanding confirms or when the
%% queue is idle and the journal needs syncing (see needs_sync/1).
+
+-spec sync(qistate()) -> qistate().
+
sync(State = #qistate { journal_handle = undefined }) ->
State;
sync(State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
+-spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'.
+
needs_sync(#qistate{journal_handle = undefined}) ->
false;
needs_sync(#qistate{journal_handle = JournalHdl,
@@ -456,9 +456,16 @@ needs_sync(#qistate{journal_handle = JournalHdl,
false -> confirms
end.
+-spec flush(qistate()) -> qistate().
+
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
+-spec read(seq_id(), seq_id(), qistate()) ->
+ {[{rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(),
+ boolean(), boolean()}], qistate()}.
+
read(StartEnd, StartEnd, State) ->
{[], State};
read(Start, End, State = #qistate { segments = Segments,
@@ -472,10 +479,15 @@ read(Start, End, State = #qistate { segments = Segments,
end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
{Messages, State #qistate { segments = Segments1 }}.
+-spec next_segment_boundary(seq_id()) -> seq_id().
+
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
reconstruct_seq_id(Seg + 1, 0).
+-spec bounds(qistate()) ->
+ {non_neg_integer(), non_neg_integer(), qistate()}.
+
bounds(State = #qistate { segments = Segments }) ->
%% This is not particularly efficient, but only gets invoked on
%% queue initialisation.
@@ -498,6 +510,8 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
+-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
+
start(VHost, DurableQueueNames) ->
ok = rabbit_recovery_terms:start(VHost),
{DurableTerms, DurableDirectories} =
@@ -1286,6 +1300,8 @@ journal_minus_segment1({no_pub, del, ack}, undefined) ->
%% upgrade
%%----------------------------------------------------------------------------
+-spec add_queue_ttl() -> 'ok'.
+
add_queue_ttl() ->
foreach_queue_index({fun add_queue_ttl_journal/1,
fun add_queue_ttl_segment/1}).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 45a1c12cac..b54de3a2e6 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -47,26 +47,6 @@
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}.
--spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
- {'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
- {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
--spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(), atom()}].
--spec stop(rabbit_types:vhost()) -> 'ok'.
--spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
- {'ok', rabbit_fifo_client:state()}.
--spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
- {'ok', rabbit_fifo_client:state()}.
--spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
- {'ok', rabbit_fifo_client:state()}.
--spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
--spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
--spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
--spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
--spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
--spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
--spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
-
-define(STATISTICS_KEYS,
[policy,
operator_policy,
@@ -104,12 +84,17 @@ init_state({Name, _}, QName = #resource{}) ->
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
+-spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
+ {'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
+ {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
+
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
-spec declare(amqqueue:amqqueue()) ->
{'new', amqqueue:amqqueue()} |
{existing, amqqueue:amqqueue()}.
+
declare(Q) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
@@ -268,6 +253,9 @@ reductions(Name) ->
0
end.
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
+ {'absent', amqqueue:amqqueue(), atom()}].
+
recover(Queues) ->
[begin
{Name, _} = amqqueue:get_pid(Q0),
@@ -308,6 +296,8 @@ recover(Queues) ->
Q
end || Q0 <- Queues].
+-spec stop(rabbit_types:vhost()) -> 'ok'.
+
stop(VHost) ->
_ = [begin
Pid = amqqueue:get_pid(Q),
@@ -319,6 +309,7 @@ stop(VHost) ->
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
+
delete(Q,
_IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{Name, _} = amqqueue:get_pid(Q),
@@ -389,9 +380,15 @@ delete_immediately(Resource, {_Name, _} = QPid) ->
rabbit_core_metrics:queue_deleted(Resource),
ok.
+-spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
+ {'ok', rabbit_fifo_client:state()}.
+
ack(CTag, MsgIds, QState) ->
rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState).
+-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
+ {'ok', rabbit_fifo_client:state()}.
+
reject(true, CTag, MsgIds, QState) ->
rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState);
reject(false, CTag, MsgIds, QState) ->
@@ -478,10 +475,15 @@ basic_consume(Q, NoAck, ChPid,
ActivityStatus, Args),
{ok, QState}.
+-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
+ {'ok', rabbit_fifo_client:state()}.
+
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
maybe_send_reply(ChPid, OkMsg),
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
+-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
+
stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
Delivery#delivery.message).
@@ -489,16 +491,21 @@ stateless_deliver(ServerId, Delivery) ->
-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
rabbit_fifo_client:state()) ->
{ok | slow, rabbit_fifo_client:state()}.
+
deliver(false, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
deliver(true, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
Delivery#delivery.message, QState0).
+-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
+
info(Q) ->
info(Q, [name, durable, auto_delete, arguments, pid, state, messages,
messages_ready, messages_unacknowledged]).
+-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
+
infos(QName) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
@@ -507,10 +514,15 @@ infos(QName) ->
[]
end.
+-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
+
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
-stat(#amqqueue{pid = Leader}) ->
+-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
+
+stat(Q) when ?is_amqqueue(Q) ->
+ Leader = amqqueue:get_pid(Q),
try
case rabbit_fifo_client:stat(Leader) of
{ok, _, _} = Stat ->
@@ -557,6 +569,8 @@ policy_changed(QName, Node) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
@@ -567,6 +581,8 @@ cluster_state(Name) ->
end
end.
+-spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+
status(Vhost, QueueName) ->
%% Handle not found queues
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 20acec77ae..95a6b185c2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -159,39 +159,26 @@
%%--------------------------------------------------------------------------
--spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
--spec info_keys() -> rabbit_types:info_keys().
--spec info(pid()) -> rabbit_types:infos().
--spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
--spec force_event_refresh(pid(), reference()) -> 'ok'.
--spec shutdown(pid(), string()) -> 'ok'.
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
NodeForWhichAlarmWasSetOrCleared :: node()}.
--spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
--spec server_properties(rabbit_types:protocol()) ->
- rabbit_framing:amqp_table().
-
-%% These specs only exists to add no_return() to keep dialyzer happy
--spec init(pid(), pid(), any()) -> no_return().
--spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
- no_return().
-
--spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any().
--spec system_code_change(_,_,_,_) -> {'ok',_}.
--spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
--spec system_terminate(_,_,_,_) -> none().
%%--------------------------------------------------------------------------
+-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
+
start_link(HelperSup, Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]),
{ok, Pid}.
+-spec shutdown(pid(), string()) -> 'ok'.
+
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
+-spec init(pid(), pid(), any()) -> no_return().
+
init(Parent, HelperSup, Ref) ->
?LG_PROCESS_TYPE(reader),
{ok, Sock} = rabbit_networking:handshake(Ref,
@@ -199,33 +186,52 @@ init(Parent, HelperSup, Ref) ->
Deb = sys:debug_options([]),
start_connection(Parent, HelperSup, Deb, Sock).
+-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
+
system_continue(Parent, Deb, {Buf, BufLen, State}) ->
mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
+-spec system_terminate(_,_,_,_) -> none().
+
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
+-spec system_code_change(_,_,_,_) -> {'ok',_}.
+
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+-spec info_keys() -> rabbit_types:info_keys().
+
info_keys() -> ?INFO_KEYS.
+-spec info(pid()) -> rabbit_types:infos().
+
info(Pid) ->
gen_server:call(Pid, info, infinity).
+-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+
info(Pid, Items) ->
case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
+-spec force_event_refresh(pid(), reference()) -> 'ok'.
+
force_event_refresh(Pid, Ref) ->
gen_server:cast(Pid, {force_event_refresh, Ref}).
+-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.
+-spec server_properties(rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table().
+
server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, description),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -303,6 +309,9 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
+-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
+ no_return().
+
start_connection(Parent, HelperSup, Deb, Sock) ->
process_flag(trap_exit, true),
RealSocket = rabbit_net:unwrap_socket(Sock),
@@ -493,6 +502,8 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
binlist_split(Len, [H|T], Acc) ->
binlist_split(Len - size(H), T, [H|Acc]).
+-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any().
+
mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
connection_state = CS,
connection = #connection{
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index ab70fa2be7..4e8ae128de 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -40,12 +40,6 @@
%%----------------------------------------------------------------------------
-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
--spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
--spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
--spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
--spec clear(rabbit_types:vhost()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
start(VHost) ->
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
@@ -64,6 +58,8 @@ start(VHost) ->
end,
ok.
+-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+
stop(VHost) ->
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
@@ -79,15 +75,21 @@ stop(VHost) ->
ok
end.
+-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
+
store(VHost, DirBaseName, Terms) ->
dets:insert(VHost, {DirBaseName, Terms}).
+-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
+
read(VHost, DirBaseName) ->
case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
+-spec clear(rabbit_types:vhost()) -> 'ok'.
+
clear(VHost) ->
try
dets:delete_all_objects(VHost)
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index ecbd10a3d7..dbf5a24b50 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -31,8 +31,6 @@
-spec start_link(atom(), rabbit_types:mfargs(), boolean()) ->
rabbit_types:ok_pid_or_error().
-%%----------------------------------------------------------------------------
-
start_link(Name, {_M, _F, _A} = Fun, Delay) ->
supervisor2:start_link({local, Name}, ?MODULE, [Fun, Delay]).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index a1a0e4897d..2a0ae34e3c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -34,53 +34,63 @@
%%----------------------------------------------------------------------------
-spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start_child(atom()) -> 'ok'.
--spec start_child(atom(), [any()]) -> 'ok'.
--spec start_child(atom(), atom(), [any()]) -> 'ok'.
--spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'.
--spec start_supervisor_child(atom()) -> 'ok'.
--spec start_supervisor_child(atom(), [any()]) -> 'ok'.
--spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'.
--spec start_restartable_child(atom()) -> 'ok'.
--spec start_restartable_child(atom(), [any()]) -> 'ok'.
--spec start_delayed_restartable_child(atom()) -> 'ok'.
--spec start_delayed_restartable_child(atom(), [any()]) -> 'ok'.
--spec stop_child(atom()) -> rabbit_types:ok_or_error(any()).
-
-%%----------------------------------------------------------------------------
start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+-spec start_child(atom()) -> 'ok'.
+
start_child(Mod) -> start_child(Mod, []).
+-spec start_child(atom(), [any()]) -> 'ok'.
+
start_child(Mod, Args) -> start_child(Mod, Mod, Args).
+-spec start_child(atom(), atom(), [any()]) -> 'ok'.
+
start_child(ChildId, Mod, Args) ->
child_reply(supervisor:start_child(
?SERVER,
{ChildId, {Mod, start_link, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
+-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'.
+
start_child(ChildId, Mod, Fun, Args) ->
child_reply(supervisor:start_child(
?SERVER,
{ChildId, {Mod, Fun, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
+-spec start_supervisor_child(atom()) -> 'ok'.
start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
+-spec start_supervisor_child(atom(), [any()]) -> 'ok'.
+
start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
+-spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'.
+
start_supervisor_child(ChildId, Mod, Args) ->
child_reply(supervisor:start_child(
?SERVER,
{ChildId, {Mod, start_link, Args},
transient, infinity, supervisor, [Mod]})).
+-spec start_restartable_child(atom()) -> 'ok'.
+
start_restartable_child(M) -> start_restartable_child(M, [], false).
+
+-spec start_restartable_child(atom(), [any()]) -> 'ok'.
+
start_restartable_child(M, A) -> start_restartable_child(M, A, false).
+
+-spec start_delayed_restartable_child(atom()) -> 'ok'.
+
start_delayed_restartable_child(M) -> start_restartable_child(M, [], true).
+
+-spec start_delayed_restartable_child(atom(), [any()]) -> 'ok'.
+
start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true).
start_restartable_child(Mod, Args, Delay) ->
@@ -91,6 +101,8 @@ start_restartable_child(Mod, Args, Delay) ->
[Name, {Mod, start_link, Args}, Delay]},
transient, infinity, supervisor, [rabbit_restartable_sup]})).
+-spec stop_child(atom()) -> rabbit_types:ok_or_error(any()).
+
stop_child(ChildId) ->
case supervisor:terminate_child(?SERVER, ChildId) of
ok -> supervisor:delete_child(?SERVER, ChildId);
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 272e3966b2..db16152d5b 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -27,25 +27,15 @@
-include_lib("rabbit_common/include/rabbit.hrl").
%%----------------------------------------------------------------------------
--type retry() :: boolean().
--spec create() -> 'ok'.
--spec create_local_copy('disc' | 'ram') -> 'ok'.
--spec wait_for_replicated(retry()) -> 'ok'.
--spec wait_for_replicated() -> 'ok'.
--spec wait([atom()]) -> 'ok'.
--spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
--spec force_load() -> 'ok'.
--spec is_present() -> boolean().
--spec is_empty() -> boolean().
--spec needs_default_data() -> boolean().
--spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
--spec clear_ram_only_tables() -> 'ok'.
+-type retry() :: boolean().
%%----------------------------------------------------------------------------
%% Main interface
%%----------------------------------------------------------------------------
+-spec create() -> 'ok'.
+
create() ->
lists:foreach(fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
@@ -74,6 +64,9 @@ ensure_secondary_index(Table, Field) ->
%% tables is important: if we delete the schema first when moving to
%% RAM mnesia will loudly complain since it doesn't make much sense to
%% do that. But when moving to disc, we need to move the schema first.
+
+-spec create_local_copy('disc' | 'ram') -> 'ok'.
+
create_local_copy(disc) ->
create_local_copy(schema, disc_copies),
create_local_copies(disc);
@@ -83,13 +76,20 @@ create_local_copy(ram) ->
%% This arity only exists for backwards compatibility with certain
%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
+
+-spec wait_for_replicated() -> 'ok'.
+
wait_for_replicated() ->
wait_for_replicated(false).
+-spec wait_for_replicated(retry()) -> 'ok'.
+
wait_for_replicated(Retry) ->
wait([Tab || {Tab, TabDef} <- definitions(),
not lists:member({local_content, true}, TabDef)], Retry).
+-spec wait([atom()]) -> 'ok'.
+
wait(TableNames) ->
wait(TableNames, _Retry = false).
@@ -131,17 +131,28 @@ retry_timeout(_Retry = true) ->
end,
{retry_timeout(), Retries}.
+-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
+
retry_timeout() ->
case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
{ok, T} -> T;
undefined -> 30000
end.
+-spec force_load() -> 'ok'.
+
force_load() -> [mnesia:force_load_table(T) || T <- names()], ok.
+-spec is_present() -> boolean().
+
is_present() -> names() -- mnesia:system_info(tables) =:= [].
+-spec is_empty() -> boolean().
+
is_empty() -> is_empty(names()).
+
+-spec needs_default_data() -> boolean().
+
needs_default_data() -> is_empty([rabbit_user, rabbit_user_permission,
rabbit_vhost]).
@@ -149,6 +160,8 @@ is_empty(Names) ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
Names).
+-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
+
check_schema_integrity(Retry) ->
Tables = mnesia:system_info(tables),
case check(fun (Tab, TabDef) ->
@@ -162,6 +175,8 @@ check_schema_integrity(Retry) ->
Other -> Other
end.
+-spec clear_ram_only_tables() -> 'ok'.
+
clear_ram_only_tables() ->
Node = node(),
lists:foreach(
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 6047eb24a3..2c85de2f3a 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -28,20 +28,10 @@
-type state() :: rabbit_types:exchange() | 'none'.
--spec init(rabbit_types:vhost()) -> state().
--spec enabled(rabbit_types:vhost()) -> boolean().
--spec tap_in(rabbit_types:basic_message(), [rabbit_amqqueue:name()],
- binary(), rabbit_channel:channel_number(),
- rabbit_types:username(), state()) -> 'ok'.
--spec tap_out(rabbit_amqqueue:qmsg(), binary(),
- rabbit_channel:channel_number(),
- rabbit_types:username(), state()) -> 'ok'.
-
--spec start(rabbit_types:vhost()) -> 'ok'.
--spec stop(rabbit_types:vhost()) -> 'ok'.
-
%%----------------------------------------------------------------------------
+-spec init(rabbit_types:vhost()) -> state().
+
init(VHost) ->
case enabled(VHost) of
false -> none;
@@ -50,10 +40,16 @@ init(VHost) ->
X
end.
+-spec enabled(rabbit_types:vhost()) -> boolean().
+
enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
+-spec tap_in(rabbit_types:basic_message(), [rabbit_amqqueue:name()],
+ binary(), rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok'.
+
tap_in(_Msg, _QNames, _ConnName, _ChannelNum, _Username, none) -> ok;
tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
virtual_host = VHost}},
@@ -66,6 +62,10 @@ tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
{<<"routed_queues">>, array,
[{longstr, QName#resource.name} || QName <- QNames]}]).
+-spec tap_out(rabbit_amqqueue:qmsg(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok'.
+
tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
tap_out({#resource{name = QName, virtual_host = VHost},
_QPid, _QMsgId, Redelivered, Msg},
@@ -80,10 +80,14 @@ tap_out({#resource{name = QName, virtual_host = VHost},
%%----------------------------------------------------------------------------
+-spec start(rabbit_types:vhost()) -> 'ok'.
+
start(VHost) ->
rabbit_log:info("Enabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end).
+-spec stop(rabbit_types:vhost()) -> 'ok'.
+
stop(VHost) ->
rabbit_log:info("Disabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> VHosts -- [VHost] end).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 07eef293d3..214a1e390b 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -27,14 +27,6 @@
%% -------------------------------------------------------------------
--spec maybe_upgrade_mnesia() -> 'ok'.
--spec maybe_upgrade_local() ->
- 'ok' |
- 'version_not_available' |
- 'starting_from_scratch'.
-
-%% -------------------------------------------------------------------
-
%% The upgrade logic is quite involved, due to the existence of
%% clusters.
%%
@@ -125,6 +117,8 @@ remove_backup() ->
ok = rabbit_file:recursive_delete([backup_dir()]),
info("upgrades: Mnesia backup removed~n", []).
+-spec maybe_upgrade_mnesia() -> 'ok'.
+
maybe_upgrade_mnesia() ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
ok = rabbit_mnesia_rename:maybe_finish(AllNodes),
@@ -244,6 +238,11 @@ nodes_running(Nodes) ->
%% -------------------------------------------------------------------
+-spec maybe_upgrade_local() ->
+ 'ok' |
+ 'version_not_available' |
+ 'starting_from_scratch'.
+
maybe_upgrade_local() ->
case rabbit_version:upgrades_required(local) of
{error, version_not_available} -> version_not_available;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3072728d69..afbcb863aa 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -64,49 +64,12 @@
%% -------------------------------------------------------------------
--spec remove_user_scope() -> 'ok'.
--spec hash_passwords() -> 'ok'.
--spec add_ip_to_listener() -> 'ok'.
--spec add_opts_to_listener() -> 'ok'.
--spec internal_exchanges() -> 'ok'.
--spec user_to_internal_user() -> 'ok'.
--spec topic_trie() -> 'ok'.
--spec semi_durable_route() -> 'ok'.
--spec exchange_event_serial() -> 'ok'.
--spec trace_exchanges() -> 'ok'.
--spec user_admin_to_tags() -> 'ok'.
--spec ha_mirrors() -> 'ok'.
--spec gm() -> 'ok'.
--spec exchange_scratch() -> 'ok'.
--spec mirrored_supervisor() -> 'ok'.
--spec topic_trie_node() -> 'ok'.
--spec runtime_parameters() -> 'ok'.
--spec policy() -> 'ok'.
--spec sync_slave_pids() -> 'ok'.
--spec no_mirror_nodes() -> 'ok'.
--spec gm_pids() -> 'ok'.
--spec exchange_decorators() -> 'ok'.
--spec policy_apply_to() -> 'ok'.
--spec queue_decorators() -> 'ok'.
--spec internal_system_x() -> 'ok'.
--spec cluster_name() -> 'ok'.
--spec down_slave_nodes() -> 'ok'.
--spec queue_state() -> 'ok'.
--spec recoverable_slaves() -> 'ok'.
--spec user_password_hashing() -> 'ok'.
--spec vhost_limits() -> 'ok'.
--spec operator_policies() -> 'ok'.
--spec queue_vhost_field() -> 'ok'.
--spec queue_options() -> 'ok'.
--spec exchange_options() -> 'ok'.
-
--spec remove_explicit_default_exchange_bindings() -> 'ok'.
-
-%%--------------------------------------------------------------------
-
%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
%% used)
+
+-spec vhost_limits() -> 'ok'.
+
vhost_limits() ->
transform(
rabbit_vhost,
@@ -121,6 +84,8 @@ vhost_limits() ->
%% would be messy to have to go back and fix old transforms at that
%% point.
+-spec remove_user_scope() -> 'ok'.
+
remove_user_scope() ->
transform(
rabbit_user_permission,
@@ -133,6 +98,9 @@ remove_user_scope() ->
%% only relevant to those migrating from 2.1.1.
%% all users created after in 3.6.0 or later will use SHA-256 (unless configured
%% otherwise)
+
+-spec hash_passwords() -> 'ok'.
+
hash_passwords() ->
transform(
rabbit_user,
@@ -142,6 +110,8 @@ hash_passwords() ->
end,
[username, password_hash, is_admin]).
+-spec add_ip_to_listener() -> 'ok'.
+
add_ip_to_listener() ->
transform(
rabbit_listener,
@@ -150,6 +120,8 @@ add_ip_to_listener() ->
end,
[node, protocol, host, ip_address, port]).
+-spec add_opts_to_listener() -> 'ok'.
+
add_opts_to_listener() ->
transform(
rabbit_listener,
@@ -158,6 +130,8 @@ add_opts_to_listener() ->
end,
[node, protocol, host, ip_address, port, opts]).
+-spec internal_exchanges() -> 'ok'.
+
internal_exchanges() ->
Tables = [rabbit_exchange, rabbit_durable_exchange],
AddInternalFun =
@@ -170,6 +144,8 @@ internal_exchanges() ->
|| T <- Tables ],
ok.
+-spec user_to_internal_user() -> 'ok'.
+
user_to_internal_user() ->
transform(
rabbit_user,
@@ -178,6 +154,8 @@ user_to_internal_user() ->
end,
[username, password_hash, is_admin], internal_user).
+-spec topic_trie() -> 'ok'.
+
topic_trie() ->
create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge},
{attributes, [trie_edge, node_id]},
@@ -186,20 +164,28 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+-spec semi_durable_route() -> 'ok'.
+
semi_durable_route() ->
create(rabbit_semi_durable_route, [{record_name, route},
{attributes, [binding, value]}]).
+-spec exchange_event_serial() -> 'ok'.
+
exchange_event_serial() ->
create(rabbit_exchange_serial, [{record_name, exchange_serial},
{attributes, [name, next]}]).
+-spec trace_exchanges() -> 'ok'.
+
trace_exchanges() ->
[declare_exchange(
rabbit_misc:r(VHost, exchange, <<"amq.rabbitmq.trace">>), topic) ||
VHost <- rabbit_vhost:list()],
ok.
+-spec user_admin_to_tags() -> 'ok'.
+
user_admin_to_tags() ->
transform(
rabbit_user,
@@ -210,6 +196,8 @@ user_admin_to_tags() ->
end,
[username, password_hash, tags], internal_user).
+-spec ha_mirrors() -> 'ok'.
+
ha_mirrors() ->
Tables = [rabbit_queue, rabbit_durable_queue],
AddMirrorPidsFun =
@@ -224,10 +212,14 @@ ha_mirrors() ->
|| T <- Tables ],
ok.
+-spec gm() -> 'ok'.
+
gm() ->
create(gm_group, [{record_name, gm_group},
{attributes, [name, version, members]}]).
+-spec exchange_scratch() -> 'ok'.
+
exchange_scratch() ->
ok = exchange_scratch(rabbit_exchange),
ok = exchange_scratch(rabbit_durable_exchange).
@@ -240,17 +232,23 @@ exchange_scratch(Table) ->
end,
[name, type, durable, auto_delete, internal, arguments, scratch]).
+-spec mirrored_supervisor() -> 'ok'.
+
mirrored_supervisor() ->
create(mirrored_sup_childspec,
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+-spec topic_trie_node() -> 'ok'.
+
topic_trie_node() ->
create(rabbit_topic_trie_node,
[{record_name, topic_trie_node},
{attributes, [trie_node, edge_count, binding_count]},
{type, ordered_set}]).
+-spec runtime_parameters() -> 'ok'.
+
runtime_parameters() ->
create(rabbit_runtime_parameters,
[{record_name, runtime_parameters},
@@ -274,6 +272,8 @@ exchange_scratches(Table) ->
end,
[name, type, durable, auto_delete, internal, arguments, scratches]).
+-spec policy() -> 'ok'.
+
policy() ->
ok = exchange_policy(rabbit_exchange),
ok = exchange_policy(rabbit_durable_exchange),
@@ -300,6 +300,8 @@ queue_policy(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid,
slave_pids, mirror_nodes, policy]).
+-spec sync_slave_pids() -> 'ok'.
+
sync_slave_pids() ->
Tables = [rabbit_queue, rabbit_durable_queue],
AddSyncSlavesFun =
@@ -312,6 +314,8 @@ sync_slave_pids() ->
|| T <- Tables],
ok.
+-spec no_mirror_nodes() -> 'ok'.
+
no_mirror_nodes() ->
Tables = [rabbit_queue, rabbit_durable_queue],
RemoveMirrorNodesFun =
@@ -324,6 +328,8 @@ no_mirror_nodes() ->
|| T <- Tables],
ok.
+-spec gm_pids() -> 'ok'.
+
gm_pids() ->
Tables = [rabbit_queue, rabbit_durable_queue],
AddGMPidsFun =
@@ -336,6 +342,8 @@ gm_pids() ->
|| T <- Tables],
ok.
+-spec exchange_decorators() -> 'ok'.
+
exchange_decorators() ->
ok = exchange_decorators(rabbit_exchange),
ok = exchange_decorators(rabbit_durable_exchange).
@@ -351,6 +359,8 @@ exchange_decorators(Table) ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+-spec policy_apply_to() -> 'ok'.
+
policy_apply_to() ->
transform(
rabbit_runtime_parameters,
@@ -373,6 +383,8 @@ apply_to(Def) ->
[_, _] -> <<"all">>
end.
+-spec queue_decorators() -> 'ok'.
+
queue_decorators() ->
ok = queue_decorators(rabbit_queue),
ok = queue_decorators(rabbit_durable_queue).
@@ -388,6 +400,8 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+-spec internal_system_x() -> 'ok'.
+
internal_system_x() ->
transform(
rabbit_durable_exchange,
@@ -401,6 +415,8 @@ internal_system_x() ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+-spec cluster_name() -> 'ok'.
+
cluster_name() ->
{atomic, ok} = mnesia:transaction(fun cluster_name_tx/0),
ok.
@@ -427,6 +443,8 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+-spec down_slave_nodes() -> 'ok'.
+
down_slave_nodes() ->
ok = down_slave_nodes(rabbit_queue),
ok = down_slave_nodes(rabbit_durable_queue).
@@ -442,6 +460,8 @@ down_slave_nodes(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+-spec queue_state() -> 'ok'.
+
queue_state() ->
ok = queue_state(rabbit_queue),
ok = queue_state(rabbit_durable_queue).
@@ -458,6 +478,8 @@ queue_state(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+-spec recoverable_slaves() -> 'ok'.
+
recoverable_slaves() ->
ok = recoverable_slaves(rabbit_queue),
ok = recoverable_slaves(rabbit_durable_queue).
@@ -505,6 +527,8 @@ slave_pids_pending_shutdown(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
policy_version, slave_pids_pending_shutdown]).
+-spec operator_policies() -> 'ok'.
+
operator_policies() ->
ok = exchange_operator_policies(rabbit_exchange),
ok = exchange_operator_policies(rabbit_durable_exchange),
@@ -536,6 +560,7 @@ queue_operator_policies(Table) ->
sync_slave_pids, recoverable_slaves, policy, operator_policy,
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]).
+-spec queue_vhost_field() -> 'ok'.
queue_vhost_field() ->
ok = queue_vhost_field(rabbit_queue),
@@ -558,6 +583,8 @@ queue_vhost_field(Table) ->
sync_slave_pids, recoverable_slaves, policy, operator_policy,
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost]).
+-spec queue_options() -> 'ok'.
+
queue_options() ->
ok = queue_options(rabbit_queue),
ok = queue_options(rabbit_durable_queue),
@@ -581,6 +608,9 @@ queue_options(Table) ->
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
%% authn backend.
+
+-spec user_password_hashing() -> 'ok'.
+
user_password_hashing() ->
transform(
rabbit_user,
@@ -595,6 +625,8 @@ topic_permission() ->
{attributes, [topic_permission_key, permission]},
{disc_copies, [node()]}]).
+-spec exchange_options() -> 'ok'.
+
exchange_options() ->
ok = exchange_options(rabbit_exchange),
ok = exchange_options(rabbit_durable_exchange).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f0ad0eb6e1..8b773f2cc2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -428,10 +428,6 @@
io_batch_size :: pos_integer(),
mode :: 'default' | 'lazy',
memory_reduction_run_count :: non_neg_integer()}.
-%% Duplicated from rabbit_backing_queue
--spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
-
--spec multiple_routing_keys() -> 'ok'.
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
@@ -704,6 +700,9 @@ drop(AckRequired, State) ->
{{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
+%% Duplicated from rabbit_backing_queue
+-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
ack([], State) ->
{[], State};
%% optimisation: this head is essentially a partial evaluation of the
@@ -2792,6 +2791,8 @@ ui(#vqstate{index_state = IndexState,
%% Upgrading
%%----------------------------------------------------------------------------
+-spec multiple_routing_keys() -> 'ok'.
+
multiple_routing_keys() ->
transform_storage(
fun ({basic_message, ExchangeName, Routing_Key, Content,
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 5b1722fdbf..a63d60058c 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -33,23 +33,6 @@
-type version() :: [atom()].
--spec recorded() -> rabbit_types:ok_or_error2(version(), any()).
--spec matches([A], [A]) -> boolean().
--spec desired() -> version().
--spec desired_for_scope(scope()) -> scope_version().
--spec record_desired() -> 'ok'.
--spec record_desired_for_scope
- (scope()) -> rabbit_types:ok_or_error(any()).
--spec upgrades_required
- (scope()) -> rabbit_types:ok_or_error2([step()], any()).
--spec check_version_consistency
- (string(), string(), string()) -> rabbit_types:ok_or_error(any()).
--spec check_version_consistency
- (string(), string(), string(), string()) ->
- rabbit_types:ok_or_error(any()).
--spec check_otp_consistency
- (string()) -> rabbit_types:ok_or_error(any()).
-
%% -------------------------------------------------------------------
-define(VERSION_FILENAME, "schema_version").
@@ -57,6 +40,8 @@
%% -------------------------------------------------------------------
+-spec recorded() -> rabbit_types:ok_or_error2(version(), any()).
+
recorded() -> case rabbit_file:read_term_file(schema_filename()) of
{ok, [V]} -> {ok, V};
{error, _} = Err -> Err
@@ -87,20 +72,34 @@ record_for_scope(Scope, ScopeVersion) ->
%% -------------------------------------------------------------------
+-spec matches([A], [A]) -> boolean().
+
matches(VerA, VerB) ->
lists:usort(VerA) =:= lists:usort(VerB).
%% -------------------------------------------------------------------
+-spec desired() -> version().
+
desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)].
+-spec desired_for_scope(scope()) -> scope_version().
+
desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope).
+-spec record_desired() -> 'ok'.
+
record_desired() -> record(desired()).
+-spec record_desired_for_scope
+ (scope()) -> rabbit_types:ok_or_error(any()).
+
record_desired_for_scope(Scope) ->
record_for_scope(Scope, desired_for_scope(Scope)).
+-spec upgrades_required
+ (scope()) -> rabbit_types:ok_or_error2([step()], any()).
+
upgrades_required(Scope) ->
case recorded_for_scope(Scope) of
{error, enoent} ->
@@ -208,9 +207,16 @@ schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
%% --------------------------------------------------------------------
+-spec check_version_consistency
+ (string(), string(), string()) -> rabbit_types:ok_or_error(any()).
+
check_version_consistency(This, Remote, Name) ->
check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end).
+-spec check_version_consistency
+ (string(), string(), string(), string()) ->
+ rabbit_types:ok_or_error(any()).
+
check_version_consistency(This, Remote, Name, Comp) ->
case Comp(This, Remote) of
true -> ok;
@@ -222,5 +228,8 @@ version_error(Name, This, Remote) ->
rabbit_misc:format("~s version mismatch: local node is ~s, "
"remote node ~s", [Name, This, Remote])}}.
+-spec check_otp_consistency
+ (string()) -> rabbit_types:ok_or_error(any()).
+
check_otp_consistency(Remote) ->
check_version_consistency(rabbit_misc:otp_release(), Remote, "OTP").
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 75f96ac1eb..c5b28e7e1c 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -28,24 +28,6 @@
-export([delete_storage/1]).
-export([vhost_down/1]).
--spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
--spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
--spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
--spec exists(rabbit_types:vhost()) -> boolean().
--spec list() -> [rabbit_types:vhost()].
--spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
--spec with_user_and_vhost
- (rabbit_types:username(), rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
--spec assert(rabbit_types:vhost()) -> 'ok'.
-
--spec info(rabbit_types:vhost()) -> rabbit_types:infos().
--spec info(rabbit_types:vhost(), rabbit_types:info_keys())
- -> rabbit_types:infos().
--spec info_all() -> [rabbit_types:infos()].
--spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
--spec info_all(rabbit_types:info_keys(), reference(), pid()) ->
- 'ok'.
-
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
@@ -83,6 +65,8 @@ recover(VHost) ->
-define(INFO_KEYS, [name, tracing, cluster_state]).
+-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+
add(VHost, ActingUser) ->
case exists(VHost) of
true -> ok;
@@ -134,6 +118,8 @@ do_add(VHostPath, ActingUser) ->
{error, Msg}
end.
+-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+
delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside
%% the TX below. Queue deletion involves sending messages to the queue
@@ -252,12 +238,18 @@ internal_delete(VHostPath, ActingUser) ->
ok = mnesia:delete({rabbit_vhost, VHostPath}),
Fs1 ++ Fs2.
+-spec exists(rabbit_types:vhost()) -> boolean().
+
exists(VHostPath) ->
mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+-spec list() -> [rabbit_types:vhost()].
+
list() ->
mnesia:dirty_all_keys(rabbit_vhost).
+-spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
+
with(VHostPath, Thunk) ->
fun () ->
case mnesia:read({rabbit_vhost, VHostPath}) of
@@ -268,15 +260,23 @@ with(VHostPath, Thunk) ->
end
end.
+-spec with_user_and_vhost
+ (rabbit_types:username(), rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
+
with_user_and_vhost(Username, VHostPath, Thunk) ->
rabbit_misc:with_user(Username, with(VHostPath, Thunk)).
%% Like with/2 but outside an Mnesia tx
+
+-spec assert(rabbit_types:vhost()) -> 'ok'.
+
assert(VHostPath) -> case exists(VHostPath) of
true -> ok;
false -> throw({error, {no_such_vhost, VHostPath}})
end.
+-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
+
update(VHostPath, Fun) ->
case mnesia:read({rabbit_vhost, VHostPath}) of
[] ->
@@ -329,13 +329,27 @@ i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(cluster_state, VHost) -> vhost_cluster_state(VHost);
i(Item, _) -> throw({bad_argument, Item}).
+-spec info(rabbit_types:vhost()) -> rabbit_types:infos().
+
info(VHost) -> infos(?INFO_KEYS, VHost).
+
+-spec info(rabbit_types:vhost(), rabbit_types:info_keys())
+ -> rabbit_types:infos().
+
info(VHost, Items) -> infos(Items, VHost).
+-spec info_all() -> [rabbit_types:infos()].
+
info_all() -> info_all(?INFO_KEYS).
+
+-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
+
info_all(Items) -> [info(VHost, Items) || VHost <- list()].
info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid).
+
+-spec info_all(rabbit_types:info_keys(), reference(), pid()) ->
+ 'ok'.
info_all(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()).
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 1b11017076..e78aca0a54 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -23,12 +23,6 @@
%%----------------------------------------------------------------------------
-spec memory() -> rabbit_types:infos().
--spec binary() -> rabbit_types:infos().
--spec ets_tables_memory(Owners) -> rabbit_types:infos()
- when Owners :: all | OwnerProcessName | [OwnerProcessName],
- OwnerProcessName :: atom().
-
-%%----------------------------------------------------------------------------
memory() ->
All = interesting_sups(),
@@ -115,6 +109,8 @@ memory() ->
%% claims about negative memory. See
%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+-spec binary() -> rabbit_types:infos().
+
binary() ->
All = interesting_sups(),
{Sums, Rest} =
@@ -153,6 +149,10 @@ mnesia_memory() ->
ets_memory(Owners) ->
lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]).
+-spec ets_tables_memory(Owners) -> rabbit_types:infos()
+ when Owners :: all | OwnerProcessName | [OwnerProcessName],
+ OwnerProcessName :: atom().
+
ets_tables_memory(all) ->
[{ets:info(T, name), bytes(ets:info(T, memory))}
|| T <- ets:all(),
diff --git a/src/supervised_lifecycle.erl b/src/supervised_lifecycle.erl
index 82f6728f17..4be7922125 100644
--- a/src/supervised_lifecycle.erl
+++ b/src/supervised_lifecycle.erl
@@ -39,8 +39,6 @@
-spec start_link(atom(), rabbit_types:mfargs(), rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error().
-%%----------------------------------------------------------------------------
-
start_link(Name, StartMFA, StopMFA) ->
gen_server:start_link({local, Name}, ?MODULE, [StartMFA, StopMFA], []).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index d6c615e3b6..d5fb900198 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -64,8 +64,6 @@
mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error().
-%%--------------------------------------------------------------------
-
start_link(IPAddress, Port,
OnStartup, OnShutdown, Label) ->
gen_server:start_link(
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 074a4e4d6a..a31a60e505 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -38,8 +38,6 @@
module(), any(), mfargs(), mfargs(), integer(), string()) ->
rabbit_types:ok_pid_or_error().
-%%----------------------------------------------------------------------------
-
start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
ConcurrentAcceptorCount, Label) ->
supervisor:start_link(