diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-26 00:57:49 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-26 00:57:49 +0300 |
| commit | b9873465666d143bd1fc70a828d417ce48b5b1c3 (patch) | |
| tree | 2b1c7da2b56dc86fbc5a267cfb9dd309c4fbea56 | |
| parent | 20d3d6a11dd4836a904cdcb2c7f27b26e2db9611 (diff) | |
| parent | 4fd34a4a05e926a016bf31a1f2e496aa6a172d13 (diff) | |
| download | rabbitmq-server-git-b9873465666d143bd1fc70a828d417ce48b5b1c3.tar.gz | |
Merge pull request #1889 from rabbitmq/poison-handling-qq
Poison handling in quorum queues
| -rw-r--r-- | src/rabbit_dead_letter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 1883 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 170 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 9 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 155 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 1617 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 640 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 104 |
9 files changed, 2383 insertions, 2213 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index e26ea8297b..7b8cffb6fa 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -23,7 +23,7 @@ %%---------------------------------------------------------------------------- --type reason() :: 'expired' | 'rejected' | 'maxlen'. +-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index af3441df86..b06d34e83a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -23,6 +23,7 @@ -compile({no_auto_import, [apply/3]}). -include_lib("ra/include/ra.hrl"). +-include("rabbit_fifo.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -export([ @@ -64,65 +65,6 @@ make_update_config/1 ]). --type raw_msg() :: term(). -%% The raw message. It is opaque to rabbit_fifo. - --type msg_in_id() :: non_neg_integer(). -% a queue scoped monotonically incrementing integer used to enforce order -% in the unassigned messages map - --type msg_id() :: non_neg_integer(). -%% A consumer-scoped monotonically incrementing integer included with a -%% {@link delivery/0.}. Used to settle deliveries using -%% {@link rabbit_fifo_client:settle/3.} - --type msg_seqno() :: non_neg_integer(). -%% A sender process scoped monotonically incrementing integer included -%% in enqueue messages. Used to ensure ordering of messages send from the -%% same process - --type msg_header() :: #{delivery_count => non_neg_integer()}. -%% The message header map: -%% delivery_count: the number of unsuccessful delivery attempts. -%% A non-zero value indicates a previous attempt. - --type msg() :: {msg_header(), raw_msg()}. -%% message with a header map. - --type msg_size() :: non_neg_integer(). -%% the size in bytes of the msg payload - --type indexed_msg() :: {ra_index(), msg()}. - --type prefix_msg() :: {'$prefix_msg', msg_size()}. - --type delivery_msg() :: {msg_id(), msg()}. -%% A tuple consisting of the message id and the headered message. - --type consumer_tag() :: binary(). -%% An arbitrary binary tag used to distinguish between different consumers -%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.} - --type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}. -%% Represents the delivery of one or more rabbit_fifo messages. - --type consumer_id() :: {consumer_tag(), pid()}. -%% The entity that receives messages. Uniquely identifies a consumer. - --type credit_mode() :: simple_prefetch | credited. -%% determines how credit is replenished - --type checkout_spec() :: {once | auto, Num :: non_neg_integer(), - credit_mode()} | - {dequeue, settled | unsettled} | - cancel. - --type consumer_meta() :: #{ack => boolean(), - username => binary(), - prefetch => non_neg_integer(), - args => list()}. -%% static meta data associated with a consumer - %% command records representing all the protocol actions that are supported -record(enqueue, {pid :: maybe(pid()), seq :: maybe(msg_seqno()), @@ -143,8 +85,6 @@ -record(purge, {}). -record(update_config, {config :: config()}). - - -opaque protocol() :: #enqueue{} | #checkout{} | @@ -154,117 +94,13 @@ #credit{} | #purge{} | #update_config{}. - -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo -type client_msg() :: delivery(). %% the messages `rabbit_fifo' can send to consumers. --type applied_mfa() :: {module(), atom(), list()}. -% represents a partially applied module call - --define(RELEASE_CURSOR_EVERY, 64000). --define(USE_AVG_HALF_LIFE, 10000.0). - --record(consumer, - {meta = #{} :: consumer_meta(), - checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, - next_msg_id = 0 :: msg_id(), % part of snapshot data - %% max number of messages that can be sent - %% decremented for each delivery - credit = 0 : non_neg_integer(), - %% total number of checked out messages - ever - %% incremented for each delivery - delivery_count = 0 :: non_neg_integer(), - %% the mode of how credit is incremented - %% simple_prefetch: credit is re-filled as deliveries are settled - %% or returned. - %% credited: credit can only be changed by receiving a consumer_credit - %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' - credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data - lifetime = once :: once | auto, - status = up :: up | suspected_down | cancelled - }). - --type consumer() :: #consumer{}. - --record(enqueuer, - {next_seqno = 1 :: msg_seqno(), - % out of order enqueues - sorted list - pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], - status = up :: up | suspected_down - }). - --record(state, - {name :: atom(), - queue_resource :: rabbit_types:r('queue'), - release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), - % unassigned messages - messages = #{} :: #{msg_in_id() => indexed_msg()}, - % defines the lowest message in id available in the messages map - % that isn't a return - low_msg_num :: msg_in_id() | undefined, - % defines the next message in id to be added to the messages map - next_msg_num = 1 :: msg_in_id(), - % list of returned msg_in_ids - when checking out it picks from - % this list first before taking low_msg_num - returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | - {msg_in_id(), indexed_msg()}), - % a counter of enqueues - used to trigger shadow copy points - enqueue_count = 0 :: non_neg_integer(), - % a map containing all the live processes that have ever enqueued - % a message to this queue as well as a cached value of the smallest - % ra_index of all pending enqueues - enqueuers = #{} :: #{pid() => #enqueuer{}}, - % master index of all enqueue raft indexes including pending - % enqueues - % rabbit_fifo_index can be slow when calculating the smallest - % index when there are large gaps but should be faster than gb_trees - % for normal appending operations as it's backed by a map - ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), - release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, - ra_index(), state()}), - % consumers need to reflect consumer state at time of snapshot - % needs to be part of snapshot - consumers = #{} :: #{consumer_id() => #consumer{}}, - % consumers that require further service are queued here - % needs to be part of snapshot - service_queue = queue:new() :: queue:queue(consumer_id()), - dead_letter_handler :: maybe(applied_mfa()), - become_leader_handler :: maybe(applied_mfa()), - %% This is a special field that is only used for snapshots - %% It represents the queued messages at the time the - %% dehydrated snapshot state was cached. - %% As release_cursors are only emitted for raft indexes where all - %% prior messages no longer contribute to the current state we can - %% replace all message payloads with their sizes (to be used for - %% overflow calculations). - %% This is done so that consumers are still served in a deterministic - %% order on recovery. - prefix_msgs = {[], []} :: {Return :: [msg_size()], - PrefixMsgs :: [msg_size()]}, - msg_bytes_enqueue = 0 :: non_neg_integer(), - msg_bytes_checkout = 0 :: non_neg_integer(), - max_length :: maybe(non_neg_integer()), - max_bytes :: maybe(non_neg_integer()), - %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, - %% waiting consumers, one is picked active consumer is cancelled or dies - %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}] - }). - --opaque state() :: #state{}. - --type config() :: #{name := atom(), - queue_resource := rabbit_types:r('queue'), - dead_letter_handler => applied_mfa(), - become_leader_handler => applied_mfa(), - release_cursor_interval => non_neg_integer(), - max_length => non_neg_integer(), - max_bytes => non_neg_integer(), - single_active_consumer_on => boolean()}. +-opaque state() :: #?MODULE{}. -export_type([protocol/0, delivery/0, @@ -284,8 +120,8 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_config(Conf, #state{name = Name, - queue_resource = Resource}). + update_config(Conf, #?MODULE{cfg = #cfg{name = Name, + resource = Resource}}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), @@ -293,18 +129,21 @@ update_config(Conf, State) -> SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), + DeliveryLimit = maps:get(delivery_limit, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; false -> default end, - State#state{dead_letter_handler = DLH, - become_leader_handler = BLH, - release_cursor_interval = SHI, - max_length = MaxLength, - max_bytes = MaxBytes, - consumer_strategy = ConsumerStrategy}. + Cfg = State#?MODULE.cfg, + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, + dead_letter_handler = DLH, + become_leader_handler = BLH, + max_length = MaxLength, + max_bytes = MaxBytes, + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}}. zero(_) -> 0. @@ -319,7 +158,7 @@ apply(Metadata, #enqueue{pid = From, seq = Seq, apply_enqueue(Metadata, From, Seq, RawMsg, State00); apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, - #state{consumers = Cons0} = State) -> + #?MODULE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot @@ -331,7 +170,7 @@ apply(Meta, end; apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - #state{consumers = Cons0} = State0) -> + #?MODULE{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), @@ -342,7 +181,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, {State0, ok} end; apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, - #state{consumers = Cons0} = State) -> + #?MODULE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), @@ -354,7 +193,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, - #state{consumers = Cons0, + #?MODULE{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> @@ -366,9 +205,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(Meta, State0#state{service_queue = ServiceQueue, - consumers = Cons}, []), - Response = {send_credit_reply, maps:size(State1#state.messages)}, + checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, + consumers = Cons}, []), + Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of @@ -377,16 +216,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = - maps:get(ConsumerId, State1#state.consumers), + maps:get(ConsumerId, State1#?MODULE.consumers), %% add the outstanding credit to the delivery count DeliveryCount = Con#consumer.delivery_count + PostCred, Consumers = maps:put(ConsumerId, Con#consumer{delivery_count = DeliveryCount, credit = 0}, - State1#state.consumers), + State1#?MODULE.consumers), Drained = Con#consumer.credit, {CTag, _} = ConsumerId, - {State1#state{consumers = Consumers}, + {State1#?MODULE{consumers = Consumers}, %% returning a multi response with two client actions %% for the channel to execute {multi, [Response, {send_drained, [{CTag, Drained}]}]}, @@ -399,7 +238,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, apply(Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, - #state{consumers = Consumers} = State0) -> + #?MODULE{consumers = Consumers} = State0) -> Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of 0 -> @@ -434,9 +273,9 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, - #state{ra_indexes = Indexes0, - returns = Returns, - messages = Messages} = State0) -> + #?MODULE{ra_indexes = Indexes0, + returns = Returns, + messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, [I || {I, _} <- lists:sort(maps:values(Messages))]), @@ -444,20 +283,20 @@ apply(#{index := RaftIdx}, #purge{}, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = update_smallest_raft_index(RaftIdx, - State0#state{ra_indexes = Indexes, - messages = #{}, - returns = lqueue:new(), - msg_bytes_enqueue = 0, - prefix_msgs = {[], []}, - low_msg_num = undefined}, + State0#?MODULE{ra_indexes = Indexes, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + prefix_msgs = {[], []}, + low_msg_num = undefined}, []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, - #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), % mark all consumers and enqueuers as suspected down @@ -468,10 +307,10 @@ apply(_, {down, ConsumerPid, noconnection}, #consumer{checked_out = Checked0} = C, {Co, St0, Eff}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled)-> - St = return_all(St0, Checked0), + {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff), + suspected_down, Eff0), {maps:put(K, C#consumer{status = suspected_down, credit = Credit, @@ -495,17 +334,17 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end ++ Effects1, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; -apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + {State#?MODULE{consumers = Cons, enqueuers = Enqs, + waiting_consumers = WaitingConsumers}, ok, Effects2}; +apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of {#enqueuer{pending = Pend}, Enqs} -> lists:foldl(fun ({_, RIdx, RawMsg}, S) -> enqueue(RIdx, RawMsg, S) - end, State0#state{enqueuers = Enqs}, Pend); + end, State0#?MODULE{enqueuers = Enqs}, Pend); error -> State0 end, @@ -518,9 +357,9 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); -apply(Meta, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> +apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -549,34 +388,34 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), - checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = WaitingConsumers}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#state{consumer_strategy = default}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; -consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) -> fun(_, _, _, _, _, Effects) -> Effects end. handle_waiting_consumer_down(_Pid, - #state{consumer_strategy = default} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, - #state{consumer_strategy = single_active, - waiting_consumers = []} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), @@ -587,20 +426,20 @@ handle_waiting_consumer_down(Pid, % update state to have only up waiting consumers StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), - State = State0#state{waiting_consumers = StillUp}, + State = State0#?MODULE{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #state{consumer_strategy = default}, +update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}}, _Status) -> []; update_waiting_consumer_status(_Node, - #state{consumer_strategy = single_active, - waiting_consumers = []}, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = []}, _Status) -> []; update_waiting_consumer_status(Node, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers}, Status) -> [begin case node(P) of @@ -613,12 +452,13 @@ update_waiting_consumer_status(Node, Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #state{consumers = Cons, - enqueuers = Enqs, - waiting_consumers = WaitingConsumers, - name = Name, - prefix_msgs = {[], []}, - become_leader_handler = BLH}) -> +state_enter(leader, #?MODULE{consumers = Cons, + enqueuers = Enqs, + waiting_consumers = WaitingConsumers, + cfg = #cfg{name = Name, + become_leader_handler = BLH}, + prefix_msgs = {[], []} + }) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)] @@ -633,13 +473,13 @@ state_enter(leader, #state{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts}) +state_enter(recovered, #?MODULE{prefix_msgs = PrefixMsgCounts}) when PrefixMsgCounts =/= {[], []} -> %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts}); -state_enter(eol, #state{enqueuers = Enqs, - consumers = Custs0, - waiting_consumers = WaitingConsumers0}) -> +state_enter(eol, #?MODULE{enqueuers = Enqs, + consumers = Custs0, + waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), @@ -652,10 +492,10 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(_Ts, #state{name = Name, - queue_resource = QName, - msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> +tick(_Ts, #?MODULE{cfg = #cfg{name = Name, + resource = QName}, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, messages_ready(State), num_checked_out(State), % checked out @@ -667,11 +507,11 @@ tick(_Ts, #state{name = Name, handle_tick, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). -overview(#state{consumers = Cons, - enqueuers = Enqs, - release_cursors = Cursors, - msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> +overview(#?MODULE{consumers = Cons, + enqueuers = Enqs, + release_cursors = Cursors, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -684,7 +524,7 @@ overview(#state{consumers = Cons, -spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> [delivery_msg()]. -get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> +get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> [{K, snd(snd(maps:get(K, Checked)))} @@ -718,7 +558,7 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) -> query_messages_ready(State) -> messages_ready(State). -query_messages_checked_out(#state{consumers = Consumers}) -> +query_messages_checked_out(#?MODULE{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> maps:size(C) + S end, 0, Consumers). @@ -726,21 +566,21 @@ query_messages_checked_out(#state{consumers = Consumers}) -> query_messages_total(State) -> messages_total(State). -query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> +query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), maps:keys(maps:merge(Enqs, Cons)). -query_ra_indexes(#state{ra_indexes = RaIndexes}) -> +query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#state{consumers = Consumers, - waiting_consumers = WaitingConsumers}) -> +query_consumer_count(#?MODULE{consumers = Consumers, + waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). -query_consumers(#state{consumers = Consumers, - waiting_consumers = WaitingConsumers, - consumer_strategy = ConsumerStrategy } = State) -> +query_consumers(#?MODULE{consumers = Consumers, + waiting_consumers = WaitingConsumers, + cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> @@ -798,8 +638,8 @@ query_consumers(#state{consumers = Consumers, end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#state{consumer_strategy = single_active, - consumers = Consumers}) -> +query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, + consumers = Consumers}) -> case maps:size(Consumers) of 0 -> {error, no_value}; @@ -812,7 +652,7 @@ query_single_active_consumer(#state{consumer_strategy = single_active, query_single_active_consumer(_) -> disabled. -query_stat(#state{consumers = Consumers} = State) -> +query_stat(#?MODULE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. -spec usage(atom()) -> float(). @@ -824,15 +664,15 @@ usage(Name) when is_atom(Name) -> %%% Internal -messages_ready(#state{messages = M, - prefix_msgs = {PreR, PreM}, - returns = R}) -> +messages_ready(#?MODULE{messages = M, + prefix_msgs = {PreR, PreM}, + returns = R}) -> %% TODO: optimise to avoid length/1 call maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). -messages_total(#state{ra_indexes = I, - prefix_msgs = {PreR, PreM}}) -> +messages_total(#?MODULE{ra_indexes = I, + prefix_msgs = {PreR, PreM}}) -> rabbit_fifo_index:size(I) + length(PreR) + length(PreM). update_use({inactive, _, _, _} = CUInfo, inactive) -> @@ -863,25 +703,25 @@ moving_average(Time, HalfLife, Next, Current) -> Weight = math:exp(Time * math:log(0.5) / HalfLife), Next * (1 - Weight) + Current * Weight. -num_checked_out(#state{consumers = Cons}) -> +num_checked_out(#?MODULE{consumers = Cons}) -> lists:foldl(fun (#consumer{checked_out = C}, Acc) -> maps:size(C) + Acc end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects, Reason) -> + #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> %% general case, single active consumer off cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, - #state{consumer_strategy = single_active, - waiting_consumers = []} = State, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, - #state{consumers = Cons0, - consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of @@ -894,18 +734,18 @@ cancel_consumer(ConsumerId, % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, - #state{service_queue = ServiceQueue} = State1, + #?MODULE{service_queue = ServiceQueue} = State1, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State = State1#state{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#state.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, + State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, + NewActiveConsumer, + State1#?MODULE.consumers), + service_queue = ServiceQueue1, + waiting_consumers = RemainingWaitingConsumers}, Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), + NewActiveConsumer, true, + single_active, Effects2), {State, Effects}; error -> % The cancelled consumer is not the active one @@ -915,10 +755,10 @@ cancel_consumer(ConsumerId, Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#state{waiting_consumers = WaitingConsumers}, Effects} + {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects} end. -consumer_update_active_effects(#state{queue_resource = QName }, +consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, ConsumerId, #consumer{meta = Meta}, Active, ActivityStatus, Effects) -> @@ -931,13 +771,13 @@ consumer_update_active_effects(#state{queue_resource = QName }, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> +cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of {Consumer, Cons1} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), Effects = cancel_consumer_effects(ConsumerId, S, Effects2), - case maps:size(S#state.consumers) of + case maps:size(S#?MODULE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; _ -> @@ -948,9 +788,10 @@ cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. -maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, - #state{consumers = C0, - service_queue = SQ0} = S0, Effects0, Reason) -> +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, + Cons1, #?MODULE{consumers = C0, + service_queue = SQ0} = S0, + Effects0, Reason) -> case Reason of consumer_cancel -> {Cons, SQ, Effects1} = @@ -959,69 +800,71 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 credit = 0, status = cancelled}, C0, SQ0, Effects0), - {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; + {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1}; down -> - S1 = return_all(S0, Checked0), - {S1#state{consumers = Cons1}, Effects0} + {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, + Consumer), + {S1#?MODULE{consumers = Cons1}, Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> - Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> - State2 = append_to_master_index(RaftIdx, - add_bytes_enqueue(Bytes, State1)), + State2 = append_to_master_index(RaftIdx, State1), {State, ok, Effects} = checkout(Meta, State2, Effects1), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} end. -drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> +drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), Bytes = message_size(Msg), - State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}), - Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}), + State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; - {{'$prefix_msg', Bytes}, State1} -> + {{'$prefix_msg', #{size := Bytes}}, State1} -> State = add_bytes_drop(Bytes, State1), {State, Effects0}; empty -> {State0, Effects0} end. -enqueue(RaftIdx, RawMsg, #state{messages = Messages, +enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> - Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map - State0#state{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + Size = message_size(RawMsg), + Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map + State = add_bytes_enqueue(Size, State0), + State#?MODULE{messages = Messages#{NextMsgNum => Msg}, + % this is probably only done to record it when low_msg_num + % is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, - #state{ra_indexes = Indexes0} = State0) -> + #?MODULE{ra_indexes = Indexes0} = State0) -> State = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - State#state{ra_indexes = Indexes}. + State#?MODULE{ra_indexes = Indexes}. -incr_enqueue_count(#state{enqueue_count = C, - release_cursor_interval = C} = State0) -> + +incr_enqueue_count(#?MODULE{enqueue_count = C, + cfg = #cfg{release_cursor_interval = C}} = State0) -> % this will trigger a dehydrated version of the state to be stored % at this raft index for potential future snapshot generation - State0#state{enqueue_count = 0}; -incr_enqueue_count(#state{enqueue_count = C} = State) -> - State#state{enqueue_count = C + 1}. + State0#?MODULE{enqueue_count = 0}; +incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> + State#?MODULE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #state{ra_indexes = Indexes, - enqueue_count = 0, - release_cursors = Cursors} = State) -> + #?MODULE{ra_indexes = Indexes, + enqueue_count = 0, + release_cursors = Cursors} = State) -> case rabbit_fifo_index:exists(RaftIdx, Indexes) of false -> %% the incoming enqueue must already have been dropped @@ -1029,7 +872,7 @@ maybe_store_dehydrated_state(RaftIdx, true -> Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, - State#state{release_cursors = lqueue:in(Cursor, Cursors)} + State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)} end; maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1041,18 +884,18 @@ enqueue_pending(From, State = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) -> - State#state{enqueuers = Enqueuers0#{From => Enq}}. +enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> + State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #state{enqueuers = Enqueuers0} = State0) -> + #?MODULE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> - State1 = State0#state{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, State1), {ok, State, [{monitor, process, From} | Effects]}; @@ -1068,7 +911,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, % out of order delivery Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - {ok, State0#state{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; + {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> % duplicate delivery - remove the raft index from the ra_indexes % map as it was added earlier @@ -1079,25 +922,28 @@ snd(T) -> element(2, T). return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, - Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> + Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) -> Con = Con0#consumer{checked_out = Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, + {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) -> - return_one(0, Msg, S0); - ({MsgNum, Msg}, S0) -> - return_one(MsgNum, Msg, S0) - end, State0, MsgNumMsgs), - checkout(Meta, State1#state{consumers = Cons, - service_queue = SQ}, - Effects). + {State1, Effects2} = lists:foldl( + fun({'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0, + ConsumerId, Con); + ({MsgNum, Msg}, {S0, E0}) -> + return_one(MsgNum, Msg, S0, E0, + ConsumerId, Con) + end, {State0, Effects1}, MsgNumMsgs), + checkout(Meta, State1#?MODULE{consumers = Cons, + service_queue = SQ}, + Effects2). % used to processes messages that are finished complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Con0, Checked, Effects0, - #state{consumers = Cons0, service_queue = SQ0, - ra_indexes = Indexes0} = State0) -> + #?MODULE{consumers = Cons0, service_queue = SQ0, + ra_indexes = Indexes0} = State0) -> %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = Checked, @@ -1106,9 +952,9 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), - {State0#state{consumers = Cons, - ra_indexes = Indexes, - service_queue = SQ}, Effects}. + {State0#?MODULE{consumers = Cons, + ra_indexes = Indexes, + service_queue = SQ}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -1143,43 +989,44 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, - #state{dead_letter_handler = undefined}, + #?MODULE{cfg = #cfg{dead_letter_handler = undefined}}, Effects) -> Effects; dead_letter_effects(Reason, Discarded, - #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, - % MsgId, MsgIdID, RaftId, Header - Acc) -> [{Reason, Msg} | Acc] + #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + Effects) -> + DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc] end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. -cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> +cancel_consumer_effects(ConsumerId, + #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. update_smallest_raft_index(IncomingRaftIdx, - #state{ra_indexes = Indexes, - release_cursors = Cursors0} = State0, + #?MODULE{ra_indexes = Indexes, + release_cursors = Cursors0} = State0, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 -> % there are no messages on queue anymore and no pending enqueues % we can forward release_cursor all the way until % the last received command, hooray - State = State0#state{release_cursors = lqueue:new()}, + State = State0#?MODULE{release_cursors = lqueue:new()}, {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]}; _ -> Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#state{release_cursors = Cursors}, + {State0#?MODULE{release_cursors = Cursors}, ok, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor we've passed the smallest %% release cursor available. - {State0#state{release_cursors = Cursors}, ok, + {State0#?MODULE{release_cursors = Cursors}, ok, [Cursor | Effects]} end end. @@ -1196,36 +1043,68 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0) -> - add_bytes_return(Msg, - State0#state{returns = lqueue:in(Msg, Returns)}); +return_one(0, {'$prefix_msg', Header0}, + #?MODULE{returns = Returns, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + Effects0, ConsumerId, Con) -> + Header = maps:update_with(delivery_count, + fun (C) -> C+1 end, + 1, Header0), + Msg = {'$prefix_msg', Header}, + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + Checked = Con#consumer.checked_out, + {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, + Effects0, State0), + {add_bytes_settle(Msg, State1), Effects}; + _ -> + %% this should not affect the release cursor in any way + {add_bytes_return(Msg, + State0#?MODULE{returns = lqueue:in(Msg, Returns)}), + Effects0} + end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, - #state{returns = Returns} = State0) -> + #?MODULE{returns = Returns, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, - % this should not affect the release cursor in any way - add_bytes_return(RawMsg, - State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}). + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + DlMsg = {MsgNum, Msg}, + Effects = dead_letter_effects(delivery_limit, + #{none => DlMsg}, + State0, Effects0), + Checked = Con#consumer.checked_out, + {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, + Effects, State0), + {add_bytes_settle(RawMsg, State1), Effects1}; + _ -> + %% this should not affect the release cursor in any way + {add_bytes_return(RawMsg, + State0#?MODULE{returns = + lqueue:in({MsgNum, Msg}, Returns)}), + Effects0} + end. -return_all(State0, Checked0) -> +return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) -> - return_one(0, Msg, S); - ({_, {MsgNum, Msg}}, S) -> - return_one(MsgNum, Msg, S) - end, State0, Checked). + lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(0, Msg, S, E, ConsumerId, Consumer); + ({_, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) + end, {State0, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, #{}), - case evaluate_limit(State0#state.ra_indexes, false, + case evaluate_limit(State0#?MODULE.ra_indexes, false, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); @@ -1250,8 +1129,8 @@ checkout0({Activity, State0}, Effects0, Acc) -> {State0, ok, lists:reverse(Effects1)}. evaluate_limit(_OldIndexes, Result, - #state{max_length = undefined, - max_bytes = undefined} = State, + #?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; evaluate_limit(OldIndexes, Result, @@ -1280,20 +1159,20 @@ append_send_msg_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) -> +take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first - {{'$prefix_msg', Bytes}, - State#state{prefix_msgs = {Rem, P}}}; -take_next_msg(#state{returns = Returns, - low_msg_num = Low0, - messages = Messages0, - prefix_msgs = {R, P}} = State) -> + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {Rem, P}}}; +take_next_msg(#?MODULE{returns = Returns, + low_msg_num = Low0, + messages = Messages0, + prefix_msgs = {R, P}} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue case lqueue:peek(Returns) of {value, NextMsg} -> {NextMsg, - State#state{returns = lqueue:drop(Returns)}}; + State#?MODULE{returns = lqueue:drop(Returns)}}; empty when P == [] -> case Low0 of undefined -> @@ -1303,27 +1182,27 @@ take_next_msg(#state{returns = Returns, case maps:size(Messages) of 0 -> {{Low0, Msg}, - State#state{messages = Messages, - low_msg_num = undefined}}; + State#?MODULE{messages = Messages, + low_msg_num = undefined}}; _ -> {{Low0, Msg}, - State#state{messages = Messages, - low_msg_num = Low0 + 1}} + State#?MODULE{messages = Messages, + low_msg_num = Low0 + 1}} end end; empty -> - [Bytes | Rem] = P, + [Header | Rem] = P, %% There are prefix msgs - {{'$prefix_msg', Bytes}, - State#state{prefix_msgs = {R, Rem}}} + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}} end. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. -checkout_one(#state{service_queue = SQ0, - messages = Messages0, - consumers = Cons0} = InitState) -> +checkout_one(#?MODULE{service_queue = SQ0, + messages = Messages0, + consumers = Cons0} = InitState) -> case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of @@ -1336,11 +1215,11 @@ checkout_one(#state{service_queue = SQ0, %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue - checkout_one(InitState#state{service_queue = SQ1}); + checkout_one(InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{status = cancelled}} -> - checkout_one(InitState#state{service_queue = SQ1}); + checkout_one(InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{status = suspected_down}} -> - checkout_one(InitState#state{service_queue = SQ1}); + checkout_one(InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1353,8 +1232,8 @@ checkout_one(#state{service_queue = SQ0, {Cons, SQ, []} = % we expect no effects update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), - State1 = State0#state{service_queue = SQ, - consumers = Cons}, + State1 = State0#?MODULE{service_queue = SQ, + consumers = Cons}, {State, Msg} = case ConsumerMsg of {'$prefix_msg', _} -> @@ -1367,7 +1246,7 @@ checkout_one(#state{service_queue = SQ0, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse - checkout_one(InitState#state{service_queue = SQ1}) + checkout_one(InitState#?MODULE{service_queue = SQ1}) end; empty -> {nochange, InitState} @@ -1417,28 +1296,28 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #state{consumer_strategy = default} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, - #state{consumers = Cons0, - consumer_strategy = single_active} = State0) + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], - State0#state{waiting_consumers = WaitingConsumers1}. + State0#?MODULE{waiting_consumers = WaitingConsumers1}. update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + #?MODULE{consumers = Cons0, + service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, @@ -1453,7 +1332,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#state{consumers = Cons, service_queue = ServiceQueue}. + State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0) -> @@ -1468,52 +1347,52 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#state{messages = Messages, - consumers = Consumers, - returns = Returns, - prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> +dehydrate_state(#?MODULE{messages = Messages, + consumers = Consumers, + returns = Returns, + prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> %% TODO: optimise this function as far as possible - PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) -> - [Bytes | Acc]; - ({_, {_, {_, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> + [Header | Acc]; + ({_, {_, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header| Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), - State#state{messages = #{}, - ra_indexes = rabbit_fifo_index:empty(), - release_cursors = lqueue:new(), - low_msg_num = undefined, - consumers = maps:map(fun (_, C) -> - dehydrate_consumer(C) - end, Consumers), - returns = lqueue:new(), - prefix_msgs = {lists:reverse(PrefRet), - lists:reverse(PrefMsgs)}}. + State#?MODULE{messages = #{}, + ra_indexes = rabbit_fifo_index:empty(), + release_cursors = lqueue:new(), + low_msg_num = undefined, + consumers = maps:map(fun (_, C) -> + dehydrate_consumer(C) + end, Consumers), + returns = lqueue:new(), + prefix_msgs = {lists:reverse(PrefRet), + lists:reverse(PrefMsgs)}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; - (_, {_, {_, {_, Raw}}}) -> - {'$prefix_msg', message_size(Raw)} + (_, {_, {_, {Header, _}}}) -> + {'$prefix_msg', Header} end, Checked0), Con#consumer{checked_out = Checked}. %% make the state suitable for equality comparison -normalize(#state{release_cursors = Cursors} = State) -> - State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. +normalize(#?MODULE{release_cursors = Cursors} = State) -> + State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. -is_over_limit(#state{max_length = undefined, - max_bytes = undefined}) -> +is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}) -> false; -is_over_limit(#state{max_length = MaxLength, - max_bytes = MaxBytes, - msg_bytes_enqueue = BytesEnq} = State) -> +is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, + max_bytes = MaxBytes}, + msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). @@ -1553,32 +1432,32 @@ make_purge() -> #purge{}. make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> - State#state{msg_bytes_enqueue = Enqueue + Bytes}. +add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> + State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}. -add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> - State#state{msg_bytes_enqueue = Enqueue - Bytes}. +add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> + State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout, +add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> Bytes = message_size(Msg), - State#state{msg_bytes_checkout = Checkout + Bytes, - msg_bytes_enqueue = Enqueue - Bytes}. + State#?MODULE{msg_bytes_checkout = Checkout + Bytes, + msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) -> +add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) -> Bytes = message_size(Msg), - State#state{msg_bytes_checkout = Checkout - Bytes}. + State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. -add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue} = State) -> +add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), - State#state{msg_bytes_checkout = Checkout - Bytes, - msg_bytes_enqueue = Enqueue + Bytes}. + State#?MODULE{msg_bytes_checkout = Checkout - Bytes, + msg_bytes_enqueue = Enqueue + Bytes}. message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); -message_size({'$prefix_msg', B}) -> +message_size({'$prefix_msg', #{size := B}}) -> B; message_size(B) when is_binary(B) -> byte_size(B); @@ -1586,9 +1465,9 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). -suspected_pids_for(Node, #state{consumers = Cons0, - enqueuers = Enqs0, - waiting_consumers = WaitingConsumers0}) -> +suspected_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; @@ -1605,1137 +1484,3 @@ suspected_pids_for(Node, #state{consumers = Cons0, [P | Acc]; (_, Acc) -> Acc end, Enqs, WaitingConsumers0). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - --define(ASSERT_EFF(EfxPat, Effects), - ?ASSERT_EFF(EfxPat, true, Effects)). - --define(ASSERT_EFF(EfxPat, Guard, Effects), - ?assert(lists:any(fun (EfxPat) when Guard -> true; - (_) -> false - end, Effects))). - --define(ASSERT_NO_EFF(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). - --define(assertNoEffect(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). - -test_init(Name) -> - init(#{name => Name, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(Name, utf8)), - release_cursor_interval => 0}). - -% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo" - -enq_enq_checkout_test() -> - Cid = {<<"enq_enq_checkout_test">>, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - {_State3, _, Effects} = - apply(meta(3), - make_checkout(Cid, {once, 2, simple_prefetch}, #{}), - State2), - ?ASSERT_EFF({monitor, _, _}, Effects), - ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), - ok. - -credit_enq_enq_checkout_settled_credit_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - {State3, _, Effects} = - apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2), - ?ASSERT_EFF({monitor, _, _}, Effects), - Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; - (_) -> false - end, Effects), - ?assertEqual(1, length(Deliveries)), - %% settle the delivery this should _not_ result in further messages being - %% delivered - {State4, SettledEffects} = settle(Cid, 4, 1, State3), - ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> - true; - (_) -> false - end, SettledEffects)), - %% granting credit (3) should deliver the second msg if the receivers - %% delivery count is (1) - {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4), - % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]), - ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects), - {_State6, FinalEffects} = enq(6, 3, third, State5), - ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> - true; - (_) -> false - end, FinalEffects)), - ok. - -credit_with_drained_test() -> - Cid = {?FUNCTION_NAME, self()}, - State0 = test_init(test), - %% checkout with a single credit - {State1, _, _} = - apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}), - State0), - ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, - delivery_count = 0}}}, - State1), - {State, Result, _} = - apply(meta(3), make_credit(Cid, 0, 5, true), State1), - ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, - delivery_count = 5}}}, - State), - ?assertEqual({multi, [{send_credit_reply, 0}, - {send_drained, [{?FUNCTION_NAME, 5}]}]}, - Result), - ok. - -credit_and_drain_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - %% checkout without any initial credit (like AMQP 1.0 would) - {State3, _, CheckEffs} = - apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}), - State2), - - ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), - {State4, {multi, [{send_credit_reply, 0}, - {send_drained, [{?FUNCTION_NAME, 2}]}]}, - Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3), - ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, - delivery_count = 4}}}, - State4), - - ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, - {_, {_, second}}]}, _}, Effects), - {_State5, EnqEffs} = enq(5, 2, third, State4), - ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), - ok. - - - -enq_enq_deq_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - % get returns a reply value - NumReady = 1, - {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = - apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - State2), - ok. - -enq_enq_deq_deq_settle_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - % get returns a reply value - {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = - apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - State2), - {_State4, {dequeue, empty}} = - apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), - State3), - ok. - -enq_enq_checkout_get_settled_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - % get returns a reply value - {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - State1), - ok. - -checkout_get_empty_test() -> - Cid = {?FUNCTION_NAME, self()}, - State = test_init(test), - {_State2, {dequeue, empty}} = - apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State), - ok. - -untracked_enq_deq_test() -> - Cid = {?FUNCTION_NAME, self()}, - State0 = test_init(test), - {State1, _, _} = apply(meta(1), - make_enqueue(undefined, undefined, first), - State0), - {_State2, {dequeue, {0, {_, first}}, _}, _} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), - ok. - -release_cursor_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _} = enq(2, 2, second, State1), - {State3, _} = check(Cid, 3, 10, State2), - % no release cursor effect at this point - {State4, _} = settle(Cid, 4, 1, State3), - {_Final, Effects1} = settle(Cid, 5, 0, State4), - % empty queue forwards release cursor all the way - ?ASSERT_EFF({release_cursor, 5, _}, Effects1), - ok. - -checkout_enq_settle_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), - {State2, Effects0} = enq(2, 1, first, State1), - ?ASSERT_EFF({send_msg, _, - {delivery, ?FUNCTION_NAME, - [{0, {_, first}}]}, _}, - Effects0), - {State3, [_Inactive]} = enq(3, 2, second, State2), - {_, _Effects} = settle(Cid, 4, 0, State3), - % the release cursor is the smallest raft index that does not - % contribute to the state of the application - % ?ASSERT_EFF({release_cursor, 2, _}, Effects), - ok. - -out_of_order_enqueue_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), - {State2, Effects2} = enq(2, 1, first, State1), - ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), - % assert monitor was set up - ?ASSERT_EFF({monitor, _, _}, Effects2), - % enqueue seq num 3 and 4 before 2 - {State3, Effects3} = enq(3, 3, third, State2), - ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3), - {State4, Effects4} = enq(4, 4, fourth, State3), - % assert no further deliveries where made - ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4), - {_State5, Effects5} = enq(5, 2, second, State4), - % assert two deliveries were now made - ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}}, - {_, {_, third}}, - {_, {_, fourth}}]}, _}, - Effects5), - ok. - -out_of_order_first_enqueue_test() -> - Cid = {?FUNCTION_NAME, self()}, - {State1, _} = check_n(Cid, 5, 5, test_init(test)), - {_State2, Effects2} = enq(2, 10, first, State1), - ?ASSERT_EFF({monitor, process, _}, Effects2), - ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, - Effects2), - ok. - -duplicate_enqueue_test() -> - Cid = {<<"duplicate_enqueue_test">>, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), - {State2, Effects2} = enq(2, 1, first, State1), - ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), - {_State3, Effects3} = enq(3, 1, first, State2), - ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), - ok. - -return_non_existent_test() -> - Cid = {<<"cid">>, self()}, - {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), - % return non-existent - {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0), - ok. - -return_checked_out_test() -> - Cid = {<<"cid">>, self()}, - {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, [_Monitor, - {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} | _ ]} = check_auto(Cid, 2, State0), - % returning immediately checks out the same message again - {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, - {aux, active}]} = - apply(meta(3), make_return(Cid, [MsgId]), State1), - ok. - -return_auto_checked_out_test() -> - Cid = {<<"cid">>, self()}, - {State00, [_, _]} = enq(1, 1, first, test_init(test)), - {State0, [_]} = enq(2, 2, second, State00), - % it first active then inactive as the consumer took on but cannot take - % any more - {State1, [_Monitor, - {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, - {aux, active}, - {aux, inactive} - ]} = check_auto(Cid, 2, State0), - % return should include another delivery - {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1), - ?ASSERT_EFF({send_msg, _, - {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, - Effects), - ok. - - -cancelled_checkout_out_test() -> - Cid = {<<"cid">>, self()}, - {State00, [_, _]} = enq(1, 1, first, test_init(test)), - {State0, [_]} = enq(2, 2, second, State00), - {State1, _} = check_auto(Cid, 2, State0), - % cancelled checkout should not return pending messages to queue - {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(1, maps:size(State2#state.messages)), - ?assertEqual(0, lqueue:len(State2#state.returns)), - - {State3, {dequeue, empty}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), - %% settle - {State4, ok, _} = - apply(meta(4), make_settle(Cid, [0]), State3), - - {_State, {dequeue, {_, {_, second}}, _}, _} = - apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4), - ok. - -down_with_noproc_consumer_returns_unsettled_test() -> - Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, - {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), - {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), - {_State, Effects} = check(Cid, 4, State2), - ?ASSERT_EFF({monitor, process, _}, Effects), - ok. - -down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> - Pid = spawn(fun() -> ok end), - Cid = {<<"down_with_noconnect">>, Pid}, - Self = self(), - Node = node(Pid), - {State0, Effects0} = enq(1, 1, second, test_init(test)), - ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), - {State1, Effects1} = check_auto(Cid, 2, State0), - #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), - ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), - % monitor both enqueuer and consumer - % because we received a noconnection we now need to monitor the node - {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), - %% validate consumer has credit - {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), - ?ASSERT_EFF({monitor, node, _}, Effects2), - ?assertNoEffect({demonitor, process, _}, Effects2), - % when the node comes up we need to retry the process monitors for the - % disconnected processes - {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), - % try to re-monitor the suspect processes - ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), - ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), - ok. - -down_with_noconnection_returns_unack_test() -> - Pid = spawn(fun() -> ok end), - Cid = {<<"down_with_noconnect">>, Pid}, - {State0, _} = enq(1, 1, second, test_init(test)), - ?assertEqual(1, maps:size(State0#state.messages)), - ?assertEqual(0, lqueue:len(State0#state.returns)), - {State1, {_, _}} = deq(2, Cid, unsettled, State0), - ?assertEqual(0, maps:size(State1#state.messages)), - ?assertEqual(0, lqueue:len(State1#state.returns)), - {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(0, maps:size(State2a#state.messages)), - ?assertEqual(1, lqueue:len(State2a#state.returns)), - ok. - -down_with_noproc_enqueuer_is_cleaned_up_test() -> - State00 = test_init(test), - Pid = spawn(fun() -> ok end), - {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00), - ?ASSERT_EFF({monitor, process, _}, Effects0), - {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), - % ensure there are no enqueuers - ?assert(0 =:= maps:size(State1#state.enqueuers)), - ok. - -discarded_message_without_dead_letter_handler_is_removed_test() -> - Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, - {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, Effects1} = check_n(Cid, 2, 10, State0), - ?ASSERT_EFF({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, - Effects1), - {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), - ?assertNoEffect({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, - Effects2), - ok. - -discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> - Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, - State00 = init(#{name => test, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - dead_letter_handler => - {somemod, somefun, [somearg]}}), - {State0, [_, _]} = enq(1, 1, first, State00), - {State1, Effects1} = check_n(Cid, 2, 10, State0), - ?ASSERT_EFF({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, - Effects1), - {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), - % assert mod call effect with appended reason and message - ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, - Effects2), - ok. - -tick_test() -> - Cid = {<<"c">>, self()}, - Cid2 = {<<"c2">>, self()}, - {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), - {S1, _} = enq(2, 2, <<"snd">>, S0), - {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), - {S3, {_, _}} = deq(4, Cid2, unsettled, S2), - {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3), - - [{mod_call, _, _, - [#resource{}, - {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4), - ok. - -enq_deq_snapshot_recover_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_checkout(Cid, {dequeue, settled}, #{}), - make_enqueue(self(), 3, three), - make_enqueue(self(), 4, four), - make_checkout(Cid, {dequeue, settled}, #{}), - make_enqueue(self(), 5, five), - make_checkout(Cid, {dequeue, settled}, #{}) - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_deq_settle_snapshot_recover_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - % OthPid = spawn(fun () -> ok end), - % Oth = {<<"oth">>, OthPid}, - Commands = [ - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_checkout(Cid, {dequeue, unsettled}, #{}), - make_settle(Cid, [0]) - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_deq_settle_snapshot_recover_2_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - OthPid = spawn(fun () -> ok end), - Oth = {<<"oth">>, OthPid}, - Commands = [ - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_checkout(Cid, {dequeue, unsettled}, #{}), - make_settle(Cid, [0]), - make_enqueue(self(), 3, two), - make_checkout(Cid, {dequeue, unsettled}, #{}), - make_settle(Oth, [0]) - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -snapshot_recover_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_enqueue(self(), 3, three), - make_purge() - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_deq_return_settle_snapshot_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_enqueue(self(), 1, one), %% to Cid - make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - make_return(Cid, [0]), %% should be re-delivered to Cid - make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 - make_settle(Cid, [1]), - make_settle(Cid, [2]) - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -return_prefix_msg_count_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_enqueue(self(), 1, one), - make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - make_checkout(Cid, cancel, #{}), - make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2 - ], - Indexes = lists:seq(1, length(Commands)), - Entries = lists:zip(Indexes, Commands), - {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), - ok. - - -return_settle_snapshot_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_enqueue(self(), 1, one), %% to Cid - make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - make_return(Cid, [0]), %% should be re-delivered to Oth - make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 - make_settle(Cid, [1]), - make_return(Cid, [2]), - make_settle(Cid, [3]), - make_enqueue(self(), 3, three), - make_purge(), - make_enqueue(self(), 4, four) - ], - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_check_settle_snapshot_recover_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_settle(Cid, [1]), - make_settle(Cid, [0]), - make_enqueue(self(), 3, three), - make_settle(Cid, [2]) - ], - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_check_settle_snapshot_purge_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_checkout(Cid, {auto, 2, simple_prefetch},#{}), - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_settle(Cid, [1]), - make_settle(Cid, [0]), - make_enqueue(self(), 3, three), - make_purge() - ], - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), - run_snapshot_test(?FUNCTION_NAME, Commands). - -enq_check_settle_duplicate_test() -> - %% duplicate settle commands are likely - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), - make_enqueue(self(), 1, one), %% 0 - make_enqueue(self(), 2, two), %% 0 - make_settle(Cid, [0]), - make_settle(Cid, [1]), - make_settle(Cid, [1]), - make_enqueue(self(), 3, three), - make_settle(Cid, [2]) - ], - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), - run_snapshot_test(?FUNCTION_NAME, Commands). - -run_snapshot_test(Name, Commands) -> - %% create every incremental permutation of the commands lists - %% and run the snapshot tests against that - [begin - run_snapshot_test0(Name, C) - end || C <- prefixes(Commands, 1, [])]. - -run_snapshot_test0(Name, Commands) -> - Indexes = lists:seq(1, length(Commands)), - Entries = lists:zip(Indexes, Commands), - {State, Effects} = run_log(test_init(Name), Entries), - - [begin - Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; - (_) -> false - end, Entries), - {S, _} = run_log(SnapState, Filtered), - % assert log can be restored from any release cursor index - ?assertEqual(normalize(State), normalize(S)) - end || {release_cursor, SnapIdx, SnapState} <- Effects], - ok. - -prefixes(Source, N, Acc) when N > length(Source) -> - lists:reverse(Acc); -prefixes(Source, N, Acc) -> - {X, _} = lists:split(N, Source), - prefixes(Source, N+1, [X | Acc]). - -delivery_query_returns_deliveries_test() -> - Tag = atom_to_binary(?FUNCTION_NAME, utf8), - Cid = {Tag, self()}, - Commands = [ - make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), - make_enqueue(self(), 1, one), - make_enqueue(self(), 2, two), - make_enqueue(self(), 3, tre), - make_enqueue(self(), 4, for) - ], - Indexes = lists:seq(1, length(Commands)), - Entries = lists:zip(Indexes, Commands), - {State, _Effects} = run_log(test_init(help), Entries), - % 3 deliveries are returned - [{0, {#{}, one}}] = get_checked_out(Cid, 0, 0, State), - [_, _, _] = get_checked_out(Cid, 1, 3, State), - ok. - -pending_enqueue_is_enqueued_on_down_test() -> - Cid = {<<"cid">>, self()}, - Pid = self(), - {State0, _} = enq(1, 2, first, test_init(test)), - {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), - {_State2, {dequeue, {0, {_, first}}, 0}, _} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), - ok. - -duplicate_delivery_test() -> - {State0, _} = enq(1, 1, first, test_init(test)), - {#state{ra_indexes = RaIdxs, - messages = Messages}, _} = enq(2, 1, first, State0), - ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), - ?assertEqual(1, maps:size(Messages)), - ok. - -state_enter_test() -> - S0 = init(#{name => the_name, - queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), - become_leader_handler => {m, f, [a]}}), - [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), - ok. - -state_enter_monitors_and_notifications_test() -> - Oth = spawn(fun () -> ok end), - {State0, _} = enq(1, 1, first, test_init(test)), - Cid = {<<"adf">>, self()}, - OthCid = {<<"oth">>, Oth}, - {State1, _} = check(Cid, 2, State0), - {State, _} = check(OthCid, 3, State1), - Self = self(), - Effects = state_enter(leader, State), - - %% monitor all enqueuers and consumers - [{monitor, process, Self}, - {monitor, process, Oth}] = - lists:filter(fun ({monitor, process, _}) -> true; - (_) -> false - end, Effects), - [{send_msg, Self, leader_change, ra_event}, - {send_msg, Oth, leader_change, ra_event}] = - lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; - (_) -> false - end, Effects), - ?ASSERT_EFF({monitor, process, _}, Effects), - ok. - -purge_test() -> - Cid = {<<"purge_test">>, self()}, - {State1, _} = enq(1, 1, first, test_init(test)), - {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1), - {State3, _} = enq(3, 2, second, State2), - % get returns a reply value - {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = - apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), - ok. - -purge_with_checkout_test() -> - Cid = {<<"purge_test">>, self()}, - {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), - {State1, _} = enq(2, 1, <<"first">>, State0), - {State2, _} = enq(3, 2, <<"second">>, State1), - %% assert message bytes are non zero - ?assert(State2#state.msg_bytes_checkout > 0), - ?assert(State2#state.msg_bytes_enqueue > 0), - {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2), - ?assert(State2#state.msg_bytes_checkout > 0), - ?assertEqual(0, State3#state.msg_bytes_enqueue), - ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)), - #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), - ?assertEqual(1, maps:size(Checked)), - ok. - -down_returns_checked_out_in_order_test() -> - S0 = test_init(?FUNCTION_NAME), - %% enqueue 100 - S1 = lists:foldl(fun (Num, FS0) -> - {FS, _} = enq(Num, Num, Num, FS0), - FS - end, S0, lists:seq(1, 100)), - ?assertEqual(100, maps:size(S1#state.messages)), - Cid = {<<"cid">>, self()}, - {S2, _} = check(Cid, 101, 1000, S1), - #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), - ?assertEqual(100, maps:size(Checked)), - %% simulate down - {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), - Returns = lqueue:to_list(S#state.returns), - ?assertEqual(100, length(Returns)), - %% validate returns are in order - ?assertEqual(lists:sort(Returns), Returns), - ok. - -single_active_consumer_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - ?assertEqual(single_active, State0#state.consumer_strategy), - ?assertEqual(0, map_size(State0#state.consumers)), - - % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - meta(1), - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, self()}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), - - % the first registered consumer is the active one, the others are waiting - ?assertEqual(1, map_size(State1#state.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)), - ?assertEqual(3, length(State1#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), - - % cancelling a waiting consumer - {State2, _, Effects1} = apply(meta(2), - #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), - % the active consumer should still be in place - ?assertEqual(1, map_size(State2#state.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), - % the cancelled consumer has been removed from waiting consumers - ?assertEqual(2, length(State2#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)), - % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects1)), - - % cancelling the active consumer - {State3, _, Effects2} = apply(meta(3), - #checkout{spec = cancel, - consumer_id = {<<"ctag1">>, self()}}, - State2), - % the second registered consumer is now the active one - ?assertEqual(1, map_size(State3#state.consumers)), - ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), - % the new active consumer is no longer in the waiting list - ?assertEqual(1, length(State3#state.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects2)), - - % cancelling the active consumer - {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), - % the last waiting consumer became the active one - ?assertEqual(1, map_size(State4#state.consumers)), - ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), - % the waiting consumer list is now empty - ?assertEqual(0, length(State4#state.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects3)), - - % cancelling the last consumer - {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), - % no active consumer anymore - ?assertEqual(0, map_size(State5#state.consumers)), - % still nothing in the waiting list - ?assertEqual(0, length(State5#state.waiting_consumers)), - % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects4)), - - ok. - -single_active_consumer_cancel_consumer_when_channel_is_down_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - - DummyFunction = fun() -> ok end, - Pid1 = spawn(DummyFunction), - Pid2 = spawn(DummyFunction), - Pid3 = spawn(DummyFunction), - - % adding some consumers - AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - #{index => 1}, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, ChannelId}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - - % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), - % fell back to another consumer - ?assertEqual(1, map_size(State2#state.consumers)), - % there are still waiting consumers - ?assertEqual(2, length(State2#state.waiting_consumers)), - % effects to unregister the consumer and - % to update the new active one (metrics) are there - ?assertEqual(2, length(Effects)), - - % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), - % fell back to another consumer - ?assertEqual(1, map_size(State3#state.consumers)), - % no more waiting consumer - ?assertEqual(0, length(State3#state.waiting_consumers)), - % effects to cancel both consumers of this channel + effect to update the new active one (metrics) - ?assertEqual(3, length(Effects2)), - - % the last channel goes down - {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), - % no more consumers - ?assertEqual(0, map_size(State4#state.consumers)), - ?assertEqual(0, length(State4#state.waiting_consumers)), - % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects3)), - - ok. - -single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - - Meta = #{index => 1}, - % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - Meta, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, self()}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), - - % simulate node goes down - {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), - - % all the waiting consumers should be suspected down - ?assertEqual(3, length(State2#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{status = Status}}) -> - ?assert(Status == suspected_down) - end, State2#state.waiting_consumers), - - % simulate node goes back up - {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), - - % all the waiting consumers should be un-suspected - ?assertEqual(3, length(State3#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{status = Status}}) -> - ?assert(Status /= suspected_down) - end, State3#state.waiting_consumers), - - ok. - -single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - - DummyFunction = fun() -> ok end, - Pid1 = spawn(DummyFunction), - Pid2 = spawn(DummyFunction), - Pid3 = spawn(DummyFunction), - - Meta = #{index => 1}, - % adding some consumers - AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - Meta, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, ChannelId}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - - Effects = state_enter(leader, State1), - % 2 effects for each consumer process (channel process), 1 effect for the node - ?assertEqual(2 * 3 + 1, length(Effects)). - -single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - - DummyFunction = fun() -> ok end, - Pid1 = spawn(DummyFunction), - Pid2 = spawn(DummyFunction), - Pid3 = spawn(DummyFunction), - - Meta = #{index => 1}, - % adding some consumers - AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - Meta, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, ChannelId}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - - Effects = state_enter(eol, State1), - % 1 effect for each consumer process (channel process) - ?assertEqual(3, length(Effects)). - -query_consumers_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => false}), - - % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - #{index => 1}, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, self()}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), - Consumers0 = State1#state.consumers, - Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), - Consumers1 = maps:put({<<"ctag2">>, self()}, - Consumer#consumer{status = suspected_down}, Consumers0), - State2 = State1#state{consumers = Consumers1}, - - ?assertEqual(4, query_consumer_count(State2)), - Consumers2 = query_consumers(State2), - ?assertEqual(4, maps:size(Consumers2)), - maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> - ?assertEqual(self(), Pid), - case Tag of - <<"ctag2">> -> - ?assertNot(Active), - ?assertEqual(suspected_down, ActivityStatus); - _ -> - ?assert(Active), - ?assertEqual(up, ActivityStatus) - end - end, [], Consumers2). - -query_consumers_when_single_active_consumer_is_on_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - Meta = #{index => 1}, - % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - Meta, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, self()}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), - - ?assertEqual(4, query_consumer_count(State1)), - Consumers = query_consumers(State1), - ?assertEqual(4, maps:size(Consumers)), - maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> - ?assertEqual(self(), Pid), - case Tag of - <<"ctag1">> -> - ?assert(Active), - ?assertEqual(single_active, ActivityStatus); - _ -> - ?assertNot(Active), - ?assertEqual(waiting, ActivityStatus) - end - end, [], Consumers). - -active_flag_updated_when_consumer_suspected_unsuspected_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => false}), - - DummyFunction = fun() -> ok end, - Pid1 = spawn(DummyFunction), - Pid2 = spawn(DummyFunction), - Pid3 = spawn(DummyFunction), - - % adding some consumers - AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - #{index => 1}, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, ChannelId}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), - % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node - ?assertEqual(4 + 1, length(Effects2)), - - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), - % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID - ?assertEqual(4 + 4, length(Effects3)). - -active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() -> - State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - - DummyFunction = fun() -> ok end, - Pid1 = spawn(DummyFunction), - Pid2 = spawn(DummyFunction), - Pid3 = spawn(DummyFunction), - - % adding some consumers - AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - #{index => 1}, - #checkout{spec = {once, 1, simple_prefetch}, - meta = #{}, - consumer_id = {CTag, ChannelId}}, - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), - % only 1 effect to monitor the node - ?assertEqual(1, length(Effects2)), - - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), - % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(4, length(Effects3)). - -meta(Idx) -> - #{index => Idx, term => 1}. - -enq(Idx, MsgSeq, Msg, State) -> - strip_reply( - apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)). - -deq(Idx, Cid, Settlement, State0) -> - {State, {dequeue, {MsgId, Msg}, _}, _} = - apply(meta(Idx), - make_checkout(Cid, {dequeue, Settlement}, #{}), - State0), - {State, {MsgId, Msg}}. - -check_n(Cid, Idx, N, State) -> - strip_reply( - apply(meta(Idx), - make_checkout(Cid, {auto, N, simple_prefetch}, #{}), - State)). - -check(Cid, Idx, State) -> - strip_reply( - apply(meta(Idx), - make_checkout(Cid, {once, 1, simple_prefetch}, #{}), - State)). - -check_auto(Cid, Idx, State) -> - strip_reply( - apply(meta(Idx), - make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - State)). - -check(Cid, Idx, Num, State) -> - strip_reply( - apply(meta(Idx), - make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), - State)). - -settle(Cid, Idx, MsgId, State) -> - strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)). - -credit(Cid, Idx, Credit, DelCnt, Drain, State) -> - strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain), - State)). - -strip_reply({State, _, Effects}) -> - {State, Effects}. - -run_log(InitState, Entries) -> - lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> - case apply(meta(Idx), E, Acc0) of - {Acc, _, Efx} when is_list(Efx) -> - {Acc, Efx0 ++ Efx}; - {Acc, _, Efx} -> - {Acc, Efx0 ++ [Efx]}; - {Acc, _} -> - {Acc, Efx0} - end - end, {InitState, []}, Entries). - - -%% AUX Tests - -aux_test() -> - _ = ra_machine_ets:start_link(), - Aux0 = init_aux(aux_test), - MacState = init(#{name => aux_test, - queue_resource => - rabbit_misc:r(<<"/">>, queue, <<"test">>)}), - Log = undefined, - {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0, - Log, MacState), - {no_reply, _Aux, undefined} = handle_aux(leader, cast, emit, Aux, - Log, MacState), - [X] = ets:lookup(rabbit_fifo_usage, aux_test), - ?assert(X > 0.0), - ok. - - --endif. - diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl new file mode 100644 index 0000000000..ebe5f3328a --- /dev/null +++ b/src/rabbit_fifo.hrl @@ -0,0 +1,170 @@ + +-type raw_msg() :: term(). +%% The raw message. It is opaque to rabbit_fifo. + +-type msg_in_id() :: non_neg_integer(). +% a queue scoped monotonically incrementing integer used to enforce order +% in the unassigned messages map + +-type msg_id() :: non_neg_integer(). +%% A consumer-scoped monotonically incrementing integer included with a +%% {@link delivery/0.}. Used to settle deliveries using +%% {@link rabbit_fifo_client:settle/3.} + +-type msg_seqno() :: non_neg_integer(). +%% A sender process scoped monotonically incrementing integer included +%% in enqueue messages. Used to ensure ordering of messages send from the +%% same process + +-type msg_header() :: #{size := msg_size(), + delivery_count => non_neg_integer()}. +%% The message header map: +%% delivery_count: the number of unsuccessful delivery attempts. +%% A non-zero value indicates a previous attempt. + +-type msg() :: {msg_header(), raw_msg()}. +%% message with a header map. + +-type msg_size() :: non_neg_integer(). +%% the size in bytes of the msg payload + +-type indexed_msg() :: {ra_index(), msg()}. + +-type prefix_msg() :: {'$prefix_msg', msg_header()}. + +-type delivery_msg() :: {msg_id(), msg()}. +%% A tuple consisting of the message id and the headered message. + +-type consumer_tag() :: binary(). +%% An arbitrary binary tag used to distinguish between different consumers +%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.} + +-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}. +%% Represents the delivery of one or more rabbit_fifo messages. + +-type consumer_id() :: {consumer_tag(), pid()}. +%% The entity that receives messages. Uniquely identifies a consumer. + +-type credit_mode() :: simple_prefetch | credited. +%% determines how credit is replenished + +-type checkout_spec() :: {once | auto, Num :: non_neg_integer(), + credit_mode()} | + {dequeue, settled | unsettled} | + cancel. + +-type consumer_meta() :: #{ack => boolean(), + username => binary(), + prefetch => non_neg_integer(), + args => list()}. +%% static meta data associated with a consumer + + +-type applied_mfa() :: {module(), atom(), list()}. +% represents a partially applied module call + +-define(RELEASE_CURSOR_EVERY, 64000). +-define(USE_AVG_HALF_LIFE, 10000.0). + +-record(consumer, + {meta = #{} :: consumer_meta(), + checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, + next_msg_id = 0 :: msg_id(), % part of snapshot data + %% max number of messages that can be sent + %% decremented for each delivery + credit = 0 : non_neg_integer(), + %% total number of checked out messages - ever + %% incremented for each delivery + delivery_count = 0 :: non_neg_integer(), + %% the mode of how credit is incremented + %% simple_prefetch: credit is re-filled as deliveries are settled + %% or returned. + %% credited: credit can only be changed by receiving a consumer_credit + %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' + credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data + lifetime = once :: once | auto, + status = up :: up | suspected_down | cancelled + }). + +-type consumer() :: #consumer{}. + +-record(enqueuer, + {next_seqno = 1 :: msg_seqno(), + % out of order enqueues - sorted list + pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], + status = up :: up | suspected_down + }). + +-record(cfg, + {name :: atom(), + resource :: rabbit_types:r('queue'), + release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), + dead_letter_handler :: maybe(applied_mfa()), + become_leader_handler :: maybe(applied_mfa()), + max_length :: maybe(non_neg_integer()), + max_bytes :: maybe(non_neg_integer()), + %% whether single active consumer is on or not for this queue + consumer_strategy = default :: default | single_active, + delivery_limit :: maybe(non_neg_integer()) + }). + +-record(rabbit_fifo, + {cfg :: #cfg{}, + % unassigned messages + messages = #{} :: #{msg_in_id() => indexed_msg()}, + % defines the lowest message in id available in the messages map + % that isn't a return + low_msg_num :: msg_in_id() | undefined, + % defines the next message in id to be added to the messages map + next_msg_num = 1 :: msg_in_id(), + % list of returned msg_in_ids - when checking out it picks from + % this list first before taking low_msg_num + returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | + {msg_in_id(), indexed_msg()}), + % a counter of enqueues - used to trigger shadow copy points + enqueue_count = 0 :: non_neg_integer(), + % a map containing all the live processes that have ever enqueued + % a message to this queue as well as a cached value of the smallest + % ra_index of all pending enqueues + enqueuers = #{} :: #{pid() => #enqueuer{}}, + % master index of all enqueue raft indexes including pending + % enqueues + % rabbit_fifo_index can be slow when calculating the smallest + % index when there are large gaps but should be faster than gb_trees + % for normal appending operations as it's backed by a map + ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), + release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, + ra_index(), #rabbit_fifo{}}), + % consumers need to reflect consumer state at time of snapshot + % needs to be part of snapshot + consumers = #{} :: #{consumer_id() => #consumer{}}, + % consumers that require further service are queued here + % needs to be part of snapshot + service_queue = queue:new() :: queue:queue(consumer_id()), + %% This is a special field that is only used for snapshots + %% It represents the queued messages at the time the + %% dehydrated snapshot state was cached. + %% As release_cursors are only emitted for raft indexes where all + %% prior messages no longer contribute to the current state we can + %% replace all message payloads with their sizes (to be used for + %% overflow calculations). + %% This is done so that consumers are still served in a deterministic + %% order on recovery. + prefix_msgs = {[], []} :: {Return :: [msg_header()], + PrefixMsgs :: [msg_header()]}, + msg_bytes_enqueue = 0 :: non_neg_integer(), + msg_bytes_checkout = 0 :: non_neg_integer(), + %% waiting consumers, one is picked active consumer is cancelled or dies + %% used only when single active consumer is on + waiting_consumers = [] :: [{consumer_id(), consumer()}] + }). + +-type config() :: #{name := atom(), + queue_resource := rabbit_types:r('queue'), + dead_letter_handler => applied_mfa(), + become_leader_handler => applied_mfa(), + release_cursor_interval => non_neg_integer(), + max_length => non_neg_integer(), + max_bytes => non_neg_integer(), + single_active_consumer_on => boolean(), + delivery_limit => non_neg_integer()}. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 555e4e2e87..95bf067539 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -43,14 +43,17 @@ register() -> {policy_validator, <<"max-length-bytes">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, + {policy_validator, <<"delivery-limit">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, {operator_policy_validator, <<"max-length-bytes">>}, + {operator_policy_validator, <<"delivery-limit">>}, {policy_merge_strategy, <<"expires">>}, {policy_merge_strategy, <<"message-ttl">>}, {policy_merge_strategy, <<"max-length">>}, - {policy_merge_strategy, <<"max-length-bytes">>}]], + {policy_merge_strategy, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"delivery-limit">>}]], ok. validate_policy(Terms) -> @@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; validate_policy0(<<"overflow">>, Value) -> - {error, "~p is not a valid overflow value", [Value]}. + {error, "~p is not a valid overflow value", [Value]}; + +validate_policy0(<<"delivery-limit">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"delivery-limit">>, Value) -> + {error, "~p is not a valid delivery limit", [Value]}. merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); -merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal). +merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 9e73981541..e811bfffb3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - single_active_consumer_on => single_active_consumer_on(Q)}. + single_active_consumer_on => single_active_consumer_on(Q), + delivery_limit => DeliveryLimit + }. single_active_consumer_on(Q) -> QArguments = amqqueue:get_arguments(Q), @@ -680,14 +683,12 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; - timeout -> + {timeout, _} -> {error, timeout}; E -> %% TODO should we stop the ra process here? E end; - timeout -> - {error, timeout}; E -> E end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 406c02de83..0ec65c31b8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -111,7 +111,10 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, - queue_length_limit_drop_head + queue_length_limit_drop_head, + subscribe_redelivery_limit, + subscribe_redelivery_policy, + subscribe_redelivery_limit_with_dead_letter ]. memory_tests() -> @@ -1487,12 +1490,12 @@ subscribe_redelivery_count(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0), subscribe(Ch, QQ, false), - DTag = <<"x-delivery-count">>, + DCHeader = <<"x-delivery-count">>, receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, #amqp_msg{props = #'P_basic'{headers = H0}}} -> - ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}) @@ -1502,7 +1505,7 @@ subscribe_redelivery_count(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H1}}} -> - ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}) @@ -1512,13 +1515,147 @@ subscribe_redelivery_count(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag2, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H2}}} -> - ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, multiple = false}), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0) end. +subscribe_redelivery_limit(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}])), + + publish(Ch, QQ), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QQ, false), + + DCHeader = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end. + +subscribe_redelivery_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>, + [{<<"delivery-limit">>, 1}]), + + publish(Ch, QQ), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QQ, false), + + DCHeader = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end, + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>). + +subscribe_redelivery_limit_with_dead_letter(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLX} + ])), + ?assertEqual({'queue.declare_ok', DLX, 0, 0}, + declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + publish(Ch, QQ), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QQ, false), + + DCHeader = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[DLX, <<"1">>, <<"1">>, <<"0">>]]). + consume_redelivery_count(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1531,14 +1668,14 @@ consume_redelivery_count(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0), - DTag = <<"x-delivery-count">>, + DCHeader = <<"x-delivery-count">>, {#'basic.get_ok'{delivery_tag = DeliveryTag, redelivered = false}, #amqp_msg{props = #'P_basic'{headers = H0}}} = amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false}), - ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + ?assertMatch({DCHeader, _, 0}, rabbit_basic:header(DCHeader, H0)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), @@ -1550,7 +1687,7 @@ consume_redelivery_count(Config) -> #amqp_msg{props = #'P_basic'{headers = H1}}} = amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false}), - ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}), @@ -1560,7 +1697,7 @@ consume_redelivery_count(Config) -> #amqp_msg{props = #'P_basic'{headers = H2}}} = amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false}), - ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, multiple = false, requeue = true}), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 60402b3a7b..ceed092d0f 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1,638 +1,1035 @@ -module(rabbit_fifo_SUITE). +%% rabbit_fifo unit tests suite + -compile(export_all). +-compile({no_auto_import, [apply/3]}). +-export([ + ]). + -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("ra/include/ra.hrl"). +-include_lib("rabbit/src/rabbit_fifo.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + all() -> [ {group, tests} ]. + +%% replicate eunit like test resultion all_tests() -> - [ - basics, - return, - rabbit_fifo_returns_correlation, - resends_lost_command, - returns_after_down, - resends_after_lost_applied, - handles_reject_notification, - two_quick_enqueues, - detects_lost_delivery, - dequeue, - discard, - cancel_checkout, - credit, - untracked_enqueue, - flow, - test_queries, - duplicate_delivery, - usage - ]. + [F || {F, _} <- ?MODULE:module_info(functions), + re:run(atom_to_list(F), "_test$") /= nomatch] + . groups() -> [ {tests, [], all_tests()} ]. -init_per_group(_, Config) -> - PrivDir = ?config(priv_dir, Config), - _ = application:load(ra), - ok = application:set_env(ra, data_dir, PrivDir), - application:ensure_all_started(ra), - application:ensure_all_started(lg), +init_per_suite(Config) -> Config. -end_per_group(_, Config) -> - _ = application:stop(ra), +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> Config. -init_per_testcase(TestCase, Config) -> - meck:new(rabbit_quorum_queue, [passthrough]), - meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end), - meck:expect(rabbit_quorum_queue, cancel_consumer_handler, - fun (_, _) -> ok end), - ra_server_sup_sup:remove_all(), - ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), - ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), - ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), - [ - {cluster_name, ClusterName}, - {uid, atom_to_binary(TestCase, utf8)}, - {node_id, {TestCase, node()}}, - {uid2, atom_to_binary(ServerName2, utf8)}, - {node_id2, {ServerName2, node()}}, - {uid3, atom_to_binary(ServerName3, utf8)}, - {node_id3, {ServerName3, node()}} - | Config]. - -end_per_testcase(_, Config) -> - meck:unload(), +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> Config. -basics(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - UId = ?config(uid, Config), - CustomerTag = UId, - ok = start_cluster(ClusterName, [ServerId]), - FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), - - ra_log_wal:force_roll_over(ra_log_wal), - % create segment the segment will trigger a snapshot - timer:sleep(1000), - - {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1), - % process ra events - FState3 = process_ra_event(FState2, 250), - - FState5 = receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of - {internal, _AcceptedSeqs, _Actions, _FState4} -> - exit(unexpected_internal_event); - {{delivery, C, [{MsgId, _Msg}]}, FState4} -> - {ok, S} = rabbit_fifo_client:settle(C, [MsgId], - FState4), - S - end - after 5000 -> - exit(await_msg_timeout) - end, - - % process settle applied notification - FState5b = process_ra_event(FState5, 250), - _ = ra:stop_server(ServerId), - _ = ra:restart_server(ServerId), - - %% wait for leader change to notice server is up again - receive - {ra_event, _, {machine, leader_change}} -> ok - after 5000 -> - exit(leader_change_timeout) - end, - - {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b), - % process applied event - FState6b = process_ra_event(FState6, 250), - - receive - {ra_event, Frm, E} -> - case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of - {internal, _, _, _FState7} -> - exit({unexpected_internal_event, E}); - {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> - {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), - ok - end - after 2000 -> - exit(await_msg_timeout) - end, - ra:stop_server(ServerId), - ok. - -return(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ServerId2 = ?config(node_id2, Config), - ok = start_cluster(ClusterName, [ServerId, ServerId2]), - - F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]), - {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), - {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), - {_, _, F2} = process_ra_events(F1, 100), - {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), - {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), - - ra:stop_server(ServerId), - ok. - -rabbit_fifo_returns_correlation(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0), - receive - {ra_event, Frm, E} -> - case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of - {internal, [corr1], [], _F2} -> - ok; - {Del, _} -> - exit({unexpected, Del}) - end - after 2000 -> - exit(await_msg_timeout) - end, - ra:stop_server(ServerId), - ok. - -duplicate_delivery(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), - {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), - Fun = fun Loop(S0) -> - receive - {ra_event, Frm, E} = Evt -> - case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of - {internal, [corr1], [], S1} -> - Loop(S1); - {_Del, S1} -> - %% repeat event delivery - self() ! Evt, - %% check that then next received delivery doesn't - %% repeat or crash - receive - {ra_event, F, E1} -> - case rabbit_fifo_client:handle_ra_event(F, E1, S1) of - {internal, [], [], S2} -> - S2 - end - end - end - after 2000 -> - exit(await_msg_timeout) - end - end, - Fun(F2), - ra:stop_server(ServerId), - ok. - -usage(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), - {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), - {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), - {_, _, _} = process_ra_events(F3, 50), - % force tick and usage stats emission - ServerId ! tick_timeout, - timer:sleep(50), - Use = rabbit_fifo:usage(element(1, ServerId)), - ra:stop_server(ServerId), - ?assert(Use > 0.0), - ok. - -resends_lost_command(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - ok = meck:new(ra, [passthrough]), - - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), - % lose the enqueue - meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end), - {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), - meck:unload(ra), - {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), - {_, _, F4} = process_ra_events(F3, 500), - {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), - ra:stop_server(ServerId), - ok. - -two_quick_enqueues(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)), - {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), - _ = process_ra_events(F2, 500), - ra:stop_server(ServerId), - ok. - -detects_lost_delivery(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), - {_, _, F0} = process_ra_events(F00, 100), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), - {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), - {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), - % lose first delivery - receive - {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> - ok - after 500 -> - exit(await_delivery_timeout) - end, - - % assert three deliveries were received - {[_, _, _], _, _} = process_ra_events(F3, 500), - ra:stop_server(ServerId), - ok. - -returns_after_down(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), - {_, _, F2} = process_ra_events(F1, 500), - % start a customer in a separate processes - % that exits after checkout +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +-define(ASSERT_EFF(EfxPat, Effects), + ?ASSERT_EFF(EfxPat, true, Effects)). + +-define(ASSERT_EFF(EfxPat, Guard, Effects), + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(assertNoEffect(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +test_init(Name) -> + init(#{name => Name, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(Name, utf8)), + release_cursor_interval => 0}). + +enq_enq_checkout_test(_) -> + Cid = {<<"enq_enq_checkout_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {_State3, _, Effects} = + apply(meta(3), + rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), + ok. + +credit_enq_enq_checkout_settled_credit_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _, Effects} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {auto, 1, credited}, #{}), State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; + (_) -> false + end, Effects), + ?assertEqual(1, length(Deliveries)), + %% settle the delivery this should _not_ result in further messages being + %% delivered + {State4, SettledEffects} = settle(Cid, 4, 1, State3), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, SettledEffects)), + %% granting credit (3) should deliver the second msg if the receivers + %% delivery count is (1) + {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4), + % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects), + {_State6, FinalEffects} = enq(6, 3, third, State5), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, FinalEffects)), + ok. + +credit_with_drained_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + %% checkout with a single credit + {State1, _, _} = + apply(meta(1), rabbit_fifo:make_checkout(Cid, {auto, 1, credited},#{}), + State0), + ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 1, + delivery_count = 0}}}, + State1), + {State, Result, _} = + apply(meta(3), rabbit_fifo:make_credit(Cid, 0, 5, true), State1), + ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 5}}}, + State), + ?assertEqual({multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 5}]}]}, + Result), + ok. + +credit_and_drain_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + %% checkout without any initial credit (like AMQP 1.0 would) + {State3, _, CheckEffs} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {auto, 0, credited}, #{}), + State2), + + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 2}]}]}, + Effects} = apply(meta(4), rabbit_fifo:make_credit(Cid, 4, 0, true), State3), + ?assertMatch(#rabbit_fifo{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 4}}}, + State4), + + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, + {_, {_, second}}]}, _}, Effects), + {_State5, EnqEffs} = enq(5, 2, third, State4), + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), + ok. + + + +enq_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + NumReady = 1, + {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + ok. + +enq_enq_deq_deq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + {_State4, {dequeue, empty}} = + apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + State3), + ok. + +enq_enq_checkout_get_settled_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + % get returns a reply value + {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), + State1), + ok. + +checkout_get_empty_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State = test_init(test), + {_State2, {dequeue, empty}} = + apply(meta(1), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State), + ok. + +untracked_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + {State1, _, _} = apply(meta(1), + rabbit_fifo:make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}, _}, _} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +release_cursor_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _} = check(Cid, 3, 10, State2), + % no release cursor effect at this point + {State4, _} = settle(Cid, 4, 1, State3), + {_Final, Effects1} = settle(Cid, 5, 0, State4), + % empty queue forwards release cursor all the way + ?ASSERT_EFF({release_cursor, 5, _}, Effects1), + ok. + +checkout_enq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State2, Effects0} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, + {delivery, ?FUNCTION_NAME, + [{0, {_, first}}]}, _}, + Effects0), + {State3, [_Inactive]} = enq(3, 2, second, State2), + {_, _Effects} = settle(Cid, 4, 0, State3), + % the release cursor is the smallest raft index that does not + % contribute to the state of the application + % ?ASSERT_EFF({release_cursor, 2, _}, Effects), + ok. + +out_of_order_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + % assert monitor was set up + ?ASSERT_EFF({monitor, _, _}, Effects2), + % enqueue seq num 3 and 4 before 2 + {State3, Effects3} = enq(3, 3, third, State2), + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3), + {State4, Effects4} = enq(4, 4, fourth, State3), + % assert no further deliveries where made + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4), + {_State5, Effects5} = enq(5, 2, second, State4), + % assert two deliveries were now made + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}}, + {_, {_, third}}, + {_, {_, fourth}}]}, _}, + Effects5), + ok. + +out_of_order_first_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = check_n(Cid, 5, 5, test_init(test)), + {_State2, Effects2} = enq(2, 10, first, State1), + ?ASSERT_EFF({monitor, process, _}, Effects2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, + Effects2), + ok. + +duplicate_enqueue_test(_) -> + Cid = {<<"duplicate_enqueue_test">>, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + {_State3, Effects3} = enq(3, 1, first, State2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), + ok. + +return_non_existent_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), + % return non-existent + {_State2, _} = apply(meta(3), rabbit_fifo:make_return(Cid, [99]), State0), + ok. + +return_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1), + ok. + +return_checked_out_limit_test(_) -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1), + {#rabbit_fifo{ra_indexes = RaIdxs}, ok, []} = + apply(meta(4), rabbit_fifo:make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + +return_auto_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + % it first active then inactive as the consumer took on but cannot take + % any more + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), + % return should include another delivery + {_State2, _, Effects} = apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, + Effects), + ok. + +cancelled_checkout_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + {State1, _} = check_auto(Cid, 2, State0), + % cancelled checkout should not return pending messages to queue + {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), + + {State3, {dequeue, empty}} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), rabbit_fifo:make_settle(Cid, [0]), State3), + + {_State, {dequeue, {_, {_, second}}, _}, _} = + apply(meta(5), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State4), + ok. + +down_with_noproc_consumer_returns_unsettled_test(_) -> + Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, + {State0, [_, _]} = enq(1, 1, second, test_init(test)), + {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), + {_State, Effects} = check(Cid, 4, State2), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, Self = self(), - _Pid = spawn(fun () -> - F = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, - undefined, F), - Self ! checkout_done - end), - receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, - timer:sleep(1000), - % message should be available for dequeue - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), - ra:stop_server(ServerId), - ok. - -resends_after_lost_applied(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)), - 500), - {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), - % lose an applied event - receive - {ra_event, _, {applied, _}} -> - ok - after 500 -> - exit(await_ra_event_timeout) - end, - % send another message - {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), - {_, _, F4} = process_ra_events(F3, 500), - {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), - ra:stop_server(ServerId), - ok. - -handles_reject_notification(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId1 = ?config(node_id, Config), - ServerId2 = ?config(node_id2, Config), - UId1 = ?config(uid, Config), - CId = {UId1, self()}, - - ok = start_cluster(ClusterName, [ServerId1, ServerId2]), - _ = ra:process_command(ServerId1, - rabbit_fifo:make_checkout( - CId, - {auto, 10, simple_prefetch}, - #{})), - % reverse order - should try the first node in the list first - F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]), - {ok, F1} = rabbit_fifo_client:enqueue(one, F0), - - timer:sleep(500), - - % the applied notification - _F2 = process_ra_event(F1, 250), - ra:stop_server(ServerId1), - ra:stop_server(ServerId2), - ok. - -discard(Config) -> - PrivDir = ?config(priv_dir, Config), - ServerId = ?config(node_id, Config), - UId = ?config(uid, Config), - ClusterName = ?config(cluster_name, Config), - Conf = #{cluster_name => ClusterName#resource.name, - id => ServerId, - uid => UId, - log_init_args => #{data_dir => PrivDir, uid => UId}, - initial_member => [], - machine => {module, rabbit_fifo, - #{queue_resource => discard, - dead_letter_handler => - {?MODULE, dead_letter_handler, [self()]}}}}, - _ = ra:start_server(Conf), - ok = ra:trigger_election(ServerId), - _ = ra:members(ServerId), - - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), - {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), - F3 = discard_next_delivery(F2, 500), - {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), - receive - {dead_letter, Letters} -> - [{_, msg1}] = Letters, - ok - after 500 -> - exit(dead_letter_timeout) - end, - ra:stop_server(ServerId), - ok. - -cancel_checkout(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), - {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), - {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), - ok. - -credit(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), - {_, _, F3} = process_ra_events(F2, [], 250), - %% checkout with 0 prefetch - {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), - %% assert no deliveries - {_, _, F5} = process_ra_events0(F4, [], [], 250, - fun - (D, _) -> error({unexpected_delivery, D}) - end), - %% provide some credit - {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), - {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = - process_ra_events(F6, [], 250), - - %% credit and drain - {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), - {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = - process_ra_events(F8, [], 250), - flush(), - - %% enqueue another message - at this point the consumer credit should be - %% all used up due to the drain - {ok, F10} = rabbit_fifo_client:enqueue(m3, F9), - %% assert no deliveries - {_, _, F11} = process_ra_events0(F10, [], [], 250, - fun - (D, _) -> error({unexpected_delivery, D}) - end), - %% credit again and receive the last message - {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), - {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250), - ok. - -untracked_enqueue(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - - ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), - timer:sleep(100), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), - ra:stop_server(ServerId), - ok. - - -flow(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3), - {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), - {ok, F3} = rabbit_fifo_client:enqueue(m3, F2), - {slow, F4} = rabbit_fifo_client:enqueue(m4, F3), - {_, _, F5} = process_ra_events(F4, 500), - {ok, _} = rabbit_fifo_client:enqueue(m5, F5), - ra:stop_server(ServerId), - ok. - -test_queries(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - ok = start_cluster(ClusterName, [ServerId]), - P = spawn(fun () -> - F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), - process_ra_events(F2, 100), - receive stop -> ok end - end), - F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), - {ok, {_, Ready}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1), - ?assertEqual(1, Ready), - {ok, {_, Checked}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1), - ?assertEqual(1, Checked), - {ok, {_, Processes}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1), - ?assertEqual(2, length(Processes)), - P ! stop, - ra:stop_server(ServerId), - ok. - -dead_letter_handler(Pid, Msgs) -> - Pid ! {dead_letter, Msgs}. - -dequeue(Config) -> - ClusterName = ?config(cluster_name, Config), - ServerId = ?config(node_id, Config), - UId = ?config(uid, Config), - Tag = UId, - ok = start_cluster(ClusterName, [ServerId]), - F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), - {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), - {_, _, F2} = process_ra_events(F2_, 100), - - {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), - {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), - {_, _, F4} = process_ra_events(F4_, 100), - {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), - {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), - ra:stop_server(ServerId), - ok. - -enq_deq_n(N, F0) -> - enq_deq_n(N, F0, []). - -enq_deq_n(0, F0, Acc) -> - {_, _, F} = process_ra_events(F0, 100), - {F, Acc}; -enq_deq_n(N, F, Acc) -> - {ok, F1} = rabbit_fifo_client:enqueue(N, F), - {_, _, F2} = process_ra_events(F1, 10), - {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), - - {_, _, F4} = process_ra_events(F3, 5), - enq_deq_n(N-1, F4, [Deq | Acc]). - -conf(ClusterName, UId, ServerId, _, Peers) -> - #{cluster_name => ClusterName, - id => ServerId, - uid => UId, - log_init_args => #{uid => UId}, - initial_members => Peers, - machine => {module, rabbit_fifo, #{}}}. - -process_ra_event(State, Wait) -> - receive - {ra_event, From, Evt} -> - ct:pal("processed ra event ~p~n", [Evt]), - {internal, _, _, S} = - rabbit_fifo_client:handle_ra_event(From, Evt, State), - S - after Wait -> - exit(ra_event_timeout) - end. - -process_ra_events(State0, Wait) -> - process_ra_events(State0, [], Wait). - -process_ra_events(State, Acc, Wait) -> - DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), - S2 + Node = node(Pid), + {State0, Effects0} = enq(1, 1, second, test_init(test)), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#rabbit_fifo.consumers), + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), + % monitor both enqueuer and consumer + % because we received a noconnection we now need to monitor the node + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers), + %% validate consumer has credit + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), + ?ASSERT_EFF({monitor, node, _}, Effects2), + ?assertNoEffect({demonitor, process, _}, Effects2), + % when the node comes up we need to retry the process monitors for the + % disconnected processes + {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + % try to re-monitor the suspect processes + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), + ok. + +down_with_noconnection_returns_unack_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)), + ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)), + ok. + +down_with_noproc_enqueuer_is_cleaned_up_test(_) -> + State00 = test_init(test), + Pid = spawn(fun() -> ok end), + {State0, _, Effects0} = apply(meta(1), rabbit_fifo:make_enqueue(Pid, 1, first), State00), + ?ASSERT_EFF({monitor, process, _}, Effects0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), + % ensure there are no enqueuers + ?assert(0 =:= maps:size(State1#rabbit_fifo.enqueuers)), + ok. + +discarded_message_without_dead_letter_handler_is_removed_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {#{}, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), + ?assertNoEffect({send_msg, _, + {delivery, _, [{0, {#{}, first}}]}, _}, + Effects2), + ok. + +discarded_message_with_dead_letter_handler_emits_mod_call_effect_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + dead_letter_handler => + {somemod, somefun, [somearg]}}), + {State0, [_, _]} = enq(1, 1, first, State00), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {#{}, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), + % assert mod call effect with appended reason and message + ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, + Effects2), + ok. + +tick_test(_) -> + Cid = {<<"c">>, self()}, + Cid2 = {<<"c2">>, self()}, + {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), + {S1, _} = enq(2, 2, <<"snd">>, S0), + {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), + {S3, {_, _}} = deq(4, Cid2, unsettled, S2), + {S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3), + + [{mod_call, _, _, + [#resource{}, + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4), + ok. + + +delivery_query_returns_deliveries_test(_) -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + rabbit_fifo:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + rabbit_fifo:make_enqueue(self(), 1, one), + rabbit_fifo:make_enqueue(self(), 2, two), + rabbit_fifo:make_enqueue(self(), 3, tre), + rabbit_fifo:make_enqueue(self(), 4, for) + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(help), Entries), + % 3 deliveries are returned + [{0, {#{}, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State), + [_, _, _] = rabbit_fifo:get_checked_out(Cid, 1, 3, State), + ok. + +pending_enqueue_is_enqueued_on_down_test(_) -> + Cid = {<<"cid">>, self()}, + Pid = self(), + {State0, _} = enq(1, 2, first, test_init(test)), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}, 0}, _} = + apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +duplicate_delivery_test(_) -> + {State0, _} = enq(1, 1, first, test_init(test)), + {#rabbit_fifo{ra_indexes = RaIdxs, + messages = Messages}, _} = enq(2, 1, first, State0), + ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), + ?assertEqual(1, maps:size(Messages)), + ok. + +state_enter_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + become_leader_handler => {m, f, [a]}}), + [{mod_call, m, f, [a, the_name]}] = rabbit_fifo:state_enter(leader, S0), + ok. + +state_enter_monitors_and_notifications_test(_) -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), + Self = self(), + Effects = rabbit_fifo:state_enter(leader, State), + + %% monitor all enqueuers and consumers + [{monitor, process, Self}, + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +purge_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1), + {State3, _} = enq(3, 2, second, State2), + % get returns a reply value + {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), + ok. + +purge_with_checkout_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), + {State1, _} = enq(2, 1, <<"first">>, State0), + {State2, _} = enq(3, 2, <<"second">>, State1), + %% assert message bytes are non zero + ?assert(State2#rabbit_fifo.msg_bytes_checkout > 0), + ?assert(State2#rabbit_fifo.msg_bytes_enqueue > 0), + {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State2), + ?assert(State2#rabbit_fifo.msg_bytes_checkout > 0), + ?assertEqual(0, State3#rabbit_fifo.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#rabbit_fifo.ra_indexes)), + #consumer{checked_out = Checked} = maps:get(Cid, State3#rabbit_fifo.consumers), + ?assertEqual(1, maps:size(Checked)), + ok. + +down_returns_checked_out_in_order_test(_) -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), + Returns = lqueue:to_list(S#rabbit_fifo.returns), + ?assertEqual(100, length(Returns)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +single_active_consumer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + % the first registered consumer is the active one, the others are waiting + ?assertEqual(1, map_size(State1#rabbit_fifo.consumers)), + ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)), + ?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + + % cancelling a waiting consumer + {State2, _, Effects1} = apply(meta(2), + make_checkout({<<"ctag3">>, self()}, + cancel, #{}), State1), + % the active consumer should still be in place + ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), + ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)), + % the cancelled consumer has been removed from waiting consumers + ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects1)), + + % cancelling the active consumer + {State3, _, Effects2} = apply(meta(3), + make_checkout({<<"ctag1">>, self()}, + cancel, #{}), + State2), + % the second registered consumer is now the active one + ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), + ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)), + % the new active consumer is no longer in the waiting list + ?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects2)), + + % cancelling the active consumer + {State4, _, Effects3} = apply(meta(4), + make_checkout({<<"ctag2">>, self()}, + cancel, #{}), + State3), + % the last waiting consumer became the active one + ?assertEqual(1, map_size(State4#rabbit_fifo.consumers)), + ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)), + % the waiting consumer list is now empty + ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects3)), + + % cancelling the last consumer + {State5, _, Effects4} = apply(meta(5), + make_checkout({<<"ctag4">>, self()}, + cancel, #{}), + State4), + % no active consumer anymore + ?assertEqual(0, map_size(State5#rabbit_fifo.consumers)), + % still nothing in the waiting list + ?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects4)), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}), + State), + NewState end, - process_ra_events0(State, Acc, [], Wait, DeliveryFun). - -process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) -> - receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, Actions, State} -> - process_ra_events0(State, Acc, Actions0 ++ Actions, - Wait, DeliveryFun); - {{delivery, _Tag, Msgs} = Del, State1} -> - State = DeliveryFun(Del, State1), - process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun); - eol -> - eol - end - after Wait -> - {Acc, Actions0, State0} - end. - -discard_next_delivery(State0, Wait) -> - receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _Actions, State} -> - discard_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, - State1), - State - end - after Wait -> - State0 - end. - -return_next_delivery(State0, Wait) -> - receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _, State} -> - return_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, - State1), - State - end - after Wait -> - State0 - end. - -validate_process_down(Name, 0) -> - exit({process_not_down, Name}); -validate_process_down(Name, Num) -> - case whereis(Name) of - undefined -> - ok; - _ -> - timer:sleep(100), - validate_process_down(Name, Num-1) - end. - -start_cluster(ClusterName, ServerIds, RaFifoConfig) -> - {ok, Started, _} = ra:start_cluster(ClusterName#resource.name, - {module, rabbit_fifo, RaFifoConfig}, - ServerIds), - ?assertEqual(length(Started), length(ServerIds)), - ok. - -start_cluster(ClusterName, ServerIds) -> - start_cluster(ClusterName, ServerIds, #{name => some_name, - queue_resource => ClusterName}). - -flush() -> - receive - Msg -> - ct:pal("flushed: ~w~n", [Msg]), - flush() - after 10 -> - ok - end. + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), + % effects to unregister the consumer and + % to update the new active one (metrics) are there + ?assertEqual(2, length(Effects)), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?assertEqual(3, length(Effects2)), + + % the last channel goes down + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), + ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects3)), + + ok. + +single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + % simulate node goes down + {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + + % all the waiting consumers should be suspected down + ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)), + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status == suspected_down) + end, State2#rabbit_fifo.waiting_consumers), + + % simulate node goes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), + + % all the waiting consumers should be un-suspected + ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)), + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) + end, State3#rabbit_fifo.waiting_consumers), + + ok. + +single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo:state_enter(leader, State1), + % 2 effects for each consumer process (channel process), 1 effect for the node + ?assertEqual(2 * 3 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo:state_enter(eol, State1), + % 1 effect for each consumer process (channel process) + ?assertEqual(3, length(Effects)). + +query_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#rabbit_fifo.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), + State2 = State1#rabbit_fifo{consumers = Consumers1}, + + ?assertEqual(4, rabbit_fifo:query_consumer_count(State2)), + Consumers2 = rabbit_fifo:query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, rabbit_fifo:query_consumer_count(State1)), + Consumers = rabbit_fifo:query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], Consumers). + +active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = + apply( + #{index => 1}, + rabbit_fifo:make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + % only 1 effect to monitor the node + ?assertEqual(1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(4, length(Effects3)). + +meta(Idx) -> + #{index => Idx, term => 1}. + +enq(Idx, MsgSeq, Msg, State) -> + strip_reply( + apply(meta(Idx), rabbit_fifo:make_enqueue(self(), MsgSeq, Msg), State)). + +deq(Idx, Cid, Settlement, State0) -> + {State, {dequeue, {MsgId, Msg}, _}, _} = + apply(meta(Idx), + rabbit_fifo:make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), + {State, {MsgId, Msg}}. + +check_n(Cid, Idx, N, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo:make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo:make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + State)). + +check_auto(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, Num, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), + State)). + +settle(Cid, Idx, MsgId, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo:make_settle(Cid, [MsgId]), State)). + +credit(Cid, Idx, Credit, DelCnt, Drain, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo:make_credit(Cid, Credit, DelCnt, Drain), + State)). + +strip_reply({State, _, Effects}) -> + {State, Effects}. + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} + end + end, {InitState, []}, Entries). + + +%% AUX Tests + +aux_test(_) -> + _ = ra_machine_ets:start_link(), + Aux0 = init_aux(aux_test), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Log = undefined, + {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0, + Log, MacState), + {no_reply, _Aux, undefined} = handle_aux(leader, cast, emit, Aux, + Log, MacState), + [X] = ets:lookup(rabbit_fifo_usage, aux_test), + ?assert(X > 0.0), + ok. + +%% Utility + +init(Conf) -> rabbit_fifo:init(Conf). +apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). +init_aux(Conf) -> rabbit_fifo:init_aux(Conf). +handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). +make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M). diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl new file mode 100644 index 0000000000..f281d15795 --- /dev/null +++ b/test/rabbit_fifo_int_SUITE.erl @@ -0,0 +1,640 @@ +-module(rabbit_fifo_int_SUITE). + +%% rabbit_fifo and rabbit_fifo_client integration suite + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +all() -> + [ + {group, tests} + ]. + +all_tests() -> + [ + basics, + return, + rabbit_fifo_returns_correlation, + resends_lost_command, + returns_after_down, + resends_after_lost_applied, + handles_reject_notification, + two_quick_enqueues, + detects_lost_delivery, + dequeue, + discard, + cancel_checkout, + credit, + untracked_enqueue, + flow, + test_queries, + duplicate_delivery, + usage + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_group(_, Config) -> + PrivDir = ?config(priv_dir, Config), + _ = application:load(ra), + ok = application:set_env(ra, data_dir, PrivDir), + application:ensure_all_started(ra), + application:ensure_all_started(lg), + Config. + +end_per_group(_, Config) -> + _ = application:stop(ra), + Config. + +init_per_testcase(TestCase, Config) -> + meck:new(rabbit_quorum_queue, [passthrough]), + meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, cancel_consumer_handler, + fun (_, _) -> ok end), + ra_server_sup_sup:remove_all(), + ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), + ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), + ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), + [ + {cluster_name, ClusterName}, + {uid, atom_to_binary(TestCase, utf8)}, + {node_id, {TestCase, node()}}, + {uid2, atom_to_binary(ServerName2, utf8)}, + {node_id2, {ServerName2, node()}}, + {uid3, atom_to_binary(ServerName3, utf8)}, + {node_id3, {ServerName3, node()}} + | Config]. + +end_per_testcase(_, Config) -> + meck:unload(), + Config. + +basics(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + CustomerTag = UId, + ok = start_cluster(ClusterName, [ServerId]), + FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), + + ra_log_wal:force_roll_over(ra_log_wal), + % create segment the segment will trigger a snapshot + timer:sleep(1000), + + {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1), + % process ra events + FState3 = process_ra_event(FState2, 250), + + FState5 = receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of + {internal, _AcceptedSeqs, _Actions, _FState4} -> + exit(unexpected_internal_event); + {{delivery, C, [{MsgId, _Msg}]}, FState4} -> + {ok, S} = rabbit_fifo_client:settle(C, [MsgId], + FState4), + S + end + after 5000 -> + exit(await_msg_timeout) + end, + + % process settle applied notification + FState5b = process_ra_event(FState5, 250), + _ = ra:stop_server(ServerId), + _ = ra:restart_server(ServerId), + + %% wait for leader change to notice server is up again + receive + {ra_event, _, {machine, leader_change}} -> ok + after 5000 -> + exit(leader_change_timeout) + end, + + {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b), + % process applied event + FState6b = process_ra_event(FState6, 250), + + receive + {ra_event, Frm, E} -> + case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of + {internal, _, _, _FState7} -> + exit({unexpected_internal_event, E}); + {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> + {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), + ok + end + after 2000 -> + exit(await_msg_timeout) + end, + ra:stop_server(ServerId), + ok. + +return(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ServerId2 = ?config(node_id2, Config), + ok = start_cluster(ClusterName, [ServerId, ServerId2]), + + F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]), + {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), + {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), + {_, _, F2} = process_ra_events(F1, 100), + {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), + {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), + + ra:stop_server(ServerId), + ok. + +rabbit_fifo_returns_correlation(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0), + receive + {ra_event, Frm, E} -> + case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of + {internal, [corr1], [], _F2} -> + ok; + {Del, _} -> + exit({unexpected, Del}) + end + after 2000 -> + exit(await_msg_timeout) + end, + ra:stop_server(ServerId), + ok. + +duplicate_delivery(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), + Fun = fun Loop(S0) -> + receive + {ra_event, Frm, E} = Evt -> + case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of + {internal, [corr1], [], S1} -> + Loop(S1); + {_Del, S1} -> + %% repeat event delivery + self() ! Evt, + %% check that then next received delivery doesn't + %% repeat or crash + receive + {ra_event, F, E1} -> + case rabbit_fifo_client:handle_ra_event(F, E1, S1) of + {internal, [], [], S2} -> + S2 + end + end + end + after 2000 -> + exit(await_msg_timeout) + end + end, + Fun(F2), + ra:stop_server(ServerId), + ok. + +usage(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), + {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), + {_, _, _} = process_ra_events(F3, 50), + % force tick and usage stats emission + ServerId ! tick_timeout, + timer:sleep(50), + Use = rabbit_fifo:usage(element(1, ServerId)), + ra:stop_server(ServerId), + ?assert(Use > 0.0), + ok. + +resends_lost_command(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + ok = meck:new(ra, [passthrough]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), + % lose the enqueue + meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + meck:unload(ra), + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + {_, _, F4} = process_ra_events(F3, 500), + {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + ra:stop_server(ServerId), + ok. + +two_quick_enqueues(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + _ = process_ra_events(F2, 500), + ra:stop_server(ServerId), + ok. + +detects_lost_delivery(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), + {_, _, F0} = process_ra_events(F00, 100), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + % lose first delivery + receive + {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> + ok + after 500 -> + exit(await_delivery_timeout) + end, + + % assert three deliveries were received + {[_, _, _], _, _} = process_ra_events(F3, 500), + ra:stop_server(ServerId), + ok. + +returns_after_down(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), + {_, _, F2} = process_ra_events(F1, 500), + % start a customer in a separate processes + % that exits after checkout + Self = self(), + _Pid = spawn(fun () -> + F = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, + undefined, F), + Self ! checkout_done + end), + receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, + timer:sleep(1000), + % message should be available for dequeue + {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), + ra:stop_server(ServerId), + ok. + +resends_after_lost_applied(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)), + 500), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + % lose an applied event + receive + {ra_event, _, {applied, _}} -> + ok + after 500 -> + exit(await_ra_event_timeout) + end, + % send another message + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + {_, _, F4} = process_ra_events(F3, 500), + {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + ra:stop_server(ServerId), + ok. + +handles_reject_notification(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(node_id, Config), + ServerId2 = ?config(node_id2, Config), + UId1 = ?config(uid, Config), + CId = {UId1, self()}, + + ok = start_cluster(ClusterName, [ServerId1, ServerId2]), + _ = ra:process_command(ServerId1, + rabbit_fifo:make_checkout( + CId, + {auto, 10, simple_prefetch}, + #{})), + % reverse order - should try the first node in the list first + F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]), + {ok, F1} = rabbit_fifo_client:enqueue(one, F0), + + timer:sleep(500), + + % the applied notification + _F2 = process_ra_event(F1, 250), + ra:stop_server(ServerId1), + ra:stop_server(ServerId2), + ok. + +discard(Config) -> + PrivDir = ?config(priv_dir, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + ClusterName = ?config(cluster_name, Config), + Conf = #{cluster_name => ClusterName#resource.name, + id => ServerId, + uid => UId, + log_init_args => #{data_dir => PrivDir, uid => UId}, + initial_member => [], + machine => {module, rabbit_fifo, + #{queue_resource => discard, + dead_letter_handler => + {?MODULE, dead_letter_handler, [self()]}}}}, + _ = ra:start_server(Conf), + ok = ra:trigger_election(ServerId), + _ = ra:members(ServerId), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), + F3 = discard_next_delivery(F2, 500), + {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), + receive + {dead_letter, Letters} -> + [{_, msg1}] = Letters, + ok + after 500 -> + exit(dead_letter_timeout) + end, + ra:stop_server(ServerId), + ok. + +cancel_checkout(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), + {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), + {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), + {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), + ok. + +credit(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + {_, _, F3} = process_ra_events(F2, [], 250), + %% checkout with 0 prefetch + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), + %% assert no deliveries + {_, _, F5} = process_ra_events0(F4, [], [], 250, + fun + (D, _) -> error({unexpected_delivery, D}) + end), + %% provide some credit + {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), + {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = + process_ra_events(F6, [], 250), + + %% credit and drain + {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), + {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = + process_ra_events(F8, [], 250), + flush(), + + %% enqueue another message - at this point the consumer credit should be + %% all used up due to the drain + {ok, F10} = rabbit_fifo_client:enqueue(m3, F9), + %% assert no deliveries + {_, _, F11} = process_ra_events0(F10, [], [], 250, + fun + (D, _) -> error({unexpected_delivery, D}) + end), + %% credit again and receive the last message + {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), + {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250), + ok. + +untracked_enqueue(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), + timer:sleep(100), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), + ra:stop_server(ServerId), + ok. + + +flow(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + {ok, F3} = rabbit_fifo_client:enqueue(m3, F2), + {slow, F4} = rabbit_fifo_client:enqueue(m4, F3), + {_, _, F5} = process_ra_events(F4, 500), + {ok, _} = rabbit_fifo_client:enqueue(m5, F5), + ra:stop_server(ServerId), + ok. + +test_queries(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + P = spawn(fun () -> + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + process_ra_events(F2, 100), + receive stop -> ok end + end), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), + {ok, {_, Ready}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), + {ok, {_, Checked}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), + {ok, {_, Processes}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_processes/1), + ?assertEqual(2, length(Processes)), + P ! stop, + ra:stop_server(ServerId), + ok. + +dead_letter_handler(Pid, Msgs) -> + Pid ! {dead_letter, Msgs}. + +dequeue(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + Tag = UId, + ok = start_cluster(ClusterName, [ServerId]), + F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), + {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), + {_, _, F2} = process_ra_events(F2_, 100), + + {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), + {_, _, F4} = process_ra_events(F4_, 100), + {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), + {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), + ra:stop_server(ServerId), + ok. + +enq_deq_n(N, F0) -> + enq_deq_n(N, F0, []). + +enq_deq_n(0, F0, Acc) -> + {_, _, F} = process_ra_events(F0, 100), + {F, Acc}; +enq_deq_n(N, F, Acc) -> + {ok, F1} = rabbit_fifo_client:enqueue(N, F), + {_, _, F2} = process_ra_events(F1, 10), + {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), + + {_, _, F4} = process_ra_events(F3, 5), + enq_deq_n(N-1, F4, [Deq | Acc]). + +conf(ClusterName, UId, ServerId, _, Peers) -> + #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + log_init_args => #{uid => UId}, + initial_members => Peers, + machine => {module, rabbit_fifo, #{}}}. + +process_ra_event(State, Wait) -> + receive + {ra_event, From, Evt} -> + ct:pal("processed ra event ~p~n", [Evt]), + {internal, _, _, S} = + rabbit_fifo_client:handle_ra_event(From, Evt, State), + S + after Wait -> + exit(ra_event_timeout) + end. + +process_ra_events(State0, Wait) -> + process_ra_events(State0, [], Wait). + +process_ra_events(State, Acc, Wait) -> + DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> + MsgIds = [element(1, M) || M <- Msgs], + {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), + S2 + end, + process_ra_events0(State, Acc, [], Wait, DeliveryFun). + +process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) -> + receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, Actions, State} -> + process_ra_events0(State, Acc, Actions0 ++ Actions, + Wait, DeliveryFun); + {{delivery, _Tag, Msgs} = Del, State1} -> + State = DeliveryFun(Del, State1), + process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun); + eol -> + eol + end + after Wait -> + {Acc, Actions0, State0} + end. + +discard_next_delivery(State0, Wait) -> + receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, _Actions, State} -> + discard_next_delivery(State, Wait); + {{delivery, Tag, Msgs}, State1} -> + MsgIds = [element(1, M) || M <- Msgs], + {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, + State1), + State + end + after Wait -> + State0 + end. + +return_next_delivery(State0, Wait) -> + receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, _, State} -> + return_next_delivery(State, Wait); + {{delivery, Tag, Msgs}, State1} -> + MsgIds = [element(1, M) || M <- Msgs], + {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, + State1), + State + end + after Wait -> + State0 + end. + +validate_process_down(Name, 0) -> + exit({process_not_down, Name}); +validate_process_down(Name, Num) -> + case whereis(Name) of + undefined -> + ok; + _ -> + timer:sleep(100), + validate_process_down(Name, Num-1) + end. + +start_cluster(ClusterName, ServerIds, RaFifoConfig) -> + {ok, Started, _} = ra:start_cluster(ClusterName#resource.name, + {module, rabbit_fifo, RaFifoConfig}, + ServerIds), + ?assertEqual(length(Started), length(ServerIds)), + ok. + +start_cluster(ClusterName, ServerIds) -> + start_cluster(ClusterName, ServerIds, #{name => some_name, + queue_resource => ClusterName}). + +flush() -> + receive + Msg -> + ct:pal("flushed: ~w~n", [Msg]), + flush() + after 10 -> + ok + end. diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 437cd02e25..da72c030cd 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -5,8 +5,8 @@ -export([ ]). --include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). %%%=================================================================== @@ -35,7 +35,10 @@ all_tests() -> scenario11, scenario12, scenario13, - scenario14 + scenario14, + scenario15, + scenario16, + fake_pid ]. groups() -> @@ -236,26 +239,80 @@ scenario14(_Config) -> max_bytes => 1}, Commands), ok. +scenario15(_Config) -> + C1 = {<<>>, c:pid(0,179,1)}, + E = c:pid(0,176,1), + Commands = [make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E, 1, msg1), + make_enqueue(E, 2, msg2), + make_return(C1, [0]), + make_return(C1, [2]), + make_settle(C1, [1]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + delivery_limit => 1}, Commands), + ok. + +scenario16(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + C2 = {<<>>, c:pid(0,882,1)}, + E = c:pid(0,176,1), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E, 1, msg1), + make_checkout(C2, {auto,1,simple_prefetch}), + {down, C1Pid, noproc}, %% msg1 allocated to C2 + make_return(C2, [0]), %% msg1 returned + make_enqueue(E, 2, <<>>), + make_settle(C2, [0]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + delivery_limit => 1}, Commands), + ok. + +fake_pid(_Config) -> + Pid = fake_external_pid(<<"mynode@banana">>), + ?assertNotEqual(node(Pid), node()), + ?assert(is_pid(Pid)), + ok. + +fake_external_pid(Node) when is_binary(Node) -> + ThisNodeSize = size(term_to_binary(node())) + 1, + Pid = spawn(fun () -> ok end), + %% drop the local node data from a local pid + <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), + S = size(Node), + %% replace it with the incoming node binary + Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>, + binary_to_term(Final). + snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer}, - frequency([{10, {0, 0, false}}, - {5, {non_neg_integer(), non_neg_integer(), - boolean()}}]), - ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)), - collect({Length, Bytes}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, + frequency([{10, {0, 0, false, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + boolean(), + oneof([range(1, 3), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), + collect({log_size, length(O)}, snapshots_prop( config(?FUNCTION_NAME, - Length, Bytes, - SingleActiveConsumer), O)))) - end, [], 2000). + Length, + Bytes, + SingleActiveConsumer, + DeliveryLimit), O)))) + end, [], 2500). -config(Name, Length, Bytes, SingleActive) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), - single_active_consumer_on => SingleActive}. + single_active_consumer_on => SingleActive, + delivery_limit => map_max(DeliveryLimit)}. map_max(0) -> undefined; map_max(N) -> N. @@ -271,8 +328,12 @@ snapshots_prop(Conf, Commands) -> end. log_gen(Size) -> - ?LET(EPids, vector(2, pid_gen()), - ?LET(CPids, vector(2, pid_gen()), + Nodes = [node(), + fakenode@fake, + fakenode@fake2 + ], + ?LET(EPids, vector(2, pid_gen(Nodes)), + ?LET(CPids, vector(2, pid_gen(Nodes)), resize(Size, list( frequency( @@ -285,15 +346,20 @@ log_gen(Size) -> {2, checkout_gen(oneof(CPids))}, {1, checkout_cancel_gen(oneof(CPids))}, {1, down_gen(oneof(EPids ++ CPids))}, + {1, nodeup_gen(Nodes)}, {1, purge} ]))))). -pid_gen() -> - ?LET(_, integer(), spawn(fun () -> ok end)). +pid_gen(Nodes) -> + ?LET(Node, oneof(Nodes), + fake_external_pid(atom_to_binary(Node, utf8))). down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). +nodeup_gen(Nodes) -> + {nodeup, oneof(Nodes)}. + enqueue_gen(Pid) -> ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, @@ -391,6 +457,8 @@ handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) -> %% it is either not down or down with noconnection do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)}) end; +handle_op({nodeup, _} = Cmd, T) -> + do_apply(Cmd, T); handle_op({input_event, requeue}, #t{effects = Effs} = T) -> %% this simulates certain settlements arriving out of order case queue:out(Effs) of @@ -477,6 +545,7 @@ run_snapshot_test0(Conf, Commands) -> State = rabbit_fifo:normalize(State0), [begin + % ct:pal("release_cursor: ~b~n", [SnapIdx]), %% drop all entries below and including the snapshot Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false @@ -490,6 +559,7 @@ run_snapshot_test0(Conf, Commands) -> ct:pal("Snapshot tests failed run log:~n" "~p~n from ~n~p~n Entries~n~p~n", [Filtered, SnapState, Entries]), + ct:pal("Expected~n~p~nGot:~n~p", [State, S]), ?assertEqual(State, S) end end || {release_cursor, SnapIdx, SnapState} <- Effects], |
