summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2020-09-15 13:51:32 +0100
committerGitHub <noreply@github.com>2020-09-15 13:51:32 +0100
commit48f38874d701e34727b4f482ea42bb0e3e82c460 (patch)
treea29bbd0db16ee8047118c9a160bda4b28d250d74
parentc2943570b94642122145facc12e3f37d5e6bc5c1 (diff)
parentddc2eacbe2690ec61dda3a8648f7acf5cec39a03 (diff)
downloadrabbitmq-server-git-48f38874d701e34727b4f482ea42bb0e3e82c460.tar.gz
Merge pull request #2407 from rabbitmq/qq-reject-publish
reject-publish strategy support for quorum queues
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_fifo.erl677
-rw-r--r--src/rabbit_fifo.hrl45
-rw-r--r--src/rabbit_fifo_client.erl144
-rw-r--r--src/rabbit_fifo_v0.erl1961
-rw-r--r--src/rabbit_fifo_v0.hrl195
-rw-r--r--src/rabbit_quorum_queue.erl82
-rw-r--r--test/consumer_timeout_SUITE.erl2
-rw-r--r--test/dead_lettering_SUITE.erl2
-rw-r--r--test/dynamic_qq_SUITE.erl100
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl10
-rw-r--r--test/queue_length_limits_SUITE.erl2
-rw-r--r--test/queue_parallel_SUITE.erl2
-rw-r--r--test/quorum_queue_SUITE.erl214
-rw-r--r--test/quorum_queue_utils.erl24
-rw-r--r--test/rabbit_fifo_SUITE.erl280
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl25
-rw-r--r--test/rabbit_fifo_v0_SUITE.erl1392
-rw-r--r--test/rabbitmq_queues_cli_integration_SUITE.erl9
19 files changed, 4672 insertions, 499 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74d400950e..1755b4b8e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason,
record_rejects(RejectMXs, State1)
end
end;
-handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
- error(quorum_queues_should_never_be_monitored).
+handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) ->
+ %% this should never happen after the queue type refactoring in 3.9
+ State.
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0d0e37830a..7745789593 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -23,6 +23,9 @@
tick/2,
overview/1,
get_checked_out/4,
+ %% versioning
+ version/0,
+ which_module/1,
%% aux
init_aux/1,
handle_aux/6,
@@ -47,6 +50,7 @@
%% protocol helpers
make_enqueue/3,
+ make_register_enqueuer/1,
make_checkout/3,
make_settle/2,
make_return/2,
@@ -61,6 +65,7 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
meta :: consumer_meta()}).
@@ -80,6 +85,7 @@
-opaque protocol() ::
#enqueue{} |
+ #register_enqueuer{} |
#checkout{} |
#settle{} |
#return{} |
@@ -121,12 +127,14 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ Overflow = maps:get(overflow_strategy, Conf, drop_head),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ Expires = maps:get(expires, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -134,24 +142,21 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- SHICur = case State#?MODULE.cfg of
- #cfg{release_cursor_interval = {_, C}} ->
- C;
- #cfg{release_cursor_interval = undefined} ->
- SHI;
- #cfg{release_cursor_interval = C} ->
- C
- end,
+ RCISpec = {RCI, RCI},
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ LastActive = maps:get(created, Conf, undefined),
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
become_leader_handler = BLH,
+ overflow_strategy = Overflow,
max_length = MaxLength,
max_bytes = MaxBytes,
max_in_memory_length = MaxMemoryLength,
max_in_memory_bytes = MaxMemoryBytes,
consumer_strategy = ConsumerStrategy,
- delivery_limit = DeliveryLimit}}.
+ delivery_limit = DeliveryLimit,
+ expires = Expires},
+ last_active = LastActive}.
zero(_) ->
0.
@@ -161,9 +166,27 @@ zero(_) ->
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), Reply :: term(), ra_machine:effects()} |
{state(), Reply :: term()}.
-apply(Metadata, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Meta, From, Seq, RawMsg, State00);
+apply(_Meta, #register_enqueuer{pid = Pid},
+ #?MODULE{enqueuers = Enqueuers0,
+ cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
+
+ State = case maps:is_key(Pid, Enqueuers0) of
+ true ->
+ %% if the enqueuer exits just echo the overflow state
+ State0;
+ false ->
+ State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ end,
+ Res = case is_over_limit(State) of
+ true when Overflow == reject_publish ->
+ reject_publish;
+ _ ->
+ ok
+ end,
+ {State, Res, [{monitor, process, Pid}]};
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
@@ -212,8 +235,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ checkout(Meta, State0,
+ State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -259,10 +283,15 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
-apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+apply(#{index := Index,
+ system_time := Ts,
+ from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #?MODULE{consumers = Consumers} = State0) ->
+ #?MODULE{consumers = Consumers} = State00) ->
+ %% dequeue always updates last_active
+ State0 = State00#?MODULE{last_active = Ts},
+ %% all dequeue operations result in keeping the queue from expiring
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -274,60 +303,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch},
State0),
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- {State, Effects} = case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State3, _, Effects0} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
- {State3, Effects0}
- end,
+ {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
+ {State4, Effects1} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ {Reply, Effects2} =
case Msg of
{RaftIdx, {Header, 'empty'}} ->
%% TODO add here new log effect with reply
- {State, '$ra_no_reply',
- reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ {'$ra_no_reply',
+ [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
+ Effects1]};
_ ->
- {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
+
+ end,
+
+ case evaluate_limit(Index, false, State0, State4, Effects2) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
- checkout(Meta, State, Effects);
+ {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
+ checkout(Meta, State0, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
- checkout(Meta, State1, [{monitor, process, Pid}]);
-apply(#{index := RaftIdx}, #purge{},
+ checkout(Meta, State0, State1, [{monitor, process, Pid}]);
+apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
- {State, _, Effects} =
- update_smallest_raft_index(RaftIdx,
- State0#?MODULE{ra_indexes = Indexes,
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {0, [], 0, []},
- low_msg_num = undefined,
- msg_bytes_in_memory = 0,
- msgs_ready_in_memory = 0},
- []),
- %% as we're not checking out after a purge (no point) we have to
- %% reverse the effects ourselves
- {State, {purge, Total},
- lists:reverse([garbage_collection | Effects])};
-apply(Meta, {down, Pid, noconnection},
+
+ State1 = State0#?MODULE{ra_indexes = Indexes,
+ messages = lqueue:new(),
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ Effects0 = [garbage_collection],
+ Reply = {purge, Total},
+ {State, _, Effects} = evaluate_limit(Index, false, State0,
+ State1, Effects0),
+ update_smallest_raft_index(Index, Reply, State, Effects);
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
@@ -344,7 +382,7 @@ apply(Meta, {down, Pid, noconnection},
S0, Cid, C0, false, suspected_down, E0),
Checked = C0#consumer.checked_out,
Credit = increase_credit(C0, maps:size(Checked)),
- {St, Effs1} = return_all(S0, Effs,
+ {St, Effs1} = return_all(Meta, S0, Effs,
Cid, C0#consumer{credit = Credit}),
%% if the consumer was cancelled there is a chance it got
%% removed when returning hence we need to be defensive here
@@ -355,7 +393,8 @@ apply(Meta, {down, Pid, noconnection},
Waiting0
end,
{St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
- waiting_consumers = Waiting},
+ waiting_consumers = Waiting,
+ last_active = Ts},
Effs1};
(_, _, S) ->
S
@@ -373,8 +412,8 @@ apply(Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
-apply(Meta, {down, Pid, noconnection},
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
@@ -395,7 +434,7 @@ apply(Meta, {down, Pid, noconnection},
Credit = increase_credit(C0, map_size(Checked0)),
C = C0#consumer{status = suspected_down,
credit = Credit},
- {St, Eff0} = return_all(St0, Eff, Cid, C),
+ {St, Eff0} = return_all(Meta, St0, Eff, Cid, C),
Eff1 = consumer_update_active_effects(St, Cid, C, false,
suspected_down, Eff0),
{St, Eff1};
@@ -416,13 +455,14 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs,
+ last_active = Ts}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
- {State, Effects} = handle_down(Pid, State0),
- checkout(Meta, State, Effects);
+ {State, Effects} = handle_down(Meta, Pid, State0),
+ checkout(Meta, State0, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ service_queue = _SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -436,44 +476,90 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
- {Cons1, SQ, Effects1} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ {State1, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {SAcc, EAcc})
when (node(P) =:= Node) and
(C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerId,
C, true, up, EAcc),
- update_or_remove_sub(ConsumerId,
- C#consumer{status = up}, CAcc,
- SQAcc, EAcc1);
+ {update_or_remove_sub(Meta, ConsumerId,
+ C#consumer{status = up},
+ SAcc), EAcc1};
(_, _, Acc) ->
Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
- Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1,
+ end, {State0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State1, up),
+ State2 = State1#?MODULE{
enqueuers = Enqs1,
- service_queue = SQ,
waiting_consumers = Waiting},
- {State, Effects} = activate_next_consumer(State1, Effects1),
- checkout(Meta, State, Effects);
+ {State, Effects} = activate_next_consumer(State2, Effects1),
+ checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
- purge_node(Node, S, E)
+ purge_node(Meta, Node, S, E)
end, {State0, []}, Nodes),
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
- checkout(Meta, update_config(Conf, State), []).
-
-purge_node(Node, State, Effects) ->
+ checkout(Meta, State, update_config(Conf, State), []);
+apply(_Meta, {machine_version, 0, 1}, V0State) ->
+ State = convert_v0_to_v1(V0State),
+ {State, ok, []}.
+
+convert_v0_to_v1(V0State0) ->
+ V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
+ V0Msgs = rabbit_fifo_v0:get_field(messages, V0State),
+ V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
+ V0Enqs = rabbit_fifo_v0:get_field(enqueuers, V0State),
+ V1Enqs = maps:map(
+ fun (_EPid, E) ->
+ #enqueuer{next_seqno = element(2, E),
+ pending = element(3, E),
+ status = element(4, E)}
+ end, V0Enqs),
+ Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
+ resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
+ release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
+ dead_letter_handler = rabbit_fifo_v0:get_cfg_field(dead_letter_handler, V0State),
+ become_leader_handler = rabbit_fifo_v0:get_cfg_field(become_leader_handler, V0State),
+ %% TODO: what if policy enabling reject_publish was applied before conversion?
+ overflow_strategy = drop_head,
+ max_length = rabbit_fifo_v0:get_cfg_field(max_length, V0State),
+ max_bytes = rabbit_fifo_v0:get_cfg_field(max_bytes, V0State),
+ consumer_strategy = rabbit_fifo_v0:get_cfg_field(consumer_strategy, V0State),
+ delivery_limit = rabbit_fifo_v0:get_cfg_field(delivery_limit, V0State),
+ max_in_memory_length = rabbit_fifo_v0:get_cfg_field(max_in_memory_length, V0State),
+ max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State)
+ },
+
+ #?MODULE{cfg = Cfg,
+ messages = V1Msgs,
+ next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State),
+ returns = rabbit_fifo_v0:get_field(returns, V0State),
+ enqueue_count = rabbit_fifo_v0:get_field(enqueue_count, V0State),
+ enqueuers = V1Enqs,
+ ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
+ release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
+ consumers = rabbit_fifo_v0:get_field(consumers, V0State),
+ service_queue = rabbit_fifo_v0:get_field(service_queue, V0State),
+ prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
+ msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
+ msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
+ waiting_consumers = rabbit_fifo_v0:get_field(waiting_consumers, V0State),
+ msg_bytes_in_memory = rabbit_fifo_v0:get_field(msg_bytes_in_memory, V0State),
+ msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State)
+ }.
+
+purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
- {S, E} = handle_down(Pid, S0),
+ {S, E} = handle_down(Meta, Pid, S0),
{S, E0 ++ E}
end, {State, Effects}, all_pids_for(Node, State)).
%% any downs that re not noconnection
-handle_down(Pid, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -490,7 +576,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
+ cancel_consumer(Meta, ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
@@ -582,19 +668,24 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
- resource = QName},
+tick(Ts, #?MODULE{cfg = #cfg{name = Name,
+ resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
- Metrics = {Name,
- messages_ready(State),
- num_checked_out(State), % checked out
- messages_total(State),
- query_consumer_count(State), % Consumers
- EnqueueBytes,
- CheckoutBytes},
- [{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics, all_nodes(State)]}].
+ case is_expired(Ts, State) of
+ true ->
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
+ false ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}]
+ end.
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -612,7 +703,10 @@ overview(#?MODULE{consumers = Cons,
max_bytes => Cfg#cfg.max_bytes,
consumer_strategy => Cfg#cfg.consumer_strategy,
max_in_memory_length => Cfg#cfg.max_in_memory_length,
- max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes,
+ expires => Cfg#cfg.expires,
+ delivery_limit => Cfg#cfg.delivery_limit
+ },
#{type => ?MODULE,
config => Conf,
num_consumers => maps:size(Cons),
@@ -621,7 +715,8 @@ overview(#?MODULE{consumers = Cons,
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
- release_crusor_enqueue_counter => EnqCount,
+ release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)],
+ release_cursor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -637,6 +732,12 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
[]
end.
+-spec version() -> pos_integer().
+version() -> 1.
+
+which_module(0) -> rabbit_fifo_v0;
+which_module(1) -> ?MODULE.
+
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
utilisation :: term(),
@@ -804,7 +905,7 @@ messages_ready(#?MODULE{messages = M,
%% prefix messages will rarely have anything in them during normal
%% operations so length/1 is fine here
- maps:size(M) + lqueue:len(R) + RCnt + PCnt.
+ lqueue:len(M) + lqueue:len(R) + RCnt + PCnt.
messages_total(#?MODULE{ra_indexes = I,
prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
@@ -843,17 +944,17 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
maps:size(C) + Acc
end, 0, Cons).
-cancel_consumer(ConsumerId,
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects, Reason) ->
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0} = State0,
@@ -862,7 +963,7 @@ cancel_consumer(ConsumerId,
case maps:is_key(ConsumerId, Cons0) of
true ->
% The active consumer is to be removed
- {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ {State1, Effects1} = cancel_consumer0(Meta, ConsumerId, State0,
Effects0, Reason),
activate_next_consumer(State1, Effects1);
false ->
@@ -886,11 +987,12 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(Meta, ConsumerId,
+ #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
case C0 of
#{ConsumerId := Consumer} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
- Effects0, Reason),
+ {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
+ S0, Effects0, Reason),
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of
@@ -939,23 +1041,18 @@ activate_next_consumer(#?MODULE{consumers = Cons,
-maybe_return_all(ConsumerId, Consumer,
- #?MODULE{consumers = C0,
- service_queue = SQ0} = S0,
- Effects0, Reason) ->
+maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0, Reason) ->
case Reason of
consumer_cancel ->
- {Cons, SQ, Effects1} =
- update_or_remove_sub(ConsumerId,
- Consumer#consumer{lifetime = once,
- credit = 0,
- status = cancelled},
- C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects1};
+ {update_or_remove_sub(Meta, ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ S0), Effects0};
down ->
- {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
- {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers),
+ last_active = Ts},
Effects1}
end.
@@ -963,7 +1060,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
@@ -993,7 +1090,6 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
%% the initial header is an integer only - it will get expanded to a map
%% when the next required key is added
@@ -1008,10 +1104,7 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
State = add_bytes_enqueue(Header, State1),
- State#?MODULE{messages = Messages#{NextMsgNum => Msg},
- %% this is probably only done to record it when low_msg_num
- %% is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
+ State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
@@ -1021,21 +1114,15 @@ append_to_master_index(RaftIdx,
State#?MODULE{ra_indexes = Indexes}.
-incr_enqueue_count(#?MODULE{enqueue_count = C,
+incr_enqueue_count(#?MODULE{enqueue_count = EC,
cfg = #cfg{release_cursor_interval = {_Base, C}}
- } = State0) ->
+ } = State0) when EC >= C->
%% this will trigger a dehydrated version of the state to be stored
%% at this raft index for potential future snapshot generation
%% Q: Why don't we just stash the release cursor here?
%% A: Because it needs to be the very last thing we do and we
%% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
-incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% conversion to new release cursor interval format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
@@ -1055,25 +1142,15 @@ maybe_store_dehydrated_state(RaftIdx,
0 -> 0;
_ ->
Total = messages_total(State0),
- min(max(Total, Base),
- ?RELEASE_CURSOR_EVERY_MAX)
+ min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX)
end,
- State = convert_prefix_msgs(
- State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}),
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}},
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
State#?MODULE{release_cursors = Cursors}
end;
-maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
- #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% convert to new format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1122,57 +1199,50 @@ snd(T) ->
element(2, T).
return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
- Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ Effects0, State0) ->
{State1, Effects1} = maps:fold(
fun(MsgId, {Tag, _} = Msg, {S0, E0})
when Tag == '$prefix_msg';
Tag == '$empty_msg'->
- return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S0, E0, ConsumerId);
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgId, MsgNum, Msg, S0, E0,
+ return_one(Meta, MsgId, MsgNum, Msg, S0, E0,
ConsumerId)
end, {State0, Effects0}, Returned),
- {State2, Effects3} =
+ State2 =
case State1#?MODULE.consumers of
- #{ConsumerId := Con0} = Cons0 ->
+ #{ConsumerId := Con0} ->
Con = Con0#consumer{credit = increase_credit(Con0,
map_size(Returned))},
- {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ0, Effects1),
- {State1#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects2};
+ update_or_remove_sub(Meta, ConsumerId, Con, State1);
_ ->
- {State1, Effects1}
+ State1
end,
- {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
-complete(ConsumerId, Discarded,
- #consumer{checked_out = Checked} = Con0, Effects0,
- #?MODULE{consumers = Cons0, service_queue = SQ0,
- ra_indexes = Indexes0} = State0) ->
+complete(Meta, ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects,
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
%% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
credit = increase_credit(Con0, map_size(Discarded))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects0),
+ State1 = update_or_remove_sub(Meta, ConsumerId, Con, State0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
%% TODO: use maps:fold instead
- State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ State2 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc);
({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
- end, State0, maps:values(Discarded)),
- {State1#?MODULE{consumers = Cons,
- ra_indexes = Indexes,
- service_queue = SQ}, Effects}.
+ end, State1, maps:values(Discarded)),
+ {State2#?MODULE{ra_indexes = Indexes}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1190,9 +1260,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
Discarded = maps:with(MsgIds, Checked0),
- {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1228,8 +1298,12 @@ cancel_consumer_effects(ConsumerId,
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
-update_smallest_raft_index(IncomingRaftIdx,
- #?MODULE{ra_indexes = Indexes,
+update_smallest_raft_index(Idx, State, Effects) ->
+ update_smallest_raft_index(Idx, ok, State, Effects).
+
+update_smallest_raft_index(IncomingRaftIdx, Reply,
+ #?MODULE{cfg = Cfg,
+ ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
@@ -1237,18 +1311,22 @@ update_smallest_raft_index(IncomingRaftIdx,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#?MODULE{release_cursors = lqueue:new()},
- {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ %% reset the release cursor interval
+ #cfg{release_cursor_interval = {Base, _}} = Cfg,
+ RCI = {Base, Base},
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI},
+ release_cursors = lqueue:new(),
+ enqueue_count = 0},
+ {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors},
- ok, Effects};
+ {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
{Cursor, Cursors} ->
- %% we can emit a release cursor we've passed the smallest
+ %% we can emit a release cursor when we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, Reply,
Effects ++ [Cursor]}
end
end.
@@ -1272,7 +1350,7 @@ update_header(Key, UpdateFun, Default, Header) ->
maps:update_with(Key, UpdateFun, Default, Header).
-return_one(MsgId, 0, {Tag, Header0},
+return_one(Meta, MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1283,7 +1361,7 @@ return_one(MsgId, 0, {Tag, Header0},
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ complete(Meta, ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
@@ -1303,7 +1381,7 @@ return_one(MsgId, 0, {Tag, Header0},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
-return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1316,7 +1394,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
DlMsg = {MsgNum, Msg0},
Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
- complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ complete(Meta, ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
@@ -1338,46 +1416,46 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Effects0}
end.
-return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
#consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ return_one(Meta, MsgId, MsgNum, Msg, S, E, ConsumerId)
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index}, State0, Effects0) ->
- {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}),
- case evaluate_limit(false, State1, Effects1) of
+ case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
Effects, {SendAcc, LogAcc0}) ->
DelMsg = {RaftIdx, {MsgId, Header}},
LogAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], LogAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
{SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], SendAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
append_send_msg_effects(
@@ -1389,17 +1467,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(Result,
+evaluate_limit(_Index, Result, _BeforeState,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State00, Effects0) ->
- State0 = convert_prefix_msgs(State00),
+evaluate_limit(Index, Result, BeforeState,
+ #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ enqueuers = Enqs0} = State0,
+ Effects0) ->
case is_over_limit(State0) of
- true ->
+ true when Strategy == drop_head ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(true, State, Effects);
+ evaluate_limit(Index, true, BeforeState, State, Effects);
+ true when Strategy == reject_publish ->
+ %% generate send_msg effect for each enqueuer to let them know
+ %% they need to block
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = Index},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, reject_publish},
+ [ra_event]} | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ false when Strategy == reject_publish ->
+ %% TODO: optimise as this case gets called for every command
+ %% pretty much
+ Before = is_below_soft_limit(BeforeState),
+ case {Before, is_below_soft_limit(State0)} of
+ {false, true} ->
+ %% we have moved below the lower limit which
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = undefined},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, go}, [ra_event]}
+ | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ _ ->
+ {State0, Result, Effects0}
+ end;
false ->
{State0, Result, Effects0}
end.
@@ -1451,7 +1566,6 @@ take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -
{{'$prefix_msg', Header},
State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
take_next_msg(#?MODULE{returns = Returns,
- low_msg_num = Low0,
messages = Messages0,
prefix_msgs = {NumR, R, NumP, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
@@ -1461,21 +1575,11 @@ take_next_msg(#?MODULE{returns = Returns,
{NextMsg,
State#?MODULE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
- case Low0 of
- undefined ->
+ case lqueue:out(Messages0) of
+ {empty, _} ->
empty;
- _ ->
- {Msg, Messages} = maps:take(Low0, Messages0),
- case maps:size(Messages) of
- 0 ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = undefined}};
- _ ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = Low0 + 1}}
- end
+ {{value, {_, _} = SeqMsg}, Messages} ->
+ {SeqMsg, State#?MODULE{messages = Messages }}
end;
empty ->
[Msg | Rem] = P,
@@ -1511,7 +1615,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(#?MODULE{service_queue = SQ0,
+checkout_one(Meta, #?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
@@ -1526,11 +1630,11 @@ checkout_one(#?MODULE{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1540,11 +1644,9 @@ checkout_one(#?MODULE{service_queue = SQ0,
next_msg_id = Next + 1,
credit = Credit - 1,
delivery_count = DelCnt + 1},
- {Cons, SQ, []} = % we expect no effects
- update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ1, []),
- State1 = State0#?MODULE{service_queue = SQ,
- consumers = Cons},
+ State1 = update_or_remove_sub(Meta,
+ ConsumerId, Con,
+ State0#?MODULE{service_queue = SQ1}),
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', Header} ->
@@ -1566,44 +1668,46 @@ checkout_one(#?MODULE{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#?MODULE{service_queue = SQ1})
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
end;
empty ->
- case maps:size(Messages0) of
+ case lqueue:len(Messages0) of
0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
- credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ #?MODULE{consumers = Cons} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)};
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)};
+update_or_remove_sub(#{system_time := Ts},
+ ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- case maps:size(Checked) of
+ #?MODULE{consumers = Cons} = State) ->
+ case maps:size(Checked) of
0 ->
% we're done with this consumer
- % TODO: demonitor consumer pid but _only_ if there are no other
- % monitors for this pid
- {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
+ State#?MODULE{consumers = maps:remove(ConsumerId, Cons),
+ last_active = Ts};
_ ->
% there are unsettled items so need to keep around
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}
end;
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects}.
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}.
uniq_queue_in(Key, Queue) ->
% TODO: queue:member could surely be quite expensive, however the practical
@@ -1663,18 +1767,11 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) ->
- State#?MODULE{prefix_msgs = {length(R), R, length(P), P}};
-convert_prefix_msgs(State) ->
- State.
-
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
- low_msg_num = Low,
- next_msg_num = Next,
prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
RCnt = lqueue:len(Returns),
@@ -1691,34 +1788,33 @@ dehydrate_state(#?MODULE{messages = Messages,
[],
lqueue:to_list(Returns)),
PrefRet = PrefRet0 ++ PrefRet1,
- PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ PrefMsgsSuff = dehydrate_messages(Messages, []),
%% prefix messages are not populated in normal operation only after
%% recovering from a snapshot
PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
- State#?MODULE{messages = #{},
+ State#?MODULE{messages = lqueue:new(),
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
- low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {PRCnt + RCnt, PrefRet,
- PPCnt + maps:size(Messages), PrefMsgs},
+ PPCnt + lqueue:len(Messages), PrefMsgs},
waiting_consumers = Waiting}.
-dehydrate_messages(Low, Next, _Msgs, Acc)
- when Next < Low ->
- Acc;
-dehydrate_messages(Low, Next, Msgs, Acc0) ->
- Acc = case maps:get(Next, Msgs) of
- {_RaftIdx, {_, 'empty'} = Msg} ->
- [Msg | Acc0];
- {_RaftIdx, {Header, _}} ->
- [Header | Acc0]
- end,
- dehydrate_messages(Low, Next - 1, Msgs, Acc).
+%% TODO make body recursive to avoid allocating lists:reverse call
+dehydrate_messages(Msgs0, Acc0) ->
+ {OutRes, Msgs} = lqueue:out(Msgs0),
+ case OutRes of
+ {value, {_MsgId, {_RaftId, {_, 'empty'} = Msg}}} ->
+ dehydrate_messages(Msgs, [Msg | Acc0]);
+ {value, {_MsgId, {_RaftId, {Header, _}}}} ->
+ dehydrate_messages(Msgs, [Header | Acc0]);
+ empty ->
+ lists:reverse(Acc0)
+ end.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
@@ -1733,8 +1829,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#?MODULE{release_cursors = Cursors} = State) ->
- State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?MODULE{messages = Messages,
+ release_cursors = Cursors} = State) ->
+ State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)),
+ release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
@@ -1742,12 +1840,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
-
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ is_below(MaxLength, messages_ready(State)) andalso
+ is_below(MaxBytes, BytesEnq).
+
+is_below(undefined, _Num) ->
+ true;
+is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) ->
+ Num =< trunc(Val * ?LOW_LIMIT).
+
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
+
+-spec make_register_enqueuer(pid()) -> protocol().
+make_register_enqueuer(Pid) ->
+ #register_enqueuer{pid = Pid}.
+
-spec make_checkout(consumer_id(),
checkout_spec(), consumer_meta()) -> protocol().
make_checkout(ConsumerId, Spec, Meta) ->
@@ -1755,7 +1871,7 @@ make_checkout(ConsumerId, Spec, Meta) ->
spec = Spec, meta = Meta}.
-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
-make_settle(ConsumerId, MsgIds) ->
+make_settle(ConsumerId, MsgIds) when is_list(MsgIds) ->
#settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
-spec make_return(consumer_id(), [msg_id()]) -> protocol().
@@ -1817,7 +1933,7 @@ add_bytes_settle(#{size := Bytes}, State) ->
add_bytes_return(Bytes,
#?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State)
+ msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes};
@@ -1913,3 +2029,18 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0,
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
+
+is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
+ last_active = LastActive,
+ consumers = Consumers})
+ when is_number(LastActive) andalso is_number(Expires) ->
+ %% TODO: should it be active consumers?
+ Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
+ false;
+ (_, _) ->
+ true
+ end, Consumers),
+
+ Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
+is_expired(_Ts, _State) ->
+ false.
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ebbaa9e1eb..4c87167ea1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -67,7 +67,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY, 2048).
-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit
@@ -75,6 +75,7 @@
-define(GC_MEM_LIMIT_B, 2000000).
-define(MB, 1048576).
+-define(LOW_LIMIT, 0.8).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -100,21 +101,29 @@
-type consumer_strategy() :: competing | single_active.
+-type milliseconds() :: non_neg_integer().
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
- status = up :: up | suspected_down
+ status = up :: up |
+ suspected_down,
+ %% it is useful to have a record of when this was blocked
+ %% so that we can retry sending the block effect if
+ %% the publisher did not receive the initial one
+ blocked :: undefined | ra:index(),
+ unused_1,
+ unused_2
}).
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval ::
- undefined | non_neg_integer() |
- {non_neg_integer(), non_neg_integer()},
+ release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
+ overflow_strategy = drop_head :: drop_head | reject_publish,
max_length :: option(non_neg_integer()),
max_bytes :: option(non_neg_integer()),
%% whether single active consumer is on or not for this queue
@@ -122,7 +131,10 @@
%% the maximum number of unsuccessful delivery attempts permitted
delivery_limit :: option(non_neg_integer()),
max_in_memory_length :: option(non_neg_integer()),
- max_in_memory_bytes :: option(non_neg_integer())
+ max_in_memory_bytes :: option(non_neg_integer()),
+ expires :: undefined | milliseconds(),
+ unused_1,
+ unused_2
}).
-type prefix_msgs() :: {list(), list()} |
@@ -132,14 +144,10 @@
-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
- messages = #{} :: #{msg_in_id() => indexed_msg()},
- % defines the lowest message in id available in the messages map
- % that isn't a return
- low_msg_num :: option(msg_in_id()),
- % defines the next message in id to be added to the messages map
+ messages = lqueue:new() :: lqueue:queue(),
+ % defines the next message id
next_msg_num = 1 :: msg_in_id(),
- % list of returned msg_in_ids - when checking out it picks from
- % this list first before taking low_msg_num
+ % queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
{msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
@@ -178,7 +186,10 @@
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_id(), consumer()}],
msg_bytes_in_memory = 0 :: non_neg_integer(),
- msgs_ready_in_memory = 0 :: non_neg_integer()
+ msgs_ready_in_memory = 0 :: non_neg_integer(),
+ last_active :: undefined | non_neg_integer(),
+ unused_1,
+ unused_2
}).
-type config() :: #{name := atom(),
@@ -190,5 +201,9 @@
max_bytes => non_neg_integer(),
max_in_memory_length => non_neg_integer(),
max_in_memory_bytes => non_neg_integer(),
+ overflow_strategy => drop_head | reject_publish,
single_active_consumer_on => boolean(),
- delivery_limit => non_neg_integer()}.
+ delivery_limit => non_neg_integer(),
+ expires => non_neg_integer(),
+ created => non_neg_integer()
+ }.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 2ac9f6787a..dc98210817 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -53,9 +53,17 @@
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: cluster_name(),
- servers = [] :: [ra:server_id()],
+-record(cfg, {cluster_name :: cluster_name(),
+ servers = [] :: [ra:server_id()],
+ soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
+ block_handler = fun() -> ok end :: fun(() -> term()),
+ unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timeout :: non_neg_integer(),
+ version = 0 :: non_neg_integer()}).
+
+-record(state, {cfg :: #cfg{},
leader :: undefined | ra:server_id(),
+ queue_status :: undefined | go | reject_publish,
next_seq = 0 :: seq(),
%% Last applied is initialise to -1 to note that no command has yet been
%% applied, but allowing to resend messages if the first ones on the sequence
@@ -66,15 +74,11 @@
slow = false :: boolean(),
unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
{[seq()], [seq()], [seq()]}},
- soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
pending = #{} :: #{seq() =>
{term(), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> term()),
- unblock_handler = fun() -> ok end :: fun(() -> ok),
- timer_state :: term(),
- timeout :: non_neg_integer()
+ timer_state :: term()
}).
-opaque state() :: #state{}.
@@ -102,22 +106,24 @@ init(ClusterName, Servers) ->
%% @param MaxPending size defining the max number of pending commands.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- block_handler = BlockFun,
- unblock_handler = UnblockFun,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ %% net ticktime is in seconds
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ block_handler = BlockFun,
+ unblock_handler = UnblockFun,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
+
%% @doc Enqueues a message.
%% @param Correlation an arbitrary erlang term used to correlate this
@@ -132,10 +138,47 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
%% SequenceNumber can be correlated to the applied sequence numbers returned
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
-enqueue(Correlation, Msg, State0 = #state{slow = Slow,
- block_handler = BlockFun}) ->
- Node = pick_node(State0),
+ {ok | slow | reject_publish, state()}.
+enqueue(Correlation, Msg,
+ #state{queue_status = undefined,
+ next_enqueue_seq = 1,
+ cfg = #cfg{timeout = Timeout}} = State0) ->
+ %% it is the first enqueue, check the version
+ {_, Node} = Server = pick_server(State0),
+ case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ %% fall back on old behavour
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg, Timeout) of
+ {ok, reject_publish, _} ->
+ {reject_publish, State0#state{queue_status = reject_publish}};
+ {ok, ok, _} ->
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ {timeout, _} ->
+ %% if we timeout it is probably better to reject
+ %% the message than being uncertain
+ {reject_publish, State0};
+ Err ->
+ exit(Err)
+ end;
+ {badrpc, nodedown} ->
+ {reject_publish, State0}
+ end;
+enqueue(_Correlation, _Msg,
+ #state{queue_status = reject_publish,
+ cfg = #cfg{}} = State) ->
+ {reject_publish, State};
+enqueue(Correlation, Msg,
+ #state{slow = Slow,
+ queue_status = go,
+ cfg = #cfg{block_handler = BlockFun}} = State0) ->
+ Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
@@ -159,7 +202,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
%%
-spec enqueue(Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
+ {ok | slow | reject_publish, state()}.
enqueue(Msg, State) ->
enqueue(undefined, Msg, State).
@@ -178,8 +221,9 @@ enqueue(Msg, State) ->
Settlement :: settled | unsettled, state()) ->
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
| empty, state()} | {error | timeout, term()}.
-dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
- Node = pick_node(State0),
+dequeue(ConsumerTag, Settlement,
+ #state{cfg = #cfg{timeout = Timeout}} = State0) ->
+ Node = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
rabbit_fifo:make_checkout(ConsumerId,
@@ -189,7 +233,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, {dequeue, empty}, Leader} ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
- {ok, {Msg, NumReady}, State0#state{leader = Leader}};
+ {ok, {Msg, NumReady},
+ State0#state{leader = Leader}};
{ok, {error, _} = Err, _Leader} ->
Err;
Err ->
@@ -208,7 +253,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -241,7 +286,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -275,7 +320,7 @@ return(ConsumerTag, [_|_] = MsgIds,
-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -363,7 +408,7 @@ credit(ConsumerTag, Credit, Drain,
%% the last received msgid provides us with the delivery count if we
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -429,11 +474,11 @@ stat(Leader, Timeout) ->
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
-cluster_name(#state{cluster_name = ClusterName}) ->
+cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
-update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
+update_machine_state(Server, Conf) ->
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -487,10 +532,11 @@ update_machine_state(Node, Conf) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
- #state{soft_limit = SftLmt,
- unblock_handler = UnblockFun} = State0) ->
+ #state{cfg = #cfg{soft_limit = SftLmt,
+ unblock_handler = UnblockFun}} = State00) ->
+ State0 = State00#state{leader = From},
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
- {[], [], State0#state{leader = From}},
+ {[], [], State0},
Seqs),
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
@@ -511,7 +557,7 @@ handle_ra_event(From, {applied, Seqs},
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
- Node = pick_node(State2),
+ Node = pick_server(State2),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
case send_command(Node, undefined,
@@ -527,6 +573,10 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(From, Del, State0);
+handle_ra_event(_, {machine, {queue_status, Status}},
+ #state{} = State) ->
+ %% just set the queue status
+ {internal, [], [], State#state{queue_status = Status}};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
@@ -543,7 +593,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
-handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
@@ -645,6 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
+ rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
@@ -719,16 +770,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
[Leader])
end.
-pick_node(#state{leader = undefined, servers = [N | _]}) ->
+pick_server(#state{leader = undefined,
+ cfg = #cfg{servers = [N | _]}}) ->
%% TODO: pick random rather that first?
N;
-pick_node(#state{leader = Leader}) ->
+pick_server(#state{leader = Leader}) ->
Leader.
% servers sorted by last known leader
-sorted_servers(#state{leader = undefined, servers = Servers}) ->
+sorted_servers(#state{leader = undefined,
+ cfg = #cfg{servers = Servers}}) ->
Servers;
-sorted_servers(#state{leader = Leader, servers = Servers}) ->
+sorted_servers(#state{leader = Leader,
+ cfg = #cfg{servers = Servers}}) ->
[Leader | lists:delete(Leader, Servers)].
next_seq(#state{next_seq = Seq} = State) ->
@@ -742,7 +796,7 @@ consumer_id(ConsumerTag) ->
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
- soft_limit = SftLmt} = State0) ->
+ cfg = #cfg{soft_limit = SftLmt}} = State0) ->
{Seq, State} = next_seq(State0),
ok = ra:pipeline_command(Server, Command, Seq, Priority),
Tag = case maps:size(Pending) >= SftLmt of
@@ -767,7 +821,7 @@ add_command(Cid, return, MsgIds, Acc) ->
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
-set_timer(#state{servers = [Server | _]} = State) ->
+set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) ->
Ref = erlang:send_after(?TIMER_TIME, self(),
{ra_event, Server, timeout}),
State#state{timer_state = Ref}.
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
new file mode 100644
index 0000000000..51f6bd133e
--- /dev/null
+++ b/src/rabbit_fifo_v0.erl
@@ -0,0 +1,1961 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_fifo_v0).
+
+-behaviour(ra_machine).
+
+-compile(inline_list_funcs).
+-compile(inline).
+-compile({no_auto_import, [apply/3]}).
+
+-include("rabbit_fifo_v0.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([
+ init/1,
+ apply/3,
+ state_enter/2,
+ tick/2,
+ overview/1,
+ get_checked_out/4,
+ %% aux
+ init_aux/1,
+ handle_aux/6,
+ % queries
+ query_messages_ready/1,
+ query_messages_checked_out/1,
+ query_messages_total/1,
+ query_processes/1,
+ query_ra_indexes/1,
+ query_consumer_count/1,
+ query_consumers/1,
+ query_stat/1,
+ query_single_active_consumer/1,
+ query_in_memory_usage/1,
+ usage/1,
+
+ zero/1,
+
+ %% misc
+ dehydrate_state/1,
+ normalize/1,
+ normalize_for_v1/1,
+ %% getters for coversions
+ get_field/2,
+ get_cfg_field/2,
+
+ %% protocol helpers
+ make_enqueue/3,
+ make_checkout/3,
+ make_settle/2,
+ make_return/2,
+ make_discard/2,
+ make_credit/4,
+ make_purge/0,
+ make_purge_nodes/1,
+ make_update_config/1
+ ]).
+
+%% command records representing all the protocol actions that are supported
+-record(enqueue, {pid :: option(pid()),
+ seq :: option(msg_seqno()),
+ msg :: raw_msg()}).
+-record(checkout, {consumer_id :: consumer_id(),
+ spec :: checkout_spec(),
+ meta :: consumer_meta()}).
+-record(settle, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(return, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(discard, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(credit, {consumer_id :: consumer_id(),
+ credit :: non_neg_integer(),
+ delivery_count :: non_neg_integer(),
+ drain :: boolean()}).
+-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
+-record(update_config, {config :: config()}).
+
+-opaque protocol() ::
+ #enqueue{} |
+ #checkout{} |
+ #settle{} |
+ #return{} |
+ #discard{} |
+ #credit{} |
+ #purge{} |
+ #purge_nodes{} |
+ #update_config{}.
+
+-type command() :: protocol() | ra_machine:builtin_command().
+%% all the command types supported by ra fifo
+
+-type client_msg() :: delivery().
+%% the messages `rabbit_fifo' can send to consumers.
+
+-opaque state() :: #?STATE{}.
+
+-export_type([protocol/0,
+ delivery/0,
+ command/0,
+ credit_mode/0,
+ consumer_tag/0,
+ consumer_meta/0,
+ consumer_id/0,
+ client_msg/0,
+ msg/0,
+ msg_id/0,
+ msg_seqno/0,
+ delivery_msg/0,
+ state/0,
+ config/0]).
+
+-spec init(config()) -> state().
+init(#{name := Name,
+ queue_resource := Resource} = Conf) ->
+ update_config(Conf, #?STATE{cfg = #cfg{name = Name,
+ resource = Resource}}).
+
+update_config(Conf, State) ->
+ DLH = maps:get(dead_letter_handler, Conf, undefined),
+ BLH = maps:get(become_leader_handler, Conf, undefined),
+ SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
+ MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
+ MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ competing
+ end,
+ Cfg = State#?STATE.cfg,
+ SHICur = case State#?STATE.cfg of
+ #cfg{release_cursor_interval = {_, C}} ->
+ C;
+ #cfg{release_cursor_interval = undefined} ->
+ SHI;
+ #cfg{release_cursor_interval = C} ->
+ C
+ end,
+
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ max_in_memory_length = MaxMemoryLength,
+ max_in_memory_bytes = MaxMemoryBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}}.
+
+zero(_) ->
+ 0.
+
+% msg_ids are scoped per consumer
+% ra_indexes holds all raft indexes for enqueues currently on queue
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ % need to increment metrics before completing as any snapshot
+ % states taken need to include them
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
+ Con0, [], State);
+ _ ->
+ {State, ok}
+
+ end;
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
+ Effects = dead_letter_effects(rejected, Discarded, State0, []),
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
+ Effects, State0);
+ _ ->
+ {State0, ok}
+ end;
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{checked_out = Checked0}} ->
+ Returned = maps:with(MsgIds, Checked0),
+ return(Meta, ConsumerId, Returned, [], State);
+ _ ->
+ {State, ok}
+ end;
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0,
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
+ %% this can go below 0 when credit is reduced
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ %% grant the credit
+ Con1 = Con0#consumer{credit = C},
+ ServiceQueue = maybe_queue_consumer(ConsumerId, Con1,
+ ServiceQueue0),
+ Cons = maps:put(ConsumerId, Con1, Cons0),
+ {State1, ok, Effects} =
+ checkout(Meta, State0#?STATE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
+ Response = {send_credit_reply, messages_ready(State1)},
+ %% by this point all checkouts for the updated credit value
+ %% should be processed so we can evaluate the drain
+ case Drain of
+ false ->
+ %% just return the result of the checkout
+ {State1, Response, Effects};
+ true ->
+ Con = #consumer{credit = PostCred} =
+ maps:get(ConsumerId, State1#?STATE.consumers),
+ %% add the outstanding credit to the delivery count
+ DeliveryCount = Con#consumer.delivery_count + PostCred,
+ Consumers = maps:put(ConsumerId,
+ Con#consumer{delivery_count = DeliveryCount,
+ credit = 0},
+ State1#?STATE.consumers),
+ Drained = Con#consumer.credit,
+ {CTag, _} = ConsumerId,
+ {State1#?STATE{consumers = Consumers},
+ %% returning a multi response with two client actions
+ %% for the channel to execute
+ {multi, [Response, {send_drained, {CTag, Drained}}]},
+ Effects}
+ end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?STATE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
+ _ ->
+ %% credit for unknown consumer - just ignore
+ {State0, ok}
+ end;
+apply(_, #checkout{spec = {dequeue, _}},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ {State0, {error, unsupported}};
+apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
+ consumer_id = ConsumerId},
+ #?STATE{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ Ready ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch},
+ State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ {State, Effects} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ case Msg of
+ {RaftIdx, {Header, 'empty'}} ->
+ %% TODO add here new log effect with reply
+ {State, '$ra_no_reply',
+ reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ _ ->
+ {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ end
+ end;
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+ {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
+ State0) ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
+apply(#{index := RaftIdx}, #purge{},
+ #?STATE{ra_indexes = Indexes0,
+ returns = Returns,
+ messages = Messages} = State0) ->
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
+ {State, _, Effects} =
+ update_smallest_raft_index(RaftIdx,
+ State0#?STATE{ra_indexes = Indexes,
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ low_msg_num = undefined,
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ []),
+ %% as we're not checking out after a purge (no point) we have to
+ %% reverse the effects ourselves
+ {State, {purge, Total},
+ lists:reverse([garbage_collection | Effects])};
+apply(Meta, {down, Pid, noconnection},
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0,
+ enqueuers = Enqs0} = State0) ->
+ Node = node(Pid),
+ %% if the pid refers to an active or cancelled consumer,
+ %% mark it as suspected and return it to the waiting queue
+ {State1, Effects0} =
+ maps:fold(fun({_, P} = Cid, C0, {S0, E0})
+ when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ S0, Cid, C0, false, suspected_down, E0),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(S0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?STATE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers),
+ waiting_consumers = Waiting},
+ Effs1};
+ (_, _, S) ->
+ S
+ end, {State0, []}, Cons0),
+ WaitingConsumers = update_waiting_consumer_status(Node, State1,
+ suspected_down),
+
+ %% select a new consumer from the waiting queue and run a checkout
+ State2 = State1#?STATE{waiting_consumers = WaitingConsumers},
+ {State, Effects1} = activate_next_consumer(State2, Effects0),
+
+ %% mark any enquers as suspected
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ Effects = [{monitor, node, Node} | Effects1],
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, noconnection},
+ #?STATE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ %% A node has been disconnected. This doesn't necessarily mean that
+ %% any processes on this node are down, they _may_ come back so here
+ %% we just mark them as suspected (effectively deactivated)
+ %% and return all checked out messages to the main queue for delivery to any
+ %% live consumers
+ %%
+ %% all pids for the disconnected node will be marked as suspected not just
+ %% the one we got the `down' command for
+ Node = node(Pid),
+
+ {State, Effects1} =
+ maps:fold(
+ fun({_, P} = Cid, #consumer{checked_out = Checked0,
+ status = up} = C0,
+ {St0, Eff}) when node(P) =:= Node ->
+ Credit = increase_credit(C0, map_size(Checked0)),
+ C = C0#consumer{status = suspected_down,
+ credit = Credit},
+ {St, Eff0} = return_all(St0, Eff, Cid, C),
+ Eff1 = consumer_update_active_effects(St, Cid, C, false,
+ suspected_down, Eff0),
+ {St, Eff1};
+ (_, _, {St, Eff}) ->
+ {St, Eff}
+ end, {State0, []}, Cons0),
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+
+ % Monitor the node so that we can "unsuspect" these processes when the node
+ % comes back, then re-issue all monitors and discover the final fate of
+ % these processes
+ Effects = case maps:size(State#?STATE.consumers) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Pid, State0),
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
+ %% A node we are monitoring has come back.
+ %% If we have suspected any processes of being
+ %% down we should now re-issue the monitors for them to detect if they're
+ %% actually down or not
+ Monitors = [{monitor, process, P}
+ || P <- suspected_pids_for(Node, State0)],
+
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = up};
+ (_, E) -> E
+ end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
+ %% mark all consumers as up
+ {Cons1, SQ, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ C, true, up, EAcc),
+ update_or_remove_sub(ConsumerId,
+ C#consumer{status = up}, CAcc,
+ SQAcc, EAcc1);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State0, up),
+ State1 = State0#?STATE{consumers = Cons1,
+ enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = Waiting},
+ {State, Effects} = activate_next_consumer(State1, Effects1),
+ checkout(Meta, State, Effects);
+apply(_, {nodedown, _Node}, State) ->
+ {State, ok};
+apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
+
+purge_node(Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+
+%% any downs that re not noconnection
+handle_down(Pid, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?STATE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->
+ fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active,
+ ActivityStatus, Effects)
+ end;
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) ->
+ fun(_, _, _, _, _, Effects) ->
+ Effects
+ end.
+
+handle_waiting_consumer_down(_Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) ->
+ {[], State};
+handle_waiting_consumer_down(_Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State) ->
+ {[], State};
+handle_waiting_consumer_down(Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ % get cancel effects for down waiting consumers
+ Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
+ WaitingConsumers0),
+ Effects = lists:foldl(fun ({ConsumerId, _}, Effects) ->
+ cancel_consumer_effects(ConsumerId, State0,
+ Effects)
+ end, [], Down),
+ % update state to have only up waiting consumers
+ StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
+ WaitingConsumers0),
+ State = State0#?STATE{waiting_consumers = StillUp},
+ {Effects, State}.
+
+update_waiting_consumer_status(Node,
+ #?STATE{waiting_consumers = WaitingConsumers},
+ Status) ->
+ [begin
+ case node(Pid) of
+ Node ->
+ {ConsumerId, Consumer#consumer{status = Status}};
+ _ ->
+ {ConsumerId, Consumer}
+ end
+ end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
+
+-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
+state_enter(leader, #?STATE{consumers = Cons,
+ enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{name = Name,
+ resource = Resource,
+ become_leader_handler = BLH},
+ prefix_msgs = {0, [], 0, []}
+ }) ->
+ % return effects to monitor all current consumers and enqueuers
+ Pids = lists:usort(maps:keys(Enqs)
+ ++ [P || {_, P} <- maps:keys(Cons)]
+ ++ [P || {{_, P}, _} <- WaitingConsumers]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
+ case BLH of
+ undefined ->
+ Effects;
+ {Mod, Fun, Args} ->
+ [{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
+ end;
+state_enter(eol, #?STATE{enqueuers = Enqs,
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
+ #{}, WaitingConsumers0),
+ AllConsumers = maps:merge(Custs, WaitingConsumers1),
+ [{send_msg, P, eol, ra_event}
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+ FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
+ [FHReservation];
+ state_enter(_, _) ->
+ %% catch all as not handling all states
+ [].
+
+
+-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
+tick(_Ts, #?STATE{cfg = #cfg{name = Name,
+ resource = QName},
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}].
+
+-spec overview(state()) -> map().
+overview(#?STATE{consumers = Cons,
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ enqueue_count = EnqCount,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name => Cfg#cfg.name,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
+ #{type => ?MODULE,
+ config => Conf,
+ num_consumers => maps:size(Cons),
+ num_checked_out => num_checked_out(State),
+ num_enqueuers => maps:size(Enqs),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
+ num_release_cursors => lqueue:len(Cursors),
+ release_crusor_enqueue_counter => EnqCount,
+ enqueue_message_bytes => EnqueueBytes,
+ checkout_message_bytes => CheckoutBytes}.
+
+-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
+ [delivery_msg()].
+get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) ->
+ case Consumers of
+ #{Cid := #consumer{checked_out = Checked}} ->
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
+ _ ->
+ []
+ end.
+
+-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
+-record(aux, {name :: atom(),
+ utilisation :: term(),
+ gc = #aux_gc{} :: #aux_gc{}}).
+
+init_aux(Name) when is_atom(Name) ->
+ %% TODO: catch specific exception throw if table already exists
+ ok = ra_machine_ets:create_table(rabbit_fifo_usage,
+ [named_table, set, public,
+ {write_concurrency, true}]),
+ Now = erlang:monotonic_time(micro_seconds),
+ #aux{name = Name,
+ utilisation = {inactive, Now, 1, 1.0}}.
+
+handle_aux(_RaState, cast, Cmd, #aux{name = Name,
+ utilisation = Use0} = State0,
+ Log, MacState) ->
+ State = case Cmd of
+ _ when Cmd == active orelse Cmd == inactive ->
+ State0#aux{utilisation = update_use(Use0, Cmd)};
+ tick ->
+ true = ets:insert(rabbit_fifo_usage,
+ {Name, utilisation(Use0)}),
+ eval_gc(Log, MacState, State0);
+ eval ->
+ State0
+ end,
+ {no_reply, State, Log}.
+
+eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState,
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case messages_total(MacState) of
+ 0 when Idx > LastGcIdx andalso
+ Mem > ?GC_MEM_LIMIT_B ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory reduced from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ _ ->
+ AuxState
+ end.
+
+%%% Queries
+
+query_messages_ready(State) ->
+ messages_ready(State).
+
+query_messages_checked_out(#?STATE{consumers = Consumers}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, S) ->
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
+
+query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
+ Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
+ maps:keys(maps:merge(Enqs, Cons)).
+
+
+query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
+ RaIndexes.
+
+query_consumer_count(#?STATE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers}) ->
+ maps:size(Consumers) + length(WaitingConsumers).
+
+query_consumers(#?STATE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
+ ActiveActivityStatusFun =
+ case ConsumerStrategy of
+ competing ->
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers =
+ maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
+ FromWaitingConsumers =
+ lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, WaitingConsumers),
+ maps:merge(FromConsumers, FromWaitingConsumers).
+
+query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active},
+ consumers = Consumers}) ->
+ case maps:size(Consumers) of
+ 0 ->
+ {error, no_value};
+ 1 ->
+ {value, lists:nth(1, maps:keys(Consumers))};
+ _
+ ->
+ {error, illegal_size}
+ end ;
+query_single_active_consumer(_) ->
+ disabled.
+
+query_stat(#?STATE{consumers = Consumers} = State) ->
+ {messages_ready(State), maps:size(Consumers)}.
+
+query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ {Length, Bytes}.
+
+-spec usage(atom()) -> float().
+usage(Name) when is_atom(Name) ->
+ case ets:lookup(rabbit_fifo_usage, Name) of
+ [] -> 0.0;
+ [{_, Use}] -> Use
+ end.
+
+%%% Internal
+
+messages_ready(#?STATE{messages = M,
+ prefix_msgs = {RCnt, _R, PCnt, _P},
+ returns = R}) ->
+
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
+ maps:size(M) + lqueue:len(R) + RCnt + PCnt.
+
+messages_total(#?STATE{ra_indexes = I,
+ prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
+ rabbit_fifo_index:size(I) + RCnt + PCnt.
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+utilisation({active, Since, Avg}) ->
+ use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
+utilisation({inactive, Since, Active, Avg}) ->
+ use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+
+use_avg(0, 0, Avg) ->
+ Avg;
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
+
+moving_average(_Time, _, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
+num_checked_out(#?STATE{consumers = Cons}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
+
+cancel_consumer(ConsumerId,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State,
+ Effects, Reason) ->
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
+cancel_consumer(ConsumerId,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State,
+ Effects, Reason) ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
+cancel_consumer(ConsumerId,
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0} = State0,
+ Effects0, Reason) ->
+ %% single active consumer on, consumers are waiting
+ case maps:is_key(ConsumerId, Cons0) of
+ true ->
+ % The active consumer is to be removed
+ {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ Effects0, Reason),
+ activate_next_consumer(State1, Effects1);
+ false ->
+ % The cancelled consumer is not active or cancelled
+ % Just remove it from idle_consumers
+ Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
+ Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
+ % A waiting consumer isn't supposed to have any checked out messages,
+ % so nothing special to do here
+ {State0#?STATE{waiting_consumers = Waiting}, Effects}
+ end.
+
+consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}},
+ ConsumerId, #consumer{meta = Meta},
+ Active, ActivityStatus,
+ Effects) ->
+ Ack = maps:get(ack, Meta, undefined),
+ Prefetch = maps:get(prefetch, Meta, undefined),
+ Args = maps:get(args, Meta, []),
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
+ [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
+ | Effects].
+
+cancel_consumer0(ConsumerId, #?STATE{consumers = C0} = S0, Effects0, Reason) ->
+ case C0 of
+ #{ConsumerId := Consumer} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
+ Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#?STATE.consumers) of
+ 0 ->
+ {S, [{aux, inactive} | Effects]};
+ _ ->
+ {S, Effects}
+ end;
+ _ ->
+ %% already removed: do nothing
+ {S0, Effects0}
+ end.
+
+activate_next_consumer(#?STATE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
+ Effects0) ->
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?STATE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, [{aux, inactive} | Effects0]}
+ end;
+ _ ->
+ {State0, Effects0}
+ end.
+
+
+
+maybe_return_all(ConsumerId, Consumer,
+ #?STATE{consumers = C0,
+ service_queue = SQ0} = S0,
+ Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {Cons, SQ, Effects1} =
+ update_or_remove_sub(ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ C0, SQ0, Effects0),
+ {S0#?STATE{consumers = Cons,
+ service_queue = SQ}, Effects1};
+ down ->
+ {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
+ {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers)},
+ Effects1}
+ end.
+
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx, State1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}),
+ State = case Msg of
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
+ end,
+ Effects = dead_letter_effects(maxlen, #{none => FullMsg},
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', Header}, State1} ->
+ State2 = add_bytes_drop(Header, State1),
+ {State2, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
+enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages,
+ low_msg_num = LowMsgNum,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
+ {State1, Msg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
+ false ->
+ {add_in_memory_counts(Header, State0),
+ {RaftIdx, {Header, RawMsg}}} % indexed message with header map
+ end,
+ State = add_bytes_enqueue(Header, State1),
+ State#?STATE{messages = Messages#{NextMsgNum => Msg},
+ %% this is probably only done to record it when low_msg_num
+ %% is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
+
+append_to_master_index(RaftIdx,
+ #?STATE{ra_indexes = Indexes0} = State0) ->
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
+ State#?STATE{ra_indexes = Indexes}.
+
+
+incr_enqueue_count(#?STATE{enqueue_count = C,
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) ->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
+ State0#?STATE{enqueue_count = 0};
+incr_enqueue_count(#?STATE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% conversion to new release cursor interval format
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ incr_enqueue_count(State);
+incr_enqueue_count(#?STATE{enqueue_count = C} = State) ->
+ State#?STATE{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx,
+ #?STATE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
+ enqueue_count = 0,
+ release_cursors = Cursors0} = State0) ->
+ case rabbit_fifo_index:exists(RaftIdx, Indexes) of
+ false ->
+ %% the incoming enqueue must already have been dropped
+ State0;
+ true ->
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base),
+ ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = convert_prefix_msgs(
+ State0#?STATE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}),
+ Dehydrated = dehydrate_state(State),
+ Cursor = {release_cursor, RaftIdx, Dehydrated},
+ Cursors = lqueue:in(Cursor, Cursors0),
+ State#?STATE{release_cursors = Cursors}
+ end;
+maybe_store_dehydrated_state(RaftIdx,
+ #?STATE{cfg =
+ #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% convert to new format
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ maybe_store_dehydrated_state(RaftIdx, State);
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
+
+enqueue_pending(From,
+ #enqueuer{next_seqno = Next,
+ pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0,
+ State0) ->
+ State = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
+ enqueue_pending(From, Enq, State);
+enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) ->
+ State#?STATE{enqueuers = Enqueuers0#{From => Enq}}.
+
+maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
+ % direct enqueue without tracking
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
+maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
+ #?STATE{enqueuers = Enqueuers0} = State0) ->
+ case maps:get(From, Enqueuers0, undefined) of
+ undefined ->
+ State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
+ RawMsg, Effects0, State1),
+ {ok, State, [{monitor, process, From} | Effects]};
+ #enqueuer{next_seqno = MsgSeqNo} = Enq0 ->
+ % it is the next expected seqno
+ State1 = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1},
+ State = enqueue_pending(From, Enq, State1),
+ {ok, State, Effects0};
+ #enqueuer{next_seqno = Next,
+ pending = Pending0} = Enq0
+ when MsgSeqNo > Next ->
+ % out of order delivery
+ Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
+ Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
+ {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ #enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
+ % duplicate delivery - remove the raft index from the ra_indexes
+ % map as it was added earlier
+ {duplicate, State0, Effects0}
+ end.
+
+snd(T) ->
+ element(2, T).
+
+return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
+ Effects0, #?STATE{service_queue = SQ0} = State0) ->
+ {State1, Effects1} = maps:fold(
+ fun(MsgId, {Tag, _} = Msg, {S0, E0})
+ when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
+ return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ (MsgId, {MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgId, MsgNum, Msg, S0, E0,
+ ConsumerId)
+ end, {State0, Effects0}, Returned),
+ {State2, Effects3} =
+ case State1#?STATE.consumers of
+ #{ConsumerId := Con0} = Cons0 ->
+ Con = Con0#consumer{credit = increase_credit(Con0,
+ map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ0, Effects1),
+ {State1#?STATE{consumers = Cons,
+ service_queue = SQ}, Effects2};
+ _ ->
+ {State1, Effects1}
+ end,
+ {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+
+% used to processes messages that are finished
+complete(ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects0,
+ #?STATE{consumers = Cons0, service_queue = SQ0,
+ ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
+ MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
+ Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
+ credit = increase_credit(Con0, map_size(Discarded))},
+ {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ SQ0, Effects0),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ MsgRaftIdxs),
+ %% TODO: use maps:fold instead
+ State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
+ end, State0, maps:values(Discarded)),
+ {State1#?STATE{consumers = Cons,
+ ra_indexes = Indexes,
+ service_queue = SQ}, Effects}.
+
+increase_credit(#consumer{lifetime = once,
+ credit = Credit}, _) ->
+ %% once consumers cannot increment credit
+ Credit;
+increase_credit(#consumer{lifetime = auto,
+ credit_mode = credited,
+ credit = Credit}, _) ->
+ %% credit_mode: credit also doesn't automatically increment credit
+ Credit;
+increase_credit(#consumer{credit = Current}, Credit) ->
+ Current + Credit.
+
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
+ #consumer{checked_out = Checked0} = Con0,
+ Effects0, State0) ->
+ Discarded = maps:with(MsgIds, Checked0),
+ {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ Effects0, State0),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+
+dead_letter_effects(_Reason, _Discarded,
+ #?STATE{cfg = #cfg{dead_letter_handler = undefined}},
+ Effects) ->
+ Effects;
+dead_letter_effects(Reason, Discarded,
+ #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ Effects) ->
+ RaftIdxs = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ [RaftIdx | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{log, RaftIdxs,
+ fun (Log) ->
+ Lookup = maps:from_list(lists:zip(RaftIdxs, Log)),
+ DeadLetters = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup),
+ [{Reason, Msg} | Acc];
+ (_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{mod_call, Mod, Fun, Args ++ [DeadLetters]}]
+ end} | Effects].
+
+cancel_consumer_effects(ConsumerId,
+ #?STATE{cfg = #cfg{resource = QName}}, Effects) ->
+ [{mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [QName, ConsumerId]} | Effects].
+
+update_smallest_raft_index(IncomingRaftIdx,
+ #?STATE{ra_indexes = Indexes,
+ release_cursors = Cursors0} = State0,
+ Effects) ->
+ case rabbit_fifo_index:size(Indexes) of
+ 0 ->
+ % there are no messages on queue anymore and no pending enqueues
+ % we can forward release_cursor all the way until
+ % the last received command, hooray
+ State = State0#?STATE{release_cursors = lqueue:new()},
+ {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ _ ->
+ Smallest = rabbit_fifo_index:smallest(Indexes),
+ case find_next_cursor(Smallest, Cursors0) of
+ {empty, Cursors} ->
+ {State0#?STATE{release_cursors = Cursors},
+ ok, Effects};
+ {Cursor, Cursors} ->
+ %% we can emit a release cursor we've passed the smallest
+ %% release cursor available.
+ {State0#?STATE{release_cursors = Cursors}, ok,
+ Effects ++ [Cursor]}
+ end
+ end.
+
+find_next_cursor(Idx, Cursors) ->
+ find_next_cursor(Idx, Cursors, empty).
+
+find_next_cursor(Smallest, Cursors0, Potential) ->
+ case lqueue:out(Cursors0) of
+ {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest ->
+ %% we found one but it may not be the largest one
+ find_next_cursor(Smallest, Cursors, Cursor);
+ _ ->
+ {Potential, Cursors0}
+ end.
+
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+
+
+return_one(MsgId, 0, {Tag, Header0},
+ #?STATE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {Tag, Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ _ ->
+ %% this should not affect the release cursor in any way
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ {Msg, State1} = case Tag of
+ '$empty_msg' ->
+ {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{'$empty_msg', Header}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
+return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+ #?STATE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {RaftId, {Header, RawMsg}},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ DlMsg = {MsgNum, Msg0},
+ Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
+ State0, Effects0),
+ complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ _ ->
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ %% this should not affect the release cursor in any way
+ {Msg, State1} = case RawMsg of
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in({MsgNum, Msg}, Returns)}),
+ Effects0}
+ end.
+
+return_all(#?STATE{consumers = Cons} = State0, Effects0, ConsumerId,
+ #consumer{checked_out = Checked0} = Con) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ State = State0#?STATE{consumers = Cons#{ConsumerId => Con}},
+ lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ end, {State, Effects0}, Checked).
+
+%% checkout new messages to consumers
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, {#{}, #{}}),
+ case evaluate_limit(false, State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
+
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+ Effects, {SendAcc, LogAcc0}) ->
+ DelMsg = {RaftIdx, {MsgId, Header}},
+ LogAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], LogAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ {SendAcc0, LogAcc}) ->
+ DelMsg = {MsgId, Msg},
+ SendAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], SendAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(Result,
+ #?STATE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(Result, State00, Effects0) ->
+ State0 = convert_prefix_msgs(State00),
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ end.
+
+evaluate_memory_limit(_Header,
+ #?STATE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
+ false;
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+evaluate_memory_limit(Size,
+ #?STATE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
+ (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
+
+append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
+ Effects;
+append_send_msg_effects(Effects0, AccMap) ->
+ Effects = maps:fold(fun (C, Msgs, Ef) ->
+ [send_msg_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap),
+ [{aux, active} | Effects].
+
+append_log_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, Msgs, Ef) ->
+ [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap).
+
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) ->
+ %% conversion
+ take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {{'$prefix_msg', Header},
+ State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {NumR, R, NumP, P}} = State) ->
+ %% use peek rather than out there as the most likely case is an empty
+ %% queue
+ case lqueue:peek(Returns) of
+ {value, NextMsg} ->
+ {NextMsg,
+ State#?STATE{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
+ case Low0 of
+ undefined ->
+ empty;
+ _ ->
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#?STATE{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#?STATE{messages = Messages,
+ low_msg_num = Low0 + 1}}
+ end
+ end;
+ empty ->
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
+ end
+ end.
+
+send_msg_effect({CTag, CPid}, Msgs) ->
+ {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
+
+send_log_effect({CTag, CPid}, IdxMsgs) ->
+ {RaftIdxs, Data} = lists:unzip(IdxMsgs),
+ {log, RaftIdxs,
+ fun(Log) ->
+ Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
+ end,
+ {local, node(CPid)}}.
+
+reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
+ {log, [RaftIdx],
+ fun([{enqueue, _, _, Msg}]) ->
+ [{reply, From, {wrap_reply,
+ {dequeue, {MsgId, {Header, Msg}}, Ready}}}]
+ end}.
+
+checkout_one(#?STATE{service_queue = SQ0,
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
+ case queue:peek(SQ0) of
+ {value, ConsumerId} ->
+ case take_next_msg(InitState) of
+ {ConsumerMsg, State0} ->
+ SQ1 = queue:drop(SQ0),
+ %% there are consumers waiting to be serviced
+ %% process consumer checkout
+ case maps:find(ConsumerId, Cons0) of
+ {ok, #consumer{credit = 0}} ->
+ %% no credit but was still on queue
+ %% can happen when draining
+ %% recurse without consumer on queue
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{status = cancelled}} ->
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{status = suspected_down}} ->
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{checked_out = Checked0,
+ next_msg_id = Next,
+ credit = Credit,
+ delivery_count = DelCnt} = Con0} ->
+ Checked = maps:put(Next, ConsumerMsg, Checked0),
+ Con = Con0#consumer{checked_out = Checked,
+ next_msg_id = Next + 1,
+ credit = Credit - 1,
+ delivery_count = DelCnt + 1},
+ {Cons, SQ, []} = % we expect no effects
+ update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ1, []),
+ State1 = State0#?STATE{service_queue = SQ,
+ consumers = Cons},
+ {State, Msg} =
+ case ConsumerMsg of
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
+ ConsumerMsg};
+ {_, {_, {Header, 'empty'}} = M} ->
+ {add_bytes_checkout(Header, State1),
+ M};
+ {_, {_, {Header, _} = M}} ->
+ {subtract_in_memory_counts(
+ Header,
+ add_bytes_checkout(Header, State1)),
+ M}
+ end,
+ {success, ConsumerId, Next, Msg, State};
+ error ->
+ %% consumer did not exist but was queued, recurse
+ checkout_one(InitState#?STATE{service_queue = SQ1})
+ end;
+ empty ->
+ {nochange, InitState}
+ end;
+ empty ->
+ case maps:size(Messages0) of
+ 0 -> {nochange, InitState};
+ _ -> {inactive, InitState}
+ end
+ end.
+
+update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects};
+update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons),
+ uniq_queue_in(ConsumerId, ServiceQueue), Effects};
+update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
+ checked_out = Checked,
+ credit = 0} = Con,
+ Cons, ServiceQueue, Effects) ->
+ case maps:size(Checked) of
+ 0 ->
+ % we're done with this consumer
+ % TODO: demonitor consumer pid but _only_ if there are no other
+ % monitors for this pid
+ {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
+ _ ->
+ % there are unsettled items so need to keep around
+ {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
+ end;
+update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons),
+ uniq_queue_in(ConsumerId, ServiceQueue), Effects}.
+
+uniq_queue_in(Key, Queue) ->
+ % TODO: queue:member could surely be quite expensive, however the practical
+ % number of unique consumers may not be large enough for it to matter
+ case queue:member(Key, Queue) of
+ true ->
+ Queue;
+ false ->
+ queue:in(Key, Queue)
+ end.
+
+update_consumer(ConsumerId, Meta, Spec,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, Spec,
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
+ when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#?STATE{waiting_consumers = WaitingConsumers1}.
+
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+ #?STATE{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
+ %% TODO: this logic may not be correct for updating a pre-existing consumer
+ Init = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ Cons = maps:update_with(ConsumerId,
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life, credit = C}
+ end, Init, Cons0),
+ ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
+ ServiceQueue0),
+ State0#?STATE{consumers = Cons, service_queue = ServiceQueue}.
+
+maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
+ ServiceQueue0) ->
+ case Credit > 0 of
+ true ->
+ % consumerect needs service - check if already on service queue
+ uniq_queue_in(ConsumerId, ServiceQueue0);
+ false ->
+ ServiceQueue0
+ end.
+
+convert_prefix_msgs(#?STATE{prefix_msgs = {R, P}} = State) ->
+ State#?STATE{prefix_msgs = {length(R), R, length(P), P}};
+convert_prefix_msgs(State) ->
+ State.
+
+%% creates a dehydrated version of the current state to be cached and
+%% potentially used to for a snaphot at a later point
+dehydrate_state(#?STATE{messages = Messages,
+ consumers = Consumers,
+ returns = Returns,
+ low_msg_num = Low,
+ next_msg_num = Next,
+ prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
+ waiting_consumers = Waiting0} = State) ->
+ RCnt = lqueue:len(Returns),
+ %% TODO: optimise this function as far as possible
+ PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
+ end,
+ [],
+ lqueue:to_list(Returns)),
+ PrefRet = PrefRet0 ++ PrefRet1,
+ PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ %% prefix messages are not populated in normal operation only after
+ %% recovering from a snapshot
+ PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
+ Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
+ State#?STATE{messages = #{},
+ ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
+ low_msg_num = undefined,
+ consumers = maps:map(fun (_, C) ->
+ dehydrate_consumer(C)
+ end, Consumers),
+ returns = lqueue:new(),
+ prefix_msgs = {PRCnt + RCnt, PrefRet,
+ PPCnt + maps:size(Messages), PrefMsgs},
+ waiting_consumers = Waiting}.
+
+dehydrate_messages(Low, Next, _Msgs, Acc)
+ when Next < Low ->
+ Acc;
+dehydrate_messages(Low, Next, Msgs, Acc0) ->
+ Acc = case maps:get(Next, Msgs) of
+ {_RaftIdx, {_, 'empty'} = Msg} ->
+ [Msg | Acc0];
+ {_RaftIdx, {Header, _}} ->
+ [Header | Acc0]
+ end,
+ dehydrate_messages(Low, Next - 1, Msgs, Acc).
+
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
+ end, Checked0),
+ Con#consumer{checked_out = Checked}.
+
+%% make the state suitable for equality comparison
+normalize(#?STATE{release_cursors = Cursors} = State) ->
+ State#?STATE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+
+is_over_limit(#?STATE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
+normalize_for_v1(#?STATE{cfg = Cfg} = State) ->
+ %% run all v0 conversions so that v1 does not have to have this code
+ RCI = case Cfg of
+ #cfg{release_cursor_interval = {_, _} = R} ->
+ R;
+ #cfg{release_cursor_interval = undefined} ->
+ {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY};
+ #cfg{release_cursor_interval = C} ->
+ {?RELEASE_CURSOR_EVERY, C}
+ end,
+ convert_prefix_msgs(
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}}).
+
+get_field(Field, State) ->
+ Fields = record_info(fields, ?STATE),
+ Index = record_index_of(Field, Fields),
+ element(Index, State).
+
+get_cfg_field(Field, #?STATE{cfg = Cfg} ) ->
+ Fields = record_info(fields, cfg),
+ Index = record_index_of(Field, Fields),
+ element(Index, Cfg).
+
+record_index_of(F, Fields) ->
+ index_of(2, F, Fields).
+
+index_of(_, F, []) ->
+ exit({field_not_found, F});
+index_of(N, F, [F | _]) ->
+ N;
+index_of(N, F, [_ | T]) ->
+ index_of(N+1, F, T).
+
+-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
+make_enqueue(Pid, Seq, Msg) ->
+ #enqueue{pid = Pid, seq = Seq, msg = Msg}.
+-spec make_checkout(consumer_id(),
+ checkout_spec(), consumer_meta()) -> protocol().
+make_checkout(ConsumerId, Spec, Meta) ->
+ #checkout{consumer_id = ConsumerId,
+ spec = Spec, meta = Meta}.
+
+-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
+make_settle(ConsumerId, MsgIds) ->
+ #settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_return(consumer_id(), [msg_id()]) -> protocol().
+make_return(ConsumerId, MsgIds) ->
+ #return{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
+make_discard(ConsumerId, MsgIds) ->
+ #discard{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(),
+ boolean()) -> protocol().
+make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
+ #credit{consumer_id = ConsumerId,
+ credit = Credit,
+ delivery_count = DeliveryCount,
+ drain = Drain}.
+
+-spec make_purge() -> protocol().
+make_purge() -> #purge{}.
+
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
+
+add_bytes_enqueue(Bytes,
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+
+add_bytes_drop(Bytes,
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+
+add_bytes_checkout(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+
+add_bytes_settle(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+
+add_bytes_return(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+
+add_in_memory_counts(Bytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
+
+subtract_in_memory_counts(Bytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
+
+message_size(#basic_message{content = Content}) ->
+ #content{payload_fragments_rev = PFR} = Content,
+ iolist_size(PFR);
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
+message_size(B) when is_binary(B) ->
+ byte_size(B);
+message_size(Msg) ->
+ %% probably only hit this for testing so ok to use erts_debug
+ erts_debug:size(Msg).
+
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+
+
+all_nodes(#?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+
+all_pids_for(Node, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
+suspected_pids_for(Node, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P},
+ #consumer{status = suspected_down}}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
diff --git a/src/rabbit_fifo_v0.hrl b/src/rabbit_fifo_v0.hrl
new file mode 100644
index 0000000000..333ccb4d77
--- /dev/null
+++ b/src/rabbit_fifo_v0.hrl
@@ -0,0 +1,195 @@
+
+-type option(T) :: undefined | T.
+
+-type raw_msg() :: term().
+%% The raw message. It is opaque to rabbit_fifo.
+
+-type msg_in_id() :: non_neg_integer().
+% a queue scoped monotonically incrementing integer used to enforce order
+% in the unassigned messages map
+
+-type msg_id() :: non_neg_integer().
+%% A consumer-scoped monotonically incrementing integer included with a
+%% {@link delivery/0.}. Used to settle deliveries using
+%% {@link rabbit_fifo_client:settle/3.}
+
+-type msg_seqno() :: non_neg_integer().
+%% A sender process scoped monotonically incrementing integer included
+%% in enqueue messages. Used to ensure ordering of messages send from the
+%% same process
+
+-type msg_header() :: msg_size() |
+ #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
+%% The message header:
+%% delivery_count: the number of unsuccessful delivery attempts.
+%% A non-zero value indicates a previous attempt.
+%% If it only contains the size it can be condensed to an integer only
+
+-type msg() :: {msg_header(), raw_msg()}.
+%% message with a header map.
+
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
+-type indexed_msg() :: {ra:index(), msg()}.
+
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
+
+-type delivery_msg() :: {msg_id(), msg()}.
+%% A tuple consisting of the message id and the headered message.
+
+-type consumer_tag() :: binary().
+%% An arbitrary binary tag used to distinguish between different consumers
+%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
+
+-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
+%% Represents the delivery of one or more rabbit_fifo messages.
+
+-type consumer_id() :: {consumer_tag(), pid()}.
+%% The entity that receives messages. Uniquely identifies a consumer.
+
+-type credit_mode() :: simple_prefetch | credited.
+%% determines how credit is replenished
+
+-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
+ credit_mode()} |
+ {dequeue, settled | unsettled} |
+ cancel.
+
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+
+-type applied_mfa() :: {module(), atom(), list()}.
+% represents a partially applied module call
+
+-define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
+-define(USE_AVG_HALF_LIFE, 10000.0).
+%% an average QQ without any message uses about 100KB so setting this limit
+%% to ~10 times that should be relatively safe.
+-define(GC_MEM_LIMIT_B, 2000000).
+
+-define(MB, 1048576).
+-define(STATE, rabbit_fifo).
+
+-record(consumer,
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ next_msg_id = 0 :: msg_id(), % part of snapshot data
+ %% max number of messages that can be sent
+ %% decremented for each delivery
+ credit = 0 : non_neg_integer(),
+ %% total number of checked out messages - ever
+ %% incremented for each delivery
+ delivery_count = 0 :: non_neg_integer(),
+ %% the mode of how credit is incremented
+ %% simple_prefetch: credit is re-filled as deliveries are settled
+ %% or returned.
+ %% credited: credit can only be changed by receiving a consumer_credit
+ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
+ credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
+ lifetime = once :: once | auto,
+ status = up :: up | suspected_down | cancelled
+ }).
+
+-type consumer() :: #consumer{}.
+
+-type consumer_strategy() :: competing | single_active.
+
+-record(enqueuer,
+ {next_seqno = 1 :: msg_seqno(),
+ % out of order enqueues - sorted list
+ pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
+ status = up :: up | suspected_down
+ }).
+
+-record(cfg,
+ {name :: atom(),
+ resource :: rabbit_types:r('queue'),
+ release_cursor_interval ::
+ undefined | non_neg_integer() |
+ {non_neg_integer(), non_neg_integer()},
+ dead_letter_handler :: option(applied_mfa()),
+ become_leader_handler :: option(applied_mfa()),
+ max_length :: option(non_neg_integer()),
+ max_bytes :: option(non_neg_integer()),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = competing :: consumer_strategy(),
+ %% the maximum number of unsuccessful delivery attempts permitted
+ delivery_limit :: option(non_neg_integer()),
+ max_in_memory_length :: option(non_neg_integer()),
+ max_in_memory_bytes :: option(non_neg_integer())
+ }).
+
+-type prefix_msgs() :: {list(), list()} |
+ {non_neg_integer(), list(),
+ non_neg_integer(), list()}.
+
+-record(?STATE,
+ {cfg :: #cfg{},
+ % unassigned messages
+ messages = #{} :: #{msg_in_id() => indexed_msg()},
+ % defines the lowest message in id available in the messages map
+ % that isn't a return
+ low_msg_num :: option(msg_in_id()),
+ % defines the next message in id to be added to the messages map
+ next_msg_num = 1 :: msg_in_id(),
+ % list of returned msg_in_ids - when checking out it picks from
+ % this list first before taking low_msg_num
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
+ % a counter of enqueues - used to trigger shadow copy points
+ enqueue_count = 0 :: non_neg_integer(),
+ % a map containing all the live processes that have ever enqueued
+ % a message to this queue as well as a cached value of the smallest
+ % ra_index of all pending enqueues
+ enqueuers = #{} :: #{pid() => #enqueuer{}},
+ % master index of all enqueue raft indexes including pending
+ % enqueues
+ % rabbit_fifo_index can be slow when calculating the smallest
+ % index when there are large gaps but should be faster than gb_trees
+ % for normal appending operations as it's backed by a map
+ ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
+ release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
+ ra:index(), #?STATE{}}),
+ % consumers need to reflect consumer state at time of snapshot
+ % needs to be part of snapshot
+ consumers = #{} :: #{consumer_id() => #consumer{}},
+ % consumers that require further service are queued here
+ % needs to be part of snapshot
+ service_queue = queue:new() :: queue:queue(consumer_id()),
+ %% This is a special field that is only used for snapshots
+ %% It represents the queued messages at the time the
+ %% dehydrated snapshot state was cached.
+ %% As release_cursors are only emitted for raft indexes where all
+ %% prior messages no longer contribute to the current state we can
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
+ %% This is done so that consumers are still served in a deterministic
+ %% order on recovery.
+ prefix_msgs = {0, [], 0, []} :: prefix_msgs(),
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ msg_bytes_in_memory = 0 :: non_neg_integer(),
+ msgs_ready_in_memory = 0 :: non_neg_integer()
+ }).
+
+-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
+ dead_letter_handler => applied_mfa(),
+ become_leader_handler => applied_mfa(),
+ release_cursor_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
+ max_in_memory_length => non_neg_integer(),
+ max_in_memory_bytes => non_neg_integer(),
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 915cf9d527..327fc11a5e 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -19,7 +19,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/3]).
+-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -102,6 +102,7 @@ handle_event({ra_event, From, Evt}, QState) ->
{new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit().
declare(Q) when ?amqqueue_is_quorum(Q) ->
+ rabbit_log:info("quorum_queue declaring ~w", [Q]),
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
AutoDelete = amqqueue:is_auto_delete(Q),
@@ -125,6 +126,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
+ %% TODO: handle error - what should be done if the
+ %% config cannot be updated
+ ok = rabbit_fifo_client:update_machine_state(Id,
+ ra_machine_config(NewQ)),
+ %% force a policy change to ensure the latest config is
+ %% updated even when running the machine version from 0
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
@@ -152,10 +159,15 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ %% prefer the policy defined strategy if available
+ Overflow = args_policy_lookup(<<"overflow">>, fun (A, _B) -> A end , Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
+ Expires = args_policy_lookup(<<"expires">>,
+ fun (A, _B) -> A end,
+ Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
@@ -165,7 +177,10 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_length => MaxMemoryLength,
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
- delivery_limit => DeliveryLimit
+ delivery_limit => DeliveryLimit,
+ overflow_strategy => overflow(Overflow, drop_head),
+ created => erlang:system_time(millisecond),
+ expires => Expires
}.
single_active_consumer_on(Q) ->
@@ -292,8 +307,14 @@ filter_quorum_critical(Queues, ReplicaStates) ->
-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ Applicable = [<<"max-length">>,
+ <<"max-length-bytes">>,
+ <<"overflow">>,
+ <<"expires">>,
+ <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>,
+ <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>],
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
@@ -304,6 +325,12 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
+spawn_deleter(QName) ->
+ spawn(fun () ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ delete(Q, false, false, <<"expired">>)
+ end).
+
handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) ->
@@ -694,13 +721,29 @@ stateless_deliver(ServerId, Delivery) ->
-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
rabbit_fifo_client:state()) ->
- {ok | slow, rabbit_fifo_client:state()}.
+ {ok | slow, rabbit_fifo_client:state()} |
+ {reject_publish, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+ case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ {ok, State}
+ end;
deliver(true, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0).
+ Seq = Delivery#delivery.msg_seq_no,
+ case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
+ Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ %% TODO: this works fine but once the queue types interface is in
+ %% place it could be replaced with an action or similar to avoid
+ %% self publishing messages.
+ gen_server2:cast(self(), {reject_publish, Seq, undefined}),
+ {ok, State}
+ end.
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
@@ -783,9 +826,9 @@ maybe_delete_data_dir(UId) ->
ok
end.
-policy_changed(QName, Node) ->
+policy_changed(QName, Server) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
- rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+ rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)).
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
@@ -1320,8 +1363,8 @@ maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
+ Keys = [<<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
@@ -1329,6 +1372,17 @@ check_invalid_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s",
[Key, rabbit_misc:rs(QueueName)])
end || Key <- Keys],
+
+ case rabbit_misc:table_lookup(Args, <<"x-overflow">>) of
+ undefined -> ok;
+ {_, <<"reject-publish-dlx">>} ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg 'x-overflow' with value 'reject-publish-dlx' for ~s",
+ [rabbit_misc:rs(QueueName)]);
+ _ ->
+ ok
+ end,
ok.
check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) ->
@@ -1413,3 +1467,7 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
+
+overflow(undefined, Def) -> Def;
+overflow(<<"reject-publish">>, _Def) -> reject_publish;
+overflow(<<"drop-head">>, _Def) -> drop_head.
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
index 6144eca20b..468714328d 100644
--- a/test/consumer_timeout_SUITE.erl
+++ b/test/consumer_timeout_SUITE.erl
@@ -78,7 +78,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
{quorum_tick_interval, 1000},
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index d1196e79fc..87b5566c57 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index 0376b2b838..9a8f2110d6 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -24,15 +24,13 @@ all() ->
groups() ->
[
{clustered, [], [
- {cluster_size_2, [], [
+ {cluster_size_3, [], [
+ recover_follower_after_standalone_restart,
vhost_deletion,
force_delete_if_no_consensus,
takeover_on_failure,
takeover_on_shutdown,
quorum_unaffected_after_vhost_failure
- ]},
- {cluster_size_3, [], [
- recover_follower_after_standalone_restart
]}
]}
].
@@ -108,7 +106,7 @@ vhost_deletion(Config) ->
ok.
force_delete_if_no_consensus(Config) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
Args = ?config(queue_args, Config),
@@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
?assertMatch(
@@ -140,7 +139,7 @@ takeover_on_shutdown(Config) ->
takeover_on(Config, stop_node).
takeover_on(Config, Fun) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
@@ -152,10 +151,11 @@ takeover_on(Config, Fun) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:Fun(Config, C),
ok = rabbit_ct_broker_helpers:Fun(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
- #'queue.declare_ok'{message_count = 0} =
+ #'queue.declare_ok'{} =
amqp_channel:call(
BCh, #'queue.declare'{queue = QName,
arguments = Args,
@@ -170,7 +170,7 @@ takeover_on(Config, Fun) ->
ok.
quorum_unaffected_after_vhost_failure(Config) ->
- [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Servers = lists:sort(Servers0),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
@@ -197,46 +197,50 @@ quorum_unaffected_after_vhost_failure(Config) ->
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
recover_follower_after_standalone_restart(Config) ->
- %% Tests that followers can be brought up standalone after forgetting the rest
- %% of the cluster. Consensus won't be reached as there is only one node in the
- %% new cluster.
- Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
-
- QName = ?config(queue_name, Config),
- Args = ?config(queue_args, Config),
- amqp_channel:call(Ch, #'queue.declare'{queue = QName,
- arguments = Args,
- durable = true
- }),
-
- rabbit_ct_client_helpers:publish(Ch, QName, 15),
- rabbit_ct_client_helpers:close_channel(Ch),
-
- Name = ra_name(QName),
- wait_for_messages_ready(Servers, Name, 15),
-
- rabbit_ct_broker_helpers:stop_node(Config, C),
- rabbit_ct_broker_helpers:stop_node(Config, B),
- rabbit_ct_broker_helpers:stop_node(Config, A),
-
- %% Restart one follower
- forget_cluster_node(Config, B, C),
- forget_cluster_node(Config, B, A),
-
- ok = rabbit_ct_broker_helpers:start_node(Config, B),
- wait_for_messages_ready([B], Name, 15),
- ok = rabbit_ct_broker_helpers:stop_node(Config, B),
-
- %% Restart the other
- forget_cluster_node(Config, C, B),
- forget_cluster_node(Config, C, A),
-
- ok = rabbit_ct_broker_helpers:start_node(Config, C),
- wait_for_messages_ready([C], Name, 15),
- ok = rabbit_ct_broker_helpers:stop_node(Config, C),
-
- ok.
+ case os:getenv("SECONDARY_UMBRELLA") of
+ false ->
+ %% Tests that followers can be brought up standalone after forgetting the
+ %% rest of the cluster. Consensus won't be reached as there is only one node in the
+ %% new cluster.
+ Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+
+ rabbit_ct_client_helpers:publish(Ch, QName, 15),
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ Name = ra_name(QName),
+ wait_for_messages_ready(Servers, Name, 15),
+
+ rabbit_ct_broker_helpers:stop_node(Config, C),
+ rabbit_ct_broker_helpers:stop_node(Config, B),
+ rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ %% Restart one follower
+ forget_cluster_node(Config, B, C),
+ forget_cluster_node(Config, B, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ wait_for_messages_ready([B], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+
+ %% Restart the other
+ forget_cluster_node(Config, C, B),
+ forget_cluster_node(Config, C, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, C),
+ wait_for_messages_ready([C], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
+ ok;
+ _ ->
+ {skip, "cannot be run in mixed mode"}
+ end.
%%----------------------------------------------------------------------------
forget_cluster_node(Config, Node, NodeToRemove) ->
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index 410dabb08a..c31527f0ba 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -302,21 +302,23 @@ confirm_nack1(Config) ->
%% The closest to a nack behaviour that we can get on quorum queues is not answering while
%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued.
confirm_minority(Config) ->
- [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
receive
- #'basic.nack'{} -> throw(unexpected_nack);
+ #'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(unexpected_ack)
- after 30000 ->
+ after 120000 ->
ok
end,
ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ publish(Ch, QName, [<<"msg2">>]),
receive
#'basic.nack'{} -> throw(unexpected_nack);
#'basic.ack'{} -> ok
diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl
index 20c7f46c86..f9b6f2f368 100644
--- a/test/queue_length_limits_SUITE.erl
+++ b/test/queue_length_limits_SUITE.erl
@@ -87,7 +87,7 @@ init_per_group(max_length_mirrored, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index d13675bf58..c4d16a5900 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -125,7 +125,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
{quorum_tick_interval, 1000}]}),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index be3b46fbfb..cc54aae78e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -16,7 +16,8 @@
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
- ra_name/1]).
+ ra_name/1,
+ is_mixed_versions/0]).
-compile(export_all).
@@ -113,6 +114,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
+ queue_length_limit_reject_publish,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
subscribe_redelivery_limit_with_dead_letter,
@@ -128,7 +130,8 @@ all_tests() ->
consumer_metrics,
invalid_policy,
delete_if_empty,
- delete_if_unused
+ delete_if_unused,
+ queue_ttl
].
memory_tests() ->
@@ -156,7 +159,12 @@ init_per_group(clustered, Config) ->
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
init_per_group(clustered_with_partitions, Config) ->
- rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
+ case is_mixed_versions() of
+ true ->
+ {skip, "clustered_with_partitions is too unreliable in mixed mode"};
+ false ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}])
+ end;
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -164,38 +172,44 @@ init_per_group(Group, Config) ->
cluster_size_3 -> 3;
cluster_size_5 -> 5
end,
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodes_count, ClusterSize},
- {rmq_nodename_suffix, Group},
- {tcp_ports_base}]),
- Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
- Ret = rabbit_ct_helpers:run_steps(Config1b,
- [fun merge_app_env/1 ] ++
- rabbit_ct_broker_helpers:setup_steps()),
- case Ret of
- {skip, _} ->
- Ret;
- Config2 ->
- EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
- Config2, quorum_queue),
- case EnableFF of
- ok ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, application, set_env,
- [rabbit, channel_tick_interval, 100]),
- %% HACK: the larger cluster sizes benefit for a bit
- %% more time after clustering before running the
- %% tests.
- case Group of
- cluster_size_5 ->
- timer:sleep(5000),
- Config2;
- _ ->
- Config2
- end;
- Skip ->
- end_per_group(Group, Config2),
- Skip
+ IsMixed = not (false == os:getenv("SECONDARY_UMBRELLA")),
+ case ClusterSize of
+ 2 when IsMixed ->
+ {skip, "cluster size 2 isn't mixed versions compatible"};
+ _ ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Ret = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ case Ret of
+ {skip, _} ->
+ Ret;
+ Config2 ->
+ EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
+ Config2, quorum_queue),
+ case EnableFF of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit
+ %% more time after clustering before running the
+ %% tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end
end
end.
@@ -327,11 +341,6 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-expires">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
?assertExit(
@@ -345,7 +354,7 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
+ || XOverflow <- [<<"reject-publish-dlx">>]],
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -612,14 +621,16 @@ publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
ct:pal("waiting for confirms from ~s", [QName]),
- ok = receive
- #'basic.ack'{} -> ok;
- #'basic.nack'{} -> fail
- after 2500 ->
- exit(confirm_timeout)
- end,
- ct:pal("CONFIRMED! ~s", [QName]),
- ok.
+ receive
+ #'basic.ack'{} ->
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok;
+ #'basic.nack'{} ->
+ ct:pal("NOT CONFIRMED! ~s", [QName]),
+ fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end.
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
@@ -685,6 +696,14 @@ shrink_all(Config) ->
ok.
rebalance(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "rebalance tests isn't mixed version compatible"};
+ false ->
+ rebalance0(Config)
+ end.
+
+rebalance0(Config) ->
[Server0, _, _] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1025,9 +1044,11 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
publishing_to_unavailable_queue(Config) ->
+ %% publishing to an unavialable queue but with a reachable member should result
+ %% in the initial enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- TCh = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ TCh = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -1035,19 +1056,29 @@ publishing_to_unavailable_queue(Config) ->
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ ct:pal("opening channel to ~w", [Server]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
- publish_many(Ch, QQ, 300),
- timer:sleep(1000),
+ publish_many(Ch, QQ, 1),
+ %% this should result in a nack
+ ok = receive
+ #'basic.ack'{} -> fail;
+ #'basic.nack'{} -> ok
+ after 90000 ->
+ exit(confirm_timeout)
+ end,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
- %% check we get at least on ack
+ timer:sleep(2000),
+ publish_many(Ch, QQ, 1),
+ %% this should now be acked
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
- after 30000 ->
+ after 90000 ->
exit(confirm_timeout)
end,
+ %% check we get at least on ack
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
ok.
@@ -1087,6 +1118,14 @@ leadership_takeover(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
metrics_cleanup_on_leadership_takeover(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"};
+ false ->
+ metrics_cleanup_on_leadership_takeover0(Config)
+ end.
+
+metrics_cleanup_on_leadership_takeover0(Config) ->
%% Queue core metrics should be deleted from a node once the leadership is transferred
%% to another follower
[Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1244,13 +1283,13 @@ simple_confirm_availability_on_leader_change(Config) ->
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
%% stop the node hosting the leader
ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
@@ -1270,7 +1309,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
receive {done, P} ->
P ! done,
ok
@@ -1462,7 +1501,16 @@ node_removal_is_not_quorum_critical(Config) ->
Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []),
?assertEqual([], Qs).
+
file_handle_reservations(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "file_handle_reservations tests isn't mixed version compatible"};
+ false ->
+ file_handle_reservations0(Config)
+ end.
+
+file_handle_reservations0(Config) ->
Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
QQ = ?config(queue_name, Config),
@@ -2020,6 +2068,35 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_limit_reject_publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ok = publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
+ %% give the channel some time to process the async reject_publish notification
+ %% now that we are over the limit it should start failing
+ wait_for_messages_total(Servers, RaName, 2),
+ fail = publish_confirm(Ch, QQ),
+ %% remove all messages
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ %% publish should be allowed again now
+ ok = publish_confirm(Ch, QQ),
+ ok.
+
queue_length_in_memory_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2381,6 +2458,29 @@ delete_if_unused(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_unused = true})).
+queue_ttl(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}])),
+ timer:sleep(5500),
+ %% check queue no longer exists
+ ?assertExit(
+ {{shutdown,
+ {server_initiated_close,404,
+ <<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}},
+ _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ passive = true,
+ durable = true,
+ auto_delete = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}]})),
+ ok.
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 9c988bc066..4b65bc2737 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -8,7 +8,8 @@
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
- ra_name/1
+ ra_name/1,
+ is_mixed_versions/0
]).
wait_for_messages_ready(Servers, QName, Ready) ->
@@ -34,11 +35,19 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(C) when is_integer(C) ->
- C == Number;
- (_) ->
- false
- end, Msgs) of
+ %% hack to allow the check to succeed in mixed versions clusters if at
+ %% least one node matches the criteria rather than all nodes for
+ F = case is_mixed_versions() of
+ true ->
+ any;
+ false ->
+ all
+ end,
+ case lists:F(fun(C) when is_integer(C) ->
+ C == Number;
+ (_) ->
+ false
+ end, Msgs) of
true ->
ok;
_ ->
@@ -88,3 +97,6 @@ filter_queues(Expected, Got) ->
lists:filter(fun([K, _, _, _]) ->
lists:member(K, Keys)
end, Got).
+
+is_mixed_versions() ->
+ not (false == os:getenv("SECONDARY_UMBRELLA")).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 9f2daac762..7778e04afb 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) ->
?ASSERT_EFF(EfxPat, true, Effects)).
-define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Guard, Effects),
+ ?assert(not lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
test_init(Name) ->
init(#{name => Name,
@@ -380,7 +385,7 @@ cancelled_checkout_out_test(_) ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
{State3, {dequeue, empty}} =
@@ -436,13 +441,13 @@ down_with_noconnection_returns_unack_test(_) ->
Pid = spawn(fun() -> ok end),
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State0#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State1#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State2a#rabbit_fifo.messages)),
?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
?assertMatch(#consumer{checked_out = Ch,
status = suspected_down}
@@ -539,7 +544,7 @@ duplicate_delivery_test(_) ->
{#rabbit_fifo{ra_indexes = RaIdxs,
messages = Messages}, _} = enq(2, 1, first, State0),
?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
- ?assertEqual(1, maps:size(Messages)),
+ ?assertEqual(1, lqueue:len(Messages)),
ok.
state_enter_file_handle_leader_reservation_test(_) ->
@@ -622,7 +627,7 @@ down_noproc_returns_checked_out_in_order_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(100, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -643,7 +648,7 @@ down_noconnection_returns_checked_out_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, NumMsgs)),
- ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(NumMsgs, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -797,7 +802,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
State1 = lists:foldl(AddConsumer, State0, Consumers),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
+ {State2, _, Effects} = apply(meta(2), {down, Pid1, noproc}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
% there are still waiting consumers
@@ -810,7 +815,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
+ {State3, _, Effects2} = apply(meta(3), {down, Pid2, noproc}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
% no more waiting consumer
@@ -824,7 +829,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects2),
% the last channel goes down
- {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(meta(4), {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
@@ -1130,7 +1135,8 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 3,
+ system_time => 1500}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
@@ -1163,11 +1169,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)).
@@ -1258,6 +1264,99 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+
+register_enqueuer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ %% register another enqueuer shoudl be ok
+ Pid2 = test_util:fake_pid(node()),
+ {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2),
+
+ {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3),
+ {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4),
+ % ct:pal("Efx ~p", [Efx]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx),
+
+ %% this time, registry should return reject_publish
+ {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer(
+ test_util:fake_pid(node())), State5),
+ ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)),
+
+
+ %% remove two messages this should make the queue fall below the 0.8 limit
+ {State7, {dequeue, _, _}, _Efx7} =
+ apply(meta(7),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6),
+ ct:pal("Efx7 ~p", [_Efx7]),
+ {State8, {dequeue, _, _}, Efx8} =
+ apply(meta(8),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7),
+ ct:pal("Efx8 ~p", [Efx8]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8),
+ {_State9, {dequeue, _, _}, Efx9} =
+ apply(meta(9),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9),
+ ok.
+
+reject_publish_purge_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1),
+ ok.
+
+reject_publish_applied_after_limit_test(_) ->
+ InitConf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8))
+ },
+ State0 = init(InitConf),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ %% apply new config
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish
+ },
+ {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1),
+ Pid2 = test_util:fake_pid(node()),
+ {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5),
+ ok.
+
purge_nodes_test(_) ->
Node = purged@node,
ThisNode = node(),
@@ -1305,7 +1404,12 @@ purge_nodes_test(_) ->
ok.
meta(Idx) ->
- #{index => Idx, term => 1,
+ meta(Idx, 0).
+
+meta(Idx, Timestamp) ->
+ #{index => Idx,
+ term => 1,
+ system_time => Timestamp,
from => {make_ref(), self()}}.
enq(Idx, MsgSeq, Msg, State) ->
@@ -1386,9 +1490,139 @@ aux_test(_) ->
?assert(X > 0.0),
ok.
+
+%% machine version conversion test
+
+machine_version_test(_) ->
+ V0 = rabbit_fifo_v0,
+ S0 = V0:init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Idx = 1,
+ {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0),
+
+ Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()},
+ Entries = [
+ {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)},
+ {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)},
+ {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 1, unsettled}, #{})}
+ ],
+ {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
+ Self = self(),
+ {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}},
+ messages = Msgs}, ok, []} = apply(meta(Idx),
+ {machine_version, 0, 1}, S1),
+ %% validate message conversion to lqueue
+ ?assertEqual(1, lqueue:len(Msgs)),
+ ok.
+
+queue_ttl_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ %% dequeue should set last applied
+ {S1Deq, {dequeue, empty}} =
+ apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ S0),
+
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S1Deq),
+ %% Enqueue message,
+ {E1, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_enqueue(self(), 1, msg1), S0),
+ Deq = {<<"deq1">>, self()},
+ {E2, {dequeue, {MsgId, _}, _}, _} =
+ apply(meta(3, Now),
+ rabbit_fifo:make_checkout(Deq, {dequeue, unsettled}, #{}),
+ E1),
+ {E3, _, _} = apply(meta(3, Now + 1000),
+ rabbit_fifo:make_settle(Deq, [MsgId]), E2),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 3000, E3),
+
+ ok.
+
+queue_ttl_with_single_active_consumer_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000,
+ single_active_consumer_on => true},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
+make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 23522e71f9..859db2178f 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -479,15 +479,17 @@ test_run_log(_Config) ->
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
- InMemoryBytes},
- frequency([{10, {0, 0, false, 0, 0, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer,
+ DeliveryLimit, InMemoryLength, InMemoryBytes,
+ Overflow},
+ frequency([{10, {0, 0, false, 0, 0, 0, drop_head}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), undefined]),
- oneof([range(1, 1000), undefined])
+ oneof([range(1, 1000), undefined]),
+ oneof([drop_head, reject_publish])
}}]),
begin
Config = config(?FUNCTION_NAME,
@@ -496,7 +498,8 @@ snapshots(_Config) ->
SingleActiveConsumer,
DeliveryLimit,
InMemoryLength,
- InMemoryBytes),
+ InMemoryBytes,
+ Overflow),
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)),
collect({log_size, length(O)},
snapshots_prop(Config, O)))
@@ -681,6 +684,11 @@ max_length(_Config) ->
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, drop_head).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, Overflow) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
@@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit,
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
- max_in_memory_bytes => map_max(InMemoryBytes)}.
+ max_in_memory_bytes => map_max(InMemoryBytes),
+ overflow_strategy => Overflow}.
map_max(0) -> undefined;
map_max(N) -> N.
@@ -1072,7 +1081,7 @@ do_apply(Cmd, #t{effects = Effs,
%% down
T;
_ ->
- {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
+ {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of
{S, _, E} when is_list(E) ->
{S, E};
{S, _, E} ->
@@ -1187,7 +1196,7 @@ test_init(Conf) ->
rabbit_fifo:init(maps:merge(Default, Conf)).
meta(Idx) ->
- #{index => Idx, term => 1}.
+ #{index => Idx, term => 1, system_time => 0}.
make_checkout(Cid, Spec) ->
rabbit_fifo:make_checkout(Cid, Spec, #{}).
diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl
new file mode 100644
index 0000000000..fcb84377de
--- /dev/null
+++ b/test/rabbit_fifo_v0_SUITE.erl
@@ -0,0 +1,1392 @@
+-module(rabbit_fifo_v0_SUITE).
+
+%% rabbit_fifo unit tests suite
+
+-compile(export_all).
+
+-compile({no_auto_import, [apply/3]}).
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("src/rabbit_fifo_v0.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+%% replicate eunit like test resultion
+all_tests() ->
+ [F || {F, _} <- ?MODULE:module_info(functions),
+ re:run(atom_to_list(F), "_test$") /= nomatch].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+-define(ASSERT_EFF(EfxPat, Effects),
+ ?ASSERT_EFF(EfxPat, true, Effects)).
+
+-define(ASSERT_EFF(EfxPat, Guard, Effects),
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(assertNoEffect(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+test_init(Name) ->
+ init(#{name => Name,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(Name, utf8)),
+ release_cursor_interval => 0}).
+
+enq_enq_checkout_test(_) ->
+ Cid = {<<"enq_enq_checkout_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {_State3, _, Effects} =
+ apply(meta(3),
+ rabbit_fifo_v0:make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
+ State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
+ ok.
+
+credit_enq_enq_checkout_settled_credit_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _, Effects} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited}, #{}), State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
+ (_) -> false
+ end, Effects),
+ ?assertEqual(1, length(Deliveries)),
+ %% settle the delivery this should _not_ result in further messages being
+ %% delivered
+ {State4, SettledEffects} = settle(Cid, 4, 1, State3),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, SettledEffects)),
+ %% granting credit (3) should deliver the second msg if the receivers
+ %% delivery count is (1)
+ {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4),
+ % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects),
+ {_State6, FinalEffects} = enq(6, 3, third, State5),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, FinalEffects)),
+ ok.
+
+credit_with_drained_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ %% checkout with a single credit
+ {State1, _, _} =
+ apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}),
+ State0),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1,
+ delivery_count = 0}}},
+ State1),
+ {State, Result, _} =
+ apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 5}}},
+ State),
+ ?assertEqual({multi, [{send_credit_reply, 0},
+ {send_drained, {?FUNCTION_NAME, 5}}]},
+ Result),
+ ok.
+
+credit_and_drain_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ %% checkout without any initial credit (like AMQP 1.0 would)
+ {State3, _, CheckEffs} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 0, credited}, #{}),
+ State2),
+
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
+ {State4, {multi, [{send_credit_reply, 0},
+ {send_drained, {?FUNCTION_NAME, 2}}]},
+ Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 4}}},
+ State4),
+
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
+ {_, {_, second}}]}, _}, Effects),
+ {_State5, EnqEffs} = enq(5, 2, third, State4),
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
+ ok.
+
+
+
+enq_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ NumReady = 1,
+ {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ ok.
+
+enq_enq_deq_deq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ {_State4, {dequeue, empty}} =
+ apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State3),
+ ok.
+
+enq_enq_checkout_get_settled_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ % get returns a reply value
+ {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}),
+ State1),
+ ok.
+
+checkout_get_empty_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State = test_init(test),
+ {_State2, {dequeue, empty}} =
+ apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State),
+ ok.
+
+untracked_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo_v0:make_enqueue(undefined, undefined, first),
+ State0),
+ {_State2, {dequeue, {0, {_, first}}, _}, _} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+release_cursor_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _} = check(Cid, 3, 10, State2),
+ % no release cursor effect at this point
+ {State4, _} = settle(Cid, 4, 1, State3),
+ {_Final, Effects1} = settle(Cid, 5, 0, State4),
+ % empty queue forwards release cursor all the way
+ ?ASSERT_EFF({release_cursor, 5, _}, Effects1),
+ ok.
+
+checkout_enq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
+ {State2, Effects0} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, ?FUNCTION_NAME,
+ [{0, {_, first}}]}, _},
+ Effects0),
+ {State3, [_Inactive]} = enq(3, 2, second, State2),
+ {_, _Effects} = settle(Cid, 4, 0, State3),
+ % the release cursor is the smallest raft index that does not
+ % contribute to the state of the application
+ % ?ASSERT_EFF({release_cursor, 2, _}, Effects),
+ ok.
+
+out_of_order_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ % assert monitor was set up
+ ?ASSERT_EFF({monitor, _, _}, Effects2),
+ % enqueue seq num 3 and 4 before 2
+ {State3, Effects3} = enq(3, 3, third, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3),
+ {State4, Effects4} = enq(4, 4, fourth, State3),
+ % assert no further deliveries where made
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4),
+ {_State5, Effects5} = enq(5, 2, second, State4),
+ % assert two deliveries were now made
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}},
+ {_, {_, third}},
+ {_, {_, fourth}}]}, _},
+ Effects5),
+ ok.
+
+out_of_order_first_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = check_n(Cid, 5, 5, test_init(test)),
+ {_State2, Effects2} = enq(2, 10, first, State1),
+ ?ASSERT_EFF({monitor, process, _}, Effects2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _},
+ Effects2),
+ ok.
+
+duplicate_enqueue_test(_) ->
+ Cid = {<<"duplicate_enqueue_test">>, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ {_State3, Effects3} = enq(3, 1, first, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
+ ok.
+
+return_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+ {State0, _} = enq(1, 1, msg, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ {State2, _} = check_auto(Cid2, 3, State1),
+ {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2),
+ ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
+ State3#?STATE.consumers),
+ ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
+ State3#?STATE.consumers),
+ ok.
+
+return_dequeue_delivery_limit_test(_) ->
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, _} = enq(1, 1, msg, Init),
+
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+
+ {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0),
+ {State2, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId1]),
+ State1),
+
+ {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2),
+ {State4, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid2, [MsgId2]),
+ State3),
+ ?assertMatch(#{num_messages := 0}, rabbit_fifo_v0:overview(State4)),
+ ok.
+
+return_non_existent_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
+ % return non-existent
+ {_State2, _} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [99]), State0),
+ ok.
+
+return_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ ok.
+
+return_checked_out_limit_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, [_, _]} = enq(1, 1, first, Init),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
+ apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2),
+ ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
+ ok.
+
+return_auto_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ % it first active then inactive as the consumer took on but cannot take
+ % any more
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active},
+ {aux, inactive}
+ ]} = check_auto(Cid, 2, State0),
+ % return should include another delivery
+ {_State2, _, Effects} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
+ Effects),
+ ok.
+
+cancelled_checkout_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ {State1, _} = check_auto(Cid, 2, State0),
+ % cancelled checkout should not return pending messages to queue
+ {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1),
+ ?assertEqual(1, maps:size(State2#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State2#?STATE.returns)),
+
+ {State3, {dequeue, empty}} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2),
+ %% settle
+ {State4, ok, _} =
+ apply(meta(4), rabbit_fifo_v0:make_settle(Cid, [0]), State3),
+
+ {_State, {dequeue, {_, {_, second}}, _}, _} =
+ apply(meta(5), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State4),
+ ok.
+
+down_with_noproc_consumer_returns_unsettled_test(_) ->
+ Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, second, test_init(test)),
+ {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
+ {_State, Effects} = check(Cid, 4, State2),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ Self = self(),
+ Node = node(Pid),
+ {State0, Effects0} = enq(1, 1, second, test_init(test)),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
+ % monitor both enqueuer and consumer
+ % because we received a noconnection we now need to monitor the node
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ #consumer{credit = 1,
+ checked_out = Ch,
+ status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers),
+ ?assertEqual(#{}, Ch),
+ %% validate consumer has credit
+ {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
+ ?ASSERT_EFF({monitor, node, _}, Effects2),
+ ?assertNoEffect({demonitor, process, _}, Effects2),
+ % when the node comes up we need to retry the process monitors for the
+ % disconnected processes
+ {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers),
+ % try to re-monitor the suspect processes
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
+ ok.
+
+down_with_noconnection_returns_unack_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State0#?STATE.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State1#?STATE.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ ?assertEqual(0, maps:size(State2a#?STATE.messages)),
+ ?assertEqual(1, lqueue:len(State2a#?STATE.returns)),
+ ?assertMatch(#consumer{checked_out = Ch,
+ status = suspected_down}
+ when map_size(Ch) == 0,
+ maps:get(Cid, State2a#?STATE.consumers)),
+ ok.
+
+down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
+ State00 = test_init(test),
+ Pid = spawn(fun() -> ok end),
+ {State0, _, Effects0} = apply(meta(1), rabbit_fifo_v0:make_enqueue(Pid, 1, first), State00),
+ ?ASSERT_EFF({monitor, process, _}, Effects0),
+ {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
+ % ensure there are no enqueuers
+ ?assert(0 =:= maps:size(State1#?STATE.enqueuers)),
+ ok.
+
+discarded_message_without_dead_letter_handler_is_removed_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1),
+ rabbit_fifo_v0:make_discard(Cid, [0]), State1),
+ ?assertNoEffect({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects2),
+ ok.
+
+discarded_message_with_dead_letter_handler_emits_log_effect_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ dead_letter_handler =>
+ {somemod, somefun, [somearg]}}),
+ {State0, [_, _]} = enq(1, 1, first, State00),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1), rabbit_fifo_v0:make_discard(Cid, [0]), State1),
+ % assert mod call effect with appended reason and message
+ ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2),
+ ok.
+
+tick_test(_) ->
+ Cid = {<<"c">>, self()},
+ Cid2 = {<<"c2">>, self()},
+ {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
+ {S1, _} = enq(2, 2, <<"snd">>, S0),
+ {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
+ {S3, {_, _}} = deq(4, Cid2, unsettled, S2),
+ {S4, _, _} = apply(meta(5), rabbit_fifo_v0:make_return(Cid, [MsgId]), S3),
+
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{},
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
+ [_Node]
+ ]}] = rabbit_fifo_v0:tick(1, S4),
+ ok.
+
+
+delivery_query_returns_deliveries_test(_) ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ rabbit_fifo_v0:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
+ rabbit_fifo_v0:make_enqueue(self(), 1, one),
+ rabbit_fifo_v0:make_enqueue(self(), 2, two),
+ rabbit_fifo_v0:make_enqueue(self(), 3, tre),
+ rabbit_fifo_v0:make_enqueue(self(), 4, for)
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(help), Entries),
+ % 3 deliveries are returned
+ [{0, {_, one}}] = rabbit_fifo_v0:get_checked_out(Cid, 0, 0, State),
+ [_, _, _] = rabbit_fifo_v0:get_checked_out(Cid, 1, 3, State),
+ ok.
+
+pending_enqueue_is_enqueued_on_down_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Pid = self(),
+ {State0, _} = enq(1, 2, first, test_init(test)),
+ {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
+ {_State2, {dequeue, {0, {_, first}}, 0}, _} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+duplicate_delivery_test(_) ->
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ {#?STATE{ra_indexes = RaIdxs,
+ messages = Messages}, _} = enq(2, 1, first, State0),
+ ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
+ ?assertEqual(1, maps:size(Messages)),
+ ok.
+
+state_enter_file_handle_leader_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ become_leader_handler => {m, f, [a]}}),
+
+ Resource = {resource, <<"/">>, queue, <<"test">>},
+ Effects = rabbit_fifo_v0:state_enter(leader, S0),
+ ?assertEqual([
+ {mod_call, m, f, [a, the_name]},
+ {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
+ ], Effects),
+ ok.
+
+state_enter_file_handle_other_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Effects = rabbit_fifo_v0:state_enter(other, S0),
+ ?assertEqual([
+ {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
+ ],
+ Effects),
+ ok.
+
+state_enter_monitors_and_notifications_test(_) ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
+ Self = self(),
+ Effects = rabbit_fifo_v0:state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
+ [{monitor, process, Self},
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+purge_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State1),
+ {State3, _} = enq(3, 2, second, State2),
+ % get returns a reply value
+ {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
+ apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
+ ok.
+
+purge_with_checkout_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
+ {State1, _} = enq(2, 1, <<"first">>, State0),
+ {State2, _} = enq(3, 2, <<"second">>, State1),
+ %% assert message bytes are non zero
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assert(State2#?STATE.msg_bytes_enqueue > 0),
+ {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assertEqual(0, State3#?STATE.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)),
+ #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers),
+ ?assertEqual(1, maps:size(Checked)),
+ ok.
+
+down_noproc_returns_checked_out_in_order_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#?STATE.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
+ Returns = lqueue:to_list(S#?STATE.returns),
+ ?assertEqual(100, length(Returns)),
+ ?assertEqual(0, maps:size(S#?STATE.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+down_noconnection_returns_checked_out_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ NumMsgs = 20,
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, NumMsgs)),
+ ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
+ ?assertEqual(NumMsgs, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
+ Returns = lqueue:to_list(S#?STATE.returns),
+ ?assertEqual(NumMsgs, length(Returns)),
+ ?assertMatch(#consumer{checked_out = Ch}
+ when map_size(Ch) == 0,
+ maps:get(Cid, S#?STATE.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+single_active_consumer_basic_get_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
+ {State1, _} = enq(1, 1, first, State0),
+ {_State, {error, unsupported}} =
+ apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State1),
+ ok.
+
+single_active_consumer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ meta(1),
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ C3 = {<<"ctag3">>, self()},
+ C4 = {<<"ctag4">>, self()},
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State1#?STATE.consumers),
+ ?assertEqual(3, length(State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, Effects1} = apply(meta(2),
+ make_checkout(C3, cancel, #{}),
+ State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State2#?STATE.consumers),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects1),
+
+ % cancelling the active consumer
+ {State3, _, Effects2} = apply(meta(3),
+ make_checkout(C1, cancel, #{}),
+ State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ ?assertMatch(#{C2 := _}, State3#?STATE.consumers),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1,
+ State3#?STATE.waiting_consumers)),
+ %% should have a cancel consumer handler mod_call effect and
+ %% an active new consumer effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
+
+ % cancelling the active consumer
+ {State4, _, Effects3} = apply(meta(4),
+ make_checkout(C2, cancel, #{}),
+ State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#?STATE.consumers)),
+ ?assertMatch(#{C4 := _}, State4#?STATE.consumers),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
+ % there are some effects to unregister the consumer and
+ % to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects3),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects3),
+
+ % cancelling the last consumer
+ {State5, _, Effects4} = apply(meta(5),
+ make_checkout(C4, cancel, #{}),
+ State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#?STATE.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#?STATE.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, _}, Effects4),
+
+ ok.
+
+single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ [C1, C2, C3, C4] = Consumers =
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}],
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, Consumers),
+
+ % the channel of the active consumer goes down
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ % there are still waiting consumers
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects),
+
+ % the channel of the active consumer and a waiting consumer goes down
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ % no more waiting consumer
+ ?assertEqual(0, length(State3#?STATE.waiting_consumers)),
+ % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
+
+ % the last channel goes down
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ % no more consumers
+ ?assertEqual(0, map_size(State4#?STATE.consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C4, Effects3),
+
+ ok.
+
+single_active_returns_messages_on_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1],
+ ConsumerIds = [{_, DownPid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+ {State2, _} = enq(4, 1, msg1, State1),
+ % simulate node goes down
+ {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
+ %% assert the consumer is up
+ ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)),
+ ?assertMatch([{_, #consumer{checked_out = Checked}}]
+ when map_size(Checked) == 0,
+ State3#?STATE.waiting_consumers),
+
+ ok.
+
+single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2, node()],
+ ConsumerIds = [C1 = {_, DownPid}, C2, _C3] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1a = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}},
+ State1a#?STATE.consumers),
+
+ {State1, _} = enq(10, 1, msg, State1a),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
+
+ %% assert a new consumer is in place and it is up
+ ?assertMatch([{C2, #consumer{status = up,
+ checked_out = Ch}}]
+ when map_size(Ch) == 1,
+ maps:to_list(State2#?STATE.consumers)),
+
+ %% the disconnected consumer has been returned to waiting
+ ?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
+ State2#?STATE.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+
+ % simulate node comes back up
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
+
+ %% the consumer is still active and the same as before
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State3#?STATE.consumers)),
+ % the waiting consumers should be un-suspected
+ ?assertEqual(2, length(State3#?STATE.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status /= suspected_down)
+ end, State3#?STATE.waiting_consumers),
+ ok.
+
+single_active_consumer_all_disconnected_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2],
+ ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
+ %% assert the consumer fails over to the consumer on n2
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers),
+ {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
+ %% assert these no active consumer after both nodes are maked as down
+ ?assertMatch([], maps:to_list(State3#?STATE.consumers)),
+ %% n2 comes back
+ {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
+ %% ensure n2 is the active consumer as this node as been registered
+ %% as up again
+ ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
+ credit = 1}}],
+ maps:to_list(State4#?STATE.consumers)),
+ ok.
+
+single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource =>
+ rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo_v0:state_enter(leader, State1),
+ %% 2 effects for each consumer process (channel process), 1 effect for the node,
+ %% 1 effect for file handle reservation
+ ?assertEqual(2 * 3 + 1 + 1, length(Effects)).
+
+single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
+ Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => Resource,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo_v0:state_enter(eol, State1),
+ %% 1 effect for each consumer process (channel process),
+ %% 1 effect for file handle reservation
+ ?assertEqual(4, length(Effects)).
+
+query_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ Consumers0 = State1#?STATE.consumers,
+ Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()},
+ Consumer#consumer{status = suspected_down}, Consumers0),
+ State2 = State1#?STATE{consumers = Consumers1},
+
+ ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)),
+ Consumers2 = rabbit_fifo_v0:query_consumers(State2),
+ ?assertEqual(4, maps:size(Consumers2)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag2">> ->
+ ?assertNot(Active),
+ ?assertEqual(suspected_down, ActivityStatus);
+ _ ->
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end
+ end, [], Consumers2).
+
+query_consumers_when_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State1)),
+ Consumers = rabbit_fifo_v0:query_consumers(State1),
+ ?assertEqual(4, maps:size(Consumers)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag1">> ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
+ end, [], Consumers).
+
+active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} =
+ apply(
+ #{index => 1},
+ rabbit_fifo_v0:make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
+ % one monitor and one consumer status update (deactivated)
+ ?assertEqual(3, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(5, length(Effects3)).
+
+single_active_cancelled_with_unacked_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 1, simple_prefetch},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% enqueue 2 messages
+ {State2, _Effects2} = enq(3, 1, msg1, State1),
+ {State3, _Effects3} = enq(4, 2, msg2, State2),
+ %% one should be checked ou to C1
+ %% cancel C1
+ {State4, _, _} = apply(meta(5),
+ make_checkout(C1, cancel, #{}),
+ State3),
+ %% C2 should be the active consumer
+ ?assertMatch(#{C2 := #consumer{status = up,
+ checked_out = #{0 := _}}},
+ State4#?STATE.consumers),
+ %% C1 should be a cancelled consumer
+ ?assertMatch(#{C1 := #consumer{status = cancelled,
+ lifetime = once,
+ checked_out = #{0 := _}}},
+ State4#?STATE.consumers),
+ ?assertMatch([], State4#?STATE.waiting_consumers),
+
+ %% Ack both messages
+ {State5, _Effects5} = settle(C1, 1, 0, State4),
+ %% C1 should now be cancelled
+ {State6, _Effects6} = settle(C2, 2, 0, State5),
+
+ %% C2 should remain
+ ?assertMatch(#{C2 := #consumer{status = up}},
+ State6#?STATE.consumers),
+ %% C1 should be gone
+ ?assertNotMatch(#{C1 := _},
+ State6#?STATE.consumers),
+ ?assertMatch([], State6#?STATE.waiting_consumers),
+ ok.
+
+single_active_with_credited_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 0, credited},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% add some credit
+ C1Cred = rabbit_fifo_v0:make_credit(C1, 5, 0, false),
+ {State2, _, _Effects2} = apply(meta(3), C1Cred, State1),
+ C2Cred = rabbit_fifo_v0:make_credit(C2, 4, 0, false),
+ {State3, _} = apply(meta(4), C2Cred, State2),
+ %% both consumers should have credit
+ ?assertMatch(#{C1 := #consumer{credit = 5}},
+ State3#?STATE.consumers),
+ ?assertMatch([{C2, #consumer{credit = 4}}],
+ State3#?STATE.waiting_consumers),
+ ok.
+
+purge_nodes_test(_) ->
+ Node = purged@node,
+ ThisNode = node(),
+ EnqPid = test_util:fake_pid(Node),
+ EnqPid2 = test_util:fake_pid(node()),
+ ConPid = test_util:fake_pid(Node),
+ Cid = {<<"tag">>, ConPid},
+ % WaitingPid = test_util:fake_pid(Node),
+
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ single_active_consumer_on => false}),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo_v0:make_enqueue(EnqPid, 1, msg1),
+ State0),
+ {State2, _, _} = apply(meta(2),
+ rabbit_fifo_v0:make_enqueue(EnqPid2, 1, msg2),
+ State1),
+ {State3, _} = check(Cid, 3, 1000, State2),
+ {State4, _, _} = apply(meta(4),
+ {down, EnqPid, noconnection},
+ State3),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode, Node]
+ ]}] , rabbit_fifo_v0:tick(1, State4)),
+ %% assert there are both enqueuers and consumers
+ {State, _, _} = apply(meta(5),
+ rabbit_fifo_v0:make_purge_nodes([Node]),
+ State4),
+
+ %% assert there are no enqueuers nor consumers
+ ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State),
+ ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode]
+ ]}] , rabbit_fifo_v0:tick(1, State)),
+ ok.
+
+meta(Idx) ->
+ #{index => Idx, term => 1,
+ from => {make_ref(), self()}}.
+
+enq(Idx, MsgSeq, Msg, State) ->
+ strip_reply(
+ apply(meta(Idx), rabbit_fifo_v0:make_enqueue(self(), MsgSeq, Msg), State)).
+
+deq(Idx, Cid, Settlement, State0) ->
+ {State, {dequeue, {MsgId, Msg}, _}, _} =
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {dequeue, Settlement}, #{}),
+ State0),
+ {State, {MsgId, Msg}}.
+
+check_n(Cid, Idx, N, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
+ State)).
+
+check_auto(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, Num, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
+ State)).
+
+settle(Cid, Idx, MsgId, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_settle(Cid, [MsgId]), State)).
+
+credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_credit(Cid, Credit, DelCnt, Drain),
+ State)).
+
+strip_reply({State, _, Effects}) ->
+ {State, Effects}.
+
+run_log(InitState, Entries) ->
+ lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
+ case apply(meta(Idx), E, Acc0) of
+ {Acc, _, Efx} when is_list(Efx) ->
+ {Acc, Efx0 ++ Efx};
+ {Acc, _, Efx} ->
+ {Acc, Efx0 ++ [Efx]};
+ {Acc, _} ->
+ {Acc, Efx0}
+ end
+ end, {InitState, []}, Entries).
+
+
+%% AUX Tests
+
+aux_test(_) ->
+ _ = ra_machine_ets:start_link(),
+ Aux0 = init_aux(aux_test),
+ MacState = init(#{name => aux_test,
+ queue_resource =>
+ rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ ok = meck:new(ra_log, []),
+ Log = mock_log,
+ meck:expect(ra_log, last_index_term, fun (_) -> {0, 0} end),
+ {no_reply, Aux, mock_log} = handle_aux(leader, cast, active, Aux0,
+ Log, MacState),
+ {no_reply, _Aux, mock_log} = handle_aux(leader, cast, tick, Aux,
+ Log, MacState),
+ [X] = ets:lookup(rabbit_fifo_usage, aux_test),
+ meck:unload(),
+ ?assert(X > 0.0),
+ ok.
+
+%% Utility
+
+init(Conf) -> rabbit_fifo_v0:init(Conf).
+apply(Meta, Entry, State) -> rabbit_fifo_v0:apply(Meta, Entry, State).
+init_aux(Conf) -> rabbit_fifo_v0:init_aux(Conf).
+handle_aux(S, T, C, A, L, M) -> rabbit_fifo_v0:handle_aux(S, T, C, A, L, M).
+make_checkout(C, S, M) -> rabbit_fifo_v0:make_checkout(C, S, M).
diff --git a/test/rabbitmq_queues_cli_integration_SUITE.erl b/test/rabbitmq_queues_cli_integration_SUITE.erl
index a41ee02427..bf5e9ee79e 100644
--- a/test/rabbitmq_queues_cli_integration_SUITE.erl
+++ b/test/rabbitmq_queues_cli_integration_SUITE.erl
@@ -27,8 +27,13 @@ groups() ->
].
init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ case os:getenv("SECONDARY_UMBRELLA") of
+ false ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config);
+ _ ->
+ {skip, "growing and shrinking cannot be done in mixed mode"}
+ end.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).