diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2018-11-30 11:59:34 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-02-01 11:23:16 +0100 |
| commit | 7ae54450980497cdcc87939dcd68d212ea6e8dc0 (patch) | |
| tree | 32cb2a123b738d76191f7292bc7f1764b55d348c | |
| parent | 5bbde6d0a3eb1790d4965d76c8699d0187b74183 (diff) | |
| download | rabbitmq-server-git-7ae54450980497cdcc87939dcd68d212ea6e8dc0.tar.gz | |
Fix errors reported by Dialyzer
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 139 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_classic_config.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_dns.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_version.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 11 |
13 files changed, 190 insertions, 134 deletions
@@ -136,7 +136,7 @@ define PROJECT_ENV ] endef -LOCAL_DEPS = sasl mnesia os_mon inets compiler syntax_tools +LOCAL_DEPS = sasl mnesia os_mon inets compiler public_key crypto ssl syntax_tools BUILD_DEPS = rabbitmq_cli syslog DEPS = ranch lager rabbit_common ra sysmon_handler TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 621ce95d41..c9c120df77 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([not_found/1, absent/2]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, - emit_info_local/4, emit_info_down/4]). + emit_info_local/4, emit_info_down/4]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, list_with_possible_retry/1]). -export([list_by_type/1]). @@ -81,11 +81,11 @@ -type msg_id() :: non_neg_integer(). -type ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}. --type absent_reason() :: 'nodedown' | 'crashed'. --type queue_or_absent() :: amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(),absent_reason()}. --type not_found_or_absent() :: - 'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}. +-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout. +-type queue_not_found() :: not_found. +-type queue_absent() :: {'absent', amqqueue:amqqueue(), absent_reason()}. +-type not_found_or_absent() :: queue_not_found() | queue_absent(). +-type quorum_states() :: #{Name :: atom() => rabbit_fifo_client:state()}. %%---------------------------------------------------------------------------- @@ -231,13 +231,14 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. --spec declare - (name(), boolean(), boolean(), rabbit_framing:amqp_table(), - rabbit_types:maybe(pid()), rabbit_types:username()) -> - {'new' | 'existing' | 'absent' | 'owner_died', - amqqueue:amqqueue()} | - {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | - rabbit_types:channel_exit(). +-spec declare(name(), + boolean(), + boolean(), + rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), + rabbit_types:username()) -> + {'new' | 'existing' | 'absent' | 'owner_died', amqqueue:amqqueue()} | + rabbit_types:channel_exit(). declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()). @@ -247,13 +248,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> %% should be. Note that in some cases (e.g. with "nodes" policy in %% effect) this might not be possible to satisfy. --spec declare - (name(), boolean(), boolean(), rabbit_framing:amqp_table(), - rabbit_types:maybe(pid()), rabbit_types:username(), node()) -> - {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | - {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | - {'absent', amqqueue:amqqueue(), absent_reason()} | - rabbit_types:channel_exit(). +-spec declare(name(), + boolean(), + boolean(), + rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), + rabbit_types:username(), + node()) -> + {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | + {'absent', amqqueue:amqqueue(), absent_reason()} | + rabbit_types:channel_exit(). declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, Owner, ActingUser, Node) -> @@ -317,7 +322,7 @@ get_queue_type(Args) -> end. -spec internal_declare(amqqueue:amqqueue(), boolean()) -> - queue_or_absent() | rabbit_misc:thunk(queue_or_absent()). + {created | existing, amqqueue:amqqueue()} | queue_absent(). internal_declare(Q, Recover) -> ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( @@ -451,6 +456,8 @@ not_found_or_absent(Name) -> [Q] -> {absent, Q, nodedown} %% Q exists on stopped node end. +-spec not_found_or_absent_dirty(name()) -> not_found_or_absent(). + not_found_or_absent_dirty(Name) -> %% We should read from both tables inside a tx, to get a %% consistent view. But the chances of an inconsistency are small, @@ -460,12 +467,15 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. --spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B. +-spec with(name(), + qfun(A), + fun((not_found_or_absent()) -> rabbit_types:channel_exit())) -> + A | rabbit_types:channel_exit(). with(Name, F, E) -> with(Name, F, E, 2000). -with(Name, F, E, RetriesLeft) -> +with(#resource{} = Name, F, E, RetriesLeft) -> case lookup(Name) of {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out @@ -502,6 +512,12 @@ with(Name, F, E, RetriesLeft) -> E(not_found_or_absent_dirty(Name)) end. +-spec retry_wait(amqqueue:amqqueue(), + qfun(A), + fun((not_found_or_absent()) -> rabbit_types:channel_exit()), + non_neg_integer()) -> + A | rabbit_types:channel_exit(). + retry_wait(Q, F, E, RetriesLeft) -> Name = amqqueue:get_name(Q), QPid = amqqueue:get_pid(Q), @@ -535,16 +551,22 @@ with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). -spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). with_or_die(Name, F) -> - with(Name, F, fun (not_found) -> not_found(Name); - ({absent, Q, Reason}) -> absent(Q, Reason) - end). + with(Name, F, die_fun(Name)). + +-spec die_fun(name()) -> + fun((not_found_or_absent()) -> rabbit_types:channel_exit()). + +die_fun(Name) -> + fun (not_found) -> not_found(Name); + ({absent, Q, Reason}) -> absent(Q, Reason) + end. --spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit(). +-spec not_found(name()) -> rabbit_types:channel_exit(). not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]). --spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) -> - rabbit_types:channel_exit(). +-spec absent(amqqueue:amqqueue(), absent_reason()) -> + rabbit_types:channel_exit(). absent(Q, AbsentReason) -> QueueName = amqqueue:get_name(Q), @@ -552,6 +574,9 @@ absent(Q, AbsentReason) -> IsDurable = amqqueue:is_durable(Q), priv_absent(QueueName, QPid, IsDurable, AbsentReason). +-spec priv_absent(name(), pid(), boolean(), absent_reason()) -> + rabbit_types:channel_exit(). + priv_absent(QueueName, QPid, true, nodedown) -> %% The assertion of durability is mainly there because we mention %% durability in the error message. That way we will hopefully @@ -1006,7 +1031,7 @@ notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) -> -spec consumers(amqqueue:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), - rabbit_framing:amqp_table()}]. + rabbit_framing:amqp_table(), rabbit_types:username()}]. consumers(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), @@ -1189,7 +1214,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). --spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. +-spec requeue(pid(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + pid(), + quorum_states()) -> + 'ok'. requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), @@ -1205,7 +1234,11 @@ requeue({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) QuorumStates end. --spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. +-spec ack(pid(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + pid(), + quorum_states()) -> + quorum_states(). ack(QPid, {_, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}), @@ -1221,8 +1254,12 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) QuorumStates end. --spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. +-spec reject(pid() | amqqueue:ra_server_id(), + boolean(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + pid(), + quorum_states()) -> + quorum_states(). reject(QPid, Requeue, {_, MsgIds}, ChPid, QStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke_no_result(QPid, {gen_server2, cast, @@ -1265,17 +1302,20 @@ notify_down_all(QPids, ChPid, Timeout) -> Error -> {error, Error} end. --spec activate_limit_all(qpids(), pid()) -> ok_or_errors(). +-spec activate_limit_all(qpids(), pid()) -> ok. activate_limit_all(QRefs, ChPid) -> QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). --spec credit - (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), - boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) -> - 'ok'. +-spec credit(amqqueue:amqqueue(), + pid(), + rabbit_types:ctag(), + non_neg_integer(), + boolean(), + quorum_states()) -> + {'ok', quorum_states()}. credit(Q, ChPid, CTag, Credit, Drain, QStates) when ?amqqueue_is_classic(Q) -> @@ -1294,7 +1334,9 @@ credit(Q, -spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(), #{Name :: atom() => rabbit_fifo_client:state()}) -> - {'ok', non_neg_integer(), qmsg()} | 'empty'. + {'ok', non_neg_integer(), qmsg(), quorum_states()} | + {'empty', quorum_states()} | + rabbit_types:channel_exit(). basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _) when ?amqqueue_is_classic(Q) -> @@ -1421,10 +1463,7 @@ internal_delete1(QueueName, OnlyDurable, Reason) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName, OnlyDurable). --spec internal_delete(name(), rabbit_types:username()) -> - 'ok' | rabbit_types:connection_exit() | - fun (() -> - 'ok' | rabbit_types:connection_exit()). +-spec internal_delete(name(), rabbit_types:username()) -> 'ok'. internal_delete(QueueName, ActingUser) -> internal_delete(QueueName, ActingUser, normal). @@ -1682,7 +1721,9 @@ pseudo_queue(QueueName, Pid) -> -spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). -pseudo_queue(QueueName, Pid, Durable) -> +pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) + when is_pid(Pid) andalso + is_boolean(Durable) -> amqqueue:new(QueueName, Pid, Durable, @@ -1704,8 +1745,12 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, untracked), ok. --spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') -> - {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}. +-spec deliver([amqqueue:amqqueue()], + rabbit_types:delivery(), + quorum_states() | 'untracked') -> + {qpids(), + [{amqqueue:ra_server_id(), name()}], + quorum_states()}. deliver([], _Delivery, QueueState) -> %% /dev/null optimisation diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ab29faf368..61b9e10e70 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -136,20 +136,14 @@ init_from_config() -> {DiscoveredNodes, NodeType} = case rabbit_peer_discovery:discover_cluster_nodes() of {ok, {Nodes, Type} = Config} - when is_list(Nodes) andalso (Type == disc orelse Type == disk orelse Type == ram) -> + when is_list(Nodes) andalso + (Type == disc orelse Type == disk orelse Type == ram) -> case lists:foldr(FindBadNodeNames, [], Nodes) of [] -> Config; BadNames -> e({invalid_cluster_node_names, BadNames}) end; {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram -> e({invalid_cluster_node_type, BadType}); - {ok, Nodes} when is_list(Nodes) -> - %% The legacy syntax (a nodes list without the node - %% type) is unsupported. - case lists:foldr(FindBadNodeNames, [], Nodes) of - [] -> e(cluster_node_type_mandatory); - _ -> e(invalid_cluster_nodes_conf) - end; {ok, _} -> e(invalid_cluster_nodes_conf) end, @@ -1025,6 +1019,8 @@ nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]). nodes_excl_me(Nodes) -> Nodes -- [node()]. +-spec e(any()) -> no_return(). + e(Tag) -> throw({error, {Tag, error_description(Tag)}}). error_description({invalid_cluster_node_names, BadNames}) -> @@ -1034,9 +1030,6 @@ error_description({invalid_cluster_node_type, BadType}) -> "In the 'cluster_nodes' configuration key, the node type is invalid " "(expected 'disc' or 'ram'): " ++ lists:flatten(io_lib:format("~p", [BadType])); -error_description(cluster_node_type_mandatory) -> - "The 'cluster_nodes' configuration key must indicate the node type: " - "either {[...], disc} or {[...], ram}"; error_description(invalid_cluster_nodes_conf) -> "The 'cluster_nodes' configuration key is invalid, it must be of the " "form {[Nodes], Type}, where Nodes is a list of node names and " diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 2d3b042fa4..cd46ade0e2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -84,14 +84,13 @@ cluster_status_filename() -> quorum_filename() -> filename:join(rabbit_mnesia:dir(), "quorum"). --spec prepare_cluster_status_files() -> 'ok'. +-spec prepare_cluster_status_files() -> 'ok' | no_return(). prepare_cluster_status_files() -> rabbit_mnesia:ensure_mnesia_dir(), - Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end, RunningNodes1 = case try_read_file(running_nodes_filename()) of {ok, [Nodes]} when is_list(Nodes) -> Nodes; - {ok, Other} -> Corrupt(Other); + {ok, Other} -> corrupt_cluster_status_files(Other); {error, enoent} -> [] end, ThisNode = [node()], @@ -105,7 +104,7 @@ prepare_cluster_status_files() -> {ok, [AllNodes0]} when is_list(AllNodes0) -> {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)}; {ok, Files} -> - Corrupt(Files); + corrupt_cluster_status_files(Files); {error, enoent} -> LegacyNodes = legacy_cluster_nodes([]), {LegacyNodes, LegacyNodes} @@ -113,6 +112,11 @@ prepare_cluster_status_files() -> AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). +-spec corrupt_cluster_status_files(any()) -> no_return(). + +corrupt_cluster_status_files(F) -> + throw({error, corrupt_cluster_status_files, F}). + -spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'. write_cluster_status({All, Disc, Running}) -> diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl index 2f4acee49e..2ed36d32ec 100644 --- a/src/rabbit_peer_discovery.erl +++ b/src/rabbit_peer_discovery.erl @@ -105,9 +105,15 @@ maybe_init() -> end. --spec discover_cluster_nodes() -> {ok, Nodes :: list()} | - {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | - {error, Reason :: string()}. +%% This module doesn't currently sanity-check the return value of +%% `Backend:list_nodes()`. Therefore, it could return something invalid: +%% thus the `{œk, any()} in the spec. +%% +%% `rabbit_mnesia:init_from_config()` does some verifications. + +-spec discover_cluster_nodes() -> + {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()} | any()} | + {error, Reason :: string()}. discover_cluster_nodes() -> Backend = backend(), @@ -156,10 +162,7 @@ maybe_inject_randomized_delay() -> -spec inject_randomized_delay() -> ok. inject_randomized_delay() -> - {Min, Max} = case randomized_delay_range_in_ms() of - {A, B} -> {A, B}; - [A, B] -> {A, B} - end, + {Min, Max} = randomized_delay_range_in_ms(), case {Min, Max} of %% When the max value is set to 0, consider the delay to be disabled. %% In addition, `rand:uniform/1` will fail with a "no function clause" @@ -258,12 +261,15 @@ unlock(Data) -> %% Implementation %% --spec normalize(Nodes :: list() | - {Nodes :: list(), NodeType :: rabbit_types:node_type()} | - {ok, Nodes :: list()} | - {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | - {error, Reason :: string()}) -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | - {error, Reason :: string()}. +-spec normalize(Nodes :: [node()] | + {Nodes :: [node()], + NodeType :: rabbit_types:node_type()} | + {ok, Nodes :: [node()]} | + {ok, {Nodes :: [node()], + NodeType :: rabbit_types:node_type()}} | + {error, Reason :: string()}) -> + {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()}} | + {error, Reason :: string()}. normalize(Nodes) when is_list(Nodes) -> {ok, {Nodes, disc}}; @@ -296,14 +302,12 @@ node_prefix() -> --spec append_node_prefix(Value :: binary() | list()) -> atom(). +-spec append_node_prefix(Value :: binary() | string()) -> string(). -append_node_prefix(Value) -> +append_node_prefix(Value) when is_binary(Value) orelse is_list(Value) -> Val = rabbit_data_coercion:to_list(Value), Hostname = case string:tokens(Val, ?NODENAME_PART_SEPARATOR) of - [_ExistingPrefix, Val] -> - Val; - [Val] -> - Val + [_ExistingPrefix, HN] -> HN; + [HN] -> HN end, string:join([node_prefix(), Hostname], ?NODENAME_PART_SEPARATOR). diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl index 6597d77da4..16b7861f5f 100644 --- a/src/rabbit_peer_discovery_classic_config.erl +++ b/src/rabbit_peer_discovery_classic_config.erl @@ -26,13 +26,12 @@ %% API %% --spec list_nodes() -> {ok, Nodes :: list()} | {error, Reason :: string()}. +-spec list_nodes() -> {ok, {Nodes :: [node()], rabbit_types:node_type()}}. list_nodes() -> - case application:get_env(rabbit, cluster_nodes) of - {_Nodes, _NodeType} = Pair -> Pair; - Nodes when is_list(Nodes) -> {Nodes, disc}; - undefined -> {[], disc} + case application:get_env(rabbit, cluster_nodes, {[], disc}) of + {_Nodes, _NodeType} = Pair -> {ok, Pair}; + Nodes when is_list(Nodes) -> {ok, {Nodes, disc}} end. -spec supports_registration() -> boolean(). diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl index ad277e08b0..031d1290b3 100644 --- a/src/rabbit_peer_discovery_dns.erl +++ b/src/rabbit_peer_discovery_dns.erl @@ -28,12 +28,13 @@ %% API %% --spec list_nodes() -> {ok, Nodes :: list()} | {error, Reason :: string()}. +-spec list_nodes() -> + {ok, {Nodes :: [node()], rabbit_types:node_type()}}. list_nodes() -> case application:get_env(rabbit, cluster_formation) of undefined -> - {[], disc}; + {ok, {[], disc}}; {ok, ClusterFormation} -> case proplists:get_value(peer_discovery_dns, ClusterFormation) of undefined -> @@ -41,11 +42,11 @@ list_nodes() -> "but final config does not contain rabbit.cluster_formation.peer_discovery_dns. " "Cannot discover any nodes because seed hostname is not configured!", [?MODULE]), - {[], disc}; + {ok, {[], disc}}; Proplist -> Hostname = rabbit_data_coercion:to_list(proplists:get_value(hostname, Proplist)), - {discover_nodes(Hostname, net_kernel:longnames()), rabbit_peer_discovery:node_type()} + {ok, {discover_nodes(Hostname, net_kernel:longnames()), rabbit_peer_discovery:node_type()}} end end. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b54de3a2e6..725d8086d9 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -43,7 +43,6 @@ -include_lib("stdlib/include/qlc.hrl"). -include("amqqueue.hrl"). --type ra_server_id() :: {Name :: atom(), Node :: node()}. -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}. @@ -68,8 +67,8 @@ %%---------------------------------------------------------------------------- --spec init_state(ra_server_id(), rabbit_types:r('queue')) -> - rabbit_fifo_client:state(). +-spec init_state(amqqueue:ra_server_id(), rabbit_amqqueue:name()) -> + rabbit_fifo_client:state(). init_state({Name, _}, QName = #resource{}) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not @@ -84,7 +83,7 @@ init_state({Name, _}, QName = #resource{}) -> fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). --spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> +-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) -> {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. @@ -92,8 +91,7 @@ handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). -spec declare(amqqueue:amqqueue()) -> - {'new', amqqueue:amqqueue()} | - {existing, amqqueue:amqqueue()}. + {new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit(). declare(Q) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), @@ -264,9 +262,9 @@ recover(Queues) -> ok -> % queue was restarted, good ok; - {error, Err} - when Err == not_started orelse - Err == name_not_registered -> + {error, Err1} + when Err1 == not_started orelse + Err1 == name_not_registered -> % queue was never started on this node % so needs to be started from scratch. Machine = ra_machine(Q0), @@ -400,7 +398,8 @@ credit(CTag, Credit, Drain, QState) -> -spec basic_get(amqqueue:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> {'ok', 'empty', rabbit_fifo_client:state()} | - {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. + {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} | + {error, timeout | term()}. basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), @@ -420,6 +419,8 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> IsDelivered = Count > 0, Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState}; + {error, _} = Err -> + Err; {timeout, _} -> {error, timeout} end. @@ -482,7 +483,7 @@ basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> maybe_send_reply(ChPid, OkMsg), rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0). --spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. +-spec stateless_deliver(amqqueue:ra_server_id(), rabbit_types:delivery()) -> 'ok'. stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], @@ -581,7 +582,7 @@ cluster_state(Name) -> end end. --spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}. +-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}. status(Vhost, QueueName) -> %% Handle not found queues @@ -885,6 +886,8 @@ format(Q) when ?is_amqqueue(Q) -> is_process_alive(Name, Node) -> erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). +-spec quorum_messages(atom()) -> non_neg_integer(). + quorum_messages(QName) -> case ets:lookup(queue_coarse_metrics, QName) of [{_, _, _, M, _}] -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 4e8ae128de..28bd9fc23a 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -117,7 +117,7 @@ upgrade_recovery_terms() -> [begin File = filename:join([QueuesDir, Dir, "clean.dot"]), case rabbit_file:read_term_file(File) of - {ok, Terms} -> ok = store(?MODULE, Dir, Terms); + {ok, Terms} -> ok = store_global_table(Dir, Terms); {error, _} -> ok end, file:delete(File) @@ -134,7 +134,7 @@ dets_upgrade(Fun)-> open_global_table(), try ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> - store(?MODULE, DirBaseName, Fun(Terms)), + store_global_table(DirBaseName, Fun(Terms)), Acc end, ok, ?MODULE), ok @@ -160,8 +160,14 @@ close_global_table() -> ok end. +store_global_table(DirBaseName, Terms) -> + dets:insert(?MODULE, {DirBaseName, Terms}). + read_global(DirBaseName) -> - read(?MODULE, DirBaseName). + case dets:lookup(?MODULE, DirBaseName) of + [{_, Terms}] -> {ok, Terms}; + _ -> {error, not_found} + end. delete_global_table() -> file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")). diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index db16152d5b..9bf1d2c3f6 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -117,8 +117,6 @@ wait(TableNames, Timeout, Retries) -> throw(Error); {_, {error, Error}} -> rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]), - wait(TableNames, Timeout, Retries - 1); - _ -> wait(TableNames, Timeout, Retries - 1) end. @@ -131,7 +129,7 @@ retry_timeout(_Retry = true) -> end, {retry_timeout(), Retries}. --spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}. +-spec retry_timeout() -> non_neg_integer() | infinity. retry_timeout() -> case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 214a1e390b..f452d5c92f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -174,25 +174,34 @@ upgrade_mode(AllNodes) -> end; [Another|_] -> MyVersion = rabbit_version:desired_for_scope(mnesia), - ErrFun = fun (ClusterVersion) -> - %% The other node(s) are running an - %% unexpected version. - die("Cluster upgrade needed but other nodes are " - "running ~p~nand I want ~p", - [ClusterVersion, MyVersion]) - end, case rpc:call(Another, rabbit_version, desired_for_scope, [mnesia]) of - {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); - {badrpc, Reason} -> ErrFun({unknown, Reason}); - CV -> case rabbit_version:matches( - MyVersion, CV) of - true -> secondary; - false -> ErrFun(CV) - end + {badrpc, {'EXIT', {undef, _}}} -> + die_because_cluster_upgrade_needed(unknown_old_version, + MyVersion); + {badrpc, Reason} -> + die_because_cluster_upgrade_needed({unknown, Reason}, + MyVersion); + CV -> case rabbit_version:matches( + MyVersion, CV) of + true -> secondary; + false -> die_because_cluster_upgrade_needed( + CV, MyVersion) + end end end. +-spec die_because_cluster_upgrade_needed(any(), any()) -> no_return(). + +die_because_cluster_upgrade_needed(ClusterVersion, MyVersion) -> + %% The other node(s) are running an + %% unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]). + +-spec die(string(), list()) -> no_return(). + die(Msg, Args) -> %% We don't throw or exit here since that gets thrown %% straight out into do_boot, generating an erl_crash.dump diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index a63d60058c..4a79ef9938 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -214,8 +214,9 @@ check_version_consistency(This, Remote, Name) -> check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end). -spec check_version_consistency - (string(), string(), string(), string()) -> - rabbit_types:ok_or_error(any()). + (string(), string(), string(), + fun((string(), string()) -> boolean())) -> + rabbit_types:ok_or_error(any()). check_version_consistency(This, Remote, Name, Comp) -> case Comp(This, Remote) of diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index c5b28e7e1c..9180f9ca0a 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -108,10 +108,6 @@ do_add(VHostPath, ActingUser) -> rabbit_event:notify(vhost_created, info(VHostPath) ++ [{user_who_performed_action, ActingUser}]), R; - {error, {no_such_vhost, VHostPath}} -> - Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!", - [VHostPath]), - {error, Msg}; {error, Reason} -> Msg = rabbit_misc:format("failed to set up vhost '~s': ~p", [VHostPath, Reason]), @@ -215,10 +211,7 @@ assert_benign({error, {absent, Q, _}}, ActingUser) -> %% Removing the mnesia entries here is safe. If/when the down node %% restarts, it will clear out the on-disk storage of the queue. QName = amqqueue:get_name(Q), - case rabbit_amqqueue:internal_delete(QName, ActingUser) of - ok -> ok; - {error, not_found} -> ok - end. + rabbit_amqqueue:internal_delete(QName, ActingUser). internal_delete(VHostPath, ActingUser) -> [ok = rabbit_auth_backend_internal:clear_permissions( @@ -275,7 +268,7 @@ assert(VHostPath) -> case exists(VHostPath) of false -> throw({error, {no_such_vhost, VHostPath}}) end. --spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. +-spec update(rabbit_types:vhost(), fun((#vhost{}) -> #vhost{})) -> #vhost{}. update(VHostPath, Fun) -> case mnesia:read({rabbit_vhost, VHostPath}) of |
