summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-02-21 02:43:09 +0300
committerGitHub <noreply@github.com>2019-02-21 02:43:09 +0300
commit02f84e6a15ed544b777f87768466099721274923 (patch)
tree5f1ee9b2de715e03f98a76e5ac72616a849b7183
parent9e4095fd906da893ca08b02836ce0716bbfac39f (diff)
parentd3eb661efd89e87bc7859512bcdbe2f95fc5667d (diff)
downloadrabbitmq-server-git-02f84e6a15ed544b777f87768466099721274923.tar.gz
Merge pull request #1883 from rabbitmq/unavailable-qq-publish-fix
Fix channel liveness issue when publishing to an unavailable quorum queue
-rw-r--r--src/rabbit_fifo_client.erl52
-rw-r--r--src/rabbit_quorum_queue.erl12
-rw-r--r--test/quorum_queue_SUITE.erl28
3 files changed, 80 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 04918c3eb9..a1bf82367b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -46,6 +46,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
+-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
@@ -73,15 +74,17 @@
{maybe(term()), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> ok),
+ block_handler = fun() -> ok end :: fun(() -> term()),
unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timer_state :: term(),
timeout :: non_neg_integer()
}).
-opaque state() :: #state{}.
-export_type([
- state/0
+ state/0,
+ actions/0
]).
@@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
case send_command(Node, Correlation, Cmd, low, State1) of
- {slow, _} = Ret when not Slow ->
+ {slow, State} when not Slow ->
BlockFun(),
- Ret;
+ {slow, set_timer(State)};
Any ->
Any
end.
@@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs},
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
- % send any unsent commands
- State2 = State1#state{slow = false,
- unsent_commands = #{}},
+ % send any unsent commands and cancel the time as
+ % TODO: really the timer should only be cancelled when the channel
+ % exits flow state (which depends on the state of all queues the
+ % channel is interacting with)
+ % but the fact the queue has just applied suggests
+ % it's ok to cancel here anyway
+ State2 = cancel_timer(State1#state{slow = false,
+ unsent_commands = #{}}),
% build up a list of commands to issue
Commands = maps:fold(
fun (Cid, {Settled, Returns, Discards}, Acc) ->
@@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
+handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+ case find_leader(Servers) of
+ undefined ->
+ %% still no leader, set the timer again
+ {internal, [], [], set_timer(State0)};
+ Leader ->
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State}
+ end;
handle_ra_event(_Leader, {machine, eol}, _State0) ->
eol.
@@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
+
+set_timer(#state{servers = [Server | _]} = State) ->
+ Ref = erlang:send_after(?TIMER_TIME, self(),
+ {ra_event, Server, timeout}),
+ State#state{timer_state = Ref}.
+
+cancel_timer(#state{timer_state = undefined} = State) ->
+ State;
+cancel_timer(#state{timer_state = Ref} = State) ->
+ erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
+ State#state{timer_state = undefined}.
+
+find_leader([]) ->
+ undefined;
+find_leader([Server | Servers]) ->
+ case ra:members(Server, 500) of
+ {ok, _, Leader} -> Leader;
+ _ ->
+ find_leader(Servers)
+ end.
+
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 0ef6d88086..9e73981541 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -83,13 +83,15 @@ init_state({Name, _}, QName = #resource{}) ->
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
rabbit_fifo_client:init(QName, Servers, SoftLimit,
- fun() -> credit_flow:block(Name), ok end,
+ fun() -> credit_flow:block(Name) end,
fun() -> credit_flow:unblock(Name), ok end).
--spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) ->
- {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} |
- {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol.
-
+-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()},
+ rabbit_fifo_client:state()) ->
+ {internal, Correlators :: [term()], rabbit_fifo_client:actions(),
+ rabbit_fifo_client:state()} |
+ {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} |
+ eol.
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index e5d631dfcf..406c02de83 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -61,6 +61,7 @@ groups() ->
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
+ publishing_to_unavailable_queue,
confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
@@ -913,6 +914,33 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_ready(Servers, RaName, 6),
wait_for_messages_pending_ack(Servers, RaName, 0).
+publishing_to_unavailable_queue(Config) ->
+ [Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ TCh = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish_many(Ch, QQ, 300),
+ timer:sleep(1000),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ %% check we get at least on ack
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 30000 ->
+ exit(confirm_timeout)
+ end,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok.
+
leadership_takeover(Config) ->
%% Kill nodes in succession forcing the takeover of leadership, and all messages that
%% are in the queue.