diff options
| -rw-r--r-- | src/rabbit_channel.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 2 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 68 |
5 files changed, 106 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bc273bf100..56b43c1415 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -670,11 +670,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, #'basic.credit_drained'{consumer_tag = CTag, credit_drained = Credit}) end, Actions), - noreply_coalesce(confirm(MsgSeqNos, From, State)); + noreply_coalesce(confirm(MsgSeqNos, Name, State)); eol -> State1 = handle_consuming_queue_down_or_eol(From, State0), State2 = handle_delivering_queue_down(From, State1), - {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed), + %% TODO: this should use dtree:take/3 + {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), case maps:find(From, QNames) of {ok, QName} -> erase_queue_stats(QName); @@ -1968,7 +1969,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Qs = rabbit_amqqueue:lookup(DelQNames), {DeliveredQPids, DeliveredQQPids, QueueStates} = rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0), - AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids, + AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids], %% The maybe_monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1991,40 +1992,40 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. - State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo, + State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo, Message, State1), - State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo, - XName, State2), + State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + XName, State2), case rabbit_event:stats_level(State3, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || - QPid <- AllDeliveredQPids, + QPid <- AllDeliveredQRefs, {ok, QName} <- [maps:find(QPid, QNames1)]]; _ -> ok end, State3#ch{queue_states = QueueStates}. -process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> +process_routing_mandatory(false, _, _, _, State) -> State; -process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> +process_routing_mandatory(true, [], _, Msg, State) -> ok = basic_return(Msg, State, no_route), State; -process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, +process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg, State#ch.mandatory)}. -process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> +process_routing_confirm(false, _, _, _, State) -> State; -process_routing_confirm(true, [], MsgSeqNo, XName, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> - State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, +process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) -> + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), +confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. record_confirms(MXs, State#ch{unconfirmed = UC1}). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c12a6ec464..ca9003dd8b 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -106,7 +106,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096). +-define(SHADOW_COPY_INTERVAL, 4096 * 4). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c063ef9a17..8742d74b3b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -432,10 +432,20 @@ update_machine_state(Node, Conf) -> {rabbit_fifo:client_msg(), state()} | eol. handle_ra_event(From, {applied, Seqs}, #state{soft_limit = SftLmt, + leader = CurLeader, + last_applied = _Last, unblock_handler = UnblockFun} = State0) -> {Corrs, Actions, State1} = lists:foldl(fun seq_applied/2, {[], [], State0#state{leader = From}}, Seqs), + case From of + CurLeader -> ok; + _ -> + ?INFO("rabbit_fifo_client: leader change from ~w to ~w~n" + "applying ~w last ~w~n" + "STate1 ~p~n", + [CurLeader, From, Seqs, _Last, State1]) + end, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> % we have exited soft limit state @@ -447,7 +457,8 @@ handle_ra_event(From, {applied, Seqs}, fun (Cid, {Settled, Returns, Discards}, Acc) -> add_command(Cid, settle, Settled, add_command(Cid, return, Returns, - add_command(Cid, discard, Discards, Acc))) + add_command(Cid, discard, + Discards, Acc))) end, [], State1#state.unsent_commands), Node = pick_node(State2), %% send all the settlements and returns @@ -468,7 +479,9 @@ handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {internal, [], [], State0}; -handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> +handle_ra_event(From, {rejected, {not_leader, Leader, Seq}}, State0) -> + ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", + [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -511,6 +524,7 @@ try_process_command([Server | Rem], Cmd, State) -> seq_applied({Seq, MaybeAction}, {Corrs, Actions0, #state{last_applied = Last} = State0}) when Seq > Last orelse Last =:= undefined -> + % ?INFO("rabbit_fifo_client: applying seq ~b last ~w", [Seq, Last]), State1 = case Last of undefined -> State0; _ -> @@ -525,10 +539,12 @@ seq_applied({Seq, MaybeAction}, {[Corr | Corrs], Actions, State#state{pending = Pending, last_applied = Seq}}; error -> + ?INFO("rabbit_fifo_client: pending not found ~w", [Seq]), % must have already been resent or removed for some other reason {Corrs, Actions, State} end; seq_applied(_Seq, Acc) -> + ?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]), Acc. maybe_add_action(ok, Acc, State) -> @@ -619,6 +635,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Missing. pick_node(#state{leader = undefined, servers = [N | _]}) -> + %% TODO: pick random rather that first? N; pick_node(#state{leader = Leader}) -> Leader. diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index 487308c25d..d5c225a9da 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -57,7 +57,7 @@ init([VHost]) -> rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), Interval = interval(), timer:send_interval(Interval, check_vhost), - true = erlang:garbage_collect(), + % true = erlang:garbage_collect(), {ok, VHost} catch _:Reason -> rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 88c79acf79..380b8ab59e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,6 +48,7 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, leadership_takeover, @@ -575,6 +576,19 @@ publish(Config) -> wait_for_messages_ready(Servers, Name, 1), wait_for_messages_pending_ack(Servers, Name, 0). +publish_confirm(Ch, QName) -> + publish(Ch, QName), + amqp_channel:register_confirm_handler(Ch, self()), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + exit(confirm_timeout) + end, + ct:pal("CONFIRMED! ~s", [QName]), + ok. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). @@ -1549,6 +1563,58 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + Pid = spawn_link(fun () -> + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ConfirmLoop = fun Loop() -> + publish_confirm(Ch, QQ), + receive {done, P} -> + P ! done, + ok + after 0 -> Loop() end + end, + ConfirmLoop() + end), + + timer:sleep(500), + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + timer:sleep(500), + Pid ! {done, self()}, + receive + done -> ok; + {'EXIT', Pid, Err} -> + exit(Err) + after 5500 -> + flush(100), + exit(bah) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. + + add_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1966,7 +2032,7 @@ filter_queues(Expected, Got) -> end, Got). publish(Ch, Queue) -> - ok = amqp_channel:call(Ch, + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = <<"msg">>}). |
