diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-21 02:43:09 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-21 02:43:09 +0300 |
| commit | 02f84e6a15ed544b777f87768466099721274923 (patch) | |
| tree | 5f1ee9b2de715e03f98a76e5ac72616a849b7183 /src | |
| parent | 9e4095fd906da893ca08b02836ce0716bbfac39f (diff) | |
| parent | d3eb661efd89e87bc7859512bcdbe2f95fc5667d (diff) | |
| download | rabbitmq-server-git-02f84e6a15ed544b777f87768466099721274923.tar.gz | |
Merge pull request #1883 from rabbitmq/unavailable-qq-publish-fix
Fix channel liveness issue when publishing to an unavailable quorum queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo_client.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 12 |
2 files changed, 52 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 04918c3eb9..a1bf82367b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -46,6 +46,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). +-define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | @@ -73,15 +74,17 @@ {maybe(term()), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - block_handler = fun() -> ok end :: fun(() -> ok), + block_handler = fun() -> ok end :: fun(() -> term()), unblock_handler = fun() -> ok end :: fun(() -> ok), + timer_state :: term(), timeout :: non_neg_integer() }). -opaque state() :: #state{}. -export_type([ - state/0 + state/0, + actions/0 ]). @@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of - {slow, _} = Ret when not Slow -> + {slow, State} when not Slow -> BlockFun(), - Ret; + {slow, set_timer(State)}; Any -> Any end. @@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs}, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> % we have exited soft limit state - % send any unsent commands - State2 = State1#state{slow = false, - unsent_commands = #{}}, + % send any unsent commands and cancel the time as + % TODO: really the timer should only be cancelled when the channel + % exits flow state (which depends on the state of all queues the + % channel is interacting with) + % but the fact the queue has just applied suggests + % it's ok to cancel here anyway + State2 = cancel_timer(State1#state{slow = false, + unsent_commands = #{}}), % build up a list of commands to issue Commands = maps:fold( fun (Cid, {Settled, Returns, Discards}, Acc) -> @@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; +handle_ra_event(_, timeout, #state{servers = Servers} = State0) -> + case find_leader(Servers) of + undefined -> + %% still no leader, set the timer again + {internal, [], [], set_timer(State0)}; + Leader -> + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State} + end; handle_ra_event(_Leader, {machine, eol}, _State0) -> eol. @@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; add_command(Cid, discard, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. + +set_timer(#state{servers = [Server | _]} = State) -> + Ref = erlang:send_after(?TIMER_TIME, self(), + {ra_event, Server, timeout}), + State#state{timer_state = Ref}. + +cancel_timer(#state{timer_state = undefined} = State) -> + State; +cancel_timer(#state{timer_state = Ref} = State) -> + erlang:cancel_timer(Ref, [{async, true}, {info, false}]), + State#state{timer_state = undefined}. + +find_leader([]) -> + undefined; +find_leader([Server | Servers]) -> + case ra:members(Server, 500) of + {ok, _, Leader} -> Leader; + _ -> + find_leader(Servers) + end. + diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 0ef6d88086..9e73981541 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -83,13 +83,15 @@ init_state({Name, _}, QName = #resource{}) -> Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], rabbit_fifo_client:init(QName, Servers, SoftLimit, - fun() -> credit_flow:block(Name), ok end, + fun() -> credit_flow:block(Name) end, fun() -> credit_flow:unblock(Name), ok end). --spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) -> - {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} | - {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol. - +-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, + rabbit_fifo_client:state()) -> + {internal, Correlators :: [term()], rabbit_fifo_client:actions(), + rabbit_fifo_client:state()} | + {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | + eol. handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). |
