diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-21 02:43:09 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-21 02:43:09 +0300 |
| commit | 02f84e6a15ed544b777f87768466099721274923 (patch) | |
| tree | 5f1ee9b2de715e03f98a76e5ac72616a849b7183 | |
| parent | 9e4095fd906da893ca08b02836ce0716bbfac39f (diff) | |
| parent | d3eb661efd89e87bc7859512bcdbe2f95fc5667d (diff) | |
| download | rabbitmq-server-git-02f84e6a15ed544b777f87768466099721274923.tar.gz | |
Merge pull request #1883 from rabbitmq/unavailable-qq-publish-fix
Fix channel liveness issue when publishing to an unavailable quorum queue
| -rw-r--r-- | src/rabbit_fifo_client.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 12 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 28 |
3 files changed, 80 insertions, 12 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 04918c3eb9..a1bf82367b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -46,6 +46,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). +-define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | @@ -73,15 +74,17 @@ {maybe(term()), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - block_handler = fun() -> ok end :: fun(() -> ok), + block_handler = fun() -> ok end :: fun(() -> term()), unblock_handler = fun() -> ok end :: fun(() -> ok), + timer_state :: term(), timeout :: non_neg_integer() }). -opaque state() :: #state{}. -export_type([ - state/0 + state/0, + actions/0 ]). @@ -140,9 +143,9 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of - {slow, _} = Ret when not Slow -> + {slow, State} when not Slow -> BlockFun(), - Ret; + {slow, set_timer(State)}; Any -> Any end. @@ -478,9 +481,14 @@ handle_ra_event(From, {applied, Seqs}, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> % we have exited soft limit state - % send any unsent commands - State2 = State1#state{slow = false, - unsent_commands = #{}}, + % send any unsent commands and cancel the time as + % TODO: really the timer should only be cancelled when the channel + % exits flow state (which depends on the state of all queues the + % channel is interacting with) + % but the fact the queue has just applied suggests + % it's ok to cancel here anyway + State2 = cancel_timer(State1#state{slow = false, + unsent_commands = #{}}), % build up a list of commands to issue Commands = maps:fold( fun (Cid, {Settled, Returns, Discards}, Acc) -> @@ -523,6 +531,15 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; +handle_ra_event(_, timeout, #state{servers = Servers} = State0) -> + case find_leader(Servers) of + undefined -> + %% still no leader, set the timer again + {internal, [], [], set_timer(State0)}; + Leader -> + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State} + end; handle_ra_event(_Leader, {machine, eol}, _State0) -> eol. @@ -729,3 +746,24 @@ add_command(Cid, return, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; add_command(Cid, discard, MsgIds, Acc) -> [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. + +set_timer(#state{servers = [Server | _]} = State) -> + Ref = erlang:send_after(?TIMER_TIME, self(), + {ra_event, Server, timeout}), + State#state{timer_state = Ref}. + +cancel_timer(#state{timer_state = undefined} = State) -> + State; +cancel_timer(#state{timer_state = Ref} = State) -> + erlang:cancel_timer(Ref, [{async, true}, {info, false}]), + State#state{timer_state = undefined}. + +find_leader([]) -> + undefined; +find_leader([Server | Servers]) -> + case ra:members(Server, 500) of + {ok, _, Leader} -> Leader; + _ -> + find_leader(Servers) + end. + diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 0ef6d88086..9e73981541 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -83,13 +83,15 @@ init_state({Name, _}, QName = #resource{}) -> Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], rabbit_fifo_client:init(QName, Servers, SoftLimit, - fun() -> credit_flow:block(Name), ok end, + fun() -> credit_flow:block(Name) end, fun() -> credit_flow:unblock(Name), ok end). --spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) -> - {internal, Correlators :: [term()], rabbit_fifo_client:actions(), rabbit_fifo_client:state()} | - {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | eol. - +-spec handle_event({'ra_event', amqqueue:ra_server_id(), any()}, + rabbit_fifo_client:state()) -> + {internal, Correlators :: [term()], rabbit_fifo_client:actions(), + rabbit_fifo_client:state()} | + {rabbit_fifo:client_msg(), rabbit_fifo_client:state()} | + eol. handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index e5d631dfcf..406c02de83 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -61,6 +61,7 @@ groups() -> {cluster_size_3, [], [ declare_during_node_down, simple_confirm_availability_on_leader_change, + publishing_to_unavailable_queue, confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, @@ -913,6 +914,33 @@ recover_from_multiple_failures(Config) -> wait_for_messages_ready(Servers, RaName, 6), wait_for_messages_pending_ack(Servers, RaName, 0). +publishing_to_unavailable_queue(Config) -> + [Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + TCh = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish_many(Ch, QQ, 300), + timer:sleep(1000), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + %% check we get at least on ack + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 30000 -> + exit(confirm_timeout) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + ok. + leadership_takeover(Config) -> %% Kill nodes in succession forcing the takeover of leadership, and all messages that %% are in the queue. |
