diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-06-29 12:30:02 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
| commit | 613ca58f81b99643c14b944f1ea73896c79d9cf1 (patch) | |
| tree | cfced0990c43cc84c346ef1ef8f6558c51e679b1 | |
| parent | 931f7d4fbca334b5da99ec2cc45121c8a2917dde (diff) | |
| download | rabbitmq-server-git-613ca58f81b99643c14b944f1ea73896c79d9cf1.tar.gz | |
rabbit_fifo machine version 1
First commit in a series that will evolve the rabbit_fifo state machine
into version 1.
| -rw-r--r-- | src/rabbit_fifo.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.erl | 1924 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.hrl | 195 | ||||
| -rw-r--r-- | test/rabbit_fifo_v0_SUITE.erl | 1395 |
4 files changed, 3527 insertions, 1 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 0d0e37830a..fb1794cae4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -23,6 +23,9 @@ tick/2, overview/1, get_checked_out/4, + %% versioning + version/0, + which_module/1, %% aux init_aux/1, handle_aux/6, @@ -463,7 +466,10 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> end, {State0, []}, Nodes), {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> - checkout(Meta, update_config(Conf, State), []). + checkout(Meta, update_config(Conf, State), []); +apply(_Meta, {machine_version, 0, 1}, State) -> + %% quick hack to "convert" the state from version one + {setelement(1, State, ?MODULE), ok, []}. purge_node(Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> @@ -637,6 +643,12 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> [] end. +-spec version() -> pos_integer(). +version() -> 1. + +which_module(0) -> rabbit_fifo_v0; +which_module(1) -> ?MODULE. + -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), utilisation :: term(), diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl new file mode 100644 index 0000000000..01330fe54f --- /dev/null +++ b/src/rabbit_fifo_v0.erl @@ -0,0 +1,1924 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_fifo_v0). + +-behaviour(ra_machine). + +-compile(inline_list_funcs). +-compile(inline). +-compile({no_auto_import, [apply/3]}). + +-include("rabbit_fifo_v0.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([ + init/1, + apply/3, + state_enter/2, + tick/2, + overview/1, + get_checked_out/4, + %% aux + init_aux/1, + handle_aux/6, + % queries + query_messages_ready/1, + query_messages_checked_out/1, + query_messages_total/1, + query_processes/1, + query_ra_indexes/1, + query_consumer_count/1, + query_consumers/1, + query_stat/1, + query_single_active_consumer/1, + query_in_memory_usage/1, + usage/1, + + zero/1, + + %% misc + dehydrate_state/1, + normalize/1, + + %% protocol helpers + make_enqueue/3, + make_checkout/3, + make_settle/2, + make_return/2, + make_discard/2, + make_credit/4, + make_purge/0, + make_purge_nodes/1, + make_update_config/1 + ]). + +%% command records representing all the protocol actions that are supported +-record(enqueue, {pid :: option(pid()), + seq :: option(msg_seqno()), + msg :: raw_msg()}). +-record(checkout, {consumer_id :: consumer_id(), + spec :: checkout_spec(), + meta :: consumer_meta()}). +-record(settle, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(return, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(discard, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(credit, {consumer_id :: consumer_id(), + credit :: non_neg_integer(), + delivery_count :: non_neg_integer(), + drain :: boolean()}). +-record(purge, {}). +-record(purge_nodes, {nodes :: [node()]}). +-record(update_config, {config :: config()}). + +-opaque protocol() :: + #enqueue{} | + #checkout{} | + #settle{} | + #return{} | + #discard{} | + #credit{} | + #purge{} | + #purge_nodes{} | + #update_config{}. + +-type command() :: protocol() | ra_machine:builtin_command(). +%% all the command types supported by ra fifo + +-type client_msg() :: delivery(). +%% the messages `rabbit_fifo' can send to consumers. + +-opaque state() :: #?MODULE{}. + +-export_type([protocol/0, + delivery/0, + command/0, + credit_mode/0, + consumer_tag/0, + consumer_meta/0, + consumer_id/0, + client_msg/0, + msg/0, + msg_id/0, + msg_seqno/0, + delivery_msg/0, + state/0, + config/0]). + +-spec init(config()) -> state(). +init(#{name := Name, + queue_resource := Resource} = Conf) -> + update_config(Conf, #?MODULE{cfg = #cfg{name = Name, + resource = Resource}}). + +update_config(Conf, State) -> + DLH = maps:get(dead_letter_handler, Conf, undefined), + BLH = maps:get(become_leader_handler, Conf, undefined), + SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), + MaxLength = maps:get(max_length, Conf, undefined), + MaxBytes = maps:get(max_bytes, Conf, undefined), + MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined), + MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined), + DeliveryLimit = maps:get(delivery_limit, Conf, undefined), + ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of + true -> + single_active; + false -> + competing + end, + Cfg = State#?MODULE.cfg, + SHICur = case State#?MODULE.cfg of + #cfg{release_cursor_interval = {_, C}} -> + C; + #cfg{release_cursor_interval = undefined} -> + SHI; + #cfg{release_cursor_interval = C} -> + C + end, + + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, + dead_letter_handler = DLH, + become_leader_handler = BLH, + max_length = MaxLength, + max_bytes = MaxBytes, + max_in_memory_length = MaxMemoryLength, + max_in_memory_bytes = MaxMemoryBytes, + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}}. + +zero(_) -> + 0. + +% msg_ids are scoped per consumer +% ra_indexes holds all raft indexes for enqueues currently on queue +-spec apply(ra_machine:command_meta_data(), command(), state()) -> + {state(), Reply :: term(), ra_machine:effects()} | + {state(), Reply :: term()}. +apply(Metadata, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, State00) -> + apply_enqueue(Metadata, From, Seq, RawMsg, State00); +apply(Meta, + #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, + #?MODULE{consumers = Cons0} = State) -> + case Cons0 of + #{ConsumerId := Con0} -> + % need to increment metrics before completing as any snapshot + % states taken need to include them + complete_and_checkout(Meta, MsgIds, ConsumerId, + Con0, [], State); + _ -> + {State, ok} + + end; +apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, + #?MODULE{consumers = Cons0} = State0) -> + case Cons0 of + #{ConsumerId := Con0} -> + Discarded = maps:with(MsgIds, Con0#consumer.checked_out), + Effects = dead_letter_effects(rejected, Discarded, State0, []), + complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, + Effects, State0); + _ -> + {State0, ok} + end; +apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, + #?MODULE{consumers = Cons0} = State) -> + case Cons0 of + #{ConsumerId := #consumer{checked_out = Checked0}} -> + Returned = maps:with(MsgIds, Checked0), + return(Meta, ConsumerId, Returned, [], State); + _ -> + {State, ok} + end; +apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, + drain = Drain, consumer_id = ConsumerId}, + #?MODULE{consumers = Cons0, + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> + case Cons0 of + #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> + %% this can go below 0 when credit is reduced + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + %% grant the credit + Con1 = Con0#consumer{credit = C}, + ServiceQueue = maybe_queue_consumer(ConsumerId, Con1, + ServiceQueue0), + Cons = maps:put(ConsumerId, Con1, Cons0), + {State1, ok, Effects} = + checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, + consumers = Cons}, []), + Response = {send_credit_reply, messages_ready(State1)}, + %% by this point all checkouts for the updated credit value + %% should be processed so we can evaluate the drain + case Drain of + false -> + %% just return the result of the checkout + {State1, Response, Effects}; + true -> + Con = #consumer{credit = PostCred} = + maps:get(ConsumerId, State1#?MODULE.consumers), + %% add the outstanding credit to the delivery count + DeliveryCount = Con#consumer.delivery_count + PostCred, + Consumers = maps:put(ConsumerId, + Con#consumer{delivery_count = DeliveryCount, + credit = 0}, + State1#?MODULE.consumers), + Drained = Con#consumer.credit, + {CTag, _} = ConsumerId, + {State1#?MODULE{consumers = Consumers}, + %% returning a multi response with two client actions + %% for the channel to execute + {multi, [Response, {send_drained, {CTag, Drained}}]}, + Effects} + end; + _ when Waiting0 /= [] -> + %% there are waiting consuemrs + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} -> + %% the consumer is a waiting one + %% grant the credit + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + Con = Con0#consumer{credit = C}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, {send_credit_reply, messages_ready(State)}}; + false -> + {State0, ok} + end; + _ -> + %% credit for unknown consumer - just ignore + {State0, ok} + end; +apply(_, #checkout{spec = {dequeue, _}}, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> + {State0, {error, unsupported}}; +apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, + meta = ConsumerMeta, + consumer_id = ConsumerId}, + #?MODULE{consumers = Consumers} = State0) -> + Exists = maps:is_key(ConsumerId, Consumers), + case messages_ready(State0) of + 0 -> + {State0, {dequeue, empty}}; + _ when Exists -> + %% a dequeue using the same consumer_id isn't possible at this point + {State0, {dequeue, empty}}; + Ready -> + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, + State0), + {success, _, MsgId, Msg, State2} = checkout_one(State1), + {State, Effects} = case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State3, _, Effects0} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), + {State3, Effects0} + end, + case Msg of + {RaftIdx, {Header, 'empty'}} -> + %% TODO add here new log effect with reply + {State, '$ra_no_reply', + reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)}; + _ -> + {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} + end + end; +apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> + {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), + checkout(Meta, State, Effects); +apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, + consumer_id = {_, Pid} = ConsumerId}, + State0) -> + State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), + checkout(Meta, State1, [{monitor, process, Pid}]); +apply(#{index := RaftIdx}, #purge{}, + #?MODULE{ra_indexes = Indexes0, + returns = Returns, + messages = Messages} = State0) -> + Total = messages_ready(State0), + Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, + [I || {I, _} <- lists:sort(maps:values(Messages))]), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, + [I || {_, {I, _}} <- lqueue:to_list(Returns)]), + {State, _, Effects} = + update_smallest_raft_index(RaftIdx, + State0#?MODULE{ra_indexes = Indexes, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + prefix_msgs = {0, [], 0, []}, + low_msg_num = undefined, + msg_bytes_in_memory = 0, + msgs_ready_in_memory = 0}, + []), + %% as we're not checking out after a purge (no point) we have to + %% reverse the effects ourselves + {State, {purge, Total}, + lists:reverse([garbage_collection | Effects])}; +apply(Meta, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0, + enqueuers = Enqs0} = State0) -> + Node = node(Pid), + %% if the pid refers to an active or cancelled consumer, + %% mark it as suspected and return it to the waiting queue + {State1, Effects0} = + maps:fold(fun({_, P} = Cid, C0, {S0, E0}) + when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + S0, Cid, C0, false, suspected_down, E0), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(S0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + waiting_consumers = Waiting}, + Effs1}; + (_, _, S) -> + S + end, {State0, []}, Cons0), + WaitingConsumers = update_waiting_consumer_status(Node, State1, + suspected_down), + + %% select a new consumer from the waiting queue and run a checkout + State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + {State, Effects1} = activate_next_consumer(State2, Effects0), + + %% mark any enquers as suspected + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + Effects = [{monitor, node, Node} | Effects1], + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + %% A node has been disconnected. This doesn't necessarily mean that + %% any processes on this node are down, they _may_ come back so here + %% we just mark them as suspected (effectively deactivated) + %% and return all checked out messages to the main queue for delivery to any + %% live consumers + %% + %% all pids for the disconnected node will be marked as suspected not just + %% the one we got the `down' command for + Node = node(Pid), + + {State, Effects1} = + maps:fold( + fun({_, P} = Cid, #consumer{checked_out = Checked0, + status = up} = C0, + {St0, Eff}) when node(P) =:= Node -> + Credit = increase_credit(C0, map_size(Checked0)), + C = C0#consumer{status = suspected_down, + credit = Credit}, + {St, Eff0} = return_all(St0, Eff, Cid, C), + Eff1 = consumer_update_active_effects(St, Cid, C, false, + suspected_down, Eff0), + {St, Eff1}; + (_, _, {St, Eff}) -> + {St, Eff} + end, {State0, []}, Cons0), + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + + % Monitor the node so that we can "unsuspect" these processes when the node + % comes back, then re-issue all monitors and discover the final fate of + % these processes + Effects = case maps:size(State#?MODULE.consumers) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, _Info}, State0) -> + {State, Effects} = handle_down(Pid, State0), + checkout(Meta, State, Effects); +apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> + %% A node we are monitoring has come back. + %% If we have suspected any processes of being + %% down we should now re-issue the monitors for them to detect if they're + %% actually down or not + Monitors = [{monitor, process, P} + || P <- suspected_pids_for(Node, State0)], + + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = up}; + (_, E) -> E + end, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), + %% mark all consumers as up + {Cons1, SQ, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + C, true, up, EAcc), + update_or_remove_sub(ConsumerId, + C#consumer{status = up}, CAcc, + SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State0, up), + State1 = State0#?MODULE{consumers = Cons1, + enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = Waiting}, + {State, Effects} = activate_next_consumer(State1, Effects1), + checkout(Meta, State, Effects); +apply(_, {nodedown, _Node}, State) -> + {State, ok}; +apply(_, #purge_nodes{nodes = Nodes}, State0) -> + {State, Effects} = lists:foldl(fun(Node, {S, E}) -> + purge_node(Node, S, E) + end, {State0, []}, Nodes), + {State, ok, Effects}; +apply(Meta, #update_config{config = Conf}, State) -> + checkout(Meta, update_config(Conf, State), []). + +purge_node(Node, State, Effects) -> + lists:foldl(fun(Pid, {S0, E0}) -> + {S, E} = handle_down(Pid, S0), + {S, E0 ++ E} + end, {State, Effects}, all_pids_for(Node, State)). + +%% any downs that re not noconnection +handle_down(Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + % Remove any enqueuer for the same pid and enqueue any pending messages + % This should be ok as we won't see any more enqueues from this pid + State1 = case maps:take(Pid, Enqs0) of + {#enqueuer{pending = Pend}, Enqs} -> + lists:foldl(fun ({_, RIdx, RawMsg}, S) -> + enqueue(RIdx, RawMsg, S) + end, State0#?MODULE{enqueuers = Enqs}, Pend); + error -> + State0 + end, + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), + % return checked out messages to main queue + % Find the consumers for the down pid + DownConsumers = maps:keys( + maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), + lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E, down) + end, {State2, Effects1}, DownConsumers). + +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> + fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> + consumer_update_active_effects(State, ConsumerId, Consumer, Active, + ActivityStatus, Effects) + end; +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) -> + fun(_, _, _, _, _, Effects) -> + Effects + end. + +handle_waiting_consumer_down(_Pid, + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> + {[], State}; +handle_waiting_consumer_down(_Pid, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = []} = State) -> + {[], State}; +handle_waiting_consumer_down(Pid, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0) -> + % get cancel effects for down waiting consumers + Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, + WaitingConsumers0), + Effects = lists:foldl(fun ({ConsumerId, _}, Effects) -> + cancel_consumer_effects(ConsumerId, State0, + Effects) + end, [], Down), + % update state to have only up waiting consumers + StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, + WaitingConsumers0), + State = State0#?MODULE{waiting_consumers = StillUp}, + {Effects, State}. + +update_waiting_consumer_status(Node, + #?MODULE{waiting_consumers = WaitingConsumers}, + Status) -> + [begin + case node(Pid) of + Node -> + {ConsumerId, Consumer#consumer{status = Status}}; + _ -> + {ConsumerId, Consumer} + end + end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. + +-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). +state_enter(leader, #?MODULE{consumers = Cons, + enqueuers = Enqs, + waiting_consumers = WaitingConsumers, + cfg = #cfg{name = Name, + resource = Resource, + become_leader_handler = BLH}, + prefix_msgs = {0, [], 0, []} + }) -> + % return effects to monitor all current consumers and enqueuers + Pids = lists:usort(maps:keys(Enqs) + ++ [P || {_, P} <- maps:keys(Cons)] + ++ [P || {{_, P}, _} <- WaitingConsumers]), + Mons = [{monitor, process, P} || P <- Pids], + Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], + NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), + FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], + Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, + case BLH of + undefined -> + Effects; + {Mod, Fun, Args} -> + [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] + end; +state_enter(eol, #?MODULE{enqueuers = Enqs, + consumers = Custs0, + waiting_consumers = WaitingConsumers0}) -> + Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), + WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, + #{}, WaitingConsumers0), + AllConsumers = maps:merge(Custs, WaitingConsumers1), + [{send_msg, P, eol, ra_event} + || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ + [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; +state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> + FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, + [FHReservation]; + state_enter(_, _) -> + %% catch all as not handling all states + []. + + +-spec tick(non_neg_integer(), state()) -> ra_machine:effects(). +tick(_Ts, #?MODULE{cfg = #cfg{name = Name, + resource = QName}, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> + Metrics = {Name, + messages_ready(State), + num_checked_out(State), % checked out + messages_total(State), + query_consumer_count(State), % Consumers + EnqueueBytes, + CheckoutBytes}, + [{mod_call, rabbit_quorum_queue, + handle_tick, [QName, Metrics, all_nodes(State)]}]. + +-spec overview(state()) -> map(). +overview(#?MODULE{consumers = Cons, + enqueuers = Enqs, + release_cursors = Cursors, + enqueue_count = EnqCount, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> + Conf = #{name => Cfg#cfg.name, + resource => Cfg#cfg.resource, + release_cursor_interval => Cfg#cfg.release_cursor_interval, + dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + max_length => Cfg#cfg.max_length, + max_bytes => Cfg#cfg.max_bytes, + consumer_strategy => Cfg#cfg.consumer_strategy, + max_in_memory_length => Cfg#cfg.max_in_memory_length, + max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes}, + #{type => ?MODULE, + config => Conf, + num_consumers => maps:size(Cons), + num_checked_out => num_checked_out(State), + num_enqueuers => maps:size(Enqs), + num_ready_messages => messages_ready(State), + num_messages => messages_total(State), + num_release_cursors => lqueue:len(Cursors), + release_crusor_enqueue_counter => EnqCount, + enqueue_message_bytes => EnqueueBytes, + checkout_message_bytes => CheckoutBytes}. + +-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> + [delivery_msg()]. +get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> + case Consumers of + #{Cid := #consumer{checked_out = Checked}} -> + [{K, snd(snd(maps:get(K, Checked)))} + || K <- lists:seq(From, To), + maps:is_key(K, Checked)]; + _ -> + [] + end. + +-record(aux_gc, {last_raft_idx = 0 :: ra:index()}). +-record(aux, {name :: atom(), + utilisation :: term(), + gc = #aux_gc{} :: #aux_gc{}}). + +init_aux(Name) when is_atom(Name) -> + %% TODO: catch specific exception throw if table already exists + ok = ra_machine_ets:create_table(rabbit_fifo_usage, + [named_table, set, public, + {write_concurrency, true}]), + Now = erlang:monotonic_time(micro_seconds), + #aux{name = Name, + utilisation = {inactive, Now, 1, 1.0}}. + +handle_aux(_RaState, cast, Cmd, #aux{name = Name, + utilisation = Use0} = State0, + Log, MacState) -> + State = case Cmd of + _ when Cmd == active orelse Cmd == inactive -> + State0#aux{utilisation = update_use(Use0, Cmd)}; + tick -> + true = ets:insert(rabbit_fifo_usage, + {Name, utilisation(Use0)}), + eval_gc(Log, MacState, State0); + eval -> + State0 + end, + {no_reply, State, Log}. + +eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, + #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> + {Idx, _} = ra_log:last_index_term(Log), + {memory, Mem} = erlang:process_info(self(), memory), + case messages_total(MacState) of + 0 when Idx > LastGcIdx andalso + Mem > ?GC_MEM_LIMIT_B -> + garbage_collect(), + {memory, MemAfter} = erlang:process_info(self(), memory), + rabbit_log:debug("~s: full GC sweep complete. " + "Process memory reduced from ~.2fMB to ~.2fMB.", + [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]), + AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}}; + _ -> + AuxState + end. + +%%% Queries + +query_messages_ready(State) -> + messages_ready(State). + +query_messages_checked_out(#?MODULE{consumers = Consumers}) -> + maps:fold(fun (_, #consumer{checked_out = C}, S) -> + maps:size(C) + S + end, 0, Consumers). + +query_messages_total(State) -> + messages_total(State). + +query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> + Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), + maps:keys(maps:merge(Enqs, Cons)). + + +query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> + RaIndexes. + +query_consumer_count(#?MODULE{consumers = Consumers, + waiting_consumers = WaitingConsumers}) -> + maps:size(Consumers) + length(WaitingConsumers). + +query_consumers(#?MODULE{consumers = Consumers, + waiting_consumers = WaitingConsumers, + cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> + ActiveActivityStatusFun = + case ConsumerStrategy of + competing -> + fun(_ConsumerId, + #consumer{status = Status}) -> + case Status of + suspected_down -> + {false, Status}; + _ -> + {true, Status} + end + end; + single_active -> + SingleActiveConsumer = query_single_active_consumer(State), + fun({Tag, Pid} = _Consumer, _) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + {true, single_active}; + _ -> + {false, waiting} + end + end + end, + FromConsumers = + maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = + ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), + FromWaitingConsumers = + lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) -> + Acc; + ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + {Active, ActivityStatus} = + ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, WaitingConsumers), + maps:merge(FromConsumers, FromWaitingConsumers). + +query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, + consumers = Consumers}) -> + case maps:size(Consumers) of + 0 -> + {error, no_value}; + 1 -> + {value, lists:nth(1, maps:keys(Consumers))}; + _ + -> + {error, illegal_size} + end ; +query_single_active_consumer(_) -> + disabled. + +query_stat(#?MODULE{consumers = Consumers} = State) -> + {messages_ready(State), maps:size(Consumers)}. + +query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> + {Length, Bytes}. + +-spec usage(atom()) -> float(). +usage(Name) when is_atom(Name) -> + case ets:lookup(rabbit_fifo_usage, Name) of + [] -> 0.0; + [{_, Use}] -> Use + end. + +%%% Internal + +messages_ready(#?MODULE{messages = M, + prefix_msgs = {RCnt, _R, PCnt, _P}, + returns = R}) -> + + %% prefix messages will rarely have anything in them during normal + %% operations so length/1 is fine here + maps:size(M) + lqueue:len(R) + RCnt + PCnt. + +messages_total(#?MODULE{ra_indexes = I, + prefix_msgs = {RCnt, _R, PCnt, _P}}) -> + rabbit_fifo_index:size(I) + RCnt + PCnt. + +update_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_use({active, Since, Avg}, inactive) -> + Now = erlang:monotonic_time(micro_seconds), + {inactive, Now, Now - Since, Avg}; +update_use({inactive, Since, Active, Avg}, active) -> + Now = erlang:monotonic_time(micro_seconds), + {active, Now, use_avg(Active, Now - Since, Avg)}. + +utilisation({active, Since, Avg}) -> + use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg); +utilisation({inactive, Since, Active, Avg}) -> + use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg). + +use_avg(0, 0, Avg) -> + Avg; +use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg). + +moving_average(_Time, _, Next, undefined) -> + Next; +moving_average(Time, HalfLife, Next, Current) -> + Weight = math:exp(Time * math:log(0.5) / HalfLife), + Next * (1 - Weight) + Current * Weight. + +num_checked_out(#?MODULE{consumers = Cons}) -> + maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> + maps:size(C) + Acc + end, 0, Cons). + +cancel_consumer(ConsumerId, + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects, Reason) -> + cancel_consumer0(ConsumerId, State, Effects, Reason); +cancel_consumer(ConsumerId, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = []} = State, + Effects, Reason) -> + %% single active consumer on, no consumers are waiting + cancel_consumer0(ConsumerId, State, Effects, Reason); +cancel_consumer(ConsumerId, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0} = State0, + Effects0, Reason) -> + %% single active consumer on, consumers are waiting + case maps:is_key(ConsumerId, Cons0) of + true -> + % The active consumer is to be removed + {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + Effects0, Reason), + activate_next_consumer(State1, Effects1); + false -> + % The cancelled consumer is not active or cancelled + % Just remove it from idle_consumers + Waiting = lists:keydelete(ConsumerId, 1, Waiting0), + Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), + % A waiting consumer isn't supposed to have any checked out messages, + % so nothing special to do here + {State0#?MODULE{waiting_consumers = Waiting}, Effects} + end. + +consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, + ConsumerId, #consumer{meta = Meta}, + Active, ActivityStatus, + Effects) -> + Ack = maps:get(ack, Meta, undefined), + Prefetch = maps:get(prefetch, Meta, undefined), + Args = maps:get(args, Meta, []), + [{mod_call, rabbit_quorum_queue, update_consumer_handler, + [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} + | Effects]. + +cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> + case C0 of + #{ConsumerId := Consumer} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, + Effects0, Reason), + %% The effects are emitted before the consumer is actually removed + %% if the consumer has unacked messages. This is a bit weird but + %% in line with what classic queues do (from an external point of + %% view) + Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#?MODULE.consumers) of + 0 -> + {S, [{aux, inactive} | Effects]}; + _ -> + {S, Effects} + end; + _ -> + %% already removed: do nothing + {S0, Effects0} + end. + +activate_next_consumer(#?MODULE{consumers = Cons, + waiting_consumers = Waiting0} = State0, + Effects0) -> + case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of + Up when map_size(Up) == 0 -> + %% there are no active consumer in the consumer map + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + %% there is a potential next active consumer + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, [{aux, inactive} | Effects0]} + end; + _ -> + {State0, Effects0} + end. + + + +maybe_return_all(ConsumerId, Consumer, + #?MODULE{consumers = C0, + service_queue = SQ0} = S0, + Effects0, Reason) -> + case Reason of + consumer_cancel -> + {Cons, SQ, Effects1} = + update_or_remove_sub(ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + status = cancelled}, + C0, SQ0, Effects0), + {S0#?MODULE{consumers = Cons, + service_queue = SQ}, Effects1}; + down -> + {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), + {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + Effects1} + end. + +apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> + case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of + {ok, State1, Effects1} -> + State2 = append_to_master_index(RaftIdx, State1), + {State, ok, Effects} = checkout(Meta, State2, Effects1), + {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; + {duplicate, State, Effects} -> + {State, ok, Effects} + end. + +drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> + case take_next_msg(State0) of + {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, + State1} -> + Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), + State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), + State = case Msg of + 'empty' -> State2; + _ -> subtract_in_memory_counts(Header, State2) + end, + Effects = dead_letter_effects(maxlen, #{none => FullMsg}, + State, Effects0), + {State, Effects}; + {{'$prefix_msg', Header}, State1} -> + State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)), + {State2, Effects0}; + {{'$empty_msg', Header}, State1} -> + State2 = add_bytes_drop(Header, State1), + {State2, Effects0}; + empty -> + {State0, Effects0} + end. + +enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, + low_msg_num = LowMsgNum, + next_msg_num = NextMsgNum} = State0) -> + %% the initial header is an integer only - it will get expanded to a map + %% when the next required key is added + Header = message_size(RawMsg), + {State1, Msg} = + case evaluate_memory_limit(Header, State0) of + true -> + % indexed message with header map + {State0, {RaftIdx, {Header, 'empty'}}}; + false -> + {add_in_memory_counts(Header, State0), + {RaftIdx, {Header, RawMsg}}} % indexed message with header map + end, + State = add_bytes_enqueue(Header, State1), + State#?MODULE{messages = Messages#{NextMsgNum => Msg}, + %% this is probably only done to record it when low_msg_num + %% is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. + +append_to_master_index(RaftIdx, + #?MODULE{ra_indexes = Indexes0} = State0) -> + State = incr_enqueue_count(State0), + Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), + State#?MODULE{ra_indexes = Indexes}. + + +incr_enqueue_count(#?MODULE{enqueue_count = C, + cfg = #cfg{release_cursor_interval = {_Base, C}} + } = State0) -> + %% this will trigger a dehydrated version of the state to be stored + %% at this raft index for potential future snapshot generation + %% Q: Why don't we just stash the release cursor here? + %% A: Because it needs to be the very last thing we do and we + %% first needs to run the checkout logic. + State0#?MODULE{enqueue_count = 0}; +incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% conversion to new release cursor interval format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + incr_enqueue_count(State); +incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> + State#?MODULE{enqueue_count = C + 1}. + +maybe_store_dehydrated_state(RaftIdx, + #?MODULE{cfg = + #cfg{release_cursor_interval = {Base, _}} + = Cfg, + ra_indexes = Indexes, + enqueue_count = 0, + release_cursors = Cursors0} = State0) -> + case rabbit_fifo_index:exists(RaftIdx, Indexes) of + false -> + %% the incoming enqueue must already have been dropped + State0; + true -> + Interval = case Base of + 0 -> 0; + _ -> + Total = messages_total(State0), + min(max(Total, Base), + ?RELEASE_CURSOR_EVERY_MAX) + end, + State = convert_prefix_msgs( + State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}), + Dehydrated = dehydrate_state(State), + Cursor = {release_cursor, RaftIdx, Dehydrated}, + Cursors = lqueue:in(Cursor, Cursors0), + State#?MODULE{release_cursors = Cursors} + end; +maybe_store_dehydrated_state(RaftIdx, + #?MODULE{cfg = + #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% convert to new format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + maybe_store_dehydrated_state(RaftIdx, State); +maybe_store_dehydrated_state(_RaftIdx, State) -> + State. + +enqueue_pending(From, + #enqueuer{next_seqno = Next, + pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0, + State0) -> + State = enqueue(RaftIdx, RawMsg, State0), + Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, + enqueue_pending(From, Enq, State); +enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> + State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. + +maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> + % direct enqueue without tracking + State = enqueue(RaftIdx, RawMsg, State0), + {ok, State, Effects}; +maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, + #?MODULE{enqueuers = Enqueuers0} = State0) -> + case maps:get(From, Enqueuers0, undefined) of + undefined -> + State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, + RawMsg, Effects0, State1), + {ok, State, [{monitor, process, From} | Effects]}; + #enqueuer{next_seqno = MsgSeqNo} = Enq0 -> + % it is the next expected seqno + State1 = enqueue(RaftIdx, RawMsg, State0), + Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1}, + State = enqueue_pending(From, Enq, State1), + {ok, State, Effects0}; + #enqueuer{next_seqno = Next, + pending = Pending0} = Enq0 + when MsgSeqNo > Next -> + % out of order delivery + Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], + Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, + {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; + #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> + % duplicate delivery - remove the raft index from the ra_indexes + % map as it was added earlier + {duplicate, State0, Effects0} + end. + +snd(T) -> + element(2, T). + +return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, + Effects0, #?MODULE{service_queue = SQ0} = State0) -> + {State1, Effects1} = maps:fold( + fun(MsgId, {Tag, _} = Msg, {S0, E0}) + when Tag == '$prefix_msg'; + Tag == '$empty_msg'-> + return_one(MsgId, 0, Msg, S0, E0, ConsumerId); + (MsgId, {MsgNum, Msg}, {S0, E0}) -> + return_one(MsgId, MsgNum, Msg, S0, E0, + ConsumerId) + end, {State0, Effects0}, Returned), + {State2, Effects3} = + case State1#?MODULE.consumers of + #{ConsumerId := Con0} = Cons0 -> + Con = Con0#consumer{credit = increase_credit(Con0, + map_size(Returned))}, + {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, + Cons0, SQ0, Effects1), + {State1#?MODULE{consumers = Cons, + service_queue = SQ}, Effects2}; + _ -> + {State1, Effects1} + end, + {State, ok, Effects} = checkout(Meta, State2, Effects3), + update_smallest_raft_index(IncomingRaftIdx, State, Effects). + +% used to processes messages that are finished +complete(ConsumerId, Discarded, + #consumer{checked_out = Checked} = Con0, Effects0, + #?MODULE{consumers = Cons0, service_queue = SQ0, + ra_indexes = Indexes0} = State0) -> + %% TODO optimise use of Discarded map here + MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], + %% credit_mode = simple_prefetch should automatically top-up credit + %% as messages are simple_prefetch or otherwise returned + Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), + credit = increase_credit(Con0, map_size(Discarded))}, + {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, + SQ0, Effects0), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, + MsgRaftIdxs), + %% TODO: use maps:fold instead + State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc) + end, State0, maps:values(Discarded)), + {State1#?MODULE{consumers = Cons, + ra_indexes = Indexes, + service_queue = SQ}, Effects}. + +increase_credit(#consumer{lifetime = once, + credit = Credit}, _) -> + %% once consumers cannot increment credit + Credit; +increase_credit(#consumer{lifetime = auto, + credit_mode = credited, + credit = Credit}, _) -> + %% credit_mode: credit also doesn't automatically increment credit + Credit; +increase_credit(#consumer{credit = Current}, Credit) -> + Current + Credit. + +complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, + #consumer{checked_out = Checked0} = Con0, + Effects0, State0) -> + Discarded = maps:with(MsgIds, Checked0), + {State2, Effects1} = complete(ConsumerId, Discarded, Con0, + Effects0, State0), + {State, ok, Effects} = checkout(Meta, State2, Effects1), + update_smallest_raft_index(IncomingRaftIdx, State, Effects). + +dead_letter_effects(_Reason, _Discarded, + #?MODULE{cfg = #cfg{dead_letter_handler = undefined}}, + Effects) -> + Effects; +dead_letter_effects(Reason, Discarded, + #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + Effects) -> + RaftIdxs = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + [RaftIdx | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{log, RaftIdxs, + fun (Log) -> + Lookup = maps:from_list(lists:zip(RaftIdxs, Log)), + DeadLetters = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup), + [{Reason, Msg} | Acc]; + (_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{mod_call, Mod, Fun, Args ++ [DeadLetters]}] + end} | Effects]. + +cancel_consumer_effects(ConsumerId, + #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> + [{mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [QName, ConsumerId]} | Effects]. + +update_smallest_raft_index(IncomingRaftIdx, + #?MODULE{ra_indexes = Indexes, + release_cursors = Cursors0} = State0, + Effects) -> + case rabbit_fifo_index:size(Indexes) of + 0 -> + % there are no messages on queue anymore and no pending enqueues + % we can forward release_cursor all the way until + % the last received command, hooray + State = State0#?MODULE{release_cursors = lqueue:new()}, + {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; + _ -> + Smallest = rabbit_fifo_index:smallest(Indexes), + case find_next_cursor(Smallest, Cursors0) of + {empty, Cursors} -> + {State0#?MODULE{release_cursors = Cursors}, + ok, Effects}; + {Cursor, Cursors} -> + %% we can emit a release cursor we've passed the smallest + %% release cursor available. + {State0#?MODULE{release_cursors = Cursors}, ok, + Effects ++ [Cursor]} + end + end. + +find_next_cursor(Idx, Cursors) -> + find_next_cursor(Idx, Cursors, empty). + +find_next_cursor(Smallest, Cursors0, Potential) -> + case lqueue:out(Cursors0) of + {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest -> + %% we found one but it may not be the largest one + find_next_cursor(Smallest, Cursors, Cursor); + _ -> + {Potential, Cursors0} + end. + +update_header(Key, UpdateFun, Default, Header) + when is_integer(Header) -> + update_header(Key, UpdateFun, Default, #{size => Header}); +update_header(Key, UpdateFun, Default, Header) -> + maps:update_with(Key, UpdateFun, Default, Header). + + +return_one(MsgId, 0, {Tag, Header0}, + #?MODULE{returns = Returns, + consumers = Consumers, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + Effects0, ConsumerId) + when Tag == '$prefix_msg'; Tag == '$empty_msg' -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), + Msg0 = {Tag, Header}, + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0); + _ -> + %% this should not affect the release cursor in any way + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + {Msg, State1} = case Tag of + '$empty_msg' -> + {Msg0, State0}; + _ -> case evaluate_memory_limit(Header, State0) of + true -> + {{'$empty_msg', Header}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in(Msg, Returns)}), + Effects0} + end; +return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, + #?MODULE{returns = Returns, + consumers = Consumers, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + Effects0, ConsumerId) -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), + Msg0 = {RaftId, {Header, RawMsg}}, + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + DlMsg = {MsgNum, Msg0}, + Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, + State0, Effects0), + complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); + _ -> + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + %% this should not affect the release cursor in any way + {Msg, State1} = case RawMsg of + 'empty' -> + {Msg0, State0}; + _ -> + case evaluate_memory_limit(Header, State0) of + true -> + {{RaftId, {Header, 'empty'}}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in({MsgNum, Msg}, Returns)}), + Effects0} + end. + +return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, + #consumer{checked_out = Checked0} = Con) -> + %% need to sort the list so that we return messages in the order + %% they were checked out + Checked = lists:sort(maps:to_list(Checked0)), + State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) + end, {State, Effects0}, Checked). + +%% checkout new messages to consumers +checkout(#{index := Index}, State0, Effects0) -> + {State1, _Result, Effects1} = checkout0(checkout_one(State0), + Effects0, {#{}, #{}}), + case evaluate_limit(false, State1, Effects1) of + {State, true, Effects} -> + update_smallest_raft_index(Index, State, Effects); + {State, false, Effects} -> + {State, ok, Effects} + end. + +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, + Effects, {SendAcc, LogAcc0}) -> + DelMsg = {RaftIdx, {MsgId, Header}}, + LogAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], LogAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, + {SendAcc0, LogAcc}) -> + DelMsg = {MsgId, Msg}, + SendAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], SendAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> + Effects1 = case Activity of + nochange -> + append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc); + inactive -> + [{aux, inactive} + | append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc)] + end, + {State0, ok, lists:reverse(Effects1)}. + +evaluate_limit(Result, + #?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}} = State, + Effects) -> + {State, Result, Effects}; +evaluate_limit(Result, State00, Effects0) -> + State0 = convert_prefix_msgs(State00), + case is_over_limit(State0) of + true -> + {State, Effects} = drop_head(State0, Effects0), + evaluate_limit(true, State, Effects); + false -> + {State0, Result, Effects0} + end. + +evaluate_memory_limit(_Header, + #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> + false; +evaluate_memory_limit(#{size := Size}, State) -> + evaluate_memory_limit(Size, State); +evaluate_memory_limit(Size, + #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) + when is_integer(Size) -> + (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). + +append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> + Effects; +append_send_msg_effects(Effects0, AccMap) -> + Effects = maps:fold(fun (C, Msgs, Ef) -> + [send_msg_effect(C, lists:reverse(Msgs)) | Ef] + end, Effects0, AccMap), + [{aux, active} | Effects]. + +append_log_effects(Effects0, AccMap) -> + maps:fold(fun (C, Msgs, Ef) -> + [send_log_effect(C, lists:reverse(Msgs)) | Ef] + end, Effects0, AccMap). + +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) -> + %% conversion + take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}); +take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], + NumP, P}} = State) -> + %% there are prefix returns, these should be served first + {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> + %% there are prefix returns, these should be served first + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?MODULE{returns = Returns, + low_msg_num = Low0, + messages = Messages0, + prefix_msgs = {NumR, R, NumP, P}} = State) -> + %% use peek rather than out there as the most likely case is an empty + %% queue + case lqueue:peek(Returns) of + {value, NextMsg} -> + {NextMsg, + State#?MODULE{returns = lqueue:drop(Returns)}}; + empty when P == [] -> + case Low0 of + undefined -> + empty; + _ -> + {Msg, Messages} = maps:take(Low0, Messages0), + case maps:size(Messages) of + 0 -> + {{Low0, Msg}, + State#?MODULE{messages = Messages, + low_msg_num = undefined}}; + _ -> + {{Low0, Msg}, + State#?MODULE{messages = Messages, + low_msg_num = Low0 + 1}} + end + end; + empty -> + [Msg | Rem] = P, + case Msg of + {Header, 'empty'} -> + %% There are prefix msgs + {{'$empty_msg', Header}, + State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; + Header -> + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}} + end + end. + +send_msg_effect({CTag, CPid}, Msgs) -> + {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}. + +send_log_effect({CTag, CPid}, IdxMsgs) -> + {RaftIdxs, Data} = lists:unzip(IdxMsgs), + {log, RaftIdxs, + fun(Log) -> + Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] + end, + {local, node(CPid)}}. + +reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> + {log, [RaftIdx], + fun([{enqueue, _, _, Msg}]) -> + [{reply, From, {wrap_reply, + {dequeue, {MsgId, {Header, Msg}}, Ready}}}] + end}. + +checkout_one(#?MODULE{service_queue = SQ0, + messages = Messages0, + consumers = Cons0} = InitState) -> + case queue:peek(SQ0) of + {value, ConsumerId} -> + case take_next_msg(InitState) of + {ConsumerMsg, State0} -> + SQ1 = queue:drop(SQ0), + %% there are consumers waiting to be serviced + %% process consumer checkout + case maps:find(ConsumerId, Cons0) of + {ok, #consumer{credit = 0}} -> + %% no credit but was still on queue + %% can happen when draining + %% recurse without consumer on queue + checkout_one(InitState#?MODULE{service_queue = SQ1}); + {ok, #consumer{status = cancelled}} -> + checkout_one(InitState#?MODULE{service_queue = SQ1}); + {ok, #consumer{status = suspected_down}} -> + checkout_one(InitState#?MODULE{service_queue = SQ1}); + {ok, #consumer{checked_out = Checked0, + next_msg_id = Next, + credit = Credit, + delivery_count = DelCnt} = Con0} -> + Checked = maps:put(Next, ConsumerMsg, Checked0), + Con = Con0#consumer{checked_out = Checked, + next_msg_id = Next + 1, + credit = Credit - 1, + delivery_count = DelCnt + 1}, + {Cons, SQ, []} = % we expect no effects + update_or_remove_sub(ConsumerId, Con, + Cons0, SQ1, []), + State1 = State0#?MODULE{service_queue = SQ, + consumers = Cons}, + {State, Msg} = + case ConsumerMsg of + {'$prefix_msg', Header} -> + {subtract_in_memory_counts( + Header, add_bytes_checkout(Header, State1)), + ConsumerMsg}; + {'$empty_msg', Header} -> + {add_bytes_checkout(Header, State1), + ConsumerMsg}; + {_, {_, {Header, 'empty'}} = M} -> + {add_bytes_checkout(Header, State1), + M}; + {_, {_, {Header, _} = M}} -> + {subtract_in_memory_counts( + Header, + add_bytes_checkout(Header, State1)), + M} + end, + {success, ConsumerId, Next, Msg, State}; + error -> + %% consumer did not exist but was queued, recurse + checkout_one(InitState#?MODULE{service_queue = SQ1}) + end; + empty -> + {nochange, InitState} + end; + empty -> + case maps:size(Messages0) of + 0 -> {nochange, InitState}; + _ -> {inactive, InitState} + end + end. + +update_or_remove_sub(ConsumerId, #consumer{lifetime = auto, + credit = 0} = Con, + Cons, ServiceQueue, Effects) -> + {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}; +update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con, + Cons, ServiceQueue, Effects) -> + {maps:put(ConsumerId, Con, Cons), + uniq_queue_in(ConsumerId, ServiceQueue), Effects}; +update_or_remove_sub(ConsumerId, #consumer{lifetime = once, + checked_out = Checked, + credit = 0} = Con, + Cons, ServiceQueue, Effects) -> + case maps:size(Checked) of + 0 -> + % we're done with this consumer + % TODO: demonitor consumer pid but _only_ if there are no other + % monitors for this pid + {maps:remove(ConsumerId, Cons), ServiceQueue, Effects}; + _ -> + % there are unsettled items so need to keep around + {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects} + end; +update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con, + Cons, ServiceQueue, Effects) -> + {maps:put(ConsumerId, Con, Cons), + uniq_queue_in(ConsumerId, ServiceQueue), Effects}. + +uniq_queue_in(Key, Queue) -> + % TODO: queue:member could surely be quite expensive, however the practical + % number of unique consumers may not be large enough for it to matter + case queue:member(Key, Queue) of + true -> + Queue; + false -> + queue:in(Key, Queue) + end. + +update_consumer(ConsumerId, Meta, Spec, + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> + %% general case, single active consumer off + update_consumer0(ConsumerId, Meta, Spec, State0); +update_consumer(ConsumerId, Meta, Spec, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}} = State0) + when map_size(Cons0) == 0 -> + %% single active consumer on, no one is consuming yet + update_consumer0(ConsumerId, Meta, Spec, State0); +update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0) -> + %% single active consumer on and one active consumer already + %% adding the new consumer to the waiting list + Consumer = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, + WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], + State0#?MODULE{waiting_consumers = WaitingConsumers1}. + +update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, + #?MODULE{consumers = Cons0, + service_queue = ServiceQueue0} = State0) -> + %% TODO: this logic may not be correct for updating a pre-existing consumer + Init = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, + Cons = maps:update_with(ConsumerId, + fun(S) -> + %% remove any in-flight messages from + %% the credit update + N = maps:size(S#consumer.checked_out), + C = max(0, Credit - N), + S#consumer{lifetime = Life, credit = C} + end, Init, Cons0), + ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), + ServiceQueue0), + State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. + +maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, + ServiceQueue0) -> + case Credit > 0 of + true -> + % consumerect needs service - check if already on service queue + uniq_queue_in(ConsumerId, ServiceQueue0); + false -> + ServiceQueue0 + end. + +convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) -> + State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}; +convert_prefix_msgs(State) -> + State. + +%% creates a dehydrated version of the current state to be cached and +%% potentially used to for a snaphot at a later point +dehydrate_state(#?MODULE{messages = Messages, + consumers = Consumers, + returns = Returns, + low_msg_num = Low, + next_msg_num = Next, + prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, + waiting_consumers = Waiting0} = State) -> + RCnt = lqueue:len(Returns), + %% TODO: optimise this function as far as possible + PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) -> + [Header | Acc]; + ({'$empty_msg', _} = Msg, Acc) -> + [Msg | Acc]; + ({_, {_, {Header, 'empty'}}}, Acc) -> + [{'$empty_msg', Header} | Acc]; + ({_, {_, {Header, _}}}, Acc) -> + [Header | Acc] + end, + [], + lqueue:to_list(Returns)), + PrefRet = PrefRet0 ++ PrefRet1, + PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), + %% prefix messages are not populated in normal operation only after + %% recovering from a snapshot + PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, + Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], + State#?MODULE{messages = #{}, + ra_indexes = rabbit_fifo_index:empty(), + release_cursors = lqueue:new(), + low_msg_num = undefined, + consumers = maps:map(fun (_, C) -> + dehydrate_consumer(C) + end, Consumers), + returns = lqueue:new(), + prefix_msgs = {PRCnt + RCnt, PrefRet, + PPCnt + maps:size(Messages), PrefMsgs}, + waiting_consumers = Waiting}. + +dehydrate_messages(Low, Next, _Msgs, Acc) + when Next < Low -> + Acc; +dehydrate_messages(Low, Next, Msgs, Acc0) -> + Acc = case maps:get(Next, Msgs) of + {_RaftIdx, {_, 'empty'} = Msg} -> + [Msg | Acc0]; + {_RaftIdx, {Header, _}} -> + [Header | Acc0] + end, + dehydrate_messages(Low, Next - 1, Msgs, Acc). + +dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> + Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> + M; + (_, {'$empty_msg', _} = M) -> + M; + (_, {_, {_, {Header, 'empty'}}}) -> + {'$empty_msg', Header}; + (_, {_, {_, {Header, _}}}) -> + {'$prefix_msg', Header} + end, Checked0), + Con#consumer{checked_out = Checked}. + +%% make the state suitable for equality comparison +normalize(#?MODULE{release_cursors = Cursors} = State) -> + State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. + +is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}) -> + false; +is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, + max_bytes = MaxBytes}, + msg_bytes_enqueue = BytesEnq} = State) -> + + messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). + +-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). +make_enqueue(Pid, Seq, Msg) -> + #enqueue{pid = Pid, seq = Seq, msg = Msg}. +-spec make_checkout(consumer_id(), + checkout_spec(), consumer_meta()) -> protocol(). +make_checkout(ConsumerId, Spec, Meta) -> + #checkout{consumer_id = ConsumerId, + spec = Spec, meta = Meta}. + +-spec make_settle(consumer_id(), [msg_id()]) -> protocol(). +make_settle(ConsumerId, MsgIds) -> + #settle{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_return(consumer_id(), [msg_id()]) -> protocol(). +make_return(ConsumerId, MsgIds) -> + #return{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_discard(consumer_id(), [msg_id()]) -> protocol(). +make_discard(ConsumerId, MsgIds) -> + #discard{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(), + boolean()) -> protocol(). +make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> + #credit{consumer_id = ConsumerId, + credit = Credit, + delivery_count = DeliveryCount, + drain = Drain}. + +-spec make_purge() -> protocol(). +make_purge() -> #purge{}. + +-spec make_purge_nodes([node()]) -> protocol(). +make_purge_nodes(Nodes) -> + #purge_nodes{nodes = Nodes}. + +-spec make_update_config(config()) -> protocol(). +make_update_config(Config) -> + #update_config{config = Config}. + +add_bytes_enqueue(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_enqueue(#{size := Bytes}, State) -> + add_bytes_enqueue(Bytes, State). + +add_bytes_drop(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_drop(#{size := Bytes}, State) -> + add_bytes_drop(Bytes, State). + +add_bytes_checkout(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue } = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_checkout = Checkout + Bytes, + msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_checkout(#{size := Bytes}, State) -> + add_bytes_checkout(Bytes, State). + +add_bytes_settle(Bytes, + #?MODULE{msg_bytes_checkout = Checkout} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; +add_bytes_settle(#{size := Bytes}, State) -> + add_bytes_settle(Bytes, State). + +add_bytes_return(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_checkout = Checkout - Bytes, + msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_return(#{size := Bytes}, State) -> + add_bytes_return(Bytes, State). + +add_in_memory_counts(Bytes, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + msgs_ready_in_memory = InMemoryCount + 1}; +add_in_memory_counts(#{size := Bytes}, State) -> + add_in_memory_counts(Bytes, State). + +subtract_in_memory_counts(Bytes, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + msgs_ready_in_memory = InMemoryCount - 1}; +subtract_in_memory_counts(#{size := Bytes}, State) -> + subtract_in_memory_counts(Bytes, State). + +message_size(#basic_message{content = Content}) -> + #content{payload_fragments_rev = PFR} = Content, + iolist_size(PFR); +message_size({'$prefix_msg', H}) -> + get_size_from_header(H); +message_size({'$empty_msg', H}) -> + get_size_from_header(H); +message_size(B) when is_binary(B) -> + byte_size(B); +message_size(Msg) -> + %% probably only hit this for testing so ok to use erts_debug + erts_debug:size(Msg). + +get_size_from_header(Size) when is_integer(Size) -> + Size; +get_size_from_header(#{size := B}) -> + B. + + +all_nodes(#?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Nodes0 = maps:fold(fun({_, P}, _, Acc) -> + Acc#{node(P) => ok} + end, #{}, Cons0), + Nodes1 = maps:fold(fun(P, _, Acc) -> + Acc#{node(P) => ok} + end, Nodes0, Enqs0), + maps:keys( + lists:foldl(fun({{_, P}, _}, Acc) -> + Acc#{node(P) => ok} + end, Nodes1, WaitingConsumers0)). + +all_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, _}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + +suspected_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, + #consumer{status = suspected_down}}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). diff --git a/src/rabbit_fifo_v0.hrl b/src/rabbit_fifo_v0.hrl new file mode 100644 index 0000000000..26a988ee10 --- /dev/null +++ b/src/rabbit_fifo_v0.hrl @@ -0,0 +1,195 @@ + +-type option(T) :: undefined | T. + +-type raw_msg() :: term(). +%% The raw message. It is opaque to rabbit_fifo. + +-type msg_in_id() :: non_neg_integer(). +% a queue scoped monotonically incrementing integer used to enforce order +% in the unassigned messages map + +-type msg_id() :: non_neg_integer(). +%% A consumer-scoped monotonically incrementing integer included with a +%% {@link delivery/0.}. Used to settle deliveries using +%% {@link rabbit_fifo_client:settle/3.} + +-type msg_seqno() :: non_neg_integer(). +%% A sender process scoped monotonically incrementing integer included +%% in enqueue messages. Used to ensure ordering of messages send from the +%% same process + +-type msg_header() :: msg_size() | + #{size := msg_size(), + delivery_count => non_neg_integer()}. +%% The message header: +%% delivery_count: the number of unsuccessful delivery attempts. +%% A non-zero value indicates a previous attempt. +%% If it only contains the size it can be condensed to an integer only + +-type msg() :: {msg_header(), raw_msg()}. +%% message with a header map. + +-type msg_size() :: non_neg_integer(). +%% the size in bytes of the msg payload + +-type indexed_msg() :: {ra:index(), msg()}. + +-type prefix_msg() :: {'$prefix_msg', msg_header()}. + +-type delivery_msg() :: {msg_id(), msg()}. +%% A tuple consisting of the message id and the headered message. + +-type consumer_tag() :: binary(). +%% An arbitrary binary tag used to distinguish between different consumers +%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.} + +-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}. +%% Represents the delivery of one or more rabbit_fifo messages. + +-type consumer_id() :: {consumer_tag(), pid()}. +%% The entity that receives messages. Uniquely identifies a consumer. + +-type credit_mode() :: simple_prefetch | credited. +%% determines how credit is replenished + +-type checkout_spec() :: {once | auto, Num :: non_neg_integer(), + credit_mode()} | + {dequeue, settled | unsettled} | + cancel. + +-type consumer_meta() :: #{ack => boolean(), + username => binary(), + prefetch => non_neg_integer(), + args => list()}. +%% static meta data associated with a consumer + + +-type applied_mfa() :: {module(), atom(), list()}. +% represents a partially applied module call + +-define(RELEASE_CURSOR_EVERY, 64000). +-define(RELEASE_CURSOR_EVERY_MAX, 3200000). +-define(USE_AVG_HALF_LIFE, 10000.0). +%% an average QQ without any message uses about 100KB so setting this limit +%% to ~10 times that should be relatively safe. +-define(GC_MEM_LIMIT_B, 2000000). + +-define(MB, 1048576). +-define(RABBIT_FIFO, rabbit_fifo_v0). + +-record(consumer, + {meta = #{} :: consumer_meta(), + checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, + next_msg_id = 0 :: msg_id(), % part of snapshot data + %% max number of messages that can be sent + %% decremented for each delivery + credit = 0 : non_neg_integer(), + %% total number of checked out messages - ever + %% incremented for each delivery + delivery_count = 0 :: non_neg_integer(), + %% the mode of how credit is incremented + %% simple_prefetch: credit is re-filled as deliveries are settled + %% or returned. + %% credited: credit can only be changed by receiving a consumer_credit + %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' + credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data + lifetime = once :: once | auto, + status = up :: up | suspected_down | cancelled + }). + +-type consumer() :: #consumer{}. + +-type consumer_strategy() :: competing | single_active. + +-record(enqueuer, + {next_seqno = 1 :: msg_seqno(), + % out of order enqueues - sorted list + pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], + status = up :: up | suspected_down + }). + +-record(cfg, + {name :: atom(), + resource :: rabbit_types:r('queue'), + release_cursor_interval :: + undefined | non_neg_integer() | + {non_neg_integer(), non_neg_integer()}, + dead_letter_handler :: option(applied_mfa()), + become_leader_handler :: option(applied_mfa()), + max_length :: option(non_neg_integer()), + max_bytes :: option(non_neg_integer()), + %% whether single active consumer is on or not for this queue + consumer_strategy = competing :: consumer_strategy(), + %% the maximum number of unsuccessful delivery attempts permitted + delivery_limit :: option(non_neg_integer()), + max_in_memory_length :: option(non_neg_integer()), + max_in_memory_bytes :: option(non_neg_integer()) + }). + +-type prefix_msgs() :: {list(), list()} | + {non_neg_integer(), list(), + non_neg_integer(), list()}. + +-record(?RABBIT_FIFO, + {cfg :: #cfg{}, + % unassigned messages + messages = #{} :: #{msg_in_id() => indexed_msg()}, + % defines the lowest message in id available in the messages map + % that isn't a return + low_msg_num :: option(msg_in_id()), + % defines the next message in id to be added to the messages map + next_msg_num = 1 :: msg_in_id(), + % list of returned msg_in_ids - when checking out it picks from + % this list first before taking low_msg_num + returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | + {msg_in_id(), indexed_msg()}), + % a counter of enqueues - used to trigger shadow copy points + enqueue_count = 0 :: non_neg_integer(), + % a map containing all the live processes that have ever enqueued + % a message to this queue as well as a cached value of the smallest + % ra_index of all pending enqueues + enqueuers = #{} :: #{pid() => #enqueuer{}}, + % master index of all enqueue raft indexes including pending + % enqueues + % rabbit_fifo_index can be slow when calculating the smallest + % index when there are large gaps but should be faster than gb_trees + % for normal appending operations as it's backed by a map + ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), + release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, + ra:index(), #?RABBIT_FIFO{}}), + % consumers need to reflect consumer state at time of snapshot + % needs to be part of snapshot + consumers = #{} :: #{consumer_id() => #consumer{}}, + % consumers that require further service are queued here + % needs to be part of snapshot + service_queue = queue:new() :: queue:queue(consumer_id()), + %% This is a special field that is only used for snapshots + %% It represents the queued messages at the time the + %% dehydrated snapshot state was cached. + %% As release_cursors are only emitted for raft indexes where all + %% prior messages no longer contribute to the current state we can + %% replace all message payloads with their sizes (to be used for + %% overflow calculations). + %% This is done so that consumers are still served in a deterministic + %% order on recovery. + prefix_msgs = {0, [], 0, []} :: prefix_msgs(), + msg_bytes_enqueue = 0 :: non_neg_integer(), + msg_bytes_checkout = 0 :: non_neg_integer(), + %% waiting consumers, one is picked active consumer is cancelled or dies + %% used only when single active consumer is on + waiting_consumers = [] :: [{consumer_id(), consumer()}], + msg_bytes_in_memory = 0 :: non_neg_integer(), + msgs_ready_in_memory = 0 :: non_neg_integer() + }). + +-type config() :: #{name := atom(), + queue_resource := rabbit_types:r('queue'), + dead_letter_handler => applied_mfa(), + become_leader_handler => applied_mfa(), + release_cursor_interval => non_neg_integer(), + max_length => non_neg_integer(), + max_bytes => non_neg_integer(), + max_in_memory_length => non_neg_integer(), + max_in_memory_bytes => non_neg_integer(), + single_active_consumer_on => boolean(), + delivery_limit => non_neg_integer()}. diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl new file mode 100644 index 0000000000..6b84911d7f --- /dev/null +++ b/test/rabbit_fifo_v0_SUITE.erl @@ -0,0 +1,1395 @@ +-module(rabbit_fifo_v0_SUITE). + +%% rabbit_fifo unit tests suite + +-compile(export_all). + +-compile({no_auto_import, [apply/3]}). +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("src/rabbit_fifo_v0.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +%% replicate eunit like test resultion +all_tests() -> + [F || {F, _} <- ?MODULE:module_info(functions), + re:run(atom_to_list(F), "_test$") /= nomatch]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +-define(ASSERT_EFF(EfxPat, Effects), + ?ASSERT_EFF(EfxPat, true, Effects)). + +-define(ASSERT_EFF(EfxPat, Guard, Effects), + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(assertNoEffect(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +test_init(Name) -> + init(#{name => Name, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(Name, utf8)), + release_cursor_interval => 0}). + +enq_enq_checkout_test(_) -> + Cid = {<<"enq_enq_checkout_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {_State3, _, Effects} = + apply(meta(3), + rabbit_fifo_v0:make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), + ok. + +credit_enq_enq_checkout_settled_credit_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _, Effects} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited}, #{}), State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; + (_) -> false + end, Effects), + ?assertEqual(1, length(Deliveries)), + %% settle the delivery this should _not_ result in further messages being + %% delivered + {State4, SettledEffects} = settle(Cid, 4, 1, State3), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, SettledEffects)), + %% granting credit (3) should deliver the second msg if the receivers + %% delivery count is (1) + {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4), + % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects), + {_State6, FinalEffects} = enq(6, 3, third, State5), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, FinalEffects)), + ok. + +credit_with_drained_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + %% checkout with a single credit + {State1, _, _} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}), + State0), + ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 1, + delivery_count = 0}}}, + State1), + {State, Result, _} = + apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1), + ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 5}}}, + State), + ?assertEqual({multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 5}}]}, + Result), + ok. + +credit_and_drain_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + %% checkout without any initial credit (like AMQP 1.0 would) + {State3, _, CheckEffs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 0, credited}, #{}), + State2), + + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 2}}]}, + Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3), + ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 4}}}, + State4), + + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, + {_, {_, second}}]}, _}, Effects), + {_State5, EnqEffs} = enq(5, 2, third, State4), + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), + ok. + + + +enq_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + NumReady = 1, + {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + ok. + +enq_enq_deq_deq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + {_State4, {dequeue, empty}} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State3), + ok. + +enq_enq_checkout_get_settled_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + % get returns a reply value + {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), + State1), + ok. + +checkout_get_empty_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State = test_init(test), + {_State2, {dequeue, empty}} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State), + ok. + +untracked_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}, _}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +release_cursor_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _} = check(Cid, 3, 10, State2), + % no release cursor effect at this point + {State4, _} = settle(Cid, 4, 1, State3), + {_Final, Effects1} = settle(Cid, 5, 0, State4), + % empty queue forwards release cursor all the way + ?ASSERT_EFF({release_cursor, 5, _}, Effects1), + ok. + +checkout_enq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State2, Effects0} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, + {delivery, ?FUNCTION_NAME, + [{0, {_, first}}]}, _}, + Effects0), + {State3, [_Inactive]} = enq(3, 2, second, State2), + {_, _Effects} = settle(Cid, 4, 0, State3), + % the release cursor is the smallest raft index that does not + % contribute to the state of the application + % ?ASSERT_EFF({release_cursor, 2, _}, Effects), + ok. + +out_of_order_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + % assert monitor was set up + ?ASSERT_EFF({monitor, _, _}, Effects2), + % enqueue seq num 3 and 4 before 2 + {State3, Effects3} = enq(3, 3, third, State2), + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3), + {State4, Effects4} = enq(4, 4, fourth, State3), + % assert no further deliveries where made + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4), + {_State5, Effects5} = enq(5, 2, second, State4), + % assert two deliveries were now made + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}}, + {_, {_, third}}, + {_, {_, fourth}}]}, _}, + Effects5), + ok. + +out_of_order_first_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = check_n(Cid, 5, 5, test_init(test)), + {_State2, Effects2} = enq(2, 10, first, State1), + ?ASSERT_EFF({monitor, process, _}, Effects2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, + Effects2), + ok. + +duplicate_enqueue_test(_) -> + Cid = {<<"duplicate_enqueue_test">>, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + {_State3, Effects3} = enq(3, 1, first, State2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), + ok. + +return_test(_) -> + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + {State0, _} = enq(1, 1, msg, test_init(test)), + {State1, _} = check_auto(Cid, 2, State0), + {State2, _} = check_auto(Cid2, 3, State1), + {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2), + ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0, + State3#?RABBIT_FIFO.consumers), + ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1, + State3#?RABBIT_FIFO.consumers), + ok. + +return_dequeue_delivery_limit_test(_) -> + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, _} = enq(1, 1, msg, Init), + + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + + {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0), + {State2, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId1]), + State1), + + {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2), + {State4, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid2, [MsgId2]), + State3), + ?assertMatch(#{num_messages := 0}, rabbit_fifo_v0:overview(State4)), + ok. + +return_non_existent_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), + % return non-existent + {_State2, _} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [99]), State0), + ok. + +return_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ok. + +return_checked_out_limit_test(_) -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + {#?RABBIT_FIFO{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} = + apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + +return_auto_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + % it first active then inactive as the consumer took on but cannot take + % any more + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), + % return should include another delivery + {_State2, _, Effects} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, + Effects), + ok. + +cancelled_checkout_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + {State1, _} = check_auto(Cid, 2, State0), + % cancelled checkout should not return pending messages to queue + {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(1, maps:size(State2#?RABBIT_FIFO.messages)), + ?assertEqual(0, lqueue:len(State2#?RABBIT_FIFO.returns)), + + {State3, {dequeue, empty}} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), rabbit_fifo_v0:make_settle(Cid, [0]), State3), + + {_State, {dequeue, {_, {_, second}}, _}, _} = + apply(meta(5), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State4), + ok. + +down_with_noproc_consumer_returns_unsettled_test(_) -> + Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, + {State0, [_, _]} = enq(1, 1, second, test_init(test)), + {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), + {_State, Effects} = check(Cid, 4, State2), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + Self = self(), + Node = node(Pid), + {State0, Effects0} = enq(1, 1, second, test_init(test)), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#?RABBIT_FIFO.consumers), + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), + % monitor both enqueuer and consumer + % because we received a noconnection we now need to monitor the node + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + #consumer{credit = 1, + checked_out = Ch, + status = suspected_down} = maps:get(Cid, State2a#?RABBIT_FIFO.consumers), + ?assertEqual(#{}, Ch), + %% validate consumer has credit + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), + ?ASSERT_EFF({monitor, node, _}, Effects2), + ?assertNoEffect({demonitor, process, _}, Effects2), + % when the node comes up we need to retry the process monitors for the + % disconnected processes + {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + #consumer{status = up} = maps:get(Cid, State3#?RABBIT_FIFO.consumers), + % try to re-monitor the suspect processes + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), + ok. + +down_with_noconnection_returns_unack_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#?RABBIT_FIFO.messages)), + ?assertEqual(0, lqueue:len(State0#?RABBIT_FIFO.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#?RABBIT_FIFO.messages)), + ?assertEqual(0, lqueue:len(State1#?RABBIT_FIFO.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + ?assertEqual(0, maps:size(State2a#?RABBIT_FIFO.messages)), + ?assertEqual(1, lqueue:len(State2a#?RABBIT_FIFO.returns)), + ?assertMatch(#consumer{checked_out = Ch, + status = suspected_down} + when map_size(Ch) == 0, + maps:get(Cid, State2a#?RABBIT_FIFO.consumers)), + ok. + +down_with_noproc_enqueuer_is_cleaned_up_test(_) -> + State00 = test_init(test), + Pid = spawn(fun() -> ok end), + {State0, _, Effects0} = apply(meta(1), rabbit_fifo_v0:make_enqueue(Pid, 1, first), State00), + ?ASSERT_EFF({monitor, process, _}, Effects0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), + % ensure there are no enqueuers + ?assert(0 =:= maps:size(State1#?RABBIT_FIFO.enqueuers)), + ok. + +discarded_message_without_dead_letter_handler_is_removed_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), + rabbit_fifo_v0:make_discard(Cid, [0]), State1), + ?assertNoEffect({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects2), + ok. + +discarded_message_with_dead_letter_handler_emits_log_effect_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + dead_letter_handler => + {somemod, somefun, [somearg]}}), + {State0, [_, _]} = enq(1, 1, first, State00), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), rabbit_fifo_v0:make_discard(Cid, [0]), State1), + % assert mod call effect with appended reason and message + ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2), + ok. + +tick_test(_) -> + Cid = {<<"c">>, self()}, + Cid2 = {<<"c2">>, self()}, + {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), + {S1, _} = enq(2, 2, <<"snd">>, S0), + {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), + {S3, {_, _}} = deq(4, Cid2, unsettled, S2), + {S4, _, _} = apply(meta(5), rabbit_fifo_v0:make_return(Cid, [MsgId]), S3), + + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}, + [_Node] + ]}] = rabbit_fifo_v0:tick(1, S4), + ok. + + +delivery_query_returns_deliveries_test(_) -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + rabbit_fifo_v0:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + rabbit_fifo_v0:make_enqueue(self(), 1, one), + rabbit_fifo_v0:make_enqueue(self(), 2, two), + rabbit_fifo_v0:make_enqueue(self(), 3, tre), + rabbit_fifo_v0:make_enqueue(self(), 4, for) + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(help), Entries), + % 3 deliveries are returned + [{0, {_, one}}] = rabbit_fifo_v0:get_checked_out(Cid, 0, 0, State), + [_, _, _] = rabbit_fifo_v0:get_checked_out(Cid, 1, 3, State), + ok. + +pending_enqueue_is_enqueued_on_down_test(_) -> + Cid = {<<"cid">>, self()}, + Pid = self(), + {State0, _} = enq(1, 2, first, test_init(test)), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}, 0}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +duplicate_delivery_test(_) -> + {State0, _} = enq(1, 1, first, test_init(test)), + {#?RABBIT_FIFO{ra_indexes = RaIdxs, + messages = Messages}, _} = enq(2, 1, first, State0), + ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), + ?assertEqual(1, maps:size(Messages)), + ok. + +state_enter_file_handle_leader_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + become_leader_handler => {m, f, [a]}}), + + Resource = {resource, <<"/">>, queue, <<"test">>}, + Effects = rabbit_fifo_v0:state_enter(leader, S0), + ?assertEqual([ + {mod_call, m, f, [a, the_name]}, + {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} + ], Effects), + ok. + +state_enter_file_handle_other_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Effects = rabbit_fifo_v0:state_enter(other, S0), + ?assertEqual([ + {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} + ], + Effects), + ok. + +state_enter_monitors_and_notifications_test(_) -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), + Self = self(), + Effects = rabbit_fifo_v0:state_enter(leader, State), + + %% monitor all enqueuers and consumers + [{monitor, process, Self}, + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +purge_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State1), + {State3, _} = enq(3, 2, second, State2), + % get returns a reply value + {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), + ok. + +purge_with_checkout_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), + {State1, _} = enq(2, 1, <<"first">>, State0), + {State2, _} = enq(3, 2, <<"second">>, State1), + %% assert message bytes are non zero + ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0), + ?assert(State2#?RABBIT_FIFO.msg_bytes_enqueue > 0), + {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2), + ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0), + ?assertEqual(0, State3#?RABBIT_FIFO.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#?RABBIT_FIFO.ra_indexes)), + #consumer{checked_out = Checked} = maps:get(Cid, State3#?RABBIT_FIFO.consumers), + ?assertEqual(1, maps:size(Checked)), + ok. + +down_noproc_returns_checked_out_in_order_test(_) -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#?RABBIT_FIFO.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), + Returns = lqueue:to_list(S#?RABBIT_FIFO.returns), + ?assertEqual(100, length(Returns)), + ?assertEqual(0, maps:size(S#?RABBIT_FIFO.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +down_noconnection_returns_checked_out_test(_) -> + S0 = test_init(?FUNCTION_NAME), + NumMsgs = 20, + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, NumMsgs)), + ?assertEqual(NumMsgs, maps:size(S1#?RABBIT_FIFO.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers), + ?assertEqual(NumMsgs, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2), + Returns = lqueue:to_list(S#?RABBIT_FIFO.returns), + ?assertEqual(NumMsgs, length(Returns)), + ?assertMatch(#consumer{checked_out = Ch} + when map_size(Ch) == 0, + maps:get(Cid, S#?RABBIT_FIFO.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +single_active_consumer_basic_get_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)), + {State1, _} = enq(1, 1, first, State0), + {_State, {error, unsupported}} = + apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State1), + ok. + +single_active_consumer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, + + % the first registered consumer is the active one, the others are waiting + ?assertEqual(1, map_size(State1#?RABBIT_FIFO.consumers)), + ?assertMatch(#{C1 := _}, State1#?RABBIT_FIFO.consumers), + ?assertEqual(3, length(State1#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?RABBIT_FIFO.waiting_consumers)), + + % cancelling a waiting consumer + {State2, _, Effects1} = apply(meta(2), + make_checkout(C3, cancel, #{}), + State1), + % the active consumer should still be in place + ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)), + ?assertMatch(#{C1 := _}, State2#?RABBIT_FIFO.consumers), + % the cancelled consumer has been removed from waiting consumers + ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?RABBIT_FIFO.waiting_consumers)), + % there are some effects to unregister the consumer + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), + + % cancelling the active consumer + {State3, _, Effects2} = apply(meta(3), + make_checkout(C1, cancel, #{}), + State2), + % the second registered consumer is now the active one + ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)), + ?assertMatch(#{C2 := _}, State3#?RABBIT_FIFO.consumers), + % the new active consumer is no longer in the waiting list + ?assertEqual(1, length(State3#?RABBIT_FIFO.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#?RABBIT_FIFO.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % cancelling the active consumer + {State4, _, Effects3} = apply(meta(4), + make_checkout(C2, cancel, #{}), + State3), + % the last waiting consumer became the active one + ?assertEqual(1, map_size(State4#?RABBIT_FIFO.consumers)), + ?assertMatch(#{C4 := _}, State4#?RABBIT_FIFO.consumers), + % the waiting consumer list is now empty + ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), + + % cancelling the last consumer + {State5, _, Effects4} = apply(meta(5), + make_checkout(C4, cancel, #{}), + State4), + % no active consumer anymore + ?assertEqual(0, map_size(State5#?RABBIT_FIFO.consumers)), + % still nothing in the waiting list + ?assertEqual(0, length(State5#?RABBIT_FIFO.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, Consumers), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), + % effects to unregister the consumer and + % to update the new active one (metrics) are there + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#?RABBIT_FIFO.waiting_consumers)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % the last channel goes down + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#?RABBIT_FIFO.consumers)), + ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), + + ok. + +single_active_returns_messages_on_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1], + ConsumerIds = [{_, DownPid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + {State2, _} = enq(4, 1, msg1, State1), + % simulate node goes down + {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2), + %% assert the consumer is up + ?assertMatch([_], lqueue:to_list(State3#?RABBIT_FIFO.returns)), + ?assertMatch([{_, #consumer{checked_out = Checked}}] + when map_size(Checked) == 0, + State3#?RABBIT_FIFO.waiting_consumers), + + ok. + +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1a = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1a#?RABBIT_FIFO.consumers), + + {State1, _} = enq(10, 1, msg, State1a), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), + + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up, + checked_out = Ch}}] + when map_size(Ch) == 1, + maps:to_list(State2#?RABBIT_FIFO.consumers)), + + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), + + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), + + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#?RABBIT_FIFO.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#?RABBIT_FIFO.waiting_consumers)), + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) + end, State3#?RABBIT_FIFO.waiting_consumers), + ok. + +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#?RABBIT_FIFO.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#?RABBIT_FIFO.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#?RABBIT_FIFO.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#?RABBIT_FIFO.consumers)), + ok. + +single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => + rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(leader, State1), + %% 2 effects for each consumer process (channel process), 1 effect for the node, + %% 1 effect for file handle reservation + ?assertEqual(2 * 3 + 1 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(eol, State1), + %% 1 effect for each consumer process (channel process), + %% 1 effect for file handle reservation + ?assertEqual(4, length(Effects)). + +query_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#?RABBIT_FIFO.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), + State2 = State1#?RABBIT_FIFO{consumers = Consumers1}, + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)), + Consumers2 = rabbit_fifo_v0:query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State1)), + Consumers = rabbit_fifo_v0:query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], Consumers). + +active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = + apply( + #{index => 1}, + rabbit_fifo_v0:make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), + % one monitor and one consumer status update (deactivated) + ?assertEqual(3, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(5, length(Effects3)). + +single_active_cancelled_with_unacked_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 1, simple_prefetch}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% enqueue 2 messages + {State2, _Effects2} = enq(3, 1, msg1, State1), + {State3, _Effects3} = enq(4, 2, msg2, State2), + %% one should be checked ou to C1 + %% cancel C1 + {State4, _, _} = apply(meta(5), + make_checkout(C1, cancel, #{}), + State3), + %% C2 should be the active consumer + ?assertMatch(#{C2 := #consumer{status = up, + checked_out = #{0 := _}}}, + State4#?RABBIT_FIFO.consumers), + %% C1 should be a cancelled consumer + ?assertMatch(#{C1 := #consumer{status = cancelled, + lifetime = once, + checked_out = #{0 := _}}}, + State4#?RABBIT_FIFO.consumers), + ?assertMatch([], State4#?RABBIT_FIFO.waiting_consumers), + + %% Ack both messages + {State5, _Effects5} = settle(C1, 1, 0, State4), + %% C1 should now be cancelled + {State6, _Effects6} = settle(C2, 2, 0, State5), + + %% C2 should remain + ?assertMatch(#{C2 := #consumer{status = up}}, + State6#?RABBIT_FIFO.consumers), + %% C1 should be gone + ?assertNotMatch(#{C1 := _}, + State6#?RABBIT_FIFO.consumers), + ?assertMatch([], State6#?RABBIT_FIFO.waiting_consumers), + ok. + +single_active_with_credited_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 0, credited}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% add some credit + C1Cred = rabbit_fifo_v0:make_credit(C1, 5, 0, false), + {State2, _, _Effects2} = apply(meta(3), C1Cred, State1), + C2Cred = rabbit_fifo_v0:make_credit(C2, 4, 0, false), + {State3, _} = apply(meta(4), C2Cred, State2), + %% both consumers should have credit + ?assertMatch(#{C1 := #consumer{credit = 5}}, + State3#?RABBIT_FIFO.consumers), + ?assertMatch([{C2, #consumer{credit = 4}}], + State3#?RABBIT_FIFO.waiting_consumers), + ok. + +purge_nodes_test(_) -> + Node = purged@node, + ThisNode = node(), + EnqPid = test_util:fake_pid(Node), + EnqPid2 = test_util:fake_pid(node()), + ConPid = test_util:fake_pid(Node), + Cid = {<<"tag">>, ConPid}, + % WaitingPid = test_util:fake_pid(Node), + + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + single_active_consumer_on => false}), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(EnqPid, 1, msg1), + State0), + {State2, _, _} = apply(meta(2), + rabbit_fifo_v0:make_enqueue(EnqPid2, 1, msg2), + State1), + {State3, _} = check(Cid, 3, 1000, State2), + {State4, _, _} = apply(meta(4), + {down, EnqPid, noconnection}, + State3), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode, Node] + ]}] , rabbit_fifo_v0:tick(1, State4)), + %% assert there are both enqueuers and consumers + {State, _, _} = apply(meta(5), + rabbit_fifo_v0:make_purge_nodes([Node]), + State4), + + %% assert there are no enqueuers nor consumers + ?assertMatch(#?RABBIT_FIFO{enqueuers = Enqs} when map_size(Enqs) == 1, + State), + + ?assertMatch(#?RABBIT_FIFO{consumers = Cons} when map_size(Cons) == 0, + State), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode] + ]}] , rabbit_fifo_v0:tick(1, State)), + ok. + +meta(Idx) -> + #{index => Idx, term => 1, + from => {make_ref(), self()}}. + +enq(Idx, MsgSeq, Msg, State) -> + strip_reply( + apply(meta(Idx), rabbit_fifo_v0:make_enqueue(self(), MsgSeq, Msg), State)). + +deq(Idx, Cid, Settlement, State0) -> + {State, {dequeue, {MsgId, Msg}, _}, _} = + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), + {State, {MsgId, Msg}}. + +check_n(Cid, Idx, N, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + State)). + +check_auto(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, Num, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), + State)). + +settle(Cid, Idx, MsgId, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_settle(Cid, [MsgId]), State)). + +credit(Cid, Idx, Credit, DelCnt, Drain, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_credit(Cid, Credit, DelCnt, Drain), + State)). + +strip_reply({State, _, Effects}) -> + {State, Effects}. + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} + end + end, {InitState, []}, Entries). + + +%% AUX Tests + +aux_test(_) -> + _ = ra_machine_ets:start_link(), + Aux0 = init_aux(aux_test), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + ok = meck:new(ra_log, []), + Log = mock_log, + meck:expect(ra_log, last_index_term, fun (_) -> {0, 0} end), + {no_reply, Aux, mock_log} = handle_aux(leader, cast, active, Aux0, + Log, MacState), + {no_reply, _Aux, mock_log} = handle_aux(leader, cast, tick, Aux, + Log, MacState), + [X] = ets:lookup(rabbit_fifo_usage, aux_test), + meck:unload(), + ?assert(X > 0.0), + ok. + +%% Utility + +init(Conf) -> rabbit_fifo_v0:init(Conf). +apply(Meta, Entry, State) -> rabbit_fifo_v0:apply(Meta, Entry, State). +init_aux(Conf) -> rabbit_fifo_v0:init_aux(Conf). +handle_aux(S, T, C, A, L, M) -> rabbit_fifo_v0:handle_aux(S, T, C, A, L, M). +make_checkout(C, S, M) -> rabbit_fifo_v0:make_checkout(C, S, M). |
