summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-14 10:49:25 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-14 11:40:05 +0000
commit017545d13ace947c02fb64c239ab28bb3f10b8d2 (patch)
treeea50a4dc584a3039c0c0f6f609626637affe66c0
parent375f743238ba75376f038544cb6199bbd637bf5b (diff)
downloadrabbitmq-server-git-017545d13ace947c02fb64c239ab28bb3f10b8d2.tar.gz
Fix channel liveness issue
when publishing to an unavailable quorum queue If a channel publishes to a quorum queue at a time when the queue is unavailable and no commands make it to the queue, the channel will eventually go into flow and never exit flow as reads from socket will never take place and the queue will never communicate with the channel. To avoid this dead-lock the channel sets a longish timer when a qourum queue reaches it's internal command buffer limit and cancels it when falling below again. When the timer triggers the quorum queue client resends all it's pending commands to ensure liveness. [#163975460]
-rw-r--r--src/rabbit_fifo_client.erl52
-rw-r--r--src/rabbit_quorum_queue.erl12
-rw-r--r--test/quorum_queue_SUITE.erl28
3 files changed, 80 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 04918c3eb9..a1bf82367b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -46,6 +46,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
+-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
@@ -73,15 +74,17 @@
{maybe(term()), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> ok),
+ block_handler = fun() -> ok end :: fun(() -> term()),
unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timer_state :: term(),
timeout :: non_neg_integer()
}).
-opaque state() :: #state{}.
-export_type([
- state/0
+ state/0,
+ actions/0
]).
@@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
case send_command(Node, Correlation, Cmd, low, State1) of
- {slow, _} = Ret when not Slow ->
+ {slow, State} when not Slow ->
BlockFun(),
- Ret;
+ {slow, set_timer(State)};
Any ->
Any
end.
@@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs},
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
- % send any unsent commands
- State2 = State1#state{slow = false,
- unsent_commands = #{}},
+ % send any unsent commands and cancel the time as
+ % TODO: really the timer should only be cancelled when the channel
+ % exits flow state (which depends on the state of all queues the
+ % channel is interacting with)
+ % but the fact the queue has just applied suggests
+ % it's ok to cancel here anyway
+ State2 = cancel_timer(State1#state{slow = false,
+ unsent_commands = #{}}),
% build up a list of commands to issue
Commands = maps:fold(
fun (Cid, {Settled, Returns, Discards}, Acc) ->
@@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
+handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+ case find_leader(Servers) of
+ undefined ->
+ %% still no leader, set the timer again
+ {internal, [], [], set_timer(State0)};
+ Leader ->
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State}
+ end;
handle_ra_event(_Leader, {machine, eol}, _State0) ->
eol.
@@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
+
+set_timer(#state{servers = [Server | _]} = State) ->
+ Ref = erlang:send_after(?TIMER_TIME, self(),
+ {ra_event, Server, timeout}),
+ State#state{timer_state = Ref}.
+
+cancel_timer(#state{timer_state = undefined} = State) ->
+ State;
+cancel_timer(#state{timer_state = Ref} = State) ->
+ erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
+ State#state{timer_state = undefined}.
+
+find_leader([]) ->
+ undefined;
+find_leader([Server | Servers]) ->
+ case ra:members(Server, 500) of
+ {ok, _, Leader} -> Leader;
+ _ ->
+ find_leader(Servers)
+ end.
+
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 79c07535a4..e1c1c320f4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -81,13 +81,15 @@ init_state({Name, _}, QName = #resource{}) ->
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
rabbit_fifo_client:init(QName, Servers, SoftLimit,
- fun() -> credit_flow:block(Name), ok end,
+ fun() -> credit_flow:block(Name) end,
fun() -> credit_flow:unblock(Name), ok end).
--spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) ->
- {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} |
- {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol.
-
+-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()},
+ rabbit_fifo_client:state()) ->
+ {internal, Correlators :: [term()], rabbit_fifo_client:actions(),
+ rabbit_fifo_client:state()} |
+ {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} |
+ eol.
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 43a353d0ea..164d37de4b 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -61,6 +61,7 @@ groups() ->
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
+ publishing_to_unavailable_queue,
confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
@@ -884,6 +885,33 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_ready(Servers, RaName, 6),
wait_for_messages_pending_ack(Servers, RaName, 0).
+publishing_to_unavailable_queue(Config) ->
+ [Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ TCh = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish_many(Ch, QQ, 300),
+ timer:sleep(1000),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ %% check we get at least on ack
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 30000 ->
+ exit(confirm_timeout)
+ end,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok.
+
leadership_takeover(Config) ->
%% Kill nodes in succession forcing the takeover of leadership, and all messages that
%% are in the queue.