summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-21 14:06:58 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-21 14:06:58 +0100
commitf841d7a34810a89055096c2ec6ddeb73fbf30cf8 (patch)
treef87ad0ec3d52afa37200c66184c11c749c23a52c /src
parent1f052db8900e388535249603c28e44ee3e40a607 (diff)
parent7e347885de96e95ce85fab577861b98aa0fa7dbf (diff)
downloadrabbitmq-server-git-f841d7a34810a89055096c2ec6ddeb73fbf30cf8.tar.gz
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
Conflicts: src/rabbit_quorum_queue.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_channel.erl14
-rw-r--r--src/rabbit_fifo.erl474
-rw-r--r--src/rabbit_fifo_client.erl33
-rw-r--r--src/rabbit_quorum_memory_manager.erl2
-rw-r--r--src/rabbit_quorum_queue.erl71
6 files changed, 342 insertions, 264 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index aa9889aef6..704ead75a7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -584,7 +584,17 @@ retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, Retries
{stopped, false} ->
E({absent, Q, stopped});
_ ->
- false = rabbit_mnesia:is_process_alive(QPid),
+ case rabbit_mnesia:is_process_alive(QPid) of
+ true ->
+ % rabbitmq-server#1682
+ % The old check would have crashed here,
+ % instead, log it and run the exit fun. absent & alive is weird,
+ % but better than crashing with badmatch,true
+ rabbit_log:debug("Unexpected alive queue process ~p~n", [QPid]),
+ E({absent, Q, alive});
+ false ->
+ ok % Expected result
+ end,
timer:sleep(30),
with(Name, F, E, RetriesLeft - 1)
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f749d9f30e..d1f3b06528 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
consumer_mapping = ConsumerMapping} = State0) ->
case QueueStates of
#{Name := QState0} ->
+ QName = rabbit_quorum_queue:queue_name(QState0),
case rabbit_quorum_queue:handle_event(Evt, QState0) of
{{delivery, CTag, Msgs}, QState1} ->
AckRequired = case maps:find(CTag, ConsumerMapping) of
@@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
true ->
QState1
end,
- QName = rabbit_quorum_queue:queue_name(QState2),
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
@@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(Name, QNames) of
- {ok, QName} -> erase_queue_stats(QName);
- error -> ok
- end,
+ erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
queue_names = maps:remove(Name, QNames)})
@@ -2530,9 +2527,10 @@ maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
-add_delivery_count_header(MsgHeader, Msg) ->
- Count = maps:get(delivery_count, MsgHeader, 0),
- rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
+add_delivery_count_header(#{delivery_count := Count}, Msg) ->
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg);
+add_delivery_count_header(_, Msg) ->
+ Msg.
qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 73883aefde..706094037f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -20,13 +20,14 @@
-compile(inline_list_funcs).
-compile(inline).
+-compile({no_auto_import, [apply/3]}).
-include_lib("ra/include/ra.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([
init/1,
- apply/4,
+ apply/3,
state_enter/2,
tick/2,
overview/1,
@@ -56,7 +57,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
- make_update_state/1
+ make_update_config/1
]).
-type raw_msg() :: term().
@@ -131,7 +132,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
--record(update_state, {config :: config()}).
+-record(update_config, {config :: config()}).
@@ -143,7 +144,7 @@
#discard{} |
#credit{} |
#purge{} |
- #update_state{}.
+ #update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -265,10 +266,10 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_state(Conf, #state{name = Name,
+ update_config(Conf, #state{name = Name,
queue_resource = Resource}).
-update_state(Conf, State) ->
+update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
@@ -284,55 +285,58 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
-spec apply(ra_machine:command_meta_data(), command(),
- ra_machine:effects(), state()) ->
- {state(), ra_machine:effects(), Reply :: term()}.
+ state()) ->
+ {state(), Reply :: term(), ra_machine:effects()}.
apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, Effects0, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
+ msg = RawMsg}, State00) ->
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
{ok, State0, Effects1} ->
- {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
- Effects1),
- {append_to_master_index(RaftIdx, State), Effects, ok};
+ %% need to checkout before capturing the shadow copy else
+ %% snapshots may not be complete
+ {State, ok, Effects} = checkout(
+ add_bytes_enqueue(RawMsg, State0),
+ Effects1),
+ append_to_master_index(RaftIdx, Effects, State);
{duplicate, State, Effects} ->
- {State, Effects, ok}
+ {State, ok, lists:reverse(Effects)}
end;
apply(#{index := RaftIdx},
- #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
- Con0, Effects0, State);
+ Con0, [], State);
_ ->
- {State, Effects0, ok}
+ {State, ok}
+
end;
apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- Effects0, #state{consumers = Cons0} = State0) ->
+ #state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
- {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds,
- ConsumerId, Con0,
- Effects0, State0),
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
- {State, dead_letter_effects(Discarded, State, Effects), Res};
+ Effects = dead_letter_effects(Discarded, State0, []),
+ complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ Effects, State0);
_ ->
- {State0, Effects0, ok}
+ {State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
+apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State);
+ return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
- {State, Effects0, ok}
+ {State, ok}
end;
apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId}, Effects0,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -344,16 +348,16 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue = maybe_queue_consumer(ConsumerId, Con1,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
- {State1, Effects, ok} =
+ {State1, ok, Effects} =
checkout(State0#state{service_queue = ServiceQueue,
- consumers = Cons}, Effects0),
+ consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
false ->
%% just return the result of the checkout
- {State1, Effects, Response};
+ {State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
maps:get(ConsumerId, State1#state.consumers),
@@ -366,108 +370,116 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
{State1#state{consumers = Consumers},
- Effects,
%% returning a multi response with two client actions
%% for the channel to execute
- {multi, [Response, {send_drained, [{CTag, Drained}]}]}}
+ {multi, [Response, {send_drained, [{CTag, Drained}]}]},
+ Effects}
end;
_ ->
%% credit for unknown consumer - just ignore
- {State0, Effects0, ok}
+ {State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}}, Effects0,
+apply(_, #checkout{spec = {dequeue, _}},
#state{messages = M,
prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
%% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
- {State0, Effects0, {dequeue, empty}};
+ {State0, {dequeue, empty}};
apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
consumer_id = ConsumerId},
- Effects0, State0) ->
+ State0) ->
% TODO: this clause could probably be optimised
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, State0),
% turn send msg effect into reply
{success, _, MsgId, Msg, State2} = checkout_one(State1),
% immediately settle
- {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]),
- Effects0, State2),
- {State, Effects, {dequeue, {MsgId, Msg}}};
+ {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects};
apply(_, #checkout{spec = {dequeue, unsettled},
meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- Effects0, State0) ->
+ State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, State0),
- Effects1 = [{monitor, process, Pid} | Effects0],
- {State, Reply, Effects} = case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {MsgId, Msg}, Effects1};
- {inactive, S} ->
- {S, empty, [{aux, inactive} | Effects1]};
- S ->
- {S, empty, Effects1}
- end,
- {State, Effects, {dequeue, Reply}};
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) ->
- {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}),
+ case checkout_one(State1) of
+ {success, _, MsgId, Msg, S} ->
+ {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
+ {inactive, S} ->
+ {S, {dequeue, empty}, [{aux, inactive}]};
+ S ->
+ {S, {dequeue, empty}}
+ end;
+apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+ {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}),
% TODO: here we should really demonitor the pid but _only_ if it has no
% other consumers or enqueuers.
checkout(State1, CancelEffects);
-apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId},
- Effects0, State0) ->
+apply(_, #checkout{spec = Spec, meta = Meta,
+ consumer_id = {_, Pid} = ConsumerId},
+ State0) ->
State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- {State, Effects, Res} = checkout(State1, Effects0),
- {State, [{monitor, process, Pid} | Effects], Res};
-apply(#{index := RaftIdx}, #purge{}, Effects0,
+ checkout(State1, [{monitor, process, Pid}]);
+apply(#{index := RaftIdx}, #purge{},
#state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
Total = rabbit_fifo_index:size(Indexes),
- {State1, Effects1, _} =
+ {State1, Effects1} =
maps:fold(
fun(ConsumerId, C = #consumer{checked_out = Checked0},
- {StateAcc0, EffectsAcc0, ok}) ->
+ {StateAcc0, EffectsAcc0}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
- end, {State0, Effects0, ok}, Cons0),
- {State, Effects, _} =
+ end, {State0, []}, Cons0),
+ {State, _, Effects} =
update_smallest_raft_index(
RaftIdx, Indexes,
State1#state{ra_indexes = rabbit_fifo_index:empty(),
messages = #{},
returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ msg_bytes_checkout = 0,
low_msg_num = undefined}, Effects1),
- {State, [garbage_collection | Effects], {purge, Total}};
+ %% as we're not checking out after a purge (no point) we have to
+ %% reverse the effects ourselves
+ {State, {purge, Total},
+ lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
- Effects0, #state{consumers = Cons0,
+ #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
- % mark all consumers and enqueuers as suspect
- % and monitor the node
- {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- {maps:put(K, C#consumer{suspected_down = true,
- checked_out = #{}},
- Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ % mark all consumers and enqueuers as suspected down
+ % and monitor the node so that we can find out the final state of the
+ % process at some later point
+ {Cons, State} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ %% TODO: need to increment credit here
+ %% with the size of the Checked map
+ Credit = increase_credit(C, maps:size(Checked0)),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
end, Enqs0),
Effects = case maps:size(Cons) of
0 ->
- [{aux, inactive}, {monitor, node, Node} | Effects0];
+ [{aux, inactive}, {monitor, node, Node}];
_ ->
- [{monitor, node, Node} | Effects0]
+ [{monitor, node, Node}]
end,
- {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
-apply(_, {down, Pid, _Info}, Effects0,
- #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ %% TODO: should we run a checkout here?
+ {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects};
+apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -482,13 +494,12 @@ apply(_, {down, Pid, _Info}, Effects0,
% Find the consumers for the down pid
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {Effects0, State1},
+ {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1},
DownConsumers),
checkout(State2, Effects1);
-apply(_, {nodeup, Node}, Effects0,
- #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -516,14 +527,14 @@ apply(_, {nodeup, Node}, Effects0,
CAcc, SQAcc, EAcc);
(_, _, Acc) ->
Acc
- end, {Cons0, SQ0, Effects0}, Cons0),
+ end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ}, Monitors ++ Effects);
-apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok};
-apply(_, #update_state{config = Conf}, Effects, State) ->
- {update_state(Conf, State), Effects, ok}.
+ service_queue = SQ}, Effects);
+apply(_, {nodedown, _Node}, State) ->
+ {State, ok};
+apply(_, #update_config{config = Conf}, State) ->
+ {update_config(Conf, State), ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -594,7 +605,9 @@ overview(#state{consumers = Cons,
get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
- [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)];
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
_ ->
[]
end.
@@ -756,14 +769,6 @@ cancel_consumer0(ConsumerId,
{Effects0, S0}
end.
-incr_enqueue_count(#state{enqueue_count = C,
- shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
-incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
-
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -774,11 +779,20 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx,
+append_to_master_index(RaftIdx, Effects,
#state{ra_indexes = Indexes0} = State0) ->
{State, Shadow} = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- State#state{ra_indexes = Indexes}.
+ {State#state{ra_indexes = Indexes}, ok, Effects}.
+
+incr_enqueue_count(#state{enqueue_count = C,
+ shadow_copy_interval = C} = State0) ->
+ % time to stash a dehydrated state version
+ State = State0#state{enqueue_count = 0},
+ {State, dehydrate_state(State)};
+incr_enqueue_count(#state{enqueue_count = C} = State) ->
+ {State#state{enqueue_count = C + 1}, undefined}.
+
enqueue_pending(From,
#enqueuer{next_seqno = Next,
@@ -824,16 +838,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
+return(ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
- Con = case Life of
- auto ->
- Num = length(MsgNumMsgs),
- Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, Num)};
- once ->
- Con0#consumer{checked_out = Checked}
- end,
+ Con = Con0#consumer{checked_out = Checked,
+ credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
@@ -859,7 +867,7 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
{State0#state{consumers = Cons,
ra_indexes = Indexes,
- service_queue = SQ}, Effects, ok}.
+ service_queue = SQ}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -886,10 +894,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
- {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
- maps:size(Discarded),
- Con0, Checked, Effects0, State1),
- {State, Effects, _} = checkout(State2, Effects1),
+ {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
+ Con0, Checked, Effects0, State1),
+ {State, ok, Effects} = checkout(State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -918,7 +926,7 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command
- {State, [{release_cursor, IncomingRaftIdx, State} | Effects], ok};
+ {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]};
_ ->
NewSmallest = rabbit_fifo_index:smallest(Indexes),
% Take the smallest raft index available in the index when starting
@@ -927,15 +935,15 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
{{Smallest, _}, {Smallest, _}} ->
% smallest has not changed, do not issue release cursor
% effects
- {State, Effects, ok};
+ {State, ok, Effects};
{_, {Smallest, Shadow}} when Shadow =/= undefined ->
% ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest,
% Shadow]),
- {State, [{release_cursor, Smallest, Shadow} | Effects], ok};
+ {State, ok, [{release_cursor, Smallest, Shadow}]};
_ -> % smallest
% no shadow taken for this index,
% no release cursor increase
- {State, Effects, ok}
+ {State, ok, Effects}
end
end.
@@ -955,13 +963,18 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}).
-return_all(State, Checked) ->
- maps:fold(fun (_, '$prefix_msg', S) ->
- return_one(0, '$prefix_msg', S);
- (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, State, Checked).
-
+return_all(State, Checked0) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ lists:foldl(fun ({_, '$prefix_msg'}, S) ->
+ return_one(0, '$prefix_msg', S);
+ ({_, {MsgNum, Msg}}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
+
+%% checkout new messages to consumers
+%% reverses the effects list
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -973,10 +986,10 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
checkout0(checkout_one(State), Effects, Acc);
checkout0({inactive, State}, Effects0, Acc) ->
Effects = append_send_msg_effects(Effects0, Acc),
- {State, [{aux, inactive} | Effects], ok};
+ {State, ok, lists:reverse([{aux, inactive} | Effects])};
checkout0(State, Effects0, Acc) ->
Effects = append_send_msg_effects(Effects0, Acc),
- {State, Effects, ok}.
+ {State, ok, lists:reverse(Effects)}.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1244,9 +1257,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
--spec make_update_state(config()) -> protocol().
-make_update_state(Config) ->
- #update_state{config = Config}.
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
@@ -1308,10 +1321,10 @@ enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
- {_State3, Effects, _} =
+ {_State3, _, Effects} =
apply(meta(3),
make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
- [], State2),
+ State2),
?ASSERT_EFF({monitor, _, _}, Effects),
?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
ok.
@@ -1320,9 +1333,8 @@ credit_enq_enq_checkout_settled_credit_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
- {State3, Effects, _} =
- apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}),
- [], State2),
+ {State3, _, Effects} =
+ apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2),
?ASSERT_EFF({monitor, _, _}, Effects),
Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
(_) -> false
@@ -1353,12 +1365,12 @@ credit_with_drained_test() ->
%% checkout with a single credit
{State1, _, _} =
apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
- [], State0),
+ State0),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
- {State, _Effs, Result} =
- apply(meta(3), make_credit(Cid, 0, 5, true), [], State1),
+ {State, Result, _} =
+ apply(meta(3), make_credit(Cid, 0, 5, true), State1),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
@@ -1372,14 +1384,14 @@ credit_and_drain_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
%% checkout without any initial credit (like AMQP 1.0 would)
- {State3, CheckEffs, _} =
+ {State3, _, CheckEffs} =
apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
- [], State2),
+ State2),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
- {State4, Effects, {multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 2}]}]}} =
- apply(meta(4), make_credit(Cid, 4, 0, true), [], State3),
+ {State4, {multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 2}]}]},
+ Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -1397,9 +1409,9 @@ enq_enq_deq_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
+ {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State2),
+ State2),
ok.
enq_enq_deq_deq_settle_test() ->
@@ -1407,39 +1419,38 @@ enq_enq_deq_deq_settle_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
+ {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State2),
- {_State4, _Effects4, {dequeue, empty}} =
+ State2),
+ {_State4, {dequeue, empty}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State3),
+ State3),
ok.
enq_enq_checkout_get_settled_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
% get returns a reply value
- {_State2, _Effects, {dequeue, {0, {_, first}}}} =
+ {_State2, {dequeue, {0, {_, first}}}, _Effs} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ State1),
ok.
checkout_get_empty_test() ->
Cid = {?FUNCTION_NAME, self()},
State = test_init(test),
- {_State2, [], {dequeue, empty}} =
- apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State),
+ {_State2, {dequeue, empty}} =
+ apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State),
ok.
untracked_enq_deq_test() ->
Cid = {?FUNCTION_NAME, self()},
State0 = test_init(test),
{State1, _, _} = apply(meta(1),
- make_enqueue(undefined, undefined, first), [], State0),
- {_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ make_enqueue(undefined, undefined, first),
+ State0),
+ {_State2, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
release_cursor_test() ->
@@ -1512,18 +1523,18 @@ return_non_existent_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
% return non-existent
- {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0),
+ {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0),
ok.
return_checked_out_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, [_Monitor, {aux, active},
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} =
- check(Cid, 2, State0),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active}
+ ]} = check(Cid, 2, State0),
% return
- {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]),
- [], State1),
+ {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1532,11 +1543,13 @@ return_auto_checked_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
% it first active then inactive as the consumer took on but cannot take
% any more
- {State1, [_Monitor, {aux, inactive}, {aux, active},
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} =
- check_auto(Cid, 2, State0),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active},
+ {aux, inactive}
+ ]} = check_auto(Cid, 2, State0),
% return should include another delivery
- {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1),
+ {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1),
?ASSERT_EFF({send_msg, _,
{delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
Effects),
@@ -1549,22 +1562,21 @@ cancelled_checkout_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
- {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}),
- [], State1),
+ {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
?assertEqual(2, maps:size(State2#state.messages)),
- {State3, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2),
+ {State3, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
?debugFmt("State3 ~p", [State3]),
- {_State, _, {dequeue, {_, {_, second}}}} =
- apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3),
+ {_State, {dequeue, {_, {_, second}}}, _} =
+ apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
{_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects),
ok.
@@ -1576,17 +1588,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
Node = node(Pid),
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
- {State1, Effects1} = check(Cid, 2, State0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
- {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
- {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
+ %% validate consumer has credit
+ {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
% when the node comes up we need to retry the process monitors for the
% disconnected processes
- {_State3, Effects3, _} = apply(meta(3), {nodeup, Node}, [], State2),
+ {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -1601,7 +1616,7 @@ down_with_noconnection_returns_unack_test() ->
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
?assertEqual(1, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
@@ -1609,9 +1624,9 @@ down_with_noconnection_returns_unack_test() ->
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, Effects0, _} = apply(meta(1), {enqueue, Pid, 1, first}, [], State00),
+ {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
- {State1, _Effects1, _} = apply(meta(3), {down, Pid, noproc}, [], State0),
+ {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
?assert(0 =:= maps:size(State1#state.enqueuers)),
ok.
@@ -1633,7 +1648,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1),
+ {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -1650,8 +1665,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]),
- [], State1),
+ {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
% assert mod call effect with appended reason and message
?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
Effects2),
@@ -1664,7 +1678,7 @@ tick_test() ->
{S1, _} = enq(2, 2, <<"snd">>, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
+ {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3),
[{mod_call, _, _,
[#resource{},
@@ -1840,8 +1854,8 @@ run_snapshot_test0(Name, Commands) ->
?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
- % [Name, SnapIdx, S, State, SnapState, Filtered]),
+ ?debugFmt("Name ~p~nS~p~nState~p~nn",
+ [Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -1874,10 +1888,9 @@ pending_enqueue_is_enqueued_on_down_test() ->
Cid = {<<"cid">>, self()},
Pid = self(),
{State0, _} = enq(1, 2, first, test_init(test)),
- {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0),
- {_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
+ {_State2, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
duplicate_delivery_test() ->
@@ -1922,76 +1935,105 @@ state_enter_montors_and_notifications_test() ->
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1),
+ {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
- {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3),
+ {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} =
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
ok.
purge_with_checkout_test() ->
Cid = {<<"purge_test">>, self()},
{State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
- {State1, _} = enq(2, 1, first, State0),
- {State2, _} = enq(3, 2, second, State1),
- {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2),
+ {State1, _} = enq(2, 1, <<"first">>, State0),
+ {State2, _} = enq(3, 2, <<"second">>, State1),
+ %% assert message bytes are non zero
+ ?assert(State2#state.msg_bytes_checkout > 0),
+ ?assert(State2#state.msg_bytes_enqueue > 0),
+ {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2),
+ ?assertEqual(0, State3#state.msg_bytes_checkout),
+ ?assertEqual(0, State3#state.msg_bytes_enqueue),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
?assertEqual(0, maps:size(Checked)),
ok.
+down_returns_checked_out_in_order_test() ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#state.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
+ Returns = lqueue:to_list(S#state.returns),
+ ?assertEqual(100, length(Returns)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
enq(Idx, MsgSeq, Msg, State) ->
strip_reply(
- apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)).
+ apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
deq(Idx, Cid, Settlement, State0) ->
- {State, _, {dequeue, Msg}} =
+ {State, {dequeue, Msg}, _} =
apply(meta(Idx),
- make_checkout(Cid, {dequeue, Settlement}, #{}),
- [], State0),
+ make_checkout(Cid, {dequeue, Settlement}, #{}),
+ State0),
{State, Msg}.
check_n(Cid, Idx, N, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
- [], State)).
+ State)).
check(Cid, Idx, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
- [], State)).
+ State)).
check_auto(Cid, Idx, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- [], State)).
+ State)).
check(Cid, Idx, Num, State) ->
strip_reply(
apply(meta(Idx),
- make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
- [], State)).
+ make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
+ State)).
settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)).
+ strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)).
credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
- [], State)).
+ State)).
-strip_reply({State, Effects, _Reply}) ->
+strip_reply({State, _, Effects}) ->
{State, Effects}.
run_log(InitState, Entries) ->
lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
- case apply(meta(Idx), E, Efx0, Acc0) of
- {Acc, Efx, _} ->
- {Acc, Efx}
+ case apply(meta(Idx), E, Acc0) of
+ {Acc, _, Efx} when is_list(Efx) ->
+ {Acc, Efx0 ++ Efx};
+ {Acc, _, Efx} ->
+ {Acc, Efx0 ++ [Efx]};
+ {Acc, _} ->
+ {Acc, Efx0}
end
end, {InitState, []}, Entries).
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 9cdb1dfbe7..955c0e4d9d 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -52,10 +52,12 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
+-type cluster_name() :: rabbit_types:r(queue).
+
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: ra_cluster_name(),
+-record(state, {cluster_name :: cluster_name(),
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
@@ -88,7 +90,7 @@
%% @param ClusterName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
--spec init(ra_cluster_name(), [ra_server_id()]) -> state().
+-spec init(cluster_name(), [ra_server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).
@@ -98,7 +100,7 @@ init(ClusterName, Servers) ->
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
@@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) ->
soft_limit = SoftLimit,
timeout = Timeout}.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -397,12 +399,12 @@ purge(Node) ->
end.
%% @doc returns the cluster name
--spec cluster_name(state()) -> ra_cluster_name().
+-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
+ case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
CDels0)}};
#consumer{last_msg_id = Prev} = C
when FstId > Prev+1 ->
+ NumMissing = FstId - Prev + 1,
+ %% there may actually be fewer missing messages returned than expected
+ %% This can happen when a node the channel is on gets disconnected
+ %% from the node the leader is on and then reconnected afterwards.
+ %% When the node is disconnected the leader will return all checked
+ %% out messages to the main queue to ensure they don't get stuck in
+ %% case the node never comes back.
Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
Del = {delivery, Tag, Missing ++ IdMsgs},
{Del, State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
- length(IdMsgs) + length(Missing),
+ length(IdMsgs) + NumMissing,
C, CDels0)}};
#consumer{last_msg_id = Prev}
when FstId =< Prev ->
@@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command,
ok = ra:pipeline_command(Node, Command, Seq),
State#state{pending = Pending#{Seq => {Correlation, Command}}}.
-add_command(_Cid, _Tag, [], Acc) ->
+add_command(_, _, [], Acc) ->
Acc;
-add_command(Cid, Tag, MsgIds, Acc) ->
- [{Tag, MsgIds, Cid} | Acc].
+add_command(Cid, settle, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, return, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, discard, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
index 347f7f205e..f567561f31 100644
--- a/src/rabbit_quorum_memory_manager.erl
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_quorum_memory_manager).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 8672a01ce8..ce5b4b5b6b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -55,10 +55,6 @@
{'ok', rabbit_fifo_client:state()}.
-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
--spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(),
- rabbit_fifo_client:state()) ->
- {'ok', 'empty', rabbit_fifo_client:state()} |
- {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
@@ -83,6 +79,8 @@
open_files
]).
+-define(TICK_TIME, 1000). %% the ra server tick time
+
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
@@ -148,9 +146,11 @@ declare(#amqqueue{name = QName,
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
-ra_machine_config(Q = #amqqueue{name = QName}) ->
- #{dead_letter_handler => dlx_mfa(Q),
+ra_machine_config(Q = #amqqueue{name = QName,
+ pid = {Name, _}}) ->
+ #{name => Name,
queue_resource => QName,
+ dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
single_active_consumer_on => single_active_consumer_on(Q)}.
@@ -165,13 +165,15 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
false ->
+ %% this could potentially block for a while if the node is
+ %% in disconnected state or tcp buffers are full
rpc:cast(Node, rabbit_quorum_queue,
cancel_consumer,
[QName, ChPid, ConsumerTag])
end.
cancel_consumer(QName, ChPid, ConsumerTag) ->
- rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
+ catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
@@ -193,7 +195,8 @@ become_leader(QName, Name) ->
end),
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{quorum_nodes = Nodes}} ->
- [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName])
+ [rpc:call(Node, ?MODULE, rpc_delete_metrics,
+ [QName], ?TICK_TIME)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -206,22 +209,29 @@ rpc_delete_metrics(QName) ->
ok.
update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
- R = reductions(Name),
- rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
- Util = case C of
- 0 -> 0;
- _ -> rabbit_fifo:usage(Name)
- end,
- Infos = [{consumers, C}, {consumer_utilisation, Util},
- {message_bytes_ready, MsgBytesReady},
- {message_bytes_unacknowledged, MsgBytesUnack},
- {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
- rabbit_core_metrics:queue_stats(QName, Infos),
- rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
- {messages, M},
- {messages_ready, MR},
- {messages_unacknowledged, MU},
- {reductions, R}]).
+ %% this makes calls to remote processes so cannot be run inside the
+ %% ra server
+ _ = spawn(fun() ->
+ R = reductions(Name),
+ rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
+ Util = case C of
+ 0 -> 0;
+ _ -> rabbit_fifo:usage(Name)
+ end,
+ Infos = [{consumers, C}, {consumer_utilisation, Util},
+ {message_bytes_ready, MsgBytesReady},
+ {message_bytes_unacknowledged, MsgBytesUnack},
+ {message_bytes, MsgBytesReady + MsgBytesUnack}
+ | infos(QName)],
+ rabbit_core_metrics:queue_stats(QName, Infos),
+ rabbit_event:notify(queue_stats,
+ Infos ++ [{name, QName},
+ {messages, M},
+ {messages_ready, MR},
+ {messages_unacknowledged, MU},
+ {reductions, R}])
+ end),
+ ok.
reductions(Name) ->
try
@@ -276,7 +286,7 @@ stop(VHost) ->
_ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
ok.
--spec delete(rabbit_types:amqqueue(),
+-spec delete(#amqqueue{},
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
@@ -294,7 +304,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
{'DOWN', MRef, process, _, _} ->
ok
end,
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
+ rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
+ ?TICK_TIME),
{ok, Msgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -330,6 +341,10 @@ reject(false, CTag, MsgIds, QState) ->
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
+-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
+ rabbit_fifo_client:state()) ->
+ {'ok', 'empty', rabbit_fifo_client:state()} |
+ {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
CTag0, QState0) ->
CTag = quorum_ctag(CTag0),
@@ -665,7 +680,7 @@ i(memory, #amqqueue{pid = {Name, _}}) ->
end;
i(state, #amqqueue{pid = {Name, Node}}) ->
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name]) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
{badrpc, _} -> down;
State -> State
end;
@@ -714,7 +729,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
quorum_messages(QName) ->
case ets:lookup(queue_coarse_metrics, QName) of