summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_fifo_client.erl49
-rw-r--r--test/dead_lettering_SUITE.erl2
-rw-r--r--test/dynamic_qq_SUITE.erl14
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl10
5 files changed, 43 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74d400950e..1755b4b8e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason,
record_rejects(RejectMXs, State1)
end
end;
-handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
- error(quorum_queues_should_never_be_monitored).
+handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) ->
+ %% this should never happen after the queue type refactoring in 3.9
+ State.
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 83207d7bf9..9a6cd32a7b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -144,29 +144,32 @@ enqueue(Correlation, Msg,
cfg = #cfg{}} = State0) ->
%% it is the first enqueue, check the version
{_, Node} = Server = pick_server(State0),
- State =
- case rpc:call(Node, rabbit_fifo, version, []) of
- 0 ->
- %% the leader is running the old version
- %% so we can't initialize the enqueuer session safely
- State0#state{queue_status = go};
- 1 ->
- %% were running the new version on the leader do sync initialisation
- %% of enqueuer session
- Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg) of
- {ok, reject_publish, _} ->
- State0#state{queue_status = reject_publish};
- {ok, ok, _} ->
- State0#state{queue_status = go};
- Err ->
- exit(Err)
- end;
- {badrpc, nodedown} ->
- rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
- State0#state{queue_status = go}
- end,
- enqueue(Correlation, Msg, State);
+ case rpc:call(Node, rabbit_fifo, version, []) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ %% fall back on old behavour
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg) of
+ {ok, reject_publish, _} ->
+ {reject_publish, State0#state{queue_status = reject_publish}};
+ {ok, ok, _} ->
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ {timeout, _} ->
+ %% if we timeout it is probably better to reject
+ %% the message than being uncertain
+ {reject_publish, State0};
+ Err ->
+ exit(Err)
+ end;
+ {badrpc, nodedown} ->
+ rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
+ State0#state{queue_status = go}
+ end;
enqueue(_Correlation, _Msg,
#state{queue_status = reject_publish,
cfg = #cfg{}} = State) ->
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index d1196e79fc..87b5566c57 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index 0376b2b838..c952ee822b 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -24,15 +24,13 @@ all() ->
groups() ->
[
{clustered, [], [
- {cluster_size_2, [], [
+ {cluster_size_3, [], [
+ recover_follower_after_standalone_restart,
vhost_deletion,
force_delete_if_no_consensus,
takeover_on_failure,
takeover_on_shutdown,
quorum_unaffected_after_vhost_failure
- ]},
- {cluster_size_3, [], [
- recover_follower_after_standalone_restart
]}
]}
].
@@ -108,7 +106,7 @@ vhost_deletion(Config) ->
ok.
force_delete_if_no_consensus(Config) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
Args = ?config(queue_args, Config),
@@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
?assertMatch(
@@ -140,7 +139,7 @@ takeover_on_shutdown(Config) ->
takeover_on(Config, stop_node).
takeover_on(Config, Fun) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
@@ -152,6 +151,7 @@ takeover_on(Config, Fun) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:Fun(Config, C),
ok = rabbit_ct_broker_helpers:Fun(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
@@ -170,7 +170,7 @@ takeover_on(Config, Fun) ->
ok.
quorum_unaffected_after_vhost_failure(Config) ->
- [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Servers = lists:sort(Servers0),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index 410dabb08a..c31527f0ba 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -302,21 +302,23 @@ confirm_nack1(Config) ->
%% The closest to a nack behaviour that we can get on quorum queues is not answering while
%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued.
confirm_minority(Config) ->
- [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
receive
- #'basic.nack'{} -> throw(unexpected_nack);
+ #'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(unexpected_ack)
- after 30000 ->
+ after 120000 ->
ok
end,
ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ publish(Ch, QName, [<<"msg2">>]),
receive
#'basic.nack'{} -> throw(unexpected_nack);
#'basic.ack'{} -> ok