summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-30 14:07:22 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-04 17:16:08 +0000
commitea0e192dc193add8bf4c1250fbe1c9b15b999d29 (patch)
treef7fbe97129f96ab514a51db1d0abab94e716df04 /src
parenta4c00475eb6cf261ac7775244e518fc63ea8fa94 (diff)
downloadrabbitmq-server-git-ea0e192dc193add8bf4c1250fbe1c9b15b999d29.tar.gz
Better handle changing quorum queue leaders
All maps that track queues inside the channel should use the queue name atom rather than the server id as they key so that leader changes don't impact this tracking.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_channel.erl80
-rw-r--r--src/rabbit_fifo.erl53
-rw-r--r--src/rabbit_fifo_client.erl42
4 files changed, 114 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e19914d5ab..d5bfbbb5e3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1094,7 +1094,8 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
-activate_limit_all(QPids, ChPid) ->
+activate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 56b43c1415..1fafd3a6f8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -110,7 +110,7 @@
%% when queue.bind's queue field is empty,
%% this name will be used instead
most_recently_declared_queue,
- %% a map of queue pid to queue name
+ %% a map of queue ref to queue name
queue_names,
%% queue processes are monitored to update
%% queue names
@@ -210,7 +210,7 @@
end).
-define(IS_CLASSIC(QPid), is_pid(QPid)).
--define(IS_QUORUM(QPid), is_tuple(QPid)).
+-define(IS_QUORUM(QPid), is_tuple(QPid) orelse is_atom(QPid)).
%%----------------------------------------------------------------------------
@@ -672,18 +672,18 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end, Actions),
noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
- State1 = handle_consuming_queue_down_or_eol(From, State0),
- State2 = handle_delivering_queue_down(From, State1),
+ State1 = handle_consuming_queue_down_or_eol(Name, State0),
+ State2 = handle_delivering_queue_down(Name, State1),
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(From, QNames) of
+ case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
- queue_names = maps:remove(From, QNames)})
+ queue_names = maps:remove(Name, QNames)})
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
@@ -1336,13 +1336,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
return_ok(State, NoWait, OkMsg);
{ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
QCons1 =
- case maps:find(QPid, QCons) of
+ case maps:find(QRef, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
- true -> maps:remove(QPid, QCons);
- false -> maps:put(QPid, CTags1, QCons)
+ true -> maps:remove(QRef, QCons);
+ false -> maps:put(QRef, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@@ -1395,7 +1396,7 @@ handle_method(#'basic.qos'{global = true,
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
- consumer_queues(State#ch.consumer_mapping), self());
+ consumer_queue_refs(State#ch.consumer_mapping), self());
false -> ok
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
@@ -1641,25 +1642,26 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _} =
- maps:get(ConsumerTag, ConsumerMapping),
- CTags1 = case maps:find(QPid, QCons) of
+ {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
+ CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
error -> gb_sets:singleton(ConsumerTag)
end,
- QCons1 = maps:put(QPid, CTags1, QCons),
- State#ch{queue_monitors = maybe_monitor(QPid, QMons),
+ QCons1 = maps:put(QRef, CTags1, QCons),
+ State#ch{queue_monitors = maybe_monitor(QRef, QMons),
queue_consumers = QCons1}.
track_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
queue_monitors = QMons,
delivering_queues = DQ}) ->
- State#ch{queue_names = maps:put(QPid, QName, QNames),
- queue_monitors = maybe_monitor(QPid, QMons),
+ QRef = qpid_to_ref(QPid),
+ State#ch{queue_names = maps:put(QRef, QName, QNames),
+ queue_monitors = maybe_monitor(QRef, QMons),
delivering_queues = case NoAck of
true -> DQ;
- false -> sets:add_element(QPid, DQ)
+ false -> sets:add_element(QRef, DQ)
end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
@@ -1678,16 +1680,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
-handle_consuming_queue_down_or_eol(QPid,
- State = #ch{queue_consumers = QCons,
- queue_names = QNames}) ->
- ConsumerTags = case maps:find(QPid, QCons) of
+handle_consuming_queue_down_or_eol(QRef,
+ State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
+ ConsumerTags = case maps:find(QRef, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
- QName = maps:get(QPid, QNames),
+ QName = maps:get(QRef, QNames),
case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
@@ -1699,7 +1701,7 @@ handle_consuming_queue_down_or_eol(QPid,
_ -> cancel_consumer(CTag, QName, StateN)
end
end
- end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags).
+ end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@@ -1726,8 +1728,8 @@ queue_down_consumer_action(CTag, CMap) ->
_ -> {recover, ConsumeSpec}
end.
-handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
- State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
RoutingKey, Arguments, VHostPath, ConnPid,
@@ -1882,7 +1884,7 @@ ack(Acked, State = #ch{queue_names = QNames,
State#ch{queue_states = QueueStates}.
incr_queue_stats(QPid, QNames, MsgIds, State) ->
- case maps:find(QPid, QNames) of
+ case maps:find(qpid_to_ref(QPid), QNames) of
{ok, QName} -> Count = length(MsgIds),
?INCR_STATS(queue_stats, QName, Count, ack, State);
error -> ok
@@ -1906,10 +1908,10 @@ notify_queues(State = #ch{state = closing}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
delivering_queues = DQ }) ->
- QPids0 = sets:to_list(
- sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ QRefs0 = sets:to_list(
+ sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
%% filter to only include pids to avoid trying to notify quorum queues
- QPids = [P || P <- QPids0, ?IS_CLASSIC(P)],
+ QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
@@ -1925,8 +1927,8 @@ foreach_per_queue(F, UAL, Acc) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
-consumer_queues(Consumers) ->
- lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+consumer_queue_refs(Consumers) ->
+ lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
<- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
@@ -1983,9 +1985,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{QNames1, QMons1} =
lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
{QNames0, QMons0}) ->
- {case maps:is_key(QPid, QNames0) of
+ QRef = qpid_to_ref(QPid),
+ {case maps:is_key(QRef, QNames0) of
true -> QNames0;
- false -> maps:put(QPid, QName, QNames0)
+ false -> maps:put(QRef, QName, QNames0)
end, maybe_monitor(QPid, QMons0)}
end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
State1 = State#ch{queue_names = QNames1,
@@ -2000,8 +2003,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
- QPid <- AllDeliveredQRefs,
- {ok, QName} <- [maps:find(QPid, QNames1)]];
+ QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]];
_ ->
ok
end,
@@ -2494,3 +2497,8 @@ maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
add_delivery_count_header(MsgHeader, Msg) ->
Count = maps:get(delivery_count, MsgHeader, 0),
rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
+
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref({Name, _}) -> Name;
+%% assume it already is a ref
+qpid_to_ref(Ref) -> Ref.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index a455264560..740c6e202c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_fifo).
-behaviour(ra_machine).
@@ -228,9 +244,9 @@ update_state(Conf, State) ->
{state(), ra_machine:effects(), Reply :: term()}.
apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
- {ok, State0, Effects} ->
- State = append_to_master_index(RaftIdx, State0),
- checkout(State, Effects);
+ {ok, State0, Effects1} ->
+ {State, Effects, ok} = checkout(State0, Effects1),
+ {append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
@@ -314,6 +330,7 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
+ %% FIX: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@@ -780,6 +797,7 @@ cancel_consumer_effects(Pid, Name,
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
+ % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -1023,12 +1041,17 @@ dehydrate_state(#state{messages = Messages0,
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
- C#consumer{checked_out = #{}}
+ dehydrate_consumer(C)
+ % C#consumer{checked_out = #{}}
end, Consumers),
returns = queue:new(),
%% messages include returns
prefix_msg_count = maps:size(Messages0) + MsgCount}.
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Con#consumer{checked_out = Checked}.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1406,7 +1429,7 @@ tick_test() ->
ok.
enq_deq_snapshot_recover_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
@@ -1539,6 +1562,23 @@ enq_check_settle_snapshot_purge_test() ->
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_duplicate_test() ->
+ %% duplicate settle commands are likely
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one}, %% 0
+ {enqueue, self(), 2, two}, %% 0
+ {settle, [0], Cid},
+ {settle, [1], Cid},
+ {settle, [1], Cid},
+ {enqueue, self(), 3, three},
+ {settle, [2], Cid}
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
@@ -1556,6 +1596,7 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
+ ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
% ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
@@ -1571,7 +1612,7 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
delivery_query_returns_deliveries_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 5, simple_prefetch}, Cid},
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index fc35d26e0f..635d85be4a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
%% state machine implementation running inside a `ra' raft system.
%%
@@ -432,19 +448,10 @@ update_machine_state(Node, Conf) ->
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
#state{soft_limit = SftLmt,
- leader = CurLeader,
- last_applied = _Last,
unblock_handler = UnblockFun} = State0) ->
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
{[], [], State0#state{leader = From}},
Seqs),
- case From of
- CurLeader -> ok;
- _ ->
- ?INFO("rabbit_fifo_client: leader change from ~w to ~w~n"
- "applying ~w last ~w~n",
- [CurLeader, From, Seqs, _Last])
- end,
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
@@ -479,10 +486,7 @@ handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
{internal, [], [], State};
-handle_ra_event(Leader, {machine, leader_change},
- #state{leader = OldLeader} = State0) ->
- ?INFO("rabbit_fifo_client: leader changed from ~w to ~w~n",
- [OldLeader, Leader]),
+handle_ra_event(Leader, {machine, leader_change}, State0) ->
%% we need to update leader
%% and resend any pending commands
State = resend_all_pending(State0#state{leader = Leader}),
@@ -490,9 +494,9 @@ handle_ra_event(Leader, {machine, leader_change},
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
-handle_ra_event(From, {rejected, {not_leader, Leader, Seq}}, State0) ->
- ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
- [Seq, From, Leader]),
+handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
+ % [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -535,7 +539,6 @@ try_process_command([Server | Rem], Cmd, State) ->
seq_applied({Seq, MaybeAction},
{Corrs, Actions0, #state{last_applied = Last} = State0})
when Seq > Last orelse Last =:= undefined ->
- % ?INFO("rabbit_fifo_client: applying seq ~b last ~w", [Seq, Last]),
State1 = case Last of
undefined -> State0;
_ ->
@@ -550,14 +553,12 @@ seq_applied({Seq, MaybeAction},
{[Corr | Corrs], Actions, State#state{pending = Pending,
last_applied = Seq}};
error ->
- ?INFO("rabbit_fifo_client: pending not found ~w", [Seq]),
% must have already been resent or removed for some other reason
% still need to update last_applied or we may inadvertently resend
% stuff later
{Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
- ?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]),
Acc.
maybe_add_action(ok, Acc, State) ->
@@ -579,7 +580,7 @@ maybe_add_action(Action, Acc, State) ->
{[Action | Acc], State}.
do_resends(From, To, State) when From =< To ->
- ?INFO("doing resends From ~w To ~w~n", [From, To]),
+ % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
lists:foldl(fun resend/2, State, lists:seq(From, To));
do_resends(_, _, State) ->
State.
@@ -596,7 +597,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
- ?INFO ("rabbit_fifo_client: resending all ~w~n", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,