summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorD Corbacho <diana@rabbitmq.com>2018-12-05 10:07:56 +0000
committerGitHub <noreply@github.com>2018-12-05 10:07:56 +0000
commit105204861abfd270b4ba8c1b6e5878e00c6049f5 (patch)
tree8cad648011789345fdf87db888efdbec804f50b7 /src
parentdb888df2a9156fefda626f57cbb9d9591e27d41f (diff)
parent26c7dfba5b31118010256ebe79b1043e470ce452 (diff)
downloadrabbitmq-server-git-105204861abfd270b4ba8c1b6e5878e00c6049f5.tar.gz
Merge pull request #1782 from rabbitmq/qq-confirm-availability
Quorum queue confirm availability
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_channel.erl111
-rw-r--r--src/rabbit_fifo.erl182
-rw-r--r--src/rabbit_fifo_client.erl41
4 files changed, 245 insertions, 92 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e19914d5ab..d5bfbbb5e3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1094,7 +1094,8 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
-activate_limit_all(QPids, ChPid) ->
+activate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bc273bf100..8fb73d6156 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -110,7 +110,7 @@
%% when queue.bind's queue field is empty,
%% this name will be used instead
most_recently_declared_queue,
- %% a map of queue pid to queue name
+ %% a map of queue ref to queue name
queue_names,
%% queue processes are monitored to update
%% queue names
@@ -670,19 +670,20 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit})
end, Actions),
- noreply_coalesce(confirm(MsgSeqNos, From, State));
+ noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
- State1 = handle_consuming_queue_down_or_eol(From, State0),
- State2 = handle_delivering_queue_down(From, State1),
- {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed),
+ State1 = handle_consuming_queue_down_or_eol(Name, State0),
+ State2 = handle_delivering_queue_down(Name, State1),
+ %% TODO: this should use dtree:take/3
+ {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(From, QNames) of
+ case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
- queue_names = maps:remove(From, QNames)})
+ queue_names = maps:remove(Name, QNames)})
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
@@ -1335,13 +1336,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
return_ok(State, NoWait, OkMsg);
{ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
QCons1 =
- case maps:find(QPid, QCons) of
+ case maps:find(QRef, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
- true -> maps:remove(QPid, QCons);
- false -> maps:put(QPid, CTags1, QCons)
+ true -> maps:remove(QRef, QCons);
+ false -> maps:put(QRef, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@@ -1394,7 +1396,7 @@ handle_method(#'basic.qos'{global = true,
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
- consumer_queues(State#ch.consumer_mapping), self());
+ consumer_queue_refs(State#ch.consumer_mapping), self());
false -> ok
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
@@ -1640,25 +1642,26 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _} =
- maps:get(ConsumerTag, ConsumerMapping),
- CTags1 = case maps:find(QPid, QCons) of
+ {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
+ CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
error -> gb_sets:singleton(ConsumerTag)
end,
- QCons1 = maps:put(QPid, CTags1, QCons),
- State#ch{queue_monitors = maybe_monitor(QPid, QMons),
+ QCons1 = maps:put(QRef, CTags1, QCons),
+ State#ch{queue_monitors = maybe_monitor(QRef, QMons),
queue_consumers = QCons1}.
track_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
queue_monitors = QMons,
delivering_queues = DQ}) ->
- State#ch{queue_names = maps:put(QPid, QName, QNames),
- queue_monitors = maybe_monitor(QPid, QMons),
+ QRef = qpid_to_ref(QPid),
+ State#ch{queue_names = maps:put(QRef, QName, QNames),
+ queue_monitors = maybe_monitor(QRef, QMons),
delivering_queues = case NoAck of
true -> DQ;
- false -> sets:add_element(QPid, DQ)
+ false -> sets:add_element(QRef, DQ)
end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
@@ -1677,16 +1680,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
-handle_consuming_queue_down_or_eol(QPid,
- State = #ch{queue_consumers = QCons,
- queue_names = QNames}) ->
- ConsumerTags = case maps:find(QPid, QCons) of
+handle_consuming_queue_down_or_eol(QRef,
+ State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
+ ConsumerTags = case maps:find(QRef, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
- QName = maps:get(QPid, QNames),
+ QName = maps:get(QRef, QNames),
case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
@@ -1698,7 +1701,7 @@ handle_consuming_queue_down_or_eol(QPid,
_ -> cancel_consumer(CTag, QName, StateN)
end
end
- end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags).
+ end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@@ -1725,8 +1728,8 @@ queue_down_consumer_action(CTag, CMap) ->
_ -> {recover, ConsumeSpec}
end.
-handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
- State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
RoutingKey, Arguments, VHostPath, ConnPid,
@@ -1881,7 +1884,7 @@ ack(Acked, State = #ch{queue_names = QNames,
State#ch{queue_states = QueueStates}.
incr_queue_stats(QPid, QNames, MsgIds, State) ->
- case maps:find(QPid, QNames) of
+ case maps:find(qpid_to_ref(QPid), QNames) of
{ok, QName} -> Count = length(MsgIds),
?INCR_STATS(queue_stats, QName, Count, ack, State);
error -> ok
@@ -1905,10 +1908,10 @@ notify_queues(State = #ch{state = closing}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
delivering_queues = DQ }) ->
- QPids0 = sets:to_list(
- sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ QRefs0 = sets:to_list(
+ sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
%% filter to only include pids to avoid trying to notify quorum queues
- QPids = [P || P <- QPids0, ?IS_CLASSIC(P)],
+ QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
@@ -1924,8 +1927,8 @@ foreach_per_queue(F, UAL, Acc) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
-consumer_queues(Consumers) ->
- lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+consumer_queue_refs(Consumers) ->
+ lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
<- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
@@ -1968,7 +1971,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Qs = rabbit_amqqueue:lookup(DelQNames),
{DeliveredQPids, DeliveredQQPids, QueueStates} =
rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0),
- AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids,
+ AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids],
%% The maybe_monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1982,49 +1985,50 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{QNames1, QMons1} =
lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
{QNames0, QMons0}) ->
- {case maps:is_key(QPid, QNames0) of
+ QRef = qpid_to_ref(QPid),
+ {case maps:is_key(QRef, QNames0) of
true -> QNames0;
- false -> maps:put(QPid, QName, QNames0)
+ false -> maps:put(QRef, QName, QNames0)
end, maybe_monitor(QPid, QMons0)}
end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
State1 = State#ch{queue_names = QNames1,
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo,
+ State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
Message, State1),
- State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo,
- XName, State2),
+ State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State2),
case rabbit_event:stats_level(State3, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
- QPid <- AllDeliveredQPids,
- {ok, QName} <- [maps:find(QPid, QNames1)]];
+ QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]];
_ ->
ok
end,
State3#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+process_routing_mandatory(false, _, _, _, State) ->
State;
-process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+process_routing_mandatory(true, [], _, Msg, State) ->
ok = basic_return(Msg, State, no_route),
State;
-process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
State#ch.mandatory)}.
-process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
+process_routing_confirm(false, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
State#ch.unconfirmed)}.
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
@@ -2493,3 +2497,8 @@ maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
add_delivery_count_header(MsgHeader, Msg) ->
Count = maps:get(delivery_count, MsgHeader, 0),
rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
+
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref({Name, _}) -> Name;
+%% assume it already is a ref
+qpid_to_ref(Ref) -> Ref.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index c12a6ec464..740c6e202c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_fifo).
-behaviour(ra_machine).
@@ -106,7 +122,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096).
+-define(SHADOW_COPY_INTERVAL, 4096 * 4).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -228,9 +244,9 @@ update_state(Conf, State) ->
{state(), ra_machine:effects(), Reply :: term()}.
apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
- {ok, State0, Effects} ->
- State = append_to_master_index(RaftIdx, State0),
- checkout(State, Effects);
+ {ok, State0, Effects1} ->
+ {State, Effects, ok} = checkout(State0, Effects1),
+ {append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
@@ -314,6 +330,7 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
+ %% FIX: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@@ -357,7 +374,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
{StateAcc0, EffectsAcc0, ok}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, C,
+ complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
end, {State0, Effects0, ok}, Cons0),
{State, Effects, _} =
@@ -456,20 +473,26 @@ apply(_, {update_state, Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Custs,
+state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
name = Name,
+ prefix_msg_count = 0,
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
- EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
- Effects = ConMons ++ EnqMons,
+ Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ Effects = Mons ++ Nots,
case BLH of
undefined ->
Effects;
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
+state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount})
+ when PrefixMsgCount =/= 0 ->
+ %% TODO: remove assertion?
+ exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount});
state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
@@ -699,7 +722,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) ->
+ State1 = lists:foldl(fun('$prefix_msg',
+ #state{prefix_msg_count = MsgCount} = S0) ->
S0#state{prefix_msg_count = MsgCount + 1};
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
@@ -709,14 +733,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
Effects).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs,
+complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
#state{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
- %% credit_mode = simple_prefetch should automatically top-up credit as messages
- %% are simple_prefetch or otherwise returned
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgRaftIdxs))},
+ credit = increase_credit(Con0, NumDiscarded)},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
@@ -742,7 +766,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% need to pass the length of discarded as $prefix_msgs would be filtered
+ %% by the above list comprehension
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
Con0, Checked, Effects0, State0),
{State, Effects, _} = checkout(State1, Effects1),
% settle metrics are incremented separately
@@ -770,6 +797,7 @@ cancel_consumer_effects(Pid, Name,
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
+ % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -810,7 +838,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
returns = queue:in(MsgNum, Returns)}.
return_all(State, Checked) ->
- maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ maps:fold(fun (_, '$prefix_msg',
+ #state{prefix_msg_count = MsgCount} = S) ->
+ S#state{prefix_msg_count = MsgCount + 1};
+ (_, {MsgNum, Msg}, S) ->
return_one(MsgNum, Msg, S)
end, State, Checked).
@@ -1010,11 +1041,17 @@ dehydrate_state(#state{messages = Messages0,
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
- C#consumer{checked_out = #{}}
+ dehydrate_consumer(C)
+ % C#consumer{checked_out = #{}}
end, Consumers),
returns = queue:new(),
+ %% messages include returns
prefix_msg_count = maps:size(Messages0) + MsgCount}.
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Con#consumer{checked_out = Checked}.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1392,7 +1429,7 @@ tick_test() ->
ok.
enq_deq_snapshot_recover_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
@@ -1449,20 +1486,49 @@ snapshot_recover_test() ->
],
run_snapshot_test(?FUNCTION_NAME, Commands).
-enq_deq_return_snapshot_recover_test() ->
+enq_deq_return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Cid
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {settle, [2], Cid}
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+return_prefix_msg_count_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
Commands = [
{enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Oth},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Oth},
- {return, [0], Cid},
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {checkout, cancel, Cid},
+ {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
+ ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ ok.
+
+
+return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Oth
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {return, [2], Cid},
+ {settle, [3], Cid},
{enqueue, self(), 3, three},
- purge
+ purge,
+ {enqueue, self(), 4, four}
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1477,17 +1543,47 @@ enq_check_settle_snapshot_recover_test() ->
{settle, [0], Cid},
{enqueue, self(), 3, three},
{settle, [2], Cid}
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_snapshot_purge_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one},
+ {enqueue, self(), 2, two},
+ {settle, [1], Cid},
+ {settle, [0], Cid},
+ {enqueue, self(), 3, three},
+ purge
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_duplicate_test() ->
+ %% duplicate settle commands are likely
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one}, %% 0
+ {enqueue, self(), 2, two}, %% 0
+ {settle, [0], Cid},
+ {settle, [1], Cid},
+ {settle, [1], Cid},
+ {enqueue, self(), 3, three},
+ {settle, [2], Cid}
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1500,6 +1596,7 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
+ ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
% ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
@@ -1515,7 +1612,7 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
delivery_query_returns_deliveries_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 5, simple_prefetch}, Cid},
@@ -1555,19 +1652,30 @@ state_enter_test() ->
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
-leader_monitors_on_state_enter_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, _} = check_auto(Cid, 2, State0),
+state_enter_montors_and_notifications_test() ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
Self = self(),
- %% as we have an enqueuer _and_ a consumer we chould
- %% get two monitor effects in total, even if they are for the same
- %% processs
+ Effects = state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
[{monitor, process, Self},
- {monitor, process, Self}] = state_enter(leader, State1),
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
ok.
-
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c063ef9a17..635d85be4a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
%% state machine implementation running inside a `ra' raft system.
%%
@@ -447,7 +463,8 @@ handle_ra_event(From, {applied, Seqs},
fun (Cid, {Settled, Returns, Discards}, Acc) ->
add_command(Cid, settle, Settled,
add_command(Cid, return, Returns,
- add_command(Cid, discard, Discards, Acc)))
+ add_command(Cid, discard,
+ Discards, Acc)))
end, [], State1#state.unsent_commands),
Node = pick_node(State2),
%% send all the settlements and returns
@@ -465,10 +482,21 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(Leader, Del, State0);
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = Leader} = State) ->
+ %% leader already known
+ {internal, [], [], State};
+handle_ra_event(Leader, {machine, leader_change}, State0) ->
+ %% we need to update leader
+ %% and resend any pending commands
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
+ % [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -526,7 +554,9 @@ seq_applied({Seq, MaybeAction},
last_applied = Seq}};
error ->
% must have already been resent or removed for some other reason
- {Corrs, Actions, State}
+ % still need to update last_applied or we may inadvertently resend
+ % stuff later
+ {Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
Acc.
@@ -550,7 +580,7 @@ maybe_add_action(Action, Acc, State) ->
{[Action | Acc], State}.
do_resends(From, To, State) when From =< To ->
- ?INFO("doing resends From ~w To ~w~n", [From, To]),
+ % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
lists:foldl(fun resend/2, State, lists:seq(From, To));
do_resends(_, _, State) ->
State.
@@ -565,6 +595,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
State
end.
+resend_all_pending(#state{pending = Pend} = State) ->
+ Seqs = lists:sort(maps:keys(Pend)),
+ lists:foldl(fun resend/2, State, Seqs).
+
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
#state{consumer_deliveries = CDels0} = State0) ->
{LastId, _} = lists:last(IdMsgs),
@@ -619,6 +653,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Missing.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
+ %% TODO: pick random rather that first?
N;
pick_node(#state{leader = Leader}) ->
Leader.