summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2018-11-30 11:59:34 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-02-01 11:23:16 +0100
commit7ae54450980497cdcc87939dcd68d212ea6e8dc0 (patch)
tree32cb2a123b738d76191f7292bc7f1764b55d348c /src
parent5bbde6d0a3eb1790d4965d76c8699d0187b74183 (diff)
downloadrabbitmq-server-git-7ae54450980497cdcc87939dcd68d212ea6e8dc0.tar.gz
Fix errors reported by Dialyzer
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl139
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/rabbit_node_monitor.erl12
-rw-r--r--src/rabbit_peer_discovery.erl42
-rw-r--r--src/rabbit_peer_discovery_classic_config.erl9
-rw-r--r--src/rabbit_peer_discovery_dns.erl9
-rw-r--r--src/rabbit_quorum_queue.erl27
-rw-r--r--src/rabbit_recovery_terms.erl12
-rw-r--r--src/rabbit_table.erl4
-rw-r--r--src/rabbit_upgrade.erl37
-rw-r--r--src/rabbit_version.erl5
-rw-r--r--src/rabbit_vhost.erl11
12 files changed, 189 insertions, 133 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 621ce95d41..c9c120df77 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([not_found/1, absent/2]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
- emit_info_local/4, emit_info_down/4]).
+ emit_info_local/4, emit_info_down/4]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
list_with_possible_retry/1]).
-export([list_by_type/1]).
@@ -81,11 +81,11 @@
-type msg_id() :: non_neg_integer().
-type ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
--type absent_reason() :: 'nodedown' | 'crashed'.
--type queue_or_absent() :: amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(),absent_reason()}.
--type not_found_or_absent() ::
- 'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}.
+-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
+-type queue_not_found() :: not_found.
+-type queue_absent() :: {'absent', amqqueue:amqqueue(), absent_reason()}.
+-type not_found_or_absent() :: queue_not_found() | queue_absent().
+-type quorum_states() :: #{Name :: atom() => rabbit_fifo_client:state()}.
%%----------------------------------------------------------------------------
@@ -231,13 +231,14 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
--spec declare
- (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
- rabbit_types:maybe(pid()), rabbit_types:username()) ->
- {'new' | 'existing' | 'absent' | 'owner_died',
- amqqueue:amqqueue()} |
- {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
- rabbit_types:channel_exit().
+-spec declare(name(),
+ boolean(),
+ boolean(),
+ rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid()),
+ rabbit_types:username()) ->
+ {'new' | 'existing' | 'absent' | 'owner_died', amqqueue:amqqueue()} |
+ rabbit_types:channel_exit().
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()).
@@ -247,13 +248,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
%% should be. Note that in some cases (e.g. with "nodes" policy in
%% effect) this might not be possible to satisfy.
--spec declare
- (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
- rabbit_types:maybe(pid()), rabbit_types:username(), node()) ->
- {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
- {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
- {'absent', amqqueue:amqqueue(), absent_reason()} |
- rabbit_types:channel_exit().
+-spec declare(name(),
+ boolean(),
+ boolean(),
+ rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid()),
+ rabbit_types:username(),
+ node()) ->
+ {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
+ {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
+ {'absent', amqqueue:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit().
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
@@ -317,7 +322,7 @@ get_queue_type(Args) ->
end.
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
- queue_or_absent() | rabbit_misc:thunk(queue_or_absent()).
+ {created | existing, amqqueue:amqqueue()} | queue_absent().
internal_declare(Q, Recover) ->
?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
@@ -451,6 +456,8 @@ not_found_or_absent(Name) ->
[Q] -> {absent, Q, nodedown} %% Q exists on stopped node
end.
+-spec not_found_or_absent_dirty(name()) -> not_found_or_absent().
+
not_found_or_absent_dirty(Name) ->
%% We should read from both tables inside a tx, to get a
%% consistent view. But the chances of an inconsistency are small,
@@ -460,12 +467,15 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
--spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B.
+-spec with(name(),
+ qfun(A),
+ fun((not_found_or_absent()) -> rabbit_types:channel_exit())) ->
+ A | rabbit_types:channel_exit().
with(Name, F, E) ->
with(Name, F, E, 2000).
-with(Name, F, E, RetriesLeft) ->
+with(#resource{} = Name, F, E, RetriesLeft) ->
case lookup(Name) of
{ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
@@ -502,6 +512,12 @@ with(Name, F, E, RetriesLeft) ->
E(not_found_or_absent_dirty(Name))
end.
+-spec retry_wait(amqqueue:amqqueue(),
+ qfun(A),
+ fun((not_found_or_absent()) -> rabbit_types:channel_exit()),
+ non_neg_integer()) ->
+ A | rabbit_types:channel_exit().
+
retry_wait(Q, F, E, RetriesLeft) ->
Name = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
@@ -535,16 +551,22 @@ with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
with_or_die(Name, F) ->
- with(Name, F, fun (not_found) -> not_found(Name);
- ({absent, Q, Reason}) -> absent(Q, Reason)
- end).
+ with(Name, F, die_fun(Name)).
+
+-spec die_fun(name()) ->
+ fun((not_found_or_absent()) -> rabbit_types:channel_exit()).
+
+die_fun(Name) ->
+ fun (not_found) -> not_found(Name);
+ ({absent, Q, Reason}) -> absent(Q, Reason)
+ end.
--spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit().
+-spec not_found(name()) -> rabbit_types:channel_exit().
not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]).
--spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) ->
- rabbit_types:channel_exit().
+-spec absent(amqqueue:amqqueue(), absent_reason()) ->
+ rabbit_types:channel_exit().
absent(Q, AbsentReason) ->
QueueName = amqqueue:get_name(Q),
@@ -552,6 +574,9 @@ absent(Q, AbsentReason) ->
IsDurable = amqqueue:is_durable(Q),
priv_absent(QueueName, QPid, IsDurable, AbsentReason).
+-spec priv_absent(name(), pid(), boolean(), absent_reason()) ->
+ rabbit_types:channel_exit().
+
priv_absent(QueueName, QPid, true, nodedown) ->
%% The assertion of durability is mainly there because we mention
%% durability in the error message. That way we will hopefully
@@ -1006,7 +1031,7 @@ notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) ->
-spec consumers(amqqueue:amqqueue()) ->
[{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
- rabbit_framing:amqp_table()}].
+ rabbit_framing:amqp_table(), rabbit_types:username()}].
consumers(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
@@ -1189,7 +1214,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
--spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+-spec requeue(pid(),
+ {rabbit_fifo:consumer_tag(), [msg_id()]},
+ pid(),
+ quorum_states()) ->
+ 'ok'.
requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}),
@@ -1205,7 +1234,11 @@ requeue({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
QuorumStates
end.
--spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+-spec ack(pid(),
+ {rabbit_fifo:consumer_tag(), [msg_id()]},
+ pid(),
+ quorum_states()) ->
+ quorum_states().
ack(QPid, {_, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) ->
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}),
@@ -1221,8 +1254,12 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
QuorumStates
end.
--spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(),
- #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+-spec reject(pid() | amqqueue:ra_server_id(),
+ boolean(),
+ {rabbit_fifo:consumer_tag(), [msg_id()]},
+ pid(),
+ quorum_states()) ->
+ quorum_states().
reject(QPid, Requeue, {_, MsgIds}, ChPid, QStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke_no_result(QPid, {gen_server2, cast,
@@ -1265,17 +1302,20 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
--spec activate_limit_all(qpids(), pid()) -> ok_or_errors().
+-spec activate_limit_all(qpids(), pid()) -> ok.
activate_limit_all(QRefs, ChPid) ->
QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
--spec credit
- (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
- boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
- 'ok'.
+-spec credit(amqqueue:amqqueue(),
+ pid(),
+ rabbit_types:ctag(),
+ non_neg_integer(),
+ boolean(),
+ quorum_states()) ->
+ {'ok', quorum_states()}.
credit(Q, ChPid, CTag, Credit,
Drain, QStates) when ?amqqueue_is_classic(Q) ->
@@ -1294,7 +1334,9 @@ credit(Q,
-spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(),
#{Name :: atom() => rabbit_fifo_client:state()}) ->
- {'ok', non_neg_integer(), qmsg()} | 'empty'.
+ {'ok', non_neg_integer(), qmsg(), quorum_states()} |
+ {'empty', quorum_states()} |
+ rabbit_types:channel_exit().
basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _)
when ?amqqueue_is_classic(Q) ->
@@ -1421,10 +1463,7 @@ internal_delete1(QueueName, OnlyDurable, Reason) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
--spec internal_delete(name(), rabbit_types:username()) ->
- 'ok' | rabbit_types:connection_exit() |
- fun (() ->
- 'ok' | rabbit_types:connection_exit()).
+-spec internal_delete(name(), rabbit_types:username()) -> 'ok'.
internal_delete(QueueName, ActingUser) ->
internal_delete(QueueName, ActingUser, normal).
@@ -1682,7 +1721,9 @@ pseudo_queue(QueueName, Pid) ->
-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue().
-pseudo_queue(QueueName, Pid, Durable) ->
+pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
+ when is_pid(Pid) andalso
+ is_boolean(Durable) ->
amqqueue:new(QueueName,
Pid,
Durable,
@@ -1704,8 +1745,12 @@ deliver(Qs, Delivery) ->
deliver(Qs, Delivery, untracked),
ok.
--spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') ->
- {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}.
+-spec deliver([amqqueue:amqqueue()],
+ rabbit_types:delivery(),
+ quorum_states() | 'untracked') ->
+ {qpids(),
+ [{amqqueue:ra_server_id(), name()}],
+ quorum_states()}.
deliver([], _Delivery, QueueState) ->
%% /dev/null optimisation
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ab29faf368..61b9e10e70 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -136,20 +136,14 @@ init_from_config() ->
{DiscoveredNodes, NodeType} =
case rabbit_peer_discovery:discover_cluster_nodes() of
{ok, {Nodes, Type} = Config}
- when is_list(Nodes) andalso (Type == disc orelse Type == disk orelse Type == ram) ->
+ when is_list(Nodes) andalso
+ (Type == disc orelse Type == disk orelse Type == ram) ->
case lists:foldr(FindBadNodeNames, [], Nodes) of
[] -> Config;
BadNames -> e({invalid_cluster_node_names, BadNames})
end;
{ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
e({invalid_cluster_node_type, BadType});
- {ok, Nodes} when is_list(Nodes) ->
- %% The legacy syntax (a nodes list without the node
- %% type) is unsupported.
- case lists:foldr(FindBadNodeNames, [], Nodes) of
- [] -> e(cluster_node_type_mandatory);
- _ -> e(invalid_cluster_nodes_conf)
- end;
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
@@ -1025,6 +1019,8 @@ nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
nodes_excl_me(Nodes) -> Nodes -- [node()].
+-spec e(any()) -> no_return().
+
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description({invalid_cluster_node_names, BadNames}) ->
@@ -1034,9 +1030,6 @@ error_description({invalid_cluster_node_type, BadType}) ->
"In the 'cluster_nodes' configuration key, the node type is invalid "
"(expected 'disc' or 'ram'): " ++
lists:flatten(io_lib:format("~p", [BadType]));
-error_description(cluster_node_type_mandatory) ->
- "The 'cluster_nodes' configuration key must indicate the node type: "
- "either {[...], disc} or {[...], ram}";
error_description(invalid_cluster_nodes_conf) ->
"The 'cluster_nodes' configuration key is invalid, it must be of the "
"form {[Nodes], Type}, where Nodes is a list of node names and "
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 2d3b042fa4..cd46ade0e2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -84,14 +84,13 @@ cluster_status_filename() ->
quorum_filename() ->
filename:join(rabbit_mnesia:dir(), "quorum").
--spec prepare_cluster_status_files() -> 'ok'.
+-spec prepare_cluster_status_files() -> 'ok' | no_return().
prepare_cluster_status_files() ->
rabbit_mnesia:ensure_mnesia_dir(),
- Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
RunningNodes1 = case try_read_file(running_nodes_filename()) of
{ok, [Nodes]} when is_list(Nodes) -> Nodes;
- {ok, Other} -> Corrupt(Other);
+ {ok, Other} -> corrupt_cluster_status_files(Other);
{error, enoent} -> []
end,
ThisNode = [node()],
@@ -105,7 +104,7 @@ prepare_cluster_status_files() ->
{ok, [AllNodes0]} when is_list(AllNodes0) ->
{legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)};
{ok, Files} ->
- Corrupt(Files);
+ corrupt_cluster_status_files(Files);
{error, enoent} ->
LegacyNodes = legacy_cluster_nodes([]),
{LegacyNodes, LegacyNodes}
@@ -113,6 +112,11 @@ prepare_cluster_status_files() ->
AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+-spec corrupt_cluster_status_files(any()) -> no_return().
+
+corrupt_cluster_status_files(F) ->
+ throw({error, corrupt_cluster_status_files, F}).
+
-spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
write_cluster_status({All, Disc, Running}) ->
diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl
index 2f4acee49e..2ed36d32ec 100644
--- a/src/rabbit_peer_discovery.erl
+++ b/src/rabbit_peer_discovery.erl
@@ -105,9 +105,15 @@ maybe_init() ->
end.
--spec discover_cluster_nodes() -> {ok, Nodes :: list()} |
- {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} |
- {error, Reason :: string()}.
+%% This module doesn't currently sanity-check the return value of
+%% `Backend:list_nodes()`. Therefore, it could return something invalid:
+%% thus the `{œk, any()} in the spec.
+%%
+%% `rabbit_mnesia:init_from_config()` does some verifications.
+
+-spec discover_cluster_nodes() ->
+ {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()} | any()} |
+ {error, Reason :: string()}.
discover_cluster_nodes() ->
Backend = backend(),
@@ -156,10 +162,7 @@ maybe_inject_randomized_delay() ->
-spec inject_randomized_delay() -> ok.
inject_randomized_delay() ->
- {Min, Max} = case randomized_delay_range_in_ms() of
- {A, B} -> {A, B};
- [A, B] -> {A, B}
- end,
+ {Min, Max} = randomized_delay_range_in_ms(),
case {Min, Max} of
%% When the max value is set to 0, consider the delay to be disabled.
%% In addition, `rand:uniform/1` will fail with a "no function clause"
@@ -258,12 +261,15 @@ unlock(Data) ->
%% Implementation
%%
--spec normalize(Nodes :: list() |
- {Nodes :: list(), NodeType :: rabbit_types:node_type()} |
- {ok, Nodes :: list()} |
- {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} |
- {error, Reason :: string()}) -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} |
- {error, Reason :: string()}.
+-spec normalize(Nodes :: [node()] |
+ {Nodes :: [node()],
+ NodeType :: rabbit_types:node_type()} |
+ {ok, Nodes :: [node()]} |
+ {ok, {Nodes :: [node()],
+ NodeType :: rabbit_types:node_type()}} |
+ {error, Reason :: string()}) ->
+ {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()}} |
+ {error, Reason :: string()}.
normalize(Nodes) when is_list(Nodes) ->
{ok, {Nodes, disc}};
@@ -296,14 +302,12 @@ node_prefix() ->
--spec append_node_prefix(Value :: binary() | list()) -> atom().
+-spec append_node_prefix(Value :: binary() | string()) -> string().
-append_node_prefix(Value) ->
+append_node_prefix(Value) when is_binary(Value) orelse is_list(Value) ->
Val = rabbit_data_coercion:to_list(Value),
Hostname = case string:tokens(Val, ?NODENAME_PART_SEPARATOR) of
- [_ExistingPrefix, Val] ->
- Val;
- [Val] ->
- Val
+ [_ExistingPrefix, HN] -> HN;
+ [HN] -> HN
end,
string:join([node_prefix(), Hostname], ?NODENAME_PART_SEPARATOR).
diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl
index 6597d77da4..16b7861f5f 100644
--- a/src/rabbit_peer_discovery_classic_config.erl
+++ b/src/rabbit_peer_discovery_classic_config.erl
@@ -26,13 +26,12 @@
%% API
%%
--spec list_nodes() -> {ok, Nodes :: list()} | {error, Reason :: string()}.
+-spec list_nodes() -> {ok, {Nodes :: [node()], rabbit_types:node_type()}}.
list_nodes() ->
- case application:get_env(rabbit, cluster_nodes) of
- {_Nodes, _NodeType} = Pair -> Pair;
- Nodes when is_list(Nodes) -> {Nodes, disc};
- undefined -> {[], disc}
+ case application:get_env(rabbit, cluster_nodes, {[], disc}) of
+ {_Nodes, _NodeType} = Pair -> {ok, Pair};
+ Nodes when is_list(Nodes) -> {ok, {Nodes, disc}}
end.
-spec supports_registration() -> boolean().
diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl
index ad277e08b0..031d1290b3 100644
--- a/src/rabbit_peer_discovery_dns.erl
+++ b/src/rabbit_peer_discovery_dns.erl
@@ -28,12 +28,13 @@
%% API
%%
--spec list_nodes() -> {ok, Nodes :: list()} | {error, Reason :: string()}.
+-spec list_nodes() ->
+ {ok, {Nodes :: [node()], rabbit_types:node_type()}}.
list_nodes() ->
case application:get_env(rabbit, cluster_formation) of
undefined ->
- {[], disc};
+ {ok, {[], disc}};
{ok, ClusterFormation} ->
case proplists:get_value(peer_discovery_dns, ClusterFormation) of
undefined ->
@@ -41,11 +42,11 @@ list_nodes() ->
"but final config does not contain rabbit.cluster_formation.peer_discovery_dns. "
"Cannot discover any nodes because seed hostname is not configured!",
[?MODULE]),
- {[], disc};
+ {ok, {[], disc}};
Proplist ->
Hostname = rabbit_data_coercion:to_list(proplists:get_value(hostname, Proplist)),
- {discover_nodes(Hostname, net_kernel:longnames()), rabbit_peer_discovery:node_type()}
+ {ok, {discover_nodes(Hostname, net_kernel:longnames()), rabbit_peer_discovery:node_type()}}
end
end.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b54de3a2e6..725d8086d9 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -43,7 +43,6 @@
-include_lib("stdlib/include/qlc.hrl").
-include("amqqueue.hrl").
--type ra_server_id() :: {Name :: atom(), Node :: node()}.
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}.
@@ -68,8 +67,8 @@
%%----------------------------------------------------------------------------
--spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
- rabbit_fifo_client:state().
+-spec init_state(amqqueue:ra_server_id(), rabbit_amqqueue:name()) ->
+ rabbit_fifo_client:state().
init_state({Name, _}, QName = #resource{}) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
@@ -84,7 +83,7 @@ init_state({Name, _}, QName = #resource{}) ->
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
--spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
+-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) ->
{'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
{rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
@@ -92,8 +91,7 @@ handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
-spec declare(amqqueue:amqqueue()) ->
- {'new', amqqueue:amqqueue()} |
- {existing, amqqueue:amqqueue()}.
+ {new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit().
declare(Q) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
@@ -264,9 +262,9 @@ recover(Queues) ->
ok ->
% queue was restarted, good
ok;
- {error, Err}
- when Err == not_started orelse
- Err == name_not_registered ->
+ {error, Err1}
+ when Err1 == not_started orelse
+ Err1 == name_not_registered ->
% queue was never started on this node
% so needs to be started from scratch.
Machine = ra_machine(Q0),
@@ -400,7 +398,8 @@ credit(CTag, Credit, Drain, QState) ->
-spec basic_get(amqqueue:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(),
rabbit_fifo_client:state()) ->
{'ok', 'empty', rabbit_fifo_client:state()} |
- {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
+ {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} |
+ {error, timeout | term()}.
basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
@@ -420,6 +419,8 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
+ {error, _} = Err ->
+ Err;
{timeout, _} ->
{error, timeout}
end.
@@ -482,7 +483,7 @@ basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
maybe_send_reply(ChPid, OkMsg),
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
--spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
+-spec stateless_deliver(amqqueue:ra_server_id(), rabbit_types:delivery()) -> 'ok'.
stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
@@ -581,7 +582,7 @@ cluster_state(Name) ->
end
end.
--spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
status(Vhost, QueueName) ->
%% Handle not found queues
@@ -885,6 +886,8 @@ format(Q) when ?is_amqqueue(Q) ->
is_process_alive(Name, Node) ->
erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
+-spec quorum_messages(atom()) -> non_neg_integer().
+
quorum_messages(QName) ->
case ets:lookup(queue_coarse_metrics, QName) of
[{_, _, _, M, _}] ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 4e8ae128de..28bd9fc23a 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -117,7 +117,7 @@ upgrade_recovery_terms() ->
[begin
File = filename:join([QueuesDir, Dir, "clean.dot"]),
case rabbit_file:read_term_file(File) of
- {ok, Terms} -> ok = store(?MODULE, Dir, Terms);
+ {ok, Terms} -> ok = store_global_table(Dir, Terms);
{error, _} -> ok
end,
file:delete(File)
@@ -134,7 +134,7 @@ dets_upgrade(Fun)->
open_global_table(),
try
ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
- store(?MODULE, DirBaseName, Fun(Terms)),
+ store_global_table(DirBaseName, Fun(Terms)),
Acc
end, ok, ?MODULE),
ok
@@ -160,8 +160,14 @@ close_global_table() ->
ok
end.
+store_global_table(DirBaseName, Terms) ->
+ dets:insert(?MODULE, {DirBaseName, Terms}).
+
read_global(DirBaseName) ->
- read(?MODULE, DirBaseName).
+ case dets:lookup(?MODULE, DirBaseName) of
+ [{_, Terms}] -> {ok, Terms};
+ _ -> {error, not_found}
+ end.
delete_global_table() ->
file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")).
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index db16152d5b..9bf1d2c3f6 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -117,8 +117,6 @@ wait(TableNames, Timeout, Retries) ->
throw(Error);
{_, {error, Error}} ->
rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]),
- wait(TableNames, Timeout, Retries - 1);
- _ ->
wait(TableNames, Timeout, Retries - 1)
end.
@@ -131,7 +129,7 @@ retry_timeout(_Retry = true) ->
end,
{retry_timeout(), Retries}.
--spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
+-spec retry_timeout() -> non_neg_integer() | infinity.
retry_timeout() ->
case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 214a1e390b..f452d5c92f 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -174,25 +174,34 @@ upgrade_mode(AllNodes) ->
end;
[Another|_] ->
MyVersion = rabbit_version:desired_for_scope(mnesia),
- ErrFun = fun (ClusterVersion) ->
- %% The other node(s) are running an
- %% unexpected version.
- die("Cluster upgrade needed but other nodes are "
- "running ~p~nand I want ~p",
- [ClusterVersion, MyVersion])
- end,
case rpc:call(Another, rabbit_version, desired_for_scope,
[mnesia]) of
- {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version);
- {badrpc, Reason} -> ErrFun({unknown, Reason});
- CV -> case rabbit_version:matches(
- MyVersion, CV) of
- true -> secondary;
- false -> ErrFun(CV)
- end
+ {badrpc, {'EXIT', {undef, _}}} ->
+ die_because_cluster_upgrade_needed(unknown_old_version,
+ MyVersion);
+ {badrpc, Reason} ->
+ die_because_cluster_upgrade_needed({unknown, Reason},
+ MyVersion);
+ CV -> case rabbit_version:matches(
+ MyVersion, CV) of
+ true -> secondary;
+ false -> die_because_cluster_upgrade_needed(
+ CV, MyVersion)
+ end
end
end.
+-spec die_because_cluster_upgrade_needed(any(), any()) -> no_return().
+
+die_because_cluster_upgrade_needed(ClusterVersion, MyVersion) ->
+ %% The other node(s) are running an
+ %% unexpected version.
+ die("Cluster upgrade needed but other nodes are "
+ "running ~p~nand I want ~p",
+ [ClusterVersion, MyVersion]).
+
+-spec die(string(), list()) -> no_return().
+
die(Msg, Args) ->
%% We don't throw or exit here since that gets thrown
%% straight out into do_boot, generating an erl_crash.dump
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index a63d60058c..4a79ef9938 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -214,8 +214,9 @@ check_version_consistency(This, Remote, Name) ->
check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end).
-spec check_version_consistency
- (string(), string(), string(), string()) ->
- rabbit_types:ok_or_error(any()).
+ (string(), string(), string(),
+ fun((string(), string()) -> boolean())) ->
+ rabbit_types:ok_or_error(any()).
check_version_consistency(This, Remote, Name, Comp) ->
case Comp(This, Remote) of
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index c5b28e7e1c..9180f9ca0a 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -108,10 +108,6 @@ do_add(VHostPath, ActingUser) ->
rabbit_event:notify(vhost_created, info(VHostPath)
++ [{user_who_performed_action, ActingUser}]),
R;
- {error, {no_such_vhost, VHostPath}} ->
- Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!",
- [VHostPath]),
- {error, Msg};
{error, Reason} ->
Msg = rabbit_misc:format("failed to set up vhost '~s': ~p",
[VHostPath, Reason]),
@@ -215,10 +211,7 @@ assert_benign({error, {absent, Q, _}}, ActingUser) ->
%% Removing the mnesia entries here is safe. If/when the down node
%% restarts, it will clear out the on-disk storage of the queue.
QName = amqqueue:get_name(Q),
- case rabbit_amqqueue:internal_delete(QName, ActingUser) of
- ok -> ok;
- {error, not_found} -> ok
- end.
+ rabbit_amqqueue:internal_delete(QName, ActingUser).
internal_delete(VHostPath, ActingUser) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
@@ -275,7 +268,7 @@ assert(VHostPath) -> case exists(VHostPath) of
false -> throw({error, {no_such_vhost, VHostPath}})
end.
--spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
+-spec update(rabbit_types:vhost(), fun((#vhost{}) -> #vhost{})) -> #vhost{}.
update(VHostPath, Fun) ->
case mnesia:read({rabbit_vhost, VHostPath}) of