summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl35
-rw-r--r--src/rabbit_fifo.erl2
-rw-r--r--src/rabbit_fifo_client.erl21
-rw-r--r--src/rabbit_vhost_process.erl2
-rw-r--r--test/quorum_queue_SUITE.erl68
5 files changed, 106 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bc273bf100..56b43c1415 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -670,11 +670,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit})
end, Actions),
- noreply_coalesce(confirm(MsgSeqNos, From, State));
+ noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
State1 = handle_consuming_queue_down_or_eol(From, State0),
State2 = handle_delivering_queue_down(From, State1),
- {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed),
+ %% TODO: this should use dtree:take/3
+ {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
case maps:find(From, QNames) of
{ok, QName} -> erase_queue_stats(QName);
@@ -1968,7 +1969,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Qs = rabbit_amqqueue:lookup(DelQNames),
{DeliveredQPids, DeliveredQQPids, QueueStates} =
rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0),
- AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids,
+ AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids],
%% The maybe_monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1991,40 +1992,40 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo,
+ State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
Message, State1),
- State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo,
- XName, State2),
+ State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State2),
case rabbit_event:stats_level(State3, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
- QPid <- AllDeliveredQPids,
+ QPid <- AllDeliveredQRefs,
{ok, QName} <- [maps:find(QPid, QNames1)]];
_ ->
ok
end,
State3#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+process_routing_mandatory(false, _, _, _, State) ->
State;
-process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+process_routing_mandatory(true, [], _, Msg, State) ->
ok = basic_return(Msg, State, no_route),
State;
-process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
State#ch.mandatory)}.
-process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
+process_routing_confirm(false, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
State#ch.unconfirmed)}.
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index c12a6ec464..ca9003dd8b 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -106,7 +106,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096).
+-define(SHADOW_COPY_INTERVAL, 4096 * 4).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c063ef9a17..8742d74b3b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -432,10 +432,20 @@ update_machine_state(Node, Conf) ->
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
#state{soft_limit = SftLmt,
+ leader = CurLeader,
+ last_applied = _Last,
unblock_handler = UnblockFun} = State0) ->
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
{[], [], State0#state{leader = From}},
Seqs),
+ case From of
+ CurLeader -> ok;
+ _ ->
+ ?INFO("rabbit_fifo_client: leader change from ~w to ~w~n"
+ "applying ~w last ~w~n"
+ "STate1 ~p~n",
+ [CurLeader, From, Seqs, _Last, State1])
+ end,
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
@@ -447,7 +457,8 @@ handle_ra_event(From, {applied, Seqs},
fun (Cid, {Settled, Returns, Discards}, Acc) ->
add_command(Cid, settle, Settled,
add_command(Cid, return, Returns,
- add_command(Cid, discard, Discards, Acc)))
+ add_command(Cid, discard,
+ Discards, Acc)))
end, [], State1#state.unsent_commands),
Node = pick_node(State2),
%% send all the settlements and returns
@@ -468,7 +479,9 @@ handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
-handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+handle_ra_event(From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
+ [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -511,6 +524,7 @@ try_process_command([Server | Rem], Cmd, State) ->
seq_applied({Seq, MaybeAction},
{Corrs, Actions0, #state{last_applied = Last} = State0})
when Seq > Last orelse Last =:= undefined ->
+ % ?INFO("rabbit_fifo_client: applying seq ~b last ~w", [Seq, Last]),
State1 = case Last of
undefined -> State0;
_ ->
@@ -525,10 +539,12 @@ seq_applied({Seq, MaybeAction},
{[Corr | Corrs], Actions, State#state{pending = Pending,
last_applied = Seq}};
error ->
+ ?INFO("rabbit_fifo_client: pending not found ~w", [Seq]),
% must have already been resent or removed for some other reason
{Corrs, Actions, State}
end;
seq_applied(_Seq, Acc) ->
+ ?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]),
Acc.
maybe_add_action(ok, Acc, State) ->
@@ -619,6 +635,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Missing.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
+ %% TODO: pick random rather that first?
N;
pick_node(#state{leader = Leader}) ->
Leader.
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index 487308c25d..d5c225a9da 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -57,7 +57,7 @@ init([VHost]) ->
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
Interval = interval(),
timer:send_interval(Interval, check_vhost),
- true = erlang:garbage_collect(),
+ % true = erlang:garbage_collect(),
{ok, VHost}
catch _:Reason ->
rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 88c79acf79..380b8ab59e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -48,6 +48,7 @@ groups() ->
++ all_tests()},
{cluster_size_3, [], [
declare_during_node_down,
+ confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
leadership_takeover,
@@ -575,6 +576,19 @@ publish(Config) ->
wait_for_messages_ready(Servers, Name, 1),
wait_for_messages_pending_ack(Servers, Name, 0).
+publish_confirm(Ch, QName) ->
+ publish(Ch, QName),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end,
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok.
+
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
@@ -1549,6 +1563,58 @@ declare_during_node_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
ok.
+confirm_availability_on_leader_change(Config) ->
+ [Node1, Node2, _Node3] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% declare a queue on node2 - this _should_ host the leader on node 2
+ DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ erlang:process_flag(trap_exit, true),
+ Pid = spawn_link(fun () ->
+ %% open a channel to another node
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ConfirmLoop = fun Loop() ->
+ publish_confirm(Ch, QQ),
+ receive {done, P} ->
+ P ! done,
+ ok
+ after 0 -> Loop() end
+ end,
+ ConfirmLoop()
+ end),
+
+ timer:sleep(500),
+ %% stop the node hosting the leader
+ stop_node(Config, Node2),
+ %% this should not fail as the channel should detect the new leader and
+ %% resend to that
+ timer:sleep(500),
+ Pid ! {done, self()},
+ receive
+ done -> ok;
+ {'EXIT', Pid, Err} ->
+ exit(Err)
+ after 5500 ->
+ flush(100),
+ exit(bah)
+ end,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ ok.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
+
+
add_member_not_running(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1966,7 +2032,7 @@ filter_queues(Expected, Got) ->
end, Got).
publish(Ch, Queue) ->
- ok = amqp_channel:call(Ch,
+ ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = <<"msg">>}).