summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-02-26 00:57:49 +0300
committerGitHub <noreply@github.com>2019-02-26 00:57:49 +0300
commitb9873465666d143bd1fc70a828d417ce48b5b1c3 (patch)
tree2b1c7da2b56dc86fbc5a267cfb9dd309c4fbea56
parent20d3d6a11dd4836a904cdcb2c7f27b26e2db9611 (diff)
parent4fd34a4a05e926a016bf31a1f2e496aa6a172d13 (diff)
downloadrabbitmq-server-git-b9873465666d143bd1fc70a828d417ce48b5b1c3.tar.gz
Merge pull request #1889 from rabbitmq/poison-handling-qq
Poison handling in quorum queues
-rw-r--r--src/rabbit_dead_letter.erl2
-rw-r--r--src/rabbit_fifo.erl1883
-rw-r--r--src/rabbit_fifo.hrl170
-rw-r--r--src/rabbit_policies.erl16
-rw-r--r--src/rabbit_quorum_queue.erl9
-rw-r--r--test/quorum_queue_SUITE.erl155
-rw-r--r--test/rabbit_fifo_SUITE.erl1617
-rw-r--r--test/rabbit_fifo_int_SUITE.erl640
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl104
9 files changed, 2383 insertions, 2213 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index e26ea8297b..7b8cffb6fa 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -23,7 +23,7 @@
%%----------------------------------------------------------------------------
--type reason() :: 'expired' | 'rejected' | 'maxlen'.
+-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index af3441df86..b06d34e83a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -23,6 +23,7 @@
-compile({no_auto_import, [apply/3]}).
-include_lib("ra/include/ra.hrl").
+-include("rabbit_fifo.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([
@@ -64,65 +65,6 @@
make_update_config/1
]).
--type raw_msg() :: term().
-%% The raw message. It is opaque to rabbit_fifo.
-
--type msg_in_id() :: non_neg_integer().
-% a queue scoped monotonically incrementing integer used to enforce order
-% in the unassigned messages map
-
--type msg_id() :: non_neg_integer().
-%% A consumer-scoped monotonically incrementing integer included with a
-%% {@link delivery/0.}. Used to settle deliveries using
-%% {@link rabbit_fifo_client:settle/3.}
-
--type msg_seqno() :: non_neg_integer().
-%% A sender process scoped monotonically incrementing integer included
-%% in enqueue messages. Used to ensure ordering of messages send from the
-%% same process
-
--type msg_header() :: #{delivery_count => non_neg_integer()}.
-%% The message header map:
-%% delivery_count: the number of unsuccessful delivery attempts.
-%% A non-zero value indicates a previous attempt.
-
--type msg() :: {msg_header(), raw_msg()}.
-%% message with a header map.
-
--type msg_size() :: non_neg_integer().
-%% the size in bytes of the msg payload
-
--type indexed_msg() :: {ra_index(), msg()}.
-
--type prefix_msg() :: {'$prefix_msg', msg_size()}.
-
--type delivery_msg() :: {msg_id(), msg()}.
-%% A tuple consisting of the message id and the headered message.
-
--type consumer_tag() :: binary().
-%% An arbitrary binary tag used to distinguish between different consumers
-%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
-
--type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
-%% Represents the delivery of one or more rabbit_fifo messages.
-
--type consumer_id() :: {consumer_tag(), pid()}.
-%% The entity that receives messages. Uniquely identifies a consumer.
-
--type credit_mode() :: simple_prefetch | credited.
-%% determines how credit is replenished
-
--type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
- credit_mode()} |
- {dequeue, settled | unsettled} |
- cancel.
-
--type consumer_meta() :: #{ack => boolean(),
- username => binary(),
- prefetch => non_neg_integer(),
- args => list()}.
-%% static meta data associated with a consumer
-
%% command records representing all the protocol actions that are supported
-record(enqueue, {pid :: maybe(pid()),
seq :: maybe(msg_seqno()),
@@ -143,8 +85,6 @@
-record(purge, {}).
-record(update_config, {config :: config()}).
-
-
-opaque protocol() ::
#enqueue{} |
#checkout{} |
@@ -154,117 +94,13 @@
#credit{} |
#purge{} |
#update_config{}.
-
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types supported by ra fifo
-type client_msg() :: delivery().
%% the messages `rabbit_fifo' can send to consumers.
--type applied_mfa() :: {module(), atom(), list()}.
-% represents a partially applied module call
-
--define(RELEASE_CURSOR_EVERY, 64000).
--define(USE_AVG_HALF_LIFE, 10000.0).
-
--record(consumer,
- {meta = #{} :: consumer_meta(),
- checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
- next_msg_id = 0 :: msg_id(), % part of snapshot data
- %% max number of messages that can be sent
- %% decremented for each delivery
- credit = 0 : non_neg_integer(),
- %% total number of checked out messages - ever
- %% incremented for each delivery
- delivery_count = 0 :: non_neg_integer(),
- %% the mode of how credit is incremented
- %% simple_prefetch: credit is re-filled as deliveries are settled
- %% or returned.
- %% credited: credit can only be changed by receiving a consumer_credit
- %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
- credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
- lifetime = once :: once | auto,
- status = up :: up | suspected_down | cancelled
- }).
-
--type consumer() :: #consumer{}.
-
--record(enqueuer,
- {next_seqno = 1 :: msg_seqno(),
- % out of order enqueues - sorted list
- pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
- status = up :: up | suspected_down
- }).
-
--record(state,
- {name :: atom(),
- queue_resource :: rabbit_types:r('queue'),
- release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
- % unassigned messages
- messages = #{} :: #{msg_in_id() => indexed_msg()},
- % defines the lowest message in id available in the messages map
- % that isn't a return
- low_msg_num :: msg_in_id() | undefined,
- % defines the next message in id to be added to the messages map
- next_msg_num = 1 :: msg_in_id(),
- % list of returned msg_in_ids - when checking out it picks from
- % this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
- {msg_in_id(), indexed_msg()}),
- % a counter of enqueues - used to trigger shadow copy points
- enqueue_count = 0 :: non_neg_integer(),
- % a map containing all the live processes that have ever enqueued
- % a message to this queue as well as a cached value of the smallest
- % ra_index of all pending enqueues
- enqueuers = #{} :: #{pid() => #enqueuer{}},
- % master index of all enqueue raft indexes including pending
- % enqueues
- % rabbit_fifo_index can be slow when calculating the smallest
- % index when there are large gaps but should be faster than gb_trees
- % for normal appending operations as it's backed by a map
- ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
- release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
- ra_index(), state()}),
- % consumers need to reflect consumer state at time of snapshot
- % needs to be part of snapshot
- consumers = #{} :: #{consumer_id() => #consumer{}},
- % consumers that require further service are queued here
- % needs to be part of snapshot
- service_queue = queue:new() :: queue:queue(consumer_id()),
- dead_letter_handler :: maybe(applied_mfa()),
- become_leader_handler :: maybe(applied_mfa()),
- %% This is a special field that is only used for snapshots
- %% It represents the queued messages at the time the
- %% dehydrated snapshot state was cached.
- %% As release_cursors are only emitted for raft indexes where all
- %% prior messages no longer contribute to the current state we can
- %% replace all message payloads with their sizes (to be used for
- %% overflow calculations).
- %% This is done so that consumers are still served in a deterministic
- %% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_size()],
- PrefixMsgs :: [msg_size()]},
- msg_bytes_enqueue = 0 :: non_neg_integer(),
- msg_bytes_checkout = 0 :: non_neg_integer(),
- max_length :: maybe(non_neg_integer()),
- max_bytes :: maybe(non_neg_integer()),
- %% whether single active consumer is on or not for this queue
- consumer_strategy = default :: default | single_active,
- %% waiting consumers, one is picked active consumer is cancelled or dies
- %% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
- }).
-
--opaque state() :: #state{}.
-
--type config() :: #{name := atom(),
- queue_resource := rabbit_types:r('queue'),
- dead_letter_handler => applied_mfa(),
- become_leader_handler => applied_mfa(),
- release_cursor_interval => non_neg_integer(),
- max_length => non_neg_integer(),
- max_bytes => non_neg_integer(),
- single_active_consumer_on => boolean()}.
+-opaque state() :: #?MODULE{}.
-export_type([protocol/0,
delivery/0,
@@ -284,8 +120,8 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
+ resource = Resource}}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
@@ -293,18 +129,21 @@ update_config(Conf, State) ->
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
false ->
default
end,
- State#state{dead_letter_handler = DLH,
- become_leader_handler = BLH,
- release_cursor_interval = SHI,
- max_length = MaxLength,
- max_bytes = MaxBytes,
- consumer_strategy = ConsumerStrategy}.
+ Cfg = State#?MODULE.cfg,
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}}.
zero(_) ->
0.
@@ -319,7 +158,7 @@ apply(Metadata, #enqueue{pid = From, seq = Seq,
apply_enqueue(Metadata, From, Seq, RawMsg, State00);
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State) ->
+ #?MODULE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
@@ -331,7 +170,7 @@ apply(Meta,
end;
apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State0) ->
+ #?MODULE{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
@@ -342,7 +181,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
{State0, ok}
end;
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State) ->
+ #?MODULE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
@@ -354,7 +193,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
@@ -366,9 +205,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#state{service_queue = ServiceQueue,
- consumers = Cons}, []),
- Response = {send_credit_reply, maps:size(State1#state.messages)},
+ checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
+ Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
@@ -377,16 +216,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
- maps:get(ConsumerId, State1#state.consumers),
+ maps:get(ConsumerId, State1#?MODULE.consumers),
%% add the outstanding credit to the delivery count
DeliveryCount = Con#consumer.delivery_count + PostCred,
Consumers = maps:put(ConsumerId,
Con#consumer{delivery_count = DeliveryCount,
credit = 0},
- State1#state.consumers),
+ State1#?MODULE.consumers),
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
- {State1#state{consumers = Consumers},
+ {State1#?MODULE{consumers = Consumers},
%% returning a multi response with two client actions
%% for the channel to execute
{multi, [Response, {send_drained, [{CTag, Drained}]}]},
@@ -399,7 +238,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #state{consumers = Consumers} = State0) ->
+ #?MODULE{consumers = Consumers} = State0) ->
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -434,9 +273,9 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
- #state{ra_indexes = Indexes0,
- returns = Returns,
- messages = Messages} = State0) ->
+ #?MODULE{ra_indexes = Indexes0,
+ returns = Returns,
+ messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
@@ -444,20 +283,20 @@ apply(#{index := RaftIdx}, #purge{},
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx,
- State0#state{ra_indexes = Indexes,
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {[], []},
- low_msg_num = undefined},
+ State0#?MODULE{ra_indexes = Indexes,
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {[], []},
+ low_msg_num = undefined},
[]),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
- #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
% mark all consumers and enqueuers as suspected down
@@ -468,10 +307,10 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when (node(P) =:= Node) and
(C#consumer.status =/= cancelled)->
- St = return_all(St0, Checked0),
+ {St, Eff0} = return_all(St0, Checked0, Eff, K, C),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
- suspected_down, Eff),
+ suspected_down, Eff0),
{maps:put(K,
C#consumer{status = suspected_down,
credit = Credit,
@@ -495,17 +334,17 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs,
- waiting_consumers = WaitingConsumers}, ok, Effects2};
-apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ {State#?MODULE{consumers = Cons, enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers}, ok, Effects2};
+apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
{#enqueuer{pending = Pend}, Enqs} ->
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
enqueue(RIdx, RawMsg, S)
- end, State0#state{enqueuers = Enqs}, Pend);
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
error ->
State0
end,
@@ -518,9 +357,9 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
-apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -549,34 +388,34 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
- checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
-consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
end;
-consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) ->
fun(_, _, _, _, _, Effects) ->
Effects
end.
handle_waiting_consumer_down(_Pid,
- #state{consumer_strategy = default} = State) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
- #state{consumer_strategy = single_active,
- waiting_consumers = []} = State) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
WaitingConsumers0),
@@ -587,20 +426,20 @@ handle_waiting_consumer_down(Pid,
% update state to have only up waiting consumers
StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
WaitingConsumers0),
- State = State0#state{waiting_consumers = StillUp},
+ State = State0#?MODULE{waiting_consumers = StillUp},
{Effects, State}.
-update_waiting_consumer_status(_Node, #state{consumer_strategy = default},
+update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}},
_Status) ->
[];
update_waiting_consumer_status(_Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = []},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []},
_Status) ->
[];
update_waiting_consumer_status(Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers},
Status) ->
[begin
case node(P) of
@@ -613,12 +452,13 @@ update_waiting_consumer_status(Node,
Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Cons,
- enqueuers = Enqs,
- waiting_consumers = WaitingConsumers,
- name = Name,
- prefix_msgs = {[], []},
- become_leader_handler = BLH}) ->
+state_enter(leader, #?MODULE{consumers = Cons,
+ enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{name = Name,
+ become_leader_handler = BLH},
+ prefix_msgs = {[], []}
+ }) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
++ [P || {_, P} <- maps:keys(Cons)]
@@ -633,13 +473,13 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+state_enter(recovered, #?MODULE{prefix_msgs = PrefixMsgCounts})
when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs,
- consumers = Custs0,
- waiting_consumers = WaitingConsumers0}) ->
+state_enter(eol, #?MODULE{enqueuers = Enqs,
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
#{}, WaitingConsumers0),
@@ -652,10 +492,10 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #state{name = Name,
- queue_resource = QName,
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
+ resource = QName},
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
messages_ready(State),
num_checked_out(State), % checked out
@@ -667,11 +507,11 @@ tick(_Ts, #state{name = Name,
handle_tick, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
-overview(#state{consumers = Cons,
- enqueuers = Enqs,
- release_cursors = Cursors,
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+overview(#?MODULE{consumers = Cons,
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
@@ -684,7 +524,7 @@ overview(#state{consumers = Cons,
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
-get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
+get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
[{K, snd(snd(maps:get(K, Checked)))}
@@ -718,7 +558,7 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
query_messages_ready(State) ->
messages_ready(State).
-query_messages_checked_out(#state{consumers = Consumers}) ->
+query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
maps:size(C) + S
end, 0, Consumers).
@@ -726,21 +566,21 @@ query_messages_checked_out(#state{consumers = Consumers}) ->
query_messages_total(State) ->
messages_total(State).
-query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
+query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
maps:keys(maps:merge(Enqs, Cons)).
-query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
+query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#state{consumers = Consumers,
- waiting_consumers = WaitingConsumers}) ->
+query_consumer_count(#?MODULE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#state{consumers = Consumers,
- waiting_consumers = WaitingConsumers,
- consumer_strategy = ConsumerStrategy } = State) ->
+query_consumers(#?MODULE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
case ConsumerStrategy of
default ->
@@ -798,8 +638,8 @@ query_consumers(#state{consumers = Consumers,
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#state{consumer_strategy = single_active,
- consumers = Consumers}) ->
+query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
{error, no_value};
@@ -812,7 +652,7 @@ query_single_active_consumer(#state{consumer_strategy = single_active,
query_single_active_consumer(_) ->
disabled.
-query_stat(#state{consumers = Consumers} = State) ->
+query_stat(#?MODULE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
-spec usage(atom()) -> float().
@@ -824,15 +664,15 @@ usage(Name) when is_atom(Name) ->
%%% Internal
-messages_ready(#state{messages = M,
- prefix_msgs = {PreR, PreM},
- returns = R}) ->
+messages_ready(#?MODULE{messages = M,
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
%% TODO: optimise to avoid length/1 call
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
-messages_total(#state{ra_indexes = I,
- prefix_msgs = {PreR, PreM}}) ->
+messages_total(#?MODULE{ra_indexes = I,
+ prefix_msgs = {PreR, PreM}}) ->
rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
update_use({inactive, _, _, _} = CUInfo, inactive) ->
@@ -863,25 +703,25 @@ moving_average(Time, HalfLife, Next, Current) ->
Weight = math:exp(Time * math:log(0.5) / HalfLife),
Next * (1 - Weight) + Current * Weight.
-num_checked_out(#state{consumers = Cons}) ->
+num_checked_out(#?MODULE{consumers = Cons}) ->
lists:foldl(fun (#consumer{checked_out = C}, Acc) ->
maps:size(C) + Acc
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{consumer_strategy = default} = State, Effects, Reason) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
%% general case, single active consumer off
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #state{consumer_strategy = single_active,
- waiting_consumers = []} = State,
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #state{consumers = Cons0,
- consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0,
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0,
Effects0, Reason) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
@@ -894,18 +734,18 @@ cancel_consumer(ConsumerId,
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
| RemainingWaitingConsumers] = WaitingConsumers0,
- #state{service_queue = ServiceQueue} = State1,
+ #?MODULE{service_queue = ServiceQueue} = State1,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
NewActiveConsumer,
ServiceQueue),
- State = State1#state{consumers = maps:put(NewActiveConsumerId,
- NewActiveConsumer,
- State1#state.consumers),
- service_queue = ServiceQueue1,
- waiting_consumers = RemainingWaitingConsumers},
+ State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId,
+ NewActiveConsumer,
+ State1#?MODULE.consumers),
+ service_queue = ServiceQueue1,
+ waiting_consumers = RemainingWaitingConsumers},
Effects = consumer_update_active_effects(State, NewActiveConsumerId,
- NewActiveConsumer, true,
- single_active, Effects2),
+ NewActiveConsumer, true,
+ single_active, Effects2),
{State, Effects};
error ->
% The cancelled consumer is not the active one
@@ -915,10 +755,10 @@ cancel_consumer(ConsumerId,
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#state{waiting_consumers = WaitingConsumers}, Effects}
+ {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects}
end.
-consumer_update_active_effects(#state{queue_resource = QName },
+consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
ConsumerId, #consumer{meta = Meta},
Active, ActivityStatus,
Effects) ->
@@ -931,13 +771,13 @@ consumer_update_active_effects(#state{queue_resource = QName },
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
{Consumer, Cons1} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
Effects0, Reason),
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
- case maps:size(S#state.consumers) of
+ case maps:size(S#?MODULE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
_ ->
@@ -948,9 +788,10 @@ cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
-maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
- #state{consumers = C0,
- service_queue = SQ0} = S0, Effects0, Reason) ->
+maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
+ Cons1, #?MODULE{consumers = C0,
+ service_queue = SQ0} = S0,
+ Effects0, Reason) ->
case Reason of
consumer_cancel ->
{Cons, SQ, Effects1} =
@@ -959,69 +800,71 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
credit = 0,
status = cancelled},
C0, SQ0, Effects0),
- {S0#state{consumers = Cons, service_queue = SQ}, Effects1};
+ {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- S1 = return_all(S0, Checked0),
- {S1#state{consumers = Cons1}, Effects0}
+ {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId,
+ Consumer),
+ {S1#?MODULE{consumers = Cons1}, Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
- Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
- State2 = append_to_master_index(RaftIdx,
- add_bytes_enqueue(Bytes, State1)),
+ State2 = append_to_master_index(RaftIdx, State1),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
end.
-drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
{FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
Bytes = message_size(Msg),
- State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
- Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}),
+ State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
- {{'$prefix_msg', Bytes}, State1} ->
+ {{'$prefix_msg', #{size := Bytes}}, State1} ->
State = add_bytes_drop(Bytes, State1),
{State, Effects0};
empty ->
{State0, Effects0}
end.
-enqueue(RaftIdx, RawMsg, #state{messages = Messages,
+enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
- Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map
- State0#state{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ Size = message_size(RawMsg),
+ Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
+ State = add_bytes_enqueue(Size, State0),
+ State#?MODULE{messages = Messages#{NextMsgNum => Msg},
+ % this is probably only done to record it when low_msg_num
+ % is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
- #state{ra_indexes = Indexes0} = State0) ->
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
State = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
- State#state{ra_indexes = Indexes}.
+ State#?MODULE{ra_indexes = Indexes}.
-incr_enqueue_count(#state{enqueue_count = C,
- release_cursor_interval = C} = State0) ->
+
+incr_enqueue_count(#?MODULE{enqueue_count = C,
+ cfg = #cfg{release_cursor_interval = C}} = State0) ->
% this will trigger a dehydrated version of the state to be stored
% at this raft index for potential future snapshot generation
- State0#state{enqueue_count = 0};
-incr_enqueue_count(#state{enqueue_count = C} = State) ->
- State#state{enqueue_count = C + 1}.
+ State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
+ State#?MODULE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #state{ra_indexes = Indexes,
- enqueue_count = 0,
- release_cursors = Cursors} = State) ->
+ #?MODULE{ra_indexes = Indexes,
+ enqueue_count = 0,
+ release_cursors = Cursors} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
@@ -1029,7 +872,7 @@ maybe_store_dehydrated_state(RaftIdx,
true ->
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
- State#state{release_cursors = lqueue:in(Cursor, Cursors)}
+ State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)}
end;
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1041,18 +884,18 @@ enqueue_pending(From,
State = enqueue(RaftIdx, RawMsg, State0),
Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
enqueue_pending(From, Enq, State);
-enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
- State#state{enqueuers = Enqueuers0#{From => Enq}}.
+enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
+ State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
State = enqueue(RaftIdx, RawMsg, State0),
{ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
- #state{enqueuers = Enqueuers0} = State0) ->
+ #?MODULE{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
undefined ->
- State1 = State0#state{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
{ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
RawMsg, Effects0, State1),
{ok, State, [{monitor, process, From} | Effects]};
@@ -1068,7 +911,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
% out of order delivery
Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
- {ok, State0#state{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
#enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
% duplicate delivery - remove the raft index from the ra_indexes
% map as it was added earlier
@@ -1079,25 +922,28 @@ snd(T) ->
element(2, T).
return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
- Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
+ Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
- return_one(0, Msg, S0);
- ({MsgNum, Msg}, S0) ->
- return_one(MsgNum, Msg, S0)
- end, State0, MsgNumMsgs),
- checkout(Meta, State1#state{consumers = Cons,
- service_queue = SQ},
- Effects).
+ {State1, Effects2} = lists:foldl(
+ fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0,
+ ConsumerId, Con);
+ ({MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgNum, Msg, S0, E0,
+ ConsumerId, Con)
+ end, {State0, Effects1}, MsgNumMsgs),
+ checkout(Meta, State1#?MODULE{consumers = Cons,
+ service_queue = SQ},
+ Effects2).
% used to processes messages that are finished
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
- #state{consumers = Cons0, service_queue = SQ0,
- ra_indexes = Indexes0} = State0) ->
+ #?MODULE{consumers = Cons0, service_queue = SQ0,
+ ra_indexes = Indexes0} = State0) ->
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
@@ -1106,9 +952,9 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
- {State0#state{consumers = Cons,
- ra_indexes = Indexes,
- service_queue = SQ}, Effects}.
+ {State0#?MODULE{consumers = Cons,
+ ra_indexes = Indexes,
+ service_queue = SQ}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1143,43 +989,44 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
- #state{dead_letter_handler = undefined},
+ #?MODULE{cfg = #cfg{dead_letter_handler = undefined}},
Effects) ->
Effects;
dead_letter_effects(Reason, Discarded,
- #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
- % MsgId, MsgIdID, RaftId, Header
- Acc) -> [{Reason, Msg} | Acc]
+ #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ Effects) ->
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc]
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
-cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
+cancel_consumer_effects(ConsumerId,
+ #?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
update_smallest_raft_index(IncomingRaftIdx,
- #state{ra_indexes = Indexes,
- release_cursors = Cursors0} = State0,
+ #?MODULE{ra_indexes = Indexes,
+ release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 ->
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#state{release_cursors = lqueue:new()},
+ State = State0#?MODULE{release_cursors = lqueue:new()},
{State, ok,
[{release_cursor, IncomingRaftIdx, State} | Effects]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#state{release_cursors = Cursors},
+ {State0#?MODULE{release_cursors = Cursors},
ok, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor we've passed the smallest
%% release cursor available.
- {State0#state{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, ok,
[Cursor | Effects]}
end
end.
@@ -1196,36 +1043,68 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0) ->
- add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)});
+return_one(0, {'$prefix_msg', Header0},
+ #?MODULE{returns = Returns,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId, Con) ->
+ Header = maps:update_with(delivery_count,
+ fun (C) -> C+1 end,
+ 1, Header0),
+ Msg = {'$prefix_msg', Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ Checked = Con#consumer.checked_out,
+ {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
+ Effects0, State0),
+ {add_bytes_settle(Msg, State1), Effects};
+ _ ->
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(Msg,
+ State0#?MODULE{returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{returns = Returns} = State0) ->
+ #?MODULE{returns = Returns,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
- % this should not affect the release cursor in any way
- add_bytes_return(RawMsg,
- State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ DlMsg = {MsgNum, Msg},
+ Effects = dead_letter_effects(delivery_limit,
+ #{none => DlMsg},
+ State0, Effects0),
+ Checked = Con#consumer.checked_out,
+ {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
+ Effects, State0),
+ {add_bytes_settle(RawMsg, State1), Effects1};
+ _ ->
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(RawMsg,
+ State0#?MODULE{returns =
+ lqueue:in({MsgNum, Msg}, Returns)}),
+ Effects0}
+ end.
-return_all(State0, Checked0) ->
+return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
- return_one(0, Msg, S);
- ({_, {MsgNum, Msg}}, S) ->
- return_one(MsgNum, Msg, S)
- end, State0, Checked).
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(0, Msg, S, E, ConsumerId, Consumer);
+ ({_, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
+ end, {State0, Effects0}, Checked).
%% checkout new messages to consumers
%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, #{}),
- case evaluate_limit(State0#state.ra_indexes, false,
+ case evaluate_limit(State0#?MODULE.ra_indexes, false,
State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
@@ -1250,8 +1129,8 @@ checkout0({Activity, State0}, Effects0, Acc) ->
{State0, ok, lists:reverse(Effects1)}.
evaluate_limit(_OldIndexes, Result,
- #state{max_length = undefined,
- max_bytes = undefined} = State,
+ #?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
evaluate_limit(OldIndexes, Result,
@@ -1280,20 +1159,20 @@ append_send_msg_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
+take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {{'$prefix_msg', Bytes},
- State#state{prefix_msgs = {Rem, P}}};
-take_next_msg(#state{returns = Returns,
- low_msg_num = Low0,
- messages = Messages0,
- prefix_msgs = {R, P}} = State) ->
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {Rem, P}}};
+take_next_msg(#?MODULE{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
{value, NextMsg} ->
{NextMsg,
- State#state{returns = lqueue:drop(Returns)}};
+ State#?MODULE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
case Low0 of
undefined ->
@@ -1303,27 +1182,27 @@ take_next_msg(#state{returns = Returns,
case maps:size(Messages) of
0 ->
{{Low0, Msg},
- State#state{messages = Messages,
- low_msg_num = undefined}};
+ State#?MODULE{messages = Messages,
+ low_msg_num = undefined}};
_ ->
{{Low0, Msg},
- State#state{messages = Messages,
- low_msg_num = Low0 + 1}}
+ State#?MODULE{messages = Messages,
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
- [Bytes | Rem] = P,
+ [Header | Rem] = P,
%% There are prefix msgs
- {{'$prefix_msg', Bytes},
- State#state{prefix_msgs = {R, Rem}}}
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
-checkout_one(#state{service_queue = SQ0,
- messages = Messages0,
- consumers = Cons0} = InitState) ->
+checkout_one(#?MODULE{service_queue = SQ0,
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
@@ -1336,11 +1215,11 @@ checkout_one(#state{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1353,8 +1232,8 @@ checkout_one(#state{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
- State1 = State0#state{service_queue = SQ,
- consumers = Cons},
+ State1 = State0#?MODULE{service_queue = SQ,
+ consumers = Cons},
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', _} ->
@@ -1367,7 +1246,7 @@ checkout_one(#state{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#state{service_queue = SQ1})
+ checkout_one(InitState#?MODULE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
@@ -1417,28 +1296,28 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #state{consumer_strategy = default} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
- consumer_strategy = single_active} = State0)
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
- State0#state{waiting_consumers = WaitingConsumers1}.
+ State0#?MODULE{waiting_consumers = WaitingConsumers1}.
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ #?MODULE{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
@@ -1453,7 +1332,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
- State0#state{consumers = Cons, service_queue = ServiceQueue}.
+ State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0) ->
@@ -1468,52 +1347,52 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#state{messages = Messages,
- consumers = Consumers,
- returns = Returns,
- prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+dehydrate_state(#?MODULE{messages = Messages,
+ consumers = Consumers,
+ returns = Returns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
%% TODO: optimise this function as far as possible
- PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
- [Bytes | Acc];
- ({_, {_, {_, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header| Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
- State#state{messages = #{},
- ra_indexes = rabbit_fifo_index:empty(),
- release_cursors = lqueue:new(),
- low_msg_num = undefined,
- consumers = maps:map(fun (_, C) ->
- dehydrate_consumer(C)
- end, Consumers),
- returns = lqueue:new(),
- prefix_msgs = {lists:reverse(PrefRet),
- lists:reverse(PrefMsgs)}}.
+ State#?MODULE{messages = #{},
+ ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
+ low_msg_num = undefined,
+ consumers = maps:map(fun (_, C) ->
+ dehydrate_consumer(C)
+ end, Consumers),
+ returns = lqueue:new(),
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
- (_, {_, {_, {_, Raw}}}) ->
- {'$prefix_msg', message_size(Raw)}
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
end, Checked0),
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#state{release_cursors = Cursors} = State) ->
- State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?MODULE{release_cursors = Cursors} = State) ->
+ State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
-is_over_limit(#state{max_length = undefined,
- max_bytes = undefined}) ->
+is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
false;
-is_over_limit(#state{max_length = MaxLength,
- max_bytes = MaxBytes,
- msg_bytes_enqueue = BytesEnq} = State) ->
+is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
@@ -1553,32 +1432,32 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
- State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
- State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
+add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout + Bytes,
- msg_bytes_enqueue = Enqueue - Bytes}.
+ State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
+add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout - Bytes}.
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State) ->
+add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout - Bytes,
- msg_bytes_enqueue = Enqueue + Bytes}.
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes}.
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', B}) ->
+message_size({'$prefix_msg', #{size := B}}) ->
B;
message_size(B) when is_binary(B) ->
byte_size(B);
@@ -1586,9 +1465,9 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
-suspected_pids_for(Node, #state{consumers = Cons0,
- enqueuers = Enqs0,
- waiting_consumers = WaitingConsumers0}) ->
+suspected_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
@@ -1605,1137 +1484,3 @@ suspected_pids_for(Node, #state{consumers = Cons0,
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
--define(ASSERT_EFF(EfxPat, Effects),
- ?ASSERT_EFF(EfxPat, true, Effects)).
-
--define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
-
--define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
-
--define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
-
-test_init(Name) ->
- init(#{name => Name,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(Name, utf8)),
- release_cursor_interval => 0}).
-
-% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo"
-
-enq_enq_checkout_test() ->
- Cid = {<<"enq_enq_checkout_test">>, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {_State3, _, Effects} =
- apply(meta(3),
- make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
- State2),
- ?ASSERT_EFF({monitor, _, _}, Effects),
- ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
- ok.
-
-credit_enq_enq_checkout_settled_credit_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {State3, _, Effects} =
- apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2),
- ?ASSERT_EFF({monitor, _, _}, Effects),
- Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
- (_) -> false
- end, Effects),
- ?assertEqual(1, length(Deliveries)),
- %% settle the delivery this should _not_ result in further messages being
- %% delivered
- {State4, SettledEffects} = settle(Cid, 4, 1, State3),
- ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
- true;
- (_) -> false
- end, SettledEffects)),
- %% granting credit (3) should deliver the second msg if the receivers
- %% delivery count is (1)
- {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4),
- % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]),
- ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects),
- {_State6, FinalEffects} = enq(6, 3, third, State5),
- ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
- true;
- (_) -> false
- end, FinalEffects)),
- ok.
-
-credit_with_drained_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State0 = test_init(test),
- %% checkout with a single credit
- {State1, _, _} =
- apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
- State0),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
- delivery_count = 0}}},
- State1),
- {State, Result, _} =
- apply(meta(3), make_credit(Cid, 0, 5, true), State1),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
- delivery_count = 5}}},
- State),
- ?assertEqual({multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 5}]}]},
- Result),
- ok.
-
-credit_and_drain_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- %% checkout without any initial credit (like AMQP 1.0 would)
- {State3, _, CheckEffs} =
- apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
- State2),
-
- ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
- {State4, {multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 2}]}]},
- Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
- delivery_count = 4}}},
- State4),
-
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
- {_, {_, second}}]}, _}, Effects),
- {_State5, EnqEffs} = enq(5, 2, third, State4),
- ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
- ok.
-
-
-
-enq_enq_deq_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- % get returns a reply value
- NumReady = 1,
- {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
- apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State2),
- ok.
-
-enq_enq_deq_deq_settle_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- % get returns a reply value
- {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
- apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State2),
- {_State4, {dequeue, empty}} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State3),
- ok.
-
-enq_enq_checkout_get_settled_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- % get returns a reply value
- {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- State1),
- ok.
-
-checkout_get_empty_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State = test_init(test),
- {_State2, {dequeue, empty}} =
- apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State),
- ok.
-
-untracked_enq_deq_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State0 = test_init(test),
- {State1, _, _} = apply(meta(1),
- make_enqueue(undefined, undefined, first),
- State0),
- {_State2, {dequeue, {0, {_, first}}, _}, _} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
- ok.
-
-release_cursor_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {State3, _} = check(Cid, 3, 10, State2),
- % no release cursor effect at this point
- {State4, _} = settle(Cid, 4, 1, State3),
- {_Final, Effects1} = settle(Cid, 5, 0, State4),
- % empty queue forwards release cursor all the way
- ?ASSERT_EFF({release_cursor, 5, _}, Effects1),
- ok.
-
-checkout_enq_settle_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
- {State2, Effects0} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _,
- {delivery, ?FUNCTION_NAME,
- [{0, {_, first}}]}, _},
- Effects0),
- {State3, [_Inactive]} = enq(3, 2, second, State2),
- {_, _Effects} = settle(Cid, 4, 0, State3),
- % the release cursor is the smallest raft index that does not
- % contribute to the state of the application
- % ?ASSERT_EFF({release_cursor, 2, _}, Effects),
- ok.
-
-out_of_order_enqueue_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
- {State2, Effects2} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
- % assert monitor was set up
- ?ASSERT_EFF({monitor, _, _}, Effects2),
- % enqueue seq num 3 and 4 before 2
- {State3, Effects3} = enq(3, 3, third, State2),
- ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3),
- {State4, Effects4} = enq(4, 4, fourth, State3),
- % assert no further deliveries where made
- ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4),
- {_State5, Effects5} = enq(5, 2, second, State4),
- % assert two deliveries were now made
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}},
- {_, {_, third}},
- {_, {_, fourth}}]}, _},
- Effects5),
- ok.
-
-out_of_order_first_enqueue_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = check_n(Cid, 5, 5, test_init(test)),
- {_State2, Effects2} = enq(2, 10, first, State1),
- ?ASSERT_EFF({monitor, process, _}, Effects2),
- ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _},
- Effects2),
- ok.
-
-duplicate_enqueue_test() ->
- Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
- {State2, Effects2} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
- {_State3, Effects3} = enq(3, 1, first, State2),
- ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
- ok.
-
-return_non_existent_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
- % return non-existent
- {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0),
- ok.
-
-return_checked_out_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active} | _ ]} = check_auto(Cid, 2, State0),
- % returning immediately checks out the same message again
- {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
- {aux, active}]} =
- apply(meta(3), make_return(Cid, [MsgId]), State1),
- ok.
-
-return_auto_checked_out_test() ->
- Cid = {<<"cid">>, self()},
- {State00, [_, _]} = enq(1, 1, first, test_init(test)),
- {State0, [_]} = enq(2, 2, second, State00),
- % it first active then inactive as the consumer took on but cannot take
- % any more
- {State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
- {aux, active},
- {aux, inactive}
- ]} = check_auto(Cid, 2, State0),
- % return should include another delivery
- {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
- Effects),
- ok.
-
-
-cancelled_checkout_out_test() ->
- Cid = {<<"cid">>, self()},
- {State00, [_, _]} = enq(1, 1, first, test_init(test)),
- {State0, [_]} = enq(2, 2, second, State00),
- {State1, _} = check_auto(Cid, 2, State0),
- % cancelled checkout should not return pending messages to queue
- {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#state.messages)),
- ?assertEqual(0, lqueue:len(State2#state.returns)),
-
- {State3, {dequeue, empty}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- %% settle
- {State4, ok, _} =
- apply(meta(4), make_settle(Cid, [0]), State3),
-
- {_State, {dequeue, {_, {_, second}}, _}, _} =
- apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4),
- ok.
-
-down_with_noproc_consumer_returns_unsettled_test() ->
- Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
- {State0, [_, _]} = enq(1, 1, second, test_init(test)),
- {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
- {_State, Effects} = check(Cid, 4, State2),
- ?ASSERT_EFF({monitor, process, _}, Effects),
- ok.
-
-down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
- Pid = spawn(fun() -> ok end),
- Cid = {<<"down_with_noconnect">>, Pid},
- Self = self(),
- Node = node(Pid),
- {State0, Effects0} = enq(1, 1, second, test_init(test)),
- ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
- {State1, Effects1} = check_auto(Cid, 2, State0),
- #consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
- ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
- % monitor both enqueuer and consumer
- % because we received a noconnection we now need to monitor the node
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
- %% validate consumer has credit
- {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
- ?ASSERT_EFF({monitor, node, _}, Effects2),
- ?assertNoEffect({demonitor, process, _}, Effects2),
- % when the node comes up we need to retry the process monitors for the
- % disconnected processes
- {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
- % try to re-monitor the suspect processes
- ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
- ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
- ok.
-
-down_with_noconnection_returns_unack_test() ->
- Pid = spawn(fun() -> ok end),
- Cid = {<<"down_with_noconnect">>, Pid},
- {State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#state.messages)),
- ?assertEqual(0, lqueue:len(State0#state.returns)),
- {State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#state.messages)),
- ?assertEqual(0, lqueue:len(State1#state.returns)),
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#state.messages)),
- ?assertEqual(1, lqueue:len(State2a#state.returns)),
- ok.
-
-down_with_noproc_enqueuer_is_cleaned_up_test() ->
- State00 = test_init(test),
- Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
- ?ASSERT_EFF({monitor, process, _}, Effects0),
- {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
- % ensure there are no enqueuers
- ?assert(0 =:= maps:size(State1#state.enqueuers)),
- ok.
-
-discarded_message_without_dead_letter_handler_is_removed_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, Effects1} = check_n(Cid, 2, 10, State0),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects1),
- {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
- ?assertNoEffect({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects2),
- ok.
-
-discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- State00 = init(#{name => test,
- queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
- dead_letter_handler =>
- {somemod, somefun, [somearg]}}),
- {State0, [_, _]} = enq(1, 1, first, State00),
- {State1, Effects1} = check_n(Cid, 2, 10, State0),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects1),
- {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
- % assert mod call effect with appended reason and message
- ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
- Effects2),
- ok.
-
-tick_test() ->
- Cid = {<<"c">>, self()},
- Cid2 = {<<"c2">>, self()},
- {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
- {S1, _} = enq(2, 2, <<"snd">>, S0),
- {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
- {S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3),
-
- [{mod_call, _, _,
- [#resource{},
- {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
- ok.
-
-enq_deq_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, settled}, #{}),
- make_enqueue(self(), 3, three),
- make_enqueue(self(), 4, four),
- make_checkout(Cid, {dequeue, settled}, #{}),
- make_enqueue(self(), 5, five),
- make_checkout(Cid, {dequeue, settled}, #{})
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_settle_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- % OthPid = spawn(fun () -> ok end),
- % Oth = {<<"oth">>, OthPid},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Cid, [0])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_settle_snapshot_recover_2_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Oth, [0])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_enqueue(self(), 3, three),
- make_purge()
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_return_settle_snapshot_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one), %% to Cid
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_return(Cid, [0]), %% should be re-delivered to Cid
- make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
- make_settle(Cid, [1]),
- make_settle(Cid, [2])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-return_prefix_msg_count_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one),
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_checkout(Cid, cancel, #{}),
- make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2
- ],
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
- ok.
-
-
-return_settle_snapshot_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one), %% to Cid
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_return(Cid, [0]), %% should be re-delivered to Oth
- make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
- make_settle(Cid, [1]),
- make_return(Cid, [2]),
- make_settle(Cid, [3]),
- make_enqueue(self(), 3, three),
- make_purge(),
- make_enqueue(self(), 4, four)
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_settle(Cid, [1]),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, three),
- make_settle(Cid, [2])
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_snapshot_purge_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch},#{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_settle(Cid, [1]),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, three),
- make_purge()
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_duplicate_test() ->
- %% duplicate settle commands are likely
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one), %% 0
- make_enqueue(self(), 2, two), %% 0
- make_settle(Cid, [0]),
- make_settle(Cid, [1]),
- make_settle(Cid, [1]),
- make_enqueue(self(), 3, three),
- make_settle(Cid, [2])
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-run_snapshot_test(Name, Commands) ->
- %% create every incremental permutation of the commands lists
- %% and run the snapshot tests against that
- [begin
- run_snapshot_test0(Name, C)
- end || C <- prefixes(Commands, 1, [])].
-
-run_snapshot_test0(Name, Commands) ->
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
-
- [begin
- Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
- (_) -> false
- end, Entries),
- {S, _} = run_log(SnapState, Filtered),
- % assert log can be restored from any release cursor index
- ?assertEqual(normalize(State), normalize(S))
- end || {release_cursor, SnapIdx, SnapState} <- Effects],
- ok.
-
-prefixes(Source, N, Acc) when N > length(Source) ->
- lists:reverse(Acc);
-prefixes(Source, N, Acc) ->
- {X, _} = lists:split(N, Source),
- prefixes(Source, N+1, [X | Acc]).
-
-delivery_query_returns_deliveries_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_enqueue(self(), 3, tre),
- make_enqueue(self(), 4, for)
- ],
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {State, _Effects} = run_log(test_init(help), Entries),
- % 3 deliveries are returned
- [{0, {#{}, one}}] = get_checked_out(Cid, 0, 0, State),
- [_, _, _] = get_checked_out(Cid, 1, 3, State),
- ok.
-
-pending_enqueue_is_enqueued_on_down_test() ->
- Cid = {<<"cid">>, self()},
- Pid = self(),
- {State0, _} = enq(1, 2, first, test_init(test)),
- {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
- {_State2, {dequeue, {0, {_, first}}, 0}, _} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
- ok.
-
-duplicate_delivery_test() ->
- {State0, _} = enq(1, 1, first, test_init(test)),
- {#state{ra_indexes = RaIdxs,
- messages = Messages}, _} = enq(2, 1, first, State0),
- ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
- ?assertEqual(1, maps:size(Messages)),
- ok.
-
-state_enter_test() ->
- S0 = init(#{name => the_name,
- queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
- become_leader_handler => {m, f, [a]}}),
- [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
- ok.
-
-state_enter_monitors_and_notifications_test() ->
- Oth = spawn(fun () -> ok end),
- {State0, _} = enq(1, 1, first, test_init(test)),
- Cid = {<<"adf">>, self()},
- OthCid = {<<"oth">>, Oth},
- {State1, _} = check(Cid, 2, State0),
- {State, _} = check(OthCid, 3, State1),
- Self = self(),
- Effects = state_enter(leader, State),
-
- %% monitor all enqueuers and consumers
- [{monitor, process, Self},
- {monitor, process, Oth}] =
- lists:filter(fun ({monitor, process, _}) -> true;
- (_) -> false
- end, Effects),
- [{send_msg, Self, leader_change, ra_event},
- {send_msg, Oth, leader_change, ra_event}] =
- lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
- (_) -> false
- end, Effects),
- ?ASSERT_EFF({monitor, process, _}, Effects),
- ok.
-
-purge_test() ->
- Cid = {<<"purge_test">>, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
- {State3, _} = enq(3, 2, second, State2),
- % get returns a reply value
- {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
- ok.
-
-purge_with_checkout_test() ->
- Cid = {<<"purge_test">>, self()},
- {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
- {State1, _} = enq(2, 1, <<"first">>, State0),
- {State2, _} = enq(3, 2, <<"second">>, State1),
- %% assert message bytes are non zero
- ?assert(State2#state.msg_bytes_checkout > 0),
- ?assert(State2#state.msg_bytes_enqueue > 0),
- {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2),
- ?assert(State2#state.msg_bytes_checkout > 0),
- ?assertEqual(0, State3#state.msg_bytes_enqueue),
- ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)),
- #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
- ?assertEqual(1, maps:size(Checked)),
- ok.
-
-down_returns_checked_out_in_order_test() ->
- S0 = test_init(?FUNCTION_NAME),
- %% enqueue 100
- S1 = lists:foldl(fun (Num, FS0) ->
- {FS, _} = enq(Num, Num, Num, FS0),
- FS
- end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#state.messages)),
- Cid = {<<"cid">>, self()},
- {S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
- ?assertEqual(100, maps:size(Checked)),
- %% simulate down
- {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
- Returns = lqueue:to_list(S#state.returns),
- ?assertEqual(100, length(Returns)),
- %% validate returns are in order
- ?assertEqual(lists:sort(Returns), Returns),
- ok.
-
-single_active_consumer_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#state.consumer_strategy),
- ?assertEqual(0, map_size(State0#state.consumers)),
-
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- meta(1),
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- % the first registered consumer is the active one, the others are waiting
- ?assertEqual(1, map_size(State1#state.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
- ?assertEqual(3, length(State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
-
- % cancelling a waiting consumer
- {State2, _, Effects1} = apply(meta(2),
- #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
- % the active consumer should still be in place
- ?assertEqual(1, map_size(State2#state.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
- % the cancelled consumer has been removed from waiting consumers
- ?assertEqual(2, length(State2#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects1)),
-
- % cancelling the active consumer
- {State3, _, Effects2} = apply(meta(3),
- #checkout{spec = cancel,
- consumer_id = {<<"ctag1">>, self()}},
- State2),
- % the second registered consumer is now the active one
- ?assertEqual(1, map_size(State3#state.consumers)),
- ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
- % the new active consumer is no longer in the waiting list
- ?assertEqual(1, length(State3#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects2)),
-
- % cancelling the active consumer
- {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
- % the last waiting consumer became the active one
- ?assertEqual(1, map_size(State4#state.consumers)),
- ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
- % the waiting consumer list is now empty
- ?assertEqual(0, length(State4#state.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects3)),
-
- % cancelling the last consumer
- {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
- % no active consumer anymore
- ?assertEqual(0, map_size(State5#state.consumers)),
- % still nothing in the waiting list
- ?assertEqual(0, length(State5#state.waiting_consumers)),
- % there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects4)),
-
- ok.
-
-single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- % the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
- % fell back to another consumer
- ?assertEqual(1, map_size(State2#state.consumers)),
- % there are still waiting consumers
- ?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and
- % to update the new active one (metrics) are there
- ?assertEqual(2, length(Effects)),
-
- % the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
- % fell back to another consumer
- ?assertEqual(1, map_size(State3#state.consumers)),
- % no more waiting consumer
- ?assertEqual(0, length(State3#state.waiting_consumers)),
- % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
- ?assertEqual(3, length(Effects2)),
-
- % the last channel goes down
- {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
- % no more consumers
- ?assertEqual(0, map_size(State4#state.consumers)),
- ?assertEqual(0, length(State4#state.waiting_consumers)),
- % there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects3)),
-
- ok.
-
-single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- % simulate node goes down
- {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
-
- % all the waiting consumers should be suspected down
- ?assertEqual(3, length(State2#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status == suspected_down)
- end, State2#state.waiting_consumers),
-
- % simulate node goes back up
- {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
-
- % all the waiting consumers should be un-suspected
- ?assertEqual(3, length(State3#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status /= suspected_down)
- end, State3#state.waiting_consumers),
-
- ok.
-
-single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- Effects = state_enter(leader, State1),
- % 2 effects for each consumer process (channel process), 1 effect for the node
- ?assertEqual(2 * 3 + 1, length(Effects)).
-
-single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- Effects = state_enter(eol, State1),
- % 1 effect for each consumer process (channel process)
- ?assertEqual(3, length(Effects)).
-
-query_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => false}),
-
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
- Consumers0 = State1#state.consumers,
- Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
- Consumers1 = maps:put({<<"ctag2">>, self()},
- Consumer#consumer{status = suspected_down}, Consumers0),
- State2 = State1#state{consumers = Consumers1},
-
- ?assertEqual(4, query_consumer_count(State2)),
- Consumers2 = query_consumers(State2),
- ?assertEqual(4, maps:size(Consumers2)),
- maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
- ?assertEqual(self(), Pid),
- case Tag of
- <<"ctag2">> ->
- ?assertNot(Active),
- ?assertEqual(suspected_down, ActivityStatus);
- _ ->
- ?assert(Active),
- ?assertEqual(up, ActivityStatus)
- end
- end, [], Consumers2).
-
-query_consumers_when_single_active_consumer_is_on_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- ?assertEqual(4, query_consumer_count(State1)),
- Consumers = query_consumers(State1),
- ?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
- ?assertEqual(self(), Pid),
- case Tag of
- <<"ctag1">> ->
- ?assert(Active),
- ?assertEqual(single_active, ActivityStatus);
- _ ->
- ?assertNot(Active),
- ?assertEqual(waiting, ActivityStatus)
- end
- end, [], Consumers).
-
-active_flag_updated_when_consumer_suspected_unsuspected_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => false}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
- ?assertEqual(4 + 1, length(Effects2)),
-
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
- % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
- ?assertEqual(4 + 4, length(Effects3)).
-
-active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % only 1 effect to monitor the node
- ?assertEqual(1, length(Effects2)),
-
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
- % for each consumer: 1 effect to monitor the consumer PID
- ?assertEqual(4, length(Effects3)).
-
-meta(Idx) ->
- #{index => Idx, term => 1}.
-
-enq(Idx, MsgSeq, Msg, State) ->
- strip_reply(
- apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
-
-deq(Idx, Cid, Settlement, State0) ->
- {State, {dequeue, {MsgId, Msg}, _}, _} =
- apply(meta(Idx),
- make_checkout(Cid, {dequeue, Settlement}, #{}),
- State0),
- {State, {MsgId, Msg}}.
-
-check_n(Cid, Idx, N, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
- State)).
-
-check(Cid, Idx, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
- State)).
-
-check_auto(Cid, Idx, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- State)).
-
-check(Cid, Idx, Num, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
- State)).
-
-settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)).
-
-credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
- strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
- State)).
-
-strip_reply({State, _, Effects}) ->
- {State, Effects}.
-
-run_log(InitState, Entries) ->
- lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
- case apply(meta(Idx), E, Acc0) of
- {Acc, _, Efx} when is_list(Efx) ->
- {Acc, Efx0 ++ Efx};
- {Acc, _, Efx} ->
- {Acc, Efx0 ++ [Efx]};
- {Acc, _} ->
- {Acc, Efx0}
- end
- end, {InitState, []}, Entries).
-
-
-%% AUX Tests
-
-aux_test() ->
- _ = ra_machine_ets:start_link(),
- Aux0 = init_aux(aux_test),
- MacState = init(#{name => aux_test,
- queue_resource =>
- rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
- Log = undefined,
- {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
- Log, MacState),
- {no_reply, _Aux, undefined} = handle_aux(leader, cast, emit, Aux,
- Log, MacState),
- [X] = ets:lookup(rabbit_fifo_usage, aux_test),
- ?assert(X > 0.0),
- ok.
-
-
--endif.
-
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
new file mode 100644
index 0000000000..ebe5f3328a
--- /dev/null
+++ b/src/rabbit_fifo.hrl
@@ -0,0 +1,170 @@
+
+-type raw_msg() :: term().
+%% The raw message. It is opaque to rabbit_fifo.
+
+-type msg_in_id() :: non_neg_integer().
+% a queue scoped monotonically incrementing integer used to enforce order
+% in the unassigned messages map
+
+-type msg_id() :: non_neg_integer().
+%% A consumer-scoped monotonically incrementing integer included with a
+%% {@link delivery/0.}. Used to settle deliveries using
+%% {@link rabbit_fifo_client:settle/3.}
+
+-type msg_seqno() :: non_neg_integer().
+%% A sender process scoped monotonically incrementing integer included
+%% in enqueue messages. Used to ensure ordering of messages send from the
+%% same process
+
+-type msg_header() :: #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
+%% The message header map:
+%% delivery_count: the number of unsuccessful delivery attempts.
+%% A non-zero value indicates a previous attempt.
+
+-type msg() :: {msg_header(), raw_msg()}.
+%% message with a header map.
+
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
+-type indexed_msg() :: {ra_index(), msg()}.
+
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
+
+-type delivery_msg() :: {msg_id(), msg()}.
+%% A tuple consisting of the message id and the headered message.
+
+-type consumer_tag() :: binary().
+%% An arbitrary binary tag used to distinguish between different consumers
+%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
+
+-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
+%% Represents the delivery of one or more rabbit_fifo messages.
+
+-type consumer_id() :: {consumer_tag(), pid()}.
+%% The entity that receives messages. Uniquely identifies a consumer.
+
+-type credit_mode() :: simple_prefetch | credited.
+%% determines how credit is replenished
+
+-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
+ credit_mode()} |
+ {dequeue, settled | unsettled} |
+ cancel.
+
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+
+-type applied_mfa() :: {module(), atom(), list()}.
+% represents a partially applied module call
+
+-define(RELEASE_CURSOR_EVERY, 64000).
+-define(USE_AVG_HALF_LIFE, 10000.0).
+
+-record(consumer,
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ next_msg_id = 0 :: msg_id(), % part of snapshot data
+ %% max number of messages that can be sent
+ %% decremented for each delivery
+ credit = 0 : non_neg_integer(),
+ %% total number of checked out messages - ever
+ %% incremented for each delivery
+ delivery_count = 0 :: non_neg_integer(),
+ %% the mode of how credit is incremented
+ %% simple_prefetch: credit is re-filled as deliveries are settled
+ %% or returned.
+ %% credited: credit can only be changed by receiving a consumer_credit
+ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
+ credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
+ lifetime = once :: once | auto,
+ status = up :: up | suspected_down | cancelled
+ }).
+
+-type consumer() :: #consumer{}.
+
+-record(enqueuer,
+ {next_seqno = 1 :: msg_seqno(),
+ % out of order enqueues - sorted list
+ pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
+ status = up :: up | suspected_down
+ }).
+
+-record(cfg,
+ {name :: atom(),
+ resource :: rabbit_types:r('queue'),
+ release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
+ dead_letter_handler :: maybe(applied_mfa()),
+ become_leader_handler :: maybe(applied_mfa()),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = default :: default | single_active,
+ delivery_limit :: maybe(non_neg_integer())
+ }).
+
+-record(rabbit_fifo,
+ {cfg :: #cfg{},
+ % unassigned messages
+ messages = #{} :: #{msg_in_id() => indexed_msg()},
+ % defines the lowest message in id available in the messages map
+ % that isn't a return
+ low_msg_num :: msg_in_id() | undefined,
+ % defines the next message in id to be added to the messages map
+ next_msg_num = 1 :: msg_in_id(),
+ % list of returned msg_in_ids - when checking out it picks from
+ % this list first before taking low_msg_num
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
+ % a counter of enqueues - used to trigger shadow copy points
+ enqueue_count = 0 :: non_neg_integer(),
+ % a map containing all the live processes that have ever enqueued
+ % a message to this queue as well as a cached value of the smallest
+ % ra_index of all pending enqueues
+ enqueuers = #{} :: #{pid() => #enqueuer{}},
+ % master index of all enqueue raft indexes including pending
+ % enqueues
+ % rabbit_fifo_index can be slow when calculating the smallest
+ % index when there are large gaps but should be faster than gb_trees
+ % for normal appending operations as it's backed by a map
+ ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
+ release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
+ ra_index(), #rabbit_fifo{}}),
+ % consumers need to reflect consumer state at time of snapshot
+ % needs to be part of snapshot
+ consumers = #{} :: #{consumer_id() => #consumer{}},
+ % consumers that require further service are queued here
+ % needs to be part of snapshot
+ service_queue = queue:new() :: queue:queue(consumer_id()),
+ %% This is a special field that is only used for snapshots
+ %% It represents the queued messages at the time the
+ %% dehydrated snapshot state was cached.
+ %% As release_cursors are only emitted for raft indexes where all
+ %% prior messages no longer contribute to the current state we can
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
+ %% This is done so that consumers are still served in a deterministic
+ %% order on recovery.
+ prefix_msgs = {[], []} :: {Return :: [msg_header()],
+ PrefixMsgs :: [msg_header()]},
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ }).
+
+-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
+ dead_letter_handler => applied_mfa(),
+ become_leader_handler => applied_mfa(),
+ release_cursor_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 555e4e2e87..95bf067539 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -43,14 +43,17 @@ register() ->
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
+ {policy_validator, <<"delivery-limit">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
{operator_policy_validator, <<"max-length-bytes">>},
+ {operator_policy_validator, <<"delivery-limit">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
- {policy_merge_strategy, <<"max-length-bytes">>}]],
+ {policy_merge_strategy, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"delivery-limit">>}]],
ok.
validate_policy(Terms) ->
@@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
validate_policy0(<<"overflow">>, Value) ->
- {error, "~p is not a valid overflow value", [Value]}.
+ {error, "~p is not a valid overflow value", [Value]};
+
+validate_policy0(<<"delivery-limit">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"delivery-limit">>, Value) ->
+ {error, "~p is not a valid delivery limit", [Value]}.
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
-merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
+merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 9e73981541..e811bfffb3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
- single_active_consumer_on => single_active_consumer_on(Q)}.
+ single_active_consumer_on => single_active_consumer_on(Q),
+ delivery_limit => DeliveryLimit
+ }.
single_active_consumer_on(Q) ->
QArguments = amqqueue:get_arguments(Q),
@@ -680,14 +683,12 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
- timeout ->
+ {timeout, _} ->
{error, timeout};
E ->
%% TODO should we stop the ra process here?
E
end;
- timeout ->
- {error, timeout};
E ->
E
end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 406c02de83..0ec65c31b8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -111,7 +111,10 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
- queue_length_limit_drop_head
+ queue_length_limit_drop_head,
+ subscribe_redelivery_limit,
+ subscribe_redelivery_policy,
+ subscribe_redelivery_limit_with_dead_letter
].
memory_tests() ->
@@ -1487,12 +1490,12 @@ subscribe_redelivery_count(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0),
subscribe(Ch, QQ, false),
- DTag = <<"x-delivery-count">>,
+ DCHeader = <<"x-delivery-count">>,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag,
redelivered = false},
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
- ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true})
@@ -1502,7 +1505,7 @@ subscribe_redelivery_count(Config) ->
{#'basic.deliver'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
- ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
requeue = true})
@@ -1512,13 +1515,147 @@ subscribe_redelivery_count(Config) ->
{#'basic.deliver'{delivery_tag = DeliveryTag2,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H2}}} ->
- ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
multiple = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0)
end.
+subscribe_redelivery_limit(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1}])),
+
+ publish(Ch, QQ),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QQ, false),
+
+ DCHeader = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end.
+
+subscribe_redelivery_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
+ [{<<"delivery-limit">>, 1}]),
+
+ publish(Ch, QQ),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QQ, false),
+
+ DCHeader = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end,
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
+
+subscribe_redelivery_limit_with_dead_letter(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, DLX}
+ ])),
+ ?assertEqual({'queue.declare_ok', DLX, 0, 0},
+ declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ publish(Ch, QQ),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QQ, false),
+
+ DCHeader = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
+ wait_for_messages(Config, [[DLX, <<"1">>, <<"1">>, <<"0">>]]).
+
consume_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1531,14 +1668,14 @@ consume_redelivery_count(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
- DTag = <<"x-delivery-count">>,
+ DCHeader = <<"x-delivery-count">>,
{#'basic.get_ok'{delivery_tag = DeliveryTag,
redelivered = false},
#amqp_msg{props = #'P_basic'{headers = H0}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false}),
- ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ ?assertMatch({DCHeader, _, 0}, rabbit_basic:header(DCHeader, H0)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
@@ -1550,7 +1687,7 @@ consume_redelivery_count(Config) ->
#amqp_msg{props = #'P_basic'{headers = H1}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false}),
- ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
requeue = true}),
@@ -1560,7 +1697,7 @@ consume_redelivery_count(Config) ->
#amqp_msg{props = #'P_basic'{headers = H2}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false}),
- ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
multiple = false,
requeue = true}),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 60402b3a7b..ceed092d0f 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1,638 +1,1035 @@
-module(rabbit_fifo_SUITE).
+%% rabbit_fifo unit tests suite
+
-compile(export_all).
+-compile({no_auto_import, [apply/3]}).
+-export([
+ ]).
+
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("ra/include/ra.hrl").
+-include_lib("rabbit/src/rabbit_fifo.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
all() ->
[
{group, tests}
].
+
+%% replicate eunit like test resultion
all_tests() ->
- [
- basics,
- return,
- rabbit_fifo_returns_correlation,
- resends_lost_command,
- returns_after_down,
- resends_after_lost_applied,
- handles_reject_notification,
- two_quick_enqueues,
- detects_lost_delivery,
- dequeue,
- discard,
- cancel_checkout,
- credit,
- untracked_enqueue,
- flow,
- test_queries,
- duplicate_delivery,
- usage
- ].
+ [F || {F, _} <- ?MODULE:module_info(functions),
+ re:run(atom_to_list(F), "_test$") /= nomatch]
+ .
groups() ->
[
{tests, [], all_tests()}
].
-init_per_group(_, Config) ->
- PrivDir = ?config(priv_dir, Config),
- _ = application:load(ra),
- ok = application:set_env(ra, data_dir, PrivDir),
- application:ensure_all_started(ra),
- application:ensure_all_started(lg),
+init_per_suite(Config) ->
Config.
-end_per_group(_, Config) ->
- _ = application:stop(ra),
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
Config.
-init_per_testcase(TestCase, Config) ->
- meck:new(rabbit_quorum_queue, [passthrough]),
- meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
- meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
- fun (_, _) -> ok end),
- ra_server_sup_sup:remove_all(),
- ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
- ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
- ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
- [
- {cluster_name, ClusterName},
- {uid, atom_to_binary(TestCase, utf8)},
- {node_id, {TestCase, node()}},
- {uid2, atom_to_binary(ServerName2, utf8)},
- {node_id2, {ServerName2, node()}},
- {uid3, atom_to_binary(ServerName3, utf8)},
- {node_id3, {ServerName3, node()}}
- | Config].
-
-end_per_testcase(_, Config) ->
- meck:unload(),
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
Config.
-basics(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- UId = ?config(uid, Config),
- CustomerTag = UId,
- ok = start_cluster(ClusterName, [ServerId]),
- FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
-
- ra_log_wal:force_roll_over(ra_log_wal),
- % create segment the segment will trigger a snapshot
- timer:sleep(1000),
-
- {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1),
- % process ra events
- FState3 = process_ra_event(FState2, 250),
-
- FState5 = receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
- {internal, _AcceptedSeqs, _Actions, _FState4} ->
- exit(unexpected_internal_event);
- {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
- {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
- FState4),
- S
- end
- after 5000 ->
- exit(await_msg_timeout)
- end,
-
- % process settle applied notification
- FState5b = process_ra_event(FState5, 250),
- _ = ra:stop_server(ServerId),
- _ = ra:restart_server(ServerId),
-
- %% wait for leader change to notice server is up again
- receive
- {ra_event, _, {machine, leader_change}} -> ok
- after 5000 ->
- exit(leader_change_timeout)
- end,
-
- {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
- % process applied event
- FState6b = process_ra_event(FState6, 250),
-
- receive
- {ra_event, Frm, E} ->
- case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
- {internal, _, _, _FState7} ->
- exit({unexpected_internal_event, E});
- {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
- {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
- ok
- end
- after 2000 ->
- exit(await_msg_timeout)
- end,
- ra:stop_server(ServerId),
- ok.
-
-return(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ServerId2 = ?config(node_id2, Config),
- ok = start_cluster(ClusterName, [ServerId, ServerId2]),
-
- F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]),
- {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
- {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
- {_, _, F2} = process_ra_events(F1, 100),
- {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
- {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
-
- ra:stop_server(ServerId),
- ok.
-
-rabbit_fifo_returns_correlation(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0),
- receive
- {ra_event, Frm, E} ->
- case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
- {internal, [corr1], [], _F2} ->
- ok;
- {Del, _} ->
- exit({unexpected, Del})
- end
- after 2000 ->
- exit(await_msg_timeout)
- end,
- ra:stop_server(ServerId),
- ok.
-
-duplicate_delivery(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
- Fun = fun Loop(S0) ->
- receive
- {ra_event, Frm, E} = Evt ->
- case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
- {internal, [corr1], [], S1} ->
- Loop(S1);
- {_Del, S1} ->
- %% repeat event delivery
- self() ! Evt,
- %% check that then next received delivery doesn't
- %% repeat or crash
- receive
- {ra_event, F, E1} ->
- case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
- {internal, [], [], S2} ->
- S2
- end
- end
- end
- after 2000 ->
- exit(await_msg_timeout)
- end
- end,
- Fun(F2),
- ra:stop_server(ServerId),
- ok.
-
-usage(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
- {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
- {_, _, _} = process_ra_events(F3, 50),
- % force tick and usage stats emission
- ServerId ! tick_timeout,
- timer:sleep(50),
- Use = rabbit_fifo:usage(element(1, ServerId)),
- ra:stop_server(ServerId),
- ?assert(Use > 0.0),
- ok.
-
-resends_lost_command(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- ok = meck:new(ra, [passthrough]),
-
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
- % lose the enqueue
- meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
- {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
- meck:unload(ra),
- {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
- {_, _, F4} = process_ra_events(F3, 500),
- {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
- ra:stop_server(ServerId),
- ok.
-
-two_quick_enqueues(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
- {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
- _ = process_ra_events(F2, 500),
- ra:stop_server(ServerId),
- ok.
-
-detects_lost_delivery(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
- {_, _, F0} = process_ra_events(F00, 100),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
- {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
- % lose first delivery
- receive
- {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
- ok
- after 500 ->
- exit(await_delivery_timeout)
- end,
-
- % assert three deliveries were received
- {[_, _, _], _, _} = process_ra_events(F3, 500),
- ra:stop_server(ServerId),
- ok.
-
-returns_after_down(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
- {_, _, F2} = process_ra_events(F1, 500),
- % start a customer in a separate processes
- % that exits after checkout
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+-define(ASSERT_EFF(EfxPat, Effects),
+ ?ASSERT_EFF(EfxPat, true, Effects)).
+
+-define(ASSERT_EFF(EfxPat, Guard, Effects),
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(assertNoEffect(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+test_init(Name) ->
+ init(#{name => Name,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(Name, utf8)),
+ release_cursor_interval => 0}).
+
+enq_enq_checkout_test(_) ->
+ Cid = {<<"enq_enq_checkout_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {_State3, _, Effects} =
+ apply(meta(3),
+ rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
+ State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
+ ok.
+
+credit_enq_enq_checkout_settled_credit_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _, Effects} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {auto, 1, credited}, #{}), State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
+ (_) -> false
+ end, Effects),
+ ?assertEqual(1, length(Deliveries)),
+ %% settle the delivery this should _not_ result in further messages being
+ %% delivered
+ {State4, SettledEffects} = settle(Cid, 4, 1, State3),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, SettledEffects)),
+ %% granting credit (3) should deliver the second msg if the receivers
+ %% delivery count is (1)
+ {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4),
+ % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects),
+ {_State6, FinalEffects} = enq(6, 3, third, State5),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, FinalEffects)),
+ ok.
+
+credit_with_drained_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ %% checkout with a single credit
+ {State1, _, _} =
+ apply(meta(1), rabbit_fifo:make_checkout(Cid, {auto, 1, credited},#{}),
+ State0),
+ ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 1,
+ delivery_count = 0}}},
+ State1),
+ {State, Result, _} =
+ apply(meta(3), rabbit_fifo:make_credit(Cid, 0, 5, true), State1),
+ ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 5}}},
+ State),
+ ?assertEqual({multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 5}]}]},
+ Result),
+ ok.
+
+credit_and_drain_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ %% checkout without any initial credit (like AMQP 1.0 would)
+ {State3, _, CheckEffs} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {auto, 0, credited}, #{}),
+ State2),
+
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
+ {State4, {multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 2}]}]},
+ Effects} = apply(meta(4), rabbit_fifo:make_credit(Cid, 4, 0, true), State3),
+ ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 4}}},
+ State4),
+
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
+ {_, {_, second}}]}, _}, Effects),
+ {_State5, EnqEffs} = enq(5, 2, third, State4),
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
+ ok.
+
+
+
+enq_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ NumReady = 1,
+ {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ ok.
+
+enq_enq_deq_deq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ {_State4, {dequeue, empty}} =
+ apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State3),
+ ok.
+
+enq_enq_checkout_get_settled_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ % get returns a reply value
+ {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}),
+ State1),
+ ok.
+
+checkout_get_empty_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State = test_init(test),
+ {_State2, {dequeue, empty}} =
+ apply(meta(1), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State),
+ ok.
+
+untracked_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo:make_enqueue(undefined, undefined, first),
+ State0),
+ {_State2, {dequeue, {0, {_, first}}, _}, _} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+release_cursor_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _} = check(Cid, 3, 10, State2),
+ % no release cursor effect at this point
+ {State4, _} = settle(Cid, 4, 1, State3),
+ {_Final, Effects1} = settle(Cid, 5, 0, State4),
+ % empty queue forwards release cursor all the way
+ ?ASSERT_EFF({release_cursor, 5, _}, Effects1),
+ ok.
+
+checkout_enq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
+ {State2, Effects0} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, ?FUNCTION_NAME,
+ [{0, {_, first}}]}, _},
+ Effects0),
+ {State3, [_Inactive]} = enq(3, 2, second, State2),
+ {_, _Effects} = settle(Cid, 4, 0, State3),
+ % the release cursor is the smallest raft index that does not
+ % contribute to the state of the application
+ % ?ASSERT_EFF({release_cursor, 2, _}, Effects),
+ ok.
+
+out_of_order_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ % assert monitor was set up
+ ?ASSERT_EFF({monitor, _, _}, Effects2),
+ % enqueue seq num 3 and 4 before 2
+ {State3, Effects3} = enq(3, 3, third, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3),
+ {State4, Effects4} = enq(4, 4, fourth, State3),
+ % assert no further deliveries where made
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4),
+ {_State5, Effects5} = enq(5, 2, second, State4),
+ % assert two deliveries were now made
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}},
+ {_, {_, third}},
+ {_, {_, fourth}}]}, _},
+ Effects5),
+ ok.
+
+out_of_order_first_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = check_n(Cid, 5, 5, test_init(test)),
+ {_State2, Effects2} = enq(2, 10, first, State1),
+ ?ASSERT_EFF({monitor, process, _}, Effects2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _},
+ Effects2),
+ ok.
+
+duplicate_enqueue_test(_) ->
+ Cid = {<<"duplicate_enqueue_test">>, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ {_State3, Effects3} = enq(3, 1, first, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
+ ok.
+
+return_non_existent_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
+ % return non-existent
+ {_State2, _} = apply(meta(3), rabbit_fifo:make_return(Cid, [99]), State0),
+ ok.
+
+return_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1),
+ ok.
+
+return_checked_out_limit_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, [_, _]} = enq(1, 1, first, Init),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1),
+ {#rabbit_fifo{ra_indexes = RaIdxs}, ok, []} =
+ apply(meta(4), rabbit_fifo:make_return(Cid, [MsgId2]), State2),
+ ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
+ ok.
+
+return_auto_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ % it first active then inactive as the consumer took on but cannot take
+ % any more
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active},
+ {aux, inactive}
+ ]} = check_auto(Cid, 2, State0),
+ % return should include another delivery
+ {_State2, _, Effects} = apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
+ Effects),
+ ok.
+
+cancelled_checkout_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ {State1, _} = check_auto(Cid, 2, State0),
+ % cancelled checkout should not return pending messages to queue
+ {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
+ ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
+
+ {State3, {dequeue, empty}} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),
+ %% settle
+ {State4, ok, _} =
+ apply(meta(4), rabbit_fifo:make_settle(Cid, [0]), State3),
+
+ {_State, {dequeue, {_, {_, second}}, _}, _} =
+ apply(meta(5), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State4),
+ ok.
+
+down_with_noproc_consumer_returns_unsettled_test(_) ->
+ Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, second, test_init(test)),
+ {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
+ {_State, Effects} = check(Cid, 4, State2),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
Self = self(),
- _Pid = spawn(fun () ->
- F = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
- undefined, F),
- Self ! checkout_done
- end),
- receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
- timer:sleep(1000),
- % message should be available for dequeue
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
- ra:stop_server(ServerId),
- ok.
-
-resends_after_lost_applied(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)),
- 500),
- {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
- % lose an applied event
- receive
- {ra_event, _, {applied, _}} ->
- ok
- after 500 ->
- exit(await_ra_event_timeout)
- end,
- % send another message
- {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
- {_, _, F4} = process_ra_events(F3, 500),
- {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
- ra:stop_server(ServerId),
- ok.
-
-handles_reject_notification(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId1 = ?config(node_id, Config),
- ServerId2 = ?config(node_id2, Config),
- UId1 = ?config(uid, Config),
- CId = {UId1, self()},
-
- ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
- _ = ra:process_command(ServerId1,
- rabbit_fifo:make_checkout(
- CId,
- {auto, 10, simple_prefetch},
- #{})),
- % reverse order - should try the first node in the list first
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
- {ok, F1} = rabbit_fifo_client:enqueue(one, F0),
-
- timer:sleep(500),
-
- % the applied notification
- _F2 = process_ra_event(F1, 250),
- ra:stop_server(ServerId1),
- ra:stop_server(ServerId2),
- ok.
-
-discard(Config) ->
- PrivDir = ?config(priv_dir, Config),
- ServerId = ?config(node_id, Config),
- UId = ?config(uid, Config),
- ClusterName = ?config(cluster_name, Config),
- Conf = #{cluster_name => ClusterName#resource.name,
- id => ServerId,
- uid => UId,
- log_init_args => #{data_dir => PrivDir, uid => UId},
- initial_member => [],
- machine => {module, rabbit_fifo,
- #{queue_resource => discard,
- dead_letter_handler =>
- {?MODULE, dead_letter_handler, [self()]}}}},
- _ = ra:start_server(Conf),
- ok = ra:trigger_election(ServerId),
- _ = ra:members(ServerId),
-
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
- F3 = discard_next_delivery(F2, 500),
- {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
- receive
- {dead_letter, Letters} ->
- [{_, msg1}] = Letters,
- ok
- after 500 ->
- exit(dead_letter_timeout)
- end,
- ra:stop_server(ServerId),
- ok.
-
-cancel_checkout(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
- {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
- {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
- ok.
-
-credit(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
- {_, _, F3} = process_ra_events(F2, [], 250),
- %% checkout with 0 prefetch
- {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
- %% assert no deliveries
- {_, _, F5} = process_ra_events0(F4, [], [], 250,
- fun
- (D, _) -> error({unexpected_delivery, D})
- end),
- %% provide some credit
- {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
- {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
- process_ra_events(F6, [], 250),
-
- %% credit and drain
- {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
- {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
- process_ra_events(F8, [], 250),
- flush(),
-
- %% enqueue another message - at this point the consumer credit should be
- %% all used up due to the drain
- {ok, F10} = rabbit_fifo_client:enqueue(m3, F9),
- %% assert no deliveries
- {_, _, F11} = process_ra_events0(F10, [], [], 250,
- fun
- (D, _) -> error({unexpected_delivery, D})
- end),
- %% credit again and receive the last message
- {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
- {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250),
- ok.
-
-untracked_enqueue(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
-
- ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
- timer:sleep(100),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
- ra:stop_server(ServerId),
- ok.
-
-
-flow(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3),
- {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
- {ok, F3} = rabbit_fifo_client:enqueue(m3, F2),
- {slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
- {_, _, F5} = process_ra_events(F4, 500),
- {ok, _} = rabbit_fifo_client:enqueue(m5, F5),
- ra:stop_server(ServerId),
- ok.
-
-test_queries(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- ok = start_cluster(ClusterName, [ServerId]),
- P = spawn(fun () ->
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
- process_ra_events(F2, 100),
- receive stop -> ok end
- end),
- F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
- {ok, {_, Ready}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1),
- ?assertEqual(1, Ready),
- {ok, {_, Checked}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1),
- ?assertEqual(1, Checked),
- {ok, {_, Processes}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1),
- ?assertEqual(2, length(Processes)),
- P ! stop,
- ra:stop_server(ServerId),
- ok.
-
-dead_letter_handler(Pid, Msgs) ->
- Pid ! {dead_letter, Msgs}.
-
-dequeue(Config) ->
- ClusterName = ?config(cluster_name, Config),
- ServerId = ?config(node_id, Config),
- UId = ?config(uid, Config),
- Tag = UId,
- ok = start_cluster(ClusterName, [ServerId]),
- F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
- {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
- {_, _, F2} = process_ra_events(F2_, 100),
-
- {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
- {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
- {_, _, F4} = process_ra_events(F4_, 100),
- {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
- {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
- ra:stop_server(ServerId),
- ok.
-
-enq_deq_n(N, F0) ->
- enq_deq_n(N, F0, []).
-
-enq_deq_n(0, F0, Acc) ->
- {_, _, F} = process_ra_events(F0, 100),
- {F, Acc};
-enq_deq_n(N, F, Acc) ->
- {ok, F1} = rabbit_fifo_client:enqueue(N, F),
- {_, _, F2} = process_ra_events(F1, 10),
- {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
-
- {_, _, F4} = process_ra_events(F3, 5),
- enq_deq_n(N-1, F4, [Deq | Acc]).
-
-conf(ClusterName, UId, ServerId, _, Peers) ->
- #{cluster_name => ClusterName,
- id => ServerId,
- uid => UId,
- log_init_args => #{uid => UId},
- initial_members => Peers,
- machine => {module, rabbit_fifo, #{}}}.
-
-process_ra_event(State, Wait) ->
- receive
- {ra_event, From, Evt} ->
- ct:pal("processed ra event ~p~n", [Evt]),
- {internal, _, _, S} =
- rabbit_fifo_client:handle_ra_event(From, Evt, State),
- S
- after Wait ->
- exit(ra_event_timeout)
- end.
-
-process_ra_events(State0, Wait) ->
- process_ra_events(State0, [], Wait).
-
-process_ra_events(State, Acc, Wait) ->
- DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
- S2
+ Node = node(Pid),
+ {State0, Effects0} = enq(1, 1, second, test_init(test)),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#rabbit_fifo.consumers),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
+ % monitor both enqueuer and consumer
+ % because we received a noconnection we now need to monitor the node
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers),
+ %% validate consumer has credit
+ {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
+ ?ASSERT_EFF({monitor, node, _}, Effects2),
+ ?assertNoEffect({demonitor, process, _}, Effects2),
+ % when the node comes up we need to retry the process monitors for the
+ % disconnected processes
+ {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ % try to re-monitor the suspect processes
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
+ ok.
+
+down_with_noconnection_returns_unack_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
+ ok.
+
+down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
+ State00 = test_init(test),
+ Pid = spawn(fun() -> ok end),
+ {State0, _, Effects0} = apply(meta(1), rabbit_fifo:make_enqueue(Pid, 1, first), State00),
+ ?ASSERT_EFF({monitor, process, _}, Effects0),
+ {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
+ % ensure there are no enqueuers
+ ?assert(0 =:= maps:size(State1#rabbit_fifo.enqueuers)),
+ ok.
+
+discarded_message_without_dead_letter_handler_is_removed_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {#{}, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
+ ?assertNoEffect({send_msg, _,
+ {delivery, _, [{0, {#{}, first}}]}, _},
+ Effects2),
+ ok.
+
+discarded_message_with_dead_letter_handler_emits_mod_call_effect_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ dead_letter_handler =>
+ {somemod, somefun, [somearg]}}),
+ {State0, [_, _]} = enq(1, 1, first, State00),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {#{}, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
+ % assert mod call effect with appended reason and message
+ ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
+ Effects2),
+ ok.
+
+tick_test(_) ->
+ Cid = {<<"c">>, self()},
+ Cid2 = {<<"c2">>, self()},
+ {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
+ {S1, _} = enq(2, 2, <<"snd">>, S0),
+ {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
+ {S3, {_, _}} = deq(4, Cid2, unsettled, S2),
+ {S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
+
+ [{mod_call, _, _,
+ [#resource{},
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4),
+ ok.
+
+
+delivery_query_returns_deliveries_test(_) ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ rabbit_fifo:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
+ rabbit_fifo:make_enqueue(self(), 1, one),
+ rabbit_fifo:make_enqueue(self(), 2, two),
+ rabbit_fifo:make_enqueue(self(), 3, tre),
+ rabbit_fifo:make_enqueue(self(), 4, for)
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(help), Entries),
+ % 3 deliveries are returned
+ [{0, {#{}, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State),
+ [_, _, _] = rabbit_fifo:get_checked_out(Cid, 1, 3, State),
+ ok.
+
+pending_enqueue_is_enqueued_on_down_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Pid = self(),
+ {State0, _} = enq(1, 2, first, test_init(test)),
+ {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
+ {_State2, {dequeue, {0, {_, first}}, 0}, _} =
+ apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+duplicate_delivery_test(_) ->
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ {#rabbit_fifo{ra_indexes = RaIdxs,
+ messages = Messages}, _} = enq(2, 1, first, State0),
+ ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
+ ?assertEqual(1, maps:size(Messages)),
+ ok.
+
+state_enter_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ become_leader_handler => {m, f, [a]}}),
+ [{mod_call, m, f, [a, the_name]}] = rabbit_fifo:state_enter(leader, S0),
+ ok.
+
+state_enter_monitors_and_notifications_test(_) ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
+ Self = self(),
+ Effects = rabbit_fifo:state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
+ [{monitor, process, Self},
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+purge_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1),
+ {State3, _} = enq(3, 2, second, State2),
+ % get returns a reply value
+ {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
+ apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
+ ok.
+
+purge_with_checkout_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
+ {State1, _} = enq(2, 1, <<"first">>, State0),
+ {State2, _} = enq(3, 2, <<"second">>, State1),
+ %% assert message bytes are non zero
+ ?assert(State2#rabbit_fifo.msg_bytes_checkout > 0),
+ ?assert(State2#rabbit_fifo.msg_bytes_enqueue > 0),
+ {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State2),
+ ?assert(State2#rabbit_fifo.msg_bytes_checkout > 0),
+ ?assertEqual(0, State3#rabbit_fifo.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#rabbit_fifo.ra_indexes)),
+ #consumer{checked_out = Checked} = maps:get(Cid, State3#rabbit_fifo.consumers),
+ ?assertEqual(1, maps:size(Checked)),
+ ok.
+
+down_returns_checked_out_in_order_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
+ Returns = lqueue:to_list(S#rabbit_fifo.returns),
+ ?assertEqual(100, length(Returns)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+single_active_consumer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ meta(1),
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#rabbit_fifo.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)),
+ ?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, Effects1} = apply(meta(2),
+ make_checkout({<<"ctag3">>, self()},
+ cancel, #{}), State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?assertEqual(1, length(Effects1)),
+
+ % cancelling the active consumer
+ {State3, _, Effects2} = apply(meta(3),
+ make_checkout({<<"ctag1">>, self()},
+ cancel, #{}),
+ State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
+ ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects2)),
+
+ % cancelling the active consumer
+ {State4, _, Effects3} = apply(meta(4),
+ make_checkout({<<"ctag2">>, self()},
+ cancel, #{}),
+ State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#rabbit_fifo.consumers)),
+ ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects3)),
+
+ % cancelling the last consumer
+ {State5, _, Effects4} = apply(meta(5),
+ make_checkout({<<"ctag4">>, self()},
+ cancel, #{}),
+ State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#rabbit_fifo.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?assertEqual(1 + 1, length(Effects4)),
+
+ ok.
+
+single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
end,
- process_ra_events0(State, Acc, [], Wait, DeliveryFun).
-
-process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
- receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, Actions, State} ->
- process_ra_events0(State, Acc, Actions0 ++ Actions,
- Wait, DeliveryFun);
- {{delivery, _Tag, Msgs} = Del, State1} ->
- State = DeliveryFun(Del, State1),
- process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun);
- eol ->
- eol
- end
- after Wait ->
- {Acc, Actions0, State0}
- end.
-
-discard_next_delivery(State0, Wait) ->
- receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _Actions, State} ->
- discard_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
- State1),
- State
- end
- after Wait ->
- State0
- end.
-
-return_next_delivery(State0, Wait) ->
- receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _, State} ->
- return_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
- State1),
- State
- end
- after Wait ->
- State0
- end.
-
-validate_process_down(Name, 0) ->
- exit({process_not_down, Name});
-validate_process_down(Name, Num) ->
- case whereis(Name) of
- undefined ->
- ok;
- _ ->
- timer:sleep(100),
- validate_process_down(Name, Num-1)
- end.
-
-start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
- {ok, Started, _} = ra:start_cluster(ClusterName#resource.name,
- {module, rabbit_fifo, RaFifoConfig},
- ServerIds),
- ?assertEqual(length(Started), length(ServerIds)),
- ok.
-
-start_cluster(ClusterName, ServerIds) ->
- start_cluster(ClusterName, ServerIds, #{name => some_name,
- queue_resource => ClusterName}).
-
-flush() ->
- receive
- Msg ->
- ct:pal("flushed: ~w~n", [Msg]),
- flush()
- after 10 ->
- ok
- end.
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ % the channel of the active consumer goes down
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
+ % there are still waiting consumers
+ ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
+ ?assertEqual(2, length(Effects)),
+
+ % the channel of the active consumer and a waiting consumer goes down
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
+ % no more waiting consumer
+ ?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)),
+ % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
+ ?assertEqual(3, length(Effects2)),
+
+ % the last channel goes down
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ % no more consumers
+ ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
+ ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?assertEqual(1 + 1, length(Effects3)),
+
+ ok.
+
+single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % simulate node goes down
+ {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
+
+ % all the waiting consumers should be suspected down
+ ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status == suspected_down)
+ end, State2#rabbit_fifo.waiting_consumers),
+
+ % simulate node goes back up
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
+
+ % all the waiting consumers should be un-suspected
+ ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status /= suspected_down)
+ end, State3#rabbit_fifo.waiting_consumers),
+
+ ok.
+
+single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo:state_enter(leader, State1),
+ % 2 effects for each consumer process (channel process), 1 effect for the node
+ ?assertEqual(2 * 3 + 1, length(Effects)).
+
+single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo:state_enter(eol, State1),
+ % 1 effect for each consumer process (channel process)
+ ?assertEqual(3, length(Effects)).
+
+query_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ Consumers0 = State1#rabbit_fifo.consumers,
+ Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()},
+ Consumer#consumer{status = suspected_down}, Consumers0),
+ State2 = State1#rabbit_fifo{consumers = Consumers1},
+
+ ?assertEqual(4, rabbit_fifo:query_consumer_count(State2)),
+ Consumers2 = rabbit_fifo:query_consumers(State2),
+ ?assertEqual(4, maps:size(Consumers2)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag2">> ->
+ ?assertNot(Active),
+ ?assertEqual(suspected_down, ActivityStatus);
+ _ ->
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end
+ end, [], Consumers2).
+
+query_consumers_when_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ ?assertEqual(4, rabbit_fifo:query_consumer_count(State1)),
+ Consumers = rabbit_fifo:query_consumers(State1),
+ ?assertEqual(4, maps:size(Consumers)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag1">> ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
+ end, [], Consumers).
+
+active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} =
+ apply(
+ #{index => 1},
+ rabbit_fifo:make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ % only 1 effect to monitor the node
+ ?assertEqual(1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(4, length(Effects3)).
+
+meta(Idx) ->
+ #{index => Idx, term => 1}.
+
+enq(Idx, MsgSeq, Msg, State) ->
+ strip_reply(
+ apply(meta(Idx), rabbit_fifo:make_enqueue(self(), MsgSeq, Msg), State)).
+
+deq(Idx, Cid, Settlement, State0) ->
+ {State, {dequeue, {MsgId, Msg}, _}, _} =
+ apply(meta(Idx),
+ rabbit_fifo:make_checkout(Cid, {dequeue, Settlement}, #{}),
+ State0),
+ {State, {MsgId, Msg}}.
+
+check_n(Cid, Idx, N, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo:make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo:make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
+ State)).
+
+check_auto(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, Num, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
+ State)).
+
+settle(Cid, Idx, MsgId, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo:make_settle(Cid, [MsgId]), State)).
+
+credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo:make_credit(Cid, Credit, DelCnt, Drain),
+ State)).
+
+strip_reply({State, _, Effects}) ->
+ {State, Effects}.
+
+run_log(InitState, Entries) ->
+ lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
+ case apply(meta(Idx), E, Acc0) of
+ {Acc, _, Efx} when is_list(Efx) ->
+ {Acc, Efx0 ++ Efx};
+ {Acc, _, Efx} ->
+ {Acc, Efx0 ++ [Efx]};
+ {Acc, _} ->
+ {Acc, Efx0}
+ end
+ end, {InitState, []}, Entries).
+
+
+%% AUX Tests
+
+aux_test(_) ->
+ _ = ra_machine_ets:start_link(),
+ Aux0 = init_aux(aux_test),
+ MacState = init(#{name => aux_test,
+ queue_resource =>
+ rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Log = undefined,
+ {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
+ Log, MacState),
+ {no_reply, _Aux, undefined} = handle_aux(leader, cast, emit, Aux,
+ Log, MacState),
+ [X] = ets:lookup(rabbit_fifo_usage, aux_test),
+ ?assert(X > 0.0),
+ ok.
+
+%% Utility
+
+init(Conf) -> rabbit_fifo:init(Conf).
+apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
+init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
+handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
+make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M).
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
new file mode 100644
index 0000000000..f281d15795
--- /dev/null
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -0,0 +1,640 @@
+-module(rabbit_fifo_int_SUITE).
+
+%% rabbit_fifo and rabbit_fifo_client integration suite
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+all_tests() ->
+ [
+ basics,
+ return,
+ rabbit_fifo_returns_correlation,
+ resends_lost_command,
+ returns_after_down,
+ resends_after_lost_applied,
+ handles_reject_notification,
+ two_quick_enqueues,
+ detects_lost_delivery,
+ dequeue,
+ discard,
+ cancel_checkout,
+ credit,
+ untracked_enqueue,
+ flow,
+ test_queries,
+ duplicate_delivery,
+ usage
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_group(_, Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ _ = application:load(ra),
+ ok = application:set_env(ra, data_dir, PrivDir),
+ application:ensure_all_started(ra),
+ application:ensure_all_started(lg),
+ Config.
+
+end_per_group(_, Config) ->
+ _ = application:stop(ra),
+ Config.
+
+init_per_testcase(TestCase, Config) ->
+ meck:new(rabbit_quorum_queue, [passthrough]),
+ meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
+ fun (_, _) -> ok end),
+ ra_server_sup_sup:remove_all(),
+ ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
+ ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
+ ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
+ [
+ {cluster_name, ClusterName},
+ {uid, atom_to_binary(TestCase, utf8)},
+ {node_id, {TestCase, node()}},
+ {uid2, atom_to_binary(ServerName2, utf8)},
+ {node_id2, {ServerName2, node()}},
+ {uid3, atom_to_binary(ServerName3, utf8)},
+ {node_id3, {ServerName3, node()}}
+ | Config].
+
+end_per_testcase(_, Config) ->
+ meck:unload(),
+ Config.
+
+basics(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ CustomerTag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
+
+ ra_log_wal:force_roll_over(ra_log_wal),
+ % create segment the segment will trigger a snapshot
+ timer:sleep(1000),
+
+ {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1),
+ % process ra events
+ FState3 = process_ra_event(FState2, 250),
+
+ FState5 = receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
+ {internal, _AcceptedSeqs, _Actions, _FState4} ->
+ exit(unexpected_internal_event);
+ {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
+ {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
+ FState4),
+ S
+ end
+ after 5000 ->
+ exit(await_msg_timeout)
+ end,
+
+ % process settle applied notification
+ FState5b = process_ra_event(FState5, 250),
+ _ = ra:stop_server(ServerId),
+ _ = ra:restart_server(ServerId),
+
+ %% wait for leader change to notice server is up again
+ receive
+ {ra_event, _, {machine, leader_change}} -> ok
+ after 5000 ->
+ exit(leader_change_timeout)
+ end,
+
+ {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
+ % process applied event
+ FState6b = process_ra_event(FState6, 250),
+
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
+ {internal, _, _, _FState7} ->
+ exit({unexpected_internal_event, E});
+ {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
+ {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
+ ok
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+return(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ ok = start_cluster(ClusterName, [ServerId, ServerId2]),
+
+ F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]),
+ {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
+ {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
+ {_, _, F2} = process_ra_events(F1, 100),
+ {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
+
+ ra:stop_server(ServerId),
+ ok.
+
+rabbit_fifo_returns_correlation(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0),
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
+ {internal, [corr1], [], _F2} ->
+ ok;
+ {Del, _} ->
+ exit({unexpected, Del})
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+duplicate_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ Fun = fun Loop(S0) ->
+ receive
+ {ra_event, Frm, E} = Evt ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
+ {internal, [corr1], [], S1} ->
+ Loop(S1);
+ {_Del, S1} ->
+ %% repeat event delivery
+ self() ! Evt,
+ %% check that then next received delivery doesn't
+ %% repeat or crash
+ receive
+ {ra_event, F, E1} ->
+ case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
+ {internal, [], [], S2} ->
+ S2
+ end
+ end
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end
+ end,
+ Fun(F2),
+ ra:stop_server(ServerId),
+ ok.
+
+usage(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
+ {_, _, _} = process_ra_events(F3, 50),
+ % force tick and usage stats emission
+ ServerId ! tick_timeout,
+ timer:sleep(50),
+ Use = rabbit_fifo:usage(element(1, ServerId)),
+ ra:stop_server(ServerId),
+ ?assert(Use > 0.0),
+ ok.
+
+resends_lost_command(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = meck:new(ra, [passthrough]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ % lose the enqueue
+ meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ meck:unload(ra),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+two_quick_enqueues(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ _ = process_ra_events(F2, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+detects_lost_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
+ {_, _, F0} = process_ra_events(F00, 100),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ % lose first delivery
+ receive
+ {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
+ ok
+ after 500 ->
+ exit(await_delivery_timeout)
+ end,
+
+ % assert three deliveries were received
+ {[_, _, _], _, _} = process_ra_events(F3, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+returns_after_down(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ {_, _, F2} = process_ra_events(F1, 500),
+ % start a customer in a separate processes
+ % that exits after checkout
+ Self = self(),
+ _Pid = spawn(fun () ->
+ F = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
+ undefined, F),
+ Self ! checkout_done
+ end),
+ receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ timer:sleep(1000),
+ % message should be available for dequeue
+ {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ ra:stop_server(ServerId),
+ ok.
+
+resends_after_lost_applied(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ 500),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ % lose an applied event
+ receive
+ {ra_event, _, {applied, _}} ->
+ ok
+ after 500 ->
+ exit(await_ra_event_timeout)
+ end,
+ % send another message
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+handles_reject_notification(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId1 = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ UId1 = ?config(uid, Config),
+ CId = {UId1, self()},
+
+ ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
+ _ = ra:process_command(ServerId1,
+ rabbit_fifo:make_checkout(
+ CId,
+ {auto, 10, simple_prefetch},
+ #{})),
+ % reverse order - should try the first node in the list first
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
+ {ok, F1} = rabbit_fifo_client:enqueue(one, F0),
+
+ timer:sleep(500),
+
+ % the applied notification
+ _F2 = process_ra_event(F1, 250),
+ ra:stop_server(ServerId1),
+ ra:stop_server(ServerId2),
+ ok.
+
+discard(Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ ClusterName = ?config(cluster_name, Config),
+ Conf = #{cluster_name => ClusterName#resource.name,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{data_dir => PrivDir, uid => UId},
+ initial_member => [],
+ machine => {module, rabbit_fifo,
+ #{queue_resource => discard,
+ dead_letter_handler =>
+ {?MODULE, dead_letter_handler, [self()]}}}},
+ _ = ra:start_server(Conf),
+ ok = ra:trigger_election(ServerId),
+ _ = ra:members(ServerId),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
+ F3 = discard_next_delivery(F2, 500),
+ {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
+ receive
+ {dead_letter, Letters} ->
+ [{_, msg1}] = Letters,
+ ok
+ after 500 ->
+ exit(dead_letter_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+cancel_checkout(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
+ {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
+ {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
+ {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
+ ok.
+
+credit(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {_, _, F3} = process_ra_events(F2, [], 250),
+ %% checkout with 0 prefetch
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
+ %% assert no deliveries
+ {_, _, F5} = process_ra_events0(F4, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% provide some credit
+ {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
+ {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
+ process_ra_events(F6, [], 250),
+
+ %% credit and drain
+ {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
+ {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
+ process_ra_events(F8, [], 250),
+ flush(),
+
+ %% enqueue another message - at this point the consumer credit should be
+ %% all used up due to the drain
+ {ok, F10} = rabbit_fifo_client:enqueue(m3, F9),
+ %% assert no deliveries
+ {_, _, F11} = process_ra_events0(F10, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% credit again and receive the last message
+ {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
+ {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250),
+ ok.
+
+untracked_enqueue(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
+ timer:sleep(100),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ ra:stop_server(ServerId),
+ ok.
+
+
+flow(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(m3, F2),
+ {slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
+ {_, _, F5} = process_ra_events(F4, 500),
+ {ok, _} = rabbit_fifo_client:enqueue(m5, F5),
+ ra:stop_server(ServerId),
+ ok.
+
+test_queries(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ P = spawn(fun () ->
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ process_ra_events(F2, 100),
+ receive stop -> ok end
+ end),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
+ {ok, {_, Ready}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
+ {ok, {_, Checked}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
+ {ok, {_, Processes}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_processes/1),
+ ?assertEqual(2, length(Processes)),
+ P ! stop,
+ ra:stop_server(ServerId),
+ ok.
+
+dead_letter_handler(Pid, Msgs) ->
+ Pid ! {dead_letter, Msgs}.
+
+dequeue(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ Tag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
+ {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
+ {_, _, F2} = process_ra_events(F2_, 100),
+
+ {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
+ {_, _, F4} = process_ra_events(F4_, 100),
+ {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
+ ra:stop_server(ServerId),
+ ok.
+
+enq_deq_n(N, F0) ->
+ enq_deq_n(N, F0, []).
+
+enq_deq_n(0, F0, Acc) ->
+ {_, _, F} = process_ra_events(F0, 100),
+ {F, Acc};
+enq_deq_n(N, F, Acc) ->
+ {ok, F1} = rabbit_fifo_client:enqueue(N, F),
+ {_, _, F2} = process_ra_events(F1, 10),
+ {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
+
+ {_, _, F4} = process_ra_events(F3, 5),
+ enq_deq_n(N-1, F4, [Deq | Acc]).
+
+conf(ClusterName, UId, ServerId, _, Peers) ->
+ #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{uid => UId},
+ initial_members => Peers,
+ machine => {module, rabbit_fifo, #{}}}.
+
+process_ra_event(State, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ ct:pal("processed ra event ~p~n", [Evt]),
+ {internal, _, _, S} =
+ rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ S
+ after Wait ->
+ exit(ra_event_timeout)
+ end.
+
+process_ra_events(State0, Wait) ->
+ process_ra_events(State0, [], Wait).
+
+process_ra_events(State, Acc, Wait) ->
+ DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
+ S2
+ end,
+ process_ra_events0(State, Acc, [], Wait, DeliveryFun).
+
+process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, Actions, State} ->
+ process_ra_events0(State, Acc, Actions0 ++ Actions,
+ Wait, DeliveryFun);
+ {{delivery, _Tag, Msgs} = Del, State1} ->
+ State = DeliveryFun(Del, State1),
+ process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun);
+ eol ->
+ eol
+ end
+ after Wait ->
+ {Acc, Actions0, State0}
+ end.
+
+discard_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _Actions, State} ->
+ discard_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+return_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _, State} ->
+ return_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+validate_process_down(Name, 0) ->
+ exit({process_not_down, Name});
+validate_process_down(Name, Num) ->
+ case whereis(Name) of
+ undefined ->
+ ok;
+ _ ->
+ timer:sleep(100),
+ validate_process_down(Name, Num-1)
+ end.
+
+start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
+ {ok, Started, _} = ra:start_cluster(ClusterName#resource.name,
+ {module, rabbit_fifo, RaFifoConfig},
+ ServerIds),
+ ?assertEqual(length(Started), length(ServerIds)),
+ ok.
+
+start_cluster(ClusterName, ServerIds) ->
+ start_cluster(ClusterName, ServerIds, #{name => some_name,
+ queue_resource => ClusterName}).
+
+flush() ->
+ receive
+ Msg ->
+ ct:pal("flushed: ~w~n", [Msg]),
+ flush()
+ after 10 ->
+ ok
+ end.
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 437cd02e25..da72c030cd 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -5,8 +5,8 @@
-export([
]).
--include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").
+-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
@@ -35,7 +35,10 @@ all_tests() ->
scenario11,
scenario12,
scenario13,
- scenario14
+ scenario14,
+ scenario15,
+ scenario16,
+ fake_pid
].
groups() ->
@@ -236,26 +239,80 @@ scenario14(_Config) ->
max_bytes => 1}, Commands),
ok.
+scenario15(_Config) ->
+ C1 = {<<>>, c:pid(0,179,1)},
+ E = c:pid(0,176,1),
+ Commands = [make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E, 1, msg1),
+ make_enqueue(E, 2, msg2),
+ make_return(C1, [0]),
+ make_return(C1, [2]),
+ make_settle(C1, [1])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ delivery_limit => 1}, Commands),
+ ok.
+
+scenario16(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ C2 = {<<>>, c:pid(0,882,1)},
+ E = c:pid(0,176,1),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E, 1, msg1),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ {down, C1Pid, noproc}, %% msg1 allocated to C2
+ make_return(C2, [0]), %% msg1 returned
+ make_enqueue(E, 2, <<>>),
+ make_settle(C2, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ delivery_limit => 1}, Commands),
+ ok.
+
+fake_pid(_Config) ->
+ Pid = fake_external_pid(<<"mynode@banana">>),
+ ?assertNotEqual(node(Pid), node()),
+ ?assert(is_pid(Pid)),
+ ok.
+
+fake_external_pid(Node) when is_binary(Node) ->
+ ThisNodeSize = size(term_to_binary(node())) + 1,
+ Pid = spawn(fun () -> ok end),
+ %% drop the local node data from a local pid
+ <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
+ S = size(Node),
+ %% replace it with the incoming node binary
+ Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>,
+ binary_to_term(Final).
+
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer},
- frequency([{10, {0, 0, false}},
- {5, {non_neg_integer(), non_neg_integer(),
- boolean()}}]),
- ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
- collect({Length, Bytes},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
+ frequency([{10, {0, 0, false, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ boolean(),
+ oneof([range(1, 3), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
+ collect({log_size, length(O)},
snapshots_prop(
config(?FUNCTION_NAME,
- Length, Bytes,
- SingleActiveConsumer), O))))
- end, [], 2000).
+ Length,
+ Bytes,
+ SingleActiveConsumer,
+ DeliveryLimit), O))))
+ end, [], 2500).
-config(Name, Length, Bytes, SingleActive) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
- single_active_consumer_on => SingleActive}.
+ single_active_consumer_on => SingleActive,
+ delivery_limit => map_max(DeliveryLimit)}.
map_max(0) -> undefined;
map_max(N) -> N.
@@ -271,8 +328,12 @@ snapshots_prop(Conf, Commands) ->
end.
log_gen(Size) ->
- ?LET(EPids, vector(2, pid_gen()),
- ?LET(CPids, vector(2, pid_gen()),
+ Nodes = [node(),
+ fakenode@fake,
+ fakenode@fake2
+ ],
+ ?LET(EPids, vector(2, pid_gen(Nodes)),
+ ?LET(CPids, vector(2, pid_gen(Nodes)),
resize(Size,
list(
frequency(
@@ -285,15 +346,20 @@ log_gen(Size) ->
{2, checkout_gen(oneof(CPids))},
{1, checkout_cancel_gen(oneof(CPids))},
{1, down_gen(oneof(EPids ++ CPids))},
+ {1, nodeup_gen(Nodes)},
{1, purge}
]))))).
-pid_gen() ->
- ?LET(_, integer(), spawn(fun () -> ok end)).
+pid_gen(Nodes) ->
+ ?LET(Node, oneof(Nodes),
+ fake_external_pid(atom_to_binary(Node, utf8))).
down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
+nodeup_gen(Nodes) ->
+ {nodeup, oneof(Nodes)}.
+
enqueue_gen(Pid) ->
?LET(E, {enqueue, Pid,
frequency([{10, enqueue},
@@ -391,6 +457,8 @@ handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) ->
%% it is either not down or down with noconnection
do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)})
end;
+handle_op({nodeup, _} = Cmd, T) ->
+ do_apply(Cmd, T);
handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
%% this simulates certain settlements arriving out of order
case queue:out(Effs) of
@@ -477,6 +545,7 @@ run_snapshot_test0(Conf, Commands) ->
State = rabbit_fifo:normalize(State0),
[begin
+ % ct:pal("release_cursor: ~b~n", [SnapIdx]),
%% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
@@ -490,6 +559,7 @@ run_snapshot_test0(Conf, Commands) ->
ct:pal("Snapshot tests failed run log:~n"
"~p~n from ~n~p~n Entries~n~p~n",
[Filtered, SnapState, Entries]),
+ ct:pal("Expected~n~p~nGot:~n~p", [State, S]),
?assertEqual(State, S)
end
end || {release_cursor, SnapIdx, SnapState} <- Effects],