diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-14 10:49:25 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-14 11:40:05 +0000 |
| commit | 017545d13ace947c02fb64c239ab28bb3f10b8d2 (patch) | |
| tree | ea50a4dc584a3039c0c0f6f609626637affe66c0 /src | |
| parent | 375f743238ba75376f038544cb6199bbd637bf5b (diff) | |
| download | rabbitmq-server-git-017545d13ace947c02fb64c239ab28bb3f10b8d2.tar.gz | |
Fix channel liveness issue
when publishing to an unavailable quorum queue
If a channel publishes to a quorum queue at a time when the queue is
unavailable and no commands make it to the queue, the channel will
eventually go into flow and never exit flow as reads from socket will
never take place and the queue will never communicate with the channel.
To avoid this dead-lock the channel sets a longish timer when a qourum queue
reaches it's internal command buffer limit and cancels it when falling
below again. When the timer triggers the quorum queue client resends all it's
pending commands to ensure liveness.
[#163975460]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo_client.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 12 |
2 files changed, 52 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 04918c3eb9..a1bf82367b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -46,6 +46,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). +-define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | @@ -73,15 +74,17 @@ {maybe(term()), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - block_handler = fun() -> ok end :: fun(() -> ok), + block_handler = fun() -> ok end :: fun(() -> term()), unblock_handler = fun() -> ok end :: fun(() -> ok), + timer_state :: term(), timeout :: non_neg_integer() }). -opaque state() :: #state{}. -export_type([ - state/0 + state/0, + actions/0 ]). @@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of - {slow, _} = Ret when not Slow -> + {slow, State} when not Slow -> BlockFun(), - Ret; + {slow, set_timer(State)}; Any -> Any end. @@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs}, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> % we have exited soft limit state - % send any unsent commands - State2 = State1#state{slow = false, - unsent_commands = #{}}, + % send any unsent commands and cancel the time as + % TODO: really the timer should only be cancelled when the channel + % exits flow state (which depends on the state of all queues the + % channel is interacting with) + % but the fact the queue has just applied suggests + % it's ok to cancel here anyway + State2 = cancel_timer(State1#state{slow = false, + unsent_commands = #{}}), % build up a list of commands to issue Commands = maps:fold( fun (Cid, {Settled, Returns, Discards}, Acc) -> @@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; +handle_ra_event(_, timeout, #state{servers = Servers} = State0) -> + case find_leader(Servers) of + undefined -> + %% still no leader, set the timer again + {internal, [], [], set_timer(State0)}; + Leader -> + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State} + end; handle_ra_event(_Leader, {machine, eol}, _State0) -> eol. @@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; add_command(Cid, discard, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. + +set_timer(#state{servers = [Server | _]} = State) -> + Ref = erlang:send_after(?TIMER_TIME, self(), + {ra_event, Server, timeout}), + State#state{timer_state = Ref}. + +cancel_timer(#state{timer_state = undefined} = State) -> + State; +cancel_timer(#state{timer_state = Ref} = State) -> + erlang:cancel_timer(Ref, [{async, true}, {info, false}]), + State#state{timer_state = undefined}. + +find_leader([]) -> + undefined; +find_leader([Server | Servers]) -> + case ra:members(Server, 500) of + {ok, _, Leader} -> Leader; + _ -> + find_leader(Servers) + end. + diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 79c07535a4..e1c1c320f4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -81,13 +81,15 @@ init_state({Name, _}, QName = #resource{}) -> Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], rabbit_fifo_client:init(QName, Servers, SoftLimit, - fun() -> credit_flow:block(Name), ok end, + fun() -> credit_flow:block(Name) end, fun() -> credit_flow:unblock(Name), ok end). --spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) -> - {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} | - {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol. - +-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, + rabbit_fifo_client:state()) -> + {internal, Correlators :: [term()], rabbit_fifo_client:actions(), + rabbit_fifo_client:state()} | + {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | + eol. handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). |
