summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl1586
-rw-r--r--src/rabbit_fifo.hrl172
2 files changed, 303 insertions, 1455 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 26842d5c49..36341259be 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -23,6 +23,7 @@
-compile({no_auto_import, [apply/3]}).
-include_lib("ra/include/ra.hrl").
+-include("rabbit_fifo.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([
@@ -64,66 +65,6 @@
make_update_config/1
]).
--type raw_msg() :: term().
-%% The raw message. It is opaque to rabbit_fifo.
-
--type msg_in_id() :: non_neg_integer().
-% a queue scoped monotonically incrementing integer used to enforce order
-% in the unassigned messages map
-
--type msg_id() :: non_neg_integer().
-%% A consumer-scoped monotonically incrementing integer included with a
-%% {@link delivery/0.}. Used to settle deliveries using
-%% {@link rabbit_fifo_client:settle/3.}
-
--type msg_seqno() :: non_neg_integer().
-%% A sender process scoped monotonically incrementing integer included
-%% in enqueue messages. Used to ensure ordering of messages send from the
-%% same process
-
--type msg_header() :: #{size := msg_size(),
- delivery_count => non_neg_integer()}.
-%% The message header map:
-%% delivery_count: the number of unsuccessful delivery attempts.
-%% A non-zero value indicates a previous attempt.
-
--type msg() :: {msg_header(), raw_msg()}.
-%% message with a header map.
-
--type msg_size() :: non_neg_integer().
-%% the size in bytes of the msg payload
-
--type indexed_msg() :: {ra_index(), msg()}.
-
--type prefix_msg() :: {'$prefix_msg', msg_header()}.
-
--type delivery_msg() :: {msg_id(), msg()}.
-%% A tuple consisting of the message id and the headered message.
-
--type consumer_tag() :: binary().
-%% An arbitrary binary tag used to distinguish between different consumers
-%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
-
--type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
-%% Represents the delivery of one or more rabbit_fifo messages.
-
--type consumer_id() :: {consumer_tag(), pid()}.
-%% The entity that receives messages. Uniquely identifies a consumer.
-
--type credit_mode() :: simple_prefetch | credited.
-%% determines how credit is replenished
-
--type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
- credit_mode()} |
- {dequeue, settled | unsettled} |
- cancel.
-
--type consumer_meta() :: #{ack => boolean(),
- username => binary(),
- prefetch => non_neg_integer(),
- args => list()}.
-%% static meta data associated with a consumer
-
%% command records representing all the protocol actions that are supported
-record(enqueue, {pid :: maybe(pid()),
seq :: maybe(msg_seqno()),
@@ -144,8 +85,6 @@
-record(purge, {}).
-record(update_config, {config :: config()}).
-
-
-opaque protocol() ::
#enqueue{} |
#checkout{} |
@@ -155,123 +94,13 @@
#credit{} |
#purge{} |
#update_config{}.
-
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types supported by ra fifo
-type client_msg() :: delivery().
%% the messages `rabbit_fifo' can send to consumers.
--type applied_mfa() :: {module(), atom(), list()}.
-% represents a partially applied module call
-
--define(RELEASE_CURSOR_EVERY, 64000).
--define(USE_AVG_HALF_LIFE, 10000.0).
-
--record(consumer,
- {meta = #{} :: consumer_meta(),
- checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
- next_msg_id = 0 :: msg_id(), % part of snapshot data
- %% max number of messages that can be sent
- %% decremented for each delivery
- credit = 0 : non_neg_integer(),
- %% total number of checked out messages - ever
- %% incremented for each delivery
- delivery_count = 0 :: non_neg_integer(),
- %% the mode of how credit is incremented
- %% simple_prefetch: credit is re-filled as deliveries are settled
- %% or returned.
- %% credited: credit can only be changed by receiving a consumer_credit
- %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
- credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
- lifetime = once :: once | auto,
- status = up :: up | suspected_down | cancelled
- }).
-
--type consumer() :: #consumer{}.
-
--record(enqueuer,
- {next_seqno = 1 :: msg_seqno(),
- % out of order enqueues - sorted list
- pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
- status = up :: up | suspected_down
- }).
-
--record(cfg,
- {name :: atom(),
- resource :: rabbit_types:r('queue'),
- release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
- dead_letter_handler :: maybe(applied_mfa()),
- become_leader_handler :: maybe(applied_mfa()),
- max_length :: maybe(non_neg_integer()),
- max_bytes :: maybe(non_neg_integer()),
- %% whether single active consumer is on or not for this queue
- consumer_strategy = default :: default | single_active,
- delivery_limit :: maybe(non_neg_integer())
- }).
-
--record(state,
- {cfg :: #cfg{},
- % unassigned messages
- messages = #{} :: #{msg_in_id() => indexed_msg()},
- % defines the lowest message in id available in the messages map
- % that isn't a return
- low_msg_num :: msg_in_id() | undefined,
- % defines the next message in id to be added to the messages map
- next_msg_num = 1 :: msg_in_id(),
- % list of returned msg_in_ids - when checking out it picks from
- % this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
- {msg_in_id(), indexed_msg()}),
- % a counter of enqueues - used to trigger shadow copy points
- enqueue_count = 0 :: non_neg_integer(),
- % a map containing all the live processes that have ever enqueued
- % a message to this queue as well as a cached value of the smallest
- % ra_index of all pending enqueues
- enqueuers = #{} :: #{pid() => #enqueuer{}},
- % master index of all enqueue raft indexes including pending
- % enqueues
- % rabbit_fifo_index can be slow when calculating the smallest
- % index when there are large gaps but should be faster than gb_trees
- % for normal appending operations as it's backed by a map
- ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
- release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
- ra_index(), state()}),
- % consumers need to reflect consumer state at time of snapshot
- % needs to be part of snapshot
- consumers = #{} :: #{consumer_id() => #consumer{}},
- % consumers that require further service are queued here
- % needs to be part of snapshot
- service_queue = queue:new() :: queue:queue(consumer_id()),
- %% This is a special field that is only used for snapshots
- %% It represents the queued messages at the time the
- %% dehydrated snapshot state was cached.
- %% As release_cursors are only emitted for raft indexes where all
- %% prior messages no longer contribute to the current state we can
- %% replace all message payloads with their sizes (to be used for
- %% overflow calculations).
- %% This is done so that consumers are still served in a deterministic
- %% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_header()],
- PrefixMsgs :: [msg_header()]},
- msg_bytes_enqueue = 0 :: non_neg_integer(),
- msg_bytes_checkout = 0 :: non_neg_integer(),
- %% waiting consumers, one is picked active consumer is cancelled or dies
- %% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
- }).
-
--opaque state() :: #state{}.
-
--type config() :: #{name := atom(),
- queue_resource := rabbit_types:r('queue'),
- dead_letter_handler => applied_mfa(),
- become_leader_handler => applied_mfa(),
- release_cursor_interval => non_neg_integer(),
- max_length => non_neg_integer(),
- max_bytes => non_neg_integer(),
- single_active_consumer_on => boolean(),
- delivery_limit => non_neg_integer()}.
+-opaque state() :: #?MODULE{}.
-export_type([protocol/0,
delivery/0,
@@ -291,7 +120,7 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_config(Conf, #state{cfg = #cfg{name = Name,
+ update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
resource = Resource}}).
update_config(Conf, State) ->
@@ -307,8 +136,8 @@ update_config(Conf, State) ->
false ->
default
end,
- Cfg = State#state.cfg,
- State#state{cfg = Cfg#cfg{release_cursor_interval = SHI,
+ Cfg = State#?MODULE.cfg,
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
dead_letter_handler = DLH,
become_leader_handler = BLH,
max_length = MaxLength,
@@ -329,7 +158,7 @@ apply(Metadata, #enqueue{pid = From, seq = Seq,
apply_enqueue(Metadata, From, Seq, RawMsg, State00);
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State) ->
+ #?MODULE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
@@ -341,7 +170,7 @@ apply(Meta,
end;
apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State0) ->
+ #?MODULE{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
@@ -352,7 +181,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
{State0, ok}
end;
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
- #state{consumers = Cons0} = State) ->
+ #?MODULE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
@@ -364,7 +193,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
@@ -376,9 +205,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#state{service_queue = ServiceQueue,
+ checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []),
- Response = {send_credit_reply, maps:size(State1#state.messages)},
+ Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
@@ -387,16 +216,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
- maps:get(ConsumerId, State1#state.consumers),
+ maps:get(ConsumerId, State1#?MODULE.consumers),
%% add the outstanding credit to the delivery count
DeliveryCount = Con#consumer.delivery_count + PostCred,
Consumers = maps:put(ConsumerId,
Con#consumer{delivery_count = DeliveryCount,
credit = 0},
- State1#state.consumers),
+ State1#?MODULE.consumers),
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
- {State1#state{consumers = Consumers},
+ {State1#?MODULE{consumers = Consumers},
%% returning a multi response with two client actions
%% for the channel to execute
{multi, [Response, {send_drained, [{CTag, Drained}]}]},
@@ -409,7 +238,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #state{consumers = Consumers} = State0) ->
+ #?MODULE{consumers = Consumers} = State0) ->
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -444,7 +273,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
- #state{ra_indexes = Indexes0,
+ #?MODULE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
@@ -454,7 +283,7 @@ apply(#{index := RaftIdx}, #purge{},
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx,
- State0#state{ra_indexes = Indexes,
+ State0#?MODULE{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
@@ -466,7 +295,7 @@ apply(#{index := RaftIdx}, #purge{},
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
@@ -505,9 +334,9 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs,
+ {State#?MODULE{consumers = Cons, enqueuers = Enqs,
waiting_consumers = WaitingConsumers}, ok, Effects2};
-apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
+apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
@@ -515,7 +344,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
{#enqueuer{pending = Pend}, Enqs} ->
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
enqueue(RIdx, RawMsg, S)
- end, State0#state{enqueuers = Enqs}, Pend);
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
error ->
State0
end,
@@ -528,7 +357,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
-apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
+apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -559,7 +388,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
- checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
+ checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
service_queue = SQ,
waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
@@ -567,25 +396,25 @@ apply(_, {nodedown, _Node}, State) ->
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
-consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = default}}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
end;
-consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = single_active}}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) ->
fun(_, _, _, _, _, Effects) ->
Effects
end.
handle_waiting_consumer_down(_Pid,
- #state{cfg = #cfg{consumer_strategy = default}} = State) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
@@ -597,19 +426,19 @@ handle_waiting_consumer_down(Pid,
% update state to have only up waiting consumers
StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
WaitingConsumers0),
- State = State0#state{waiting_consumers = StillUp},
+ State = State0#?MODULE{waiting_consumers = StillUp},
{Effects, State}.
-update_waiting_consumer_status(_Node, #state{cfg = #cfg{consumer_strategy = default}},
+update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}},
_Status) ->
[];
update_waiting_consumer_status(_Node,
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []},
_Status) ->
[];
update_waiting_consumer_status(Node,
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers},
Status) ->
[begin
@@ -623,7 +452,7 @@ update_waiting_consumer_status(Node,
Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Cons,
+state_enter(leader, #?MODULE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
@@ -644,11 +473,11 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+state_enter(recovered, #?MODULE{prefix_msgs = PrefixMsgCounts})
when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs,
+state_enter(eol, #?MODULE{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
@@ -663,7 +492,7 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #state{cfg = #cfg{name = Name,
+tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
@@ -678,7 +507,7 @@ tick(_Ts, #state{cfg = #cfg{name = Name,
handle_tick, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
-overview(#state{consumers = Cons,
+overview(#?MODULE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
msg_bytes_enqueue = EnqueueBytes,
@@ -695,7 +524,7 @@ overview(#state{consumers = Cons,
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
-get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
+get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
[{K, snd(snd(maps:get(K, Checked)))}
@@ -729,7 +558,7 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
query_messages_ready(State) ->
messages_ready(State).
-query_messages_checked_out(#state{consumers = Consumers}) ->
+query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
maps:size(C) + S
end, 0, Consumers).
@@ -737,19 +566,19 @@ query_messages_checked_out(#state{consumers = Consumers}) ->
query_messages_total(State) ->
messages_total(State).
-query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
+query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
maps:keys(maps:merge(Enqs, Cons)).
-query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
+query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#state{consumers = Consumers,
+query_consumer_count(#?MODULE{consumers = Consumers,
waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#state{consumers = Consumers,
+query_consumers(#?MODULE{consumers = Consumers,
waiting_consumers = WaitingConsumers,
cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
@@ -809,7 +638,7 @@ query_consumers(#state{consumers = Consumers,
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#state{cfg = #cfg{consumer_strategy = single_active},
+query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
@@ -823,7 +652,7 @@ query_single_active_consumer(#state{cfg = #cfg{consumer_strategy = single_active
query_single_active_consumer(_) ->
disabled.
-query_stat(#state{consumers = Consumers} = State) ->
+query_stat(#?MODULE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
-spec usage(atom()) -> float().
@@ -835,14 +664,14 @@ usage(Name) when is_atom(Name) ->
%%% Internal
-messages_ready(#state{messages = M,
+messages_ready(#?MODULE{messages = M,
prefix_msgs = {PreR, PreM},
returns = R}) ->
%% TODO: optimise to avoid length/1 call
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
-messages_total(#state{ra_indexes = I,
+messages_total(#?MODULE{ra_indexes = I,
prefix_msgs = {PreR, PreM}}) ->
rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
@@ -874,23 +703,23 @@ moving_average(Time, HalfLife, Next, Current) ->
Weight = math:exp(Time * math:log(0.5) / HalfLife),
Next * (1 - Weight) + Current * Weight.
-num_checked_out(#state{consumers = Cons}) ->
+num_checked_out(#?MODULE{consumers = Cons}) ->
lists:foldl(fun (#consumer{checked_out = C}, Acc) ->
maps:size(C) + Acc
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
%% general case, single active consumer off
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0,
Effects0, Reason) ->
@@ -905,13 +734,13 @@ cancel_consumer(ConsumerId,
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
| RemainingWaitingConsumers] = WaitingConsumers0,
- #state{service_queue = ServiceQueue} = State1,
+ #?MODULE{service_queue = ServiceQueue} = State1,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
NewActiveConsumer,
ServiceQueue),
- State = State1#state{consumers = maps:put(NewActiveConsumerId,
+ State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId,
NewActiveConsumer,
- State1#state.consumers),
+ State1#?MODULE.consumers),
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
Effects = consumer_update_active_effects(State, NewActiveConsumerId,
@@ -926,10 +755,10 @@ cancel_consumer(ConsumerId,
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#state{waiting_consumers = WaitingConsumers}, Effects}
+ {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects}
end.
-consumer_update_active_effects(#state{cfg = #cfg{resource = QName}},
+consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
ConsumerId, #consumer{meta = Meta},
Active, ActivityStatus,
Effects) ->
@@ -942,13 +771,13 @@ consumer_update_active_effects(#state{cfg = #cfg{resource = QName}},
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
{Consumer, Cons1} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
Effects0, Reason),
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
- case maps:size(S#state.consumers) of
+ case maps:size(S#?MODULE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
_ ->
@@ -960,7 +789,7 @@ cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
end.
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
- #state{consumers = C0,
+ #?MODULE{consumers = C0,
service_queue = SQ0} = S0, Effects0, Reason) ->
case Reason of
consumer_cancel ->
@@ -970,10 +799,10 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
credit = 0,
status = cancelled},
C0, SQ0, Effects0),
- {S0#state{consumers = Cons, service_queue = SQ}, Effects1};
+ {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1};
down ->
{S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer),
- {S1#state{consumers = Cons1}, Effects1}
+ {S1#?MODULE{consumers = Cons1}, Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -986,13 +815,13 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
{State, ok, Effects}
end.
-drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
{FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
Bytes = message_size(Msg),
- State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
+ State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}),
State, Effects0),
{State, Effects};
@@ -1003,35 +832,35 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
{State0, Effects0}
end.
-enqueue(RaftIdx, RawMsg, #state{messages = Messages,
+enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
Size = message_size(RawMsg),
Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
State = add_bytes_enqueue(Size, State0),
- State#state{messages = Messages#{NextMsgNum => Msg},
+ State#?MODULE{messages = Messages#{NextMsgNum => Msg},
% this is probably only done to record it when low_msg_num
% is undefined
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
- #state{ra_indexes = Indexes0} = State0) ->
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
State = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
- State#state{ra_indexes = Indexes}.
+ State#?MODULE{ra_indexes = Indexes}.
-incr_enqueue_count(#state{enqueue_count = C,
+incr_enqueue_count(#?MODULE{enqueue_count = C,
cfg = #cfg{release_cursor_interval = C}} = State0) ->
% this will trigger a dehydrated version of the state to be stored
% at this raft index for potential future snapshot generation
- State0#state{enqueue_count = 0};
-incr_enqueue_count(#state{enqueue_count = C} = State) ->
- State#state{enqueue_count = C + 1}.
+ State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
+ State#?MODULE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #state{ra_indexes = Indexes,
+ #?MODULE{ra_indexes = Indexes,
enqueue_count = 0,
release_cursors = Cursors} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
@@ -1041,7 +870,7 @@ maybe_store_dehydrated_state(RaftIdx,
true ->
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
- State#state{release_cursors = lqueue:in(Cursor, Cursors)}
+ State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)}
end;
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1053,18 +882,18 @@ enqueue_pending(From,
State = enqueue(RaftIdx, RawMsg, State0),
Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
enqueue_pending(From, Enq, State);
-enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
- State#state{enqueuers = Enqueuers0#{From => Enq}}.
+enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
+ State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
State = enqueue(RaftIdx, RawMsg, State0),
{ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
- #state{enqueuers = Enqueuers0} = State0) ->
+ #?MODULE{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
undefined ->
- State1 = State0#state{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
{ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
RawMsg, Effects0, State1),
{ok, State, [{monitor, process, From} | Effects]};
@@ -1080,7 +909,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
% out of order delivery
Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
- {ok, State0#state{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
#enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
% duplicate delivery - remove the raft index from the ra_indexes
% map as it was added earlier
@@ -1091,7 +920,7 @@ snd(T) ->
element(2, T).
return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
- Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
+ Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
@@ -1104,14 +933,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
return_one(MsgNum, Msg, S0, E0,
ConsumerId, Con)
end, {State0, Effects1}, MsgNumMsgs),
- checkout(Meta, State1#state{consumers = Cons,
+ checkout(Meta, State1#?MODULE{consumers = Cons,
service_queue = SQ},
Effects2).
% used to processes messages that are finished
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
- #state{consumers = Cons0, service_queue = SQ0,
+ #?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
@@ -1121,7 +950,7 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
- {State0#state{consumers = Cons,
+ {State0#?MODULE{consumers = Cons,
ra_indexes = Indexes,
service_queue = SQ}, Effects}.
@@ -1158,11 +987,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
- #state{cfg = #cfg{dead_letter_handler = undefined}},
+ #?MODULE{cfg = #cfg{dead_letter_handler = undefined}},
Effects) ->
Effects;
dead_letter_effects(Reason, Discarded,
- #state{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
Effects) ->
DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
[{Reason, Msg} | Acc]
@@ -1170,12 +999,12 @@ dead_letter_effects(Reason, Discarded,
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
cancel_consumer_effects(ConsumerId,
- #state{cfg = #cfg{resource = QName}}, Effects) ->
+ #?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
update_smallest_raft_index(IncomingRaftIdx,
- #state{ra_indexes = Indexes,
+ #?MODULE{ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
@@ -1183,19 +1012,19 @@ update_smallest_raft_index(IncomingRaftIdx,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#state{release_cursors = lqueue:new()},
+ State = State0#?MODULE{release_cursors = lqueue:new()},
{State, ok,
[{release_cursor, IncomingRaftIdx, State} | Effects]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#state{release_cursors = Cursors},
+ {State0#?MODULE{release_cursors = Cursors},
ok, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor we've passed the smallest
%% release cursor available.
- {State0#state{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, ok,
[Cursor | Effects]}
end
end.
@@ -1213,7 +1042,7 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
end.
return_one(0, {'$prefix_msg', Header0},
- #state{returns = Returns,
+ #?MODULE{returns = Returns,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0,
ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
@@ -1229,11 +1058,11 @@ return_one(0, {'$prefix_msg', Header0},
_ ->
%% this should not affect the release cursor in any way
{add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)}),
+ State0#?MODULE{returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{returns = Returns,
+ #?MODULE{returns = Returns,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
@@ -1252,7 +1081,7 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
_ ->
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
- State0#state{returns =
+ State0#?MODULE{returns =
lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
@@ -1272,7 +1101,7 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, #{}),
- case evaluate_limit(State0#state.ra_indexes, false,
+ case evaluate_limit(State0#?MODULE.ra_indexes, false,
State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
@@ -1297,7 +1126,7 @@ checkout0({Activity, State0}, Effects0, Acc) ->
{State0, ok, lists:reverse(Effects1)}.
evaluate_limit(_OldIndexes, Result,
- #state{cfg = #cfg{max_length = undefined,
+ #?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
@@ -1327,11 +1156,11 @@ append_send_msg_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#state{prefix_msgs = {[Header | Rem], P}} = State) ->
+take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
- State#state{prefix_msgs = {Rem, P}}};
-take_next_msg(#state{returns = Returns,
+ State#?MODULE{prefix_msgs = {Rem, P}}};
+take_next_msg(#?MODULE{returns = Returns,
low_msg_num = Low0,
messages = Messages0,
prefix_msgs = {R, P}} = State) ->
@@ -1340,7 +1169,7 @@ take_next_msg(#state{returns = Returns,
case lqueue:peek(Returns) of
{value, NextMsg} ->
{NextMsg,
- State#state{returns = lqueue:drop(Returns)}};
+ State#?MODULE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
case Low0 of
undefined ->
@@ -1350,11 +1179,11 @@ take_next_msg(#state{returns = Returns,
case maps:size(Messages) of
0 ->
{{Low0, Msg},
- State#state{messages = Messages,
+ State#?MODULE{messages = Messages,
low_msg_num = undefined}};
_ ->
{{Low0, Msg},
- State#state{messages = Messages,
+ State#?MODULE{messages = Messages,
low_msg_num = Low0 + 1}}
end
end;
@@ -1362,13 +1191,13 @@ take_next_msg(#state{returns = Returns,
[Header | Rem] = P,
%% There are prefix msgs
{{'$prefix_msg', Header},
- State#state{prefix_msgs = {R, Rem}}}
+ State#?MODULE{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
-checkout_one(#state{service_queue = SQ0,
+checkout_one(#?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
@@ -1383,11 +1212,11 @@ checkout_one(#state{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#state{service_queue = SQ1});
+ checkout_one(InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1400,7 +1229,7 @@ checkout_one(#state{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
- State1 = State0#state{service_queue = SQ,
+ State1 = State0#?MODULE{service_queue = SQ,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
@@ -1414,7 +1243,7 @@ checkout_one(#state{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#state{service_queue = SQ1})
+ checkout_one(InitState#?MODULE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
@@ -1464,27 +1293,27 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #state{cfg = #cfg{consumer_strategy = default}} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{cfg = #cfg{consumer_strategy = single_active},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
- State0#state{waiting_consumers = WaitingConsumers1}.
+ State0#?MODULE{waiting_consumers = WaitingConsumers1}.
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumers = Cons0,
+ #?MODULE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
@@ -1500,7 +1329,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
- State0#state{consumers = Cons, service_queue = ServiceQueue}.
+ State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0) ->
@@ -1515,7 +1344,7 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#state{messages = Messages,
+dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
@@ -1532,7 +1361,7 @@ dehydrate_state(#state{messages = Messages,
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
- State#state{messages = #{},
+ State#?MODULE{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
low_msg_num = undefined,
@@ -1552,13 +1381,13 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#state{release_cursors = Cursors} = State) ->
- State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?MODULE{release_cursors = Cursors} = State) ->
+ State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
-is_over_limit(#state{cfg = #cfg{max_length = undefined,
+is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
false;
-is_over_limit(#state{cfg = #cfg{max_length = MaxLength,
+is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
@@ -1600,26 +1429,26 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
- State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
- State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
+add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout + Bytes,
+ State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
+add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout - Bytes}.
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
+add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
- State#state{msg_bytes_checkout = Checkout - Bytes,
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
message_size(#basic_message{content = Content}) ->
@@ -1633,7 +1462,7 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
-suspected_pids_for(Node, #state{consumers = Cons0,
+suspected_pids_for(Node, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
@@ -1652,1156 +1481,3 @@ suspected_pids_for(Node, #state{consumers = Cons0,
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
--define(ASSERT_EFF(EfxPat, Effects),
- ?ASSERT_EFF(EfxPat, true, Effects)).
-
--define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
-
--define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
-
--define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
-
-test_init(Name) ->
- init(#{name => Name,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(Name, utf8)),
- release_cursor_interval => 0}).
-
-% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo"
-
-enq_enq_checkout_test() ->
- Cid = {<<"enq_enq_checkout_test">>, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {_State3, _, Effects} =
- apply(meta(3),
- make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
- State2),
- ?ASSERT_EFF({monitor, _, _}, Effects),
- ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
- ok.
-
-credit_enq_enq_checkout_settled_credit_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {State3, _, Effects} =
- apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2),
- ?ASSERT_EFF({monitor, _, _}, Effects),
- Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
- (_) -> false
- end, Effects),
- ?assertEqual(1, length(Deliveries)),
- %% settle the delivery this should _not_ result in further messages being
- %% delivered
- {State4, SettledEffects} = settle(Cid, 4, 1, State3),
- ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
- true;
- (_) -> false
- end, SettledEffects)),
- %% granting credit (3) should deliver the second msg if the receivers
- %% delivery count is (1)
- {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4),
- % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]),
- ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects),
- {_State6, FinalEffects} = enq(6, 3, third, State5),
- ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
- true;
- (_) -> false
- end, FinalEffects)),
- ok.
-
-credit_with_drained_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State0 = test_init(test),
- %% checkout with a single credit
- {State1, _, _} =
- apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
- State0),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
- delivery_count = 0}}},
- State1),
- {State, Result, _} =
- apply(meta(3), make_credit(Cid, 0, 5, true), State1),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
- delivery_count = 5}}},
- State),
- ?assertEqual({multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 5}]}]},
- Result),
- ok.
-
-credit_and_drain_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- %% checkout without any initial credit (like AMQP 1.0 would)
- {State3, _, CheckEffs} =
- apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
- State2),
-
- ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
- {State4, {multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 2}]}]},
- Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3),
- ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
- delivery_count = 4}}},
- State4),
-
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
- {_, {_, second}}]}, _}, Effects),
- {_State5, EnqEffs} = enq(5, 2, third, State4),
- ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
- ok.
-
-
-
-enq_enq_deq_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- % get returns a reply value
- NumReady = 1,
- {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
- apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State2),
- ok.
-
-enq_enq_deq_deq_settle_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- % get returns a reply value
- {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
- apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State2),
- {_State4, {dequeue, empty}} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
- State3),
- ok.
-
-enq_enq_checkout_get_settled_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- % get returns a reply value
- {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- State1),
- ok.
-
-checkout_get_empty_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State = test_init(test),
- {_State2, {dequeue, empty}} =
- apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State),
- ok.
-
-untracked_enq_deq_test() ->
- Cid = {?FUNCTION_NAME, self()},
- State0 = test_init(test),
- {State1, _, _} = apply(meta(1),
- make_enqueue(undefined, undefined, first),
- State0),
- {_State2, {dequeue, {0, {_, first}}, _}, _} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
- ok.
-
-release_cursor_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _} = enq(2, 2, second, State1),
- {State3, _} = check(Cid, 3, 10, State2),
- % no release cursor effect at this point
- {State4, _} = settle(Cid, 4, 1, State3),
- {_Final, Effects1} = settle(Cid, 5, 0, State4),
- % empty queue forwards release cursor all the way
- ?ASSERT_EFF({release_cursor, 5, _}, Effects1),
- ok.
-
-checkout_enq_settle_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
- {State2, Effects0} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _,
- {delivery, ?FUNCTION_NAME,
- [{0, {_, first}}]}, _},
- Effects0),
- {State3, [_Inactive]} = enq(3, 2, second, State2),
- {_, _Effects} = settle(Cid, 4, 0, State3),
- % the release cursor is the smallest raft index that does not
- % contribute to the state of the application
- % ?ASSERT_EFF({release_cursor, 2, _}, Effects),
- ok.
-
-out_of_order_enqueue_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
- {State2, Effects2} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
- % assert monitor was set up
- ?ASSERT_EFF({monitor, _, _}, Effects2),
- % enqueue seq num 3 and 4 before 2
- {State3, Effects3} = enq(3, 3, third, State2),
- ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3),
- {State4, Effects4} = enq(4, 4, fourth, State3),
- % assert no further deliveries where made
- ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4),
- {_State5, Effects5} = enq(5, 2, second, State4),
- % assert two deliveries were now made
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}},
- {_, {_, third}},
- {_, {_, fourth}}]}, _},
- Effects5),
- ok.
-
-out_of_order_first_enqueue_test() ->
- Cid = {?FUNCTION_NAME, self()},
- {State1, _} = check_n(Cid, 5, 5, test_init(test)),
- {_State2, Effects2} = enq(2, 10, first, State1),
- ?ASSERT_EFF({monitor, process, _}, Effects2),
- ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _},
- Effects2),
- ok.
-
-duplicate_enqueue_test() ->
- Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
- {State2, Effects2} = enq(2, 1, first, State1),
- ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
- {_State3, Effects3} = enq(3, 1, first, State2),
- ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
- ok.
-
-return_non_existent_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
- % return non-existent
- {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0),
- ok.
-
-return_checked_out_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active} | _ ]} = check_auto(Cid, 2, State0),
- % returning immediately checks out the same message again
- {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
- {aux, active}]} =
- apply(meta(3), make_return(Cid, [MsgId]), State1),
- ok.
-
-return_checked_out_limit_test() ->
- Cid = {<<"cid">>, self()},
- Init = init(#{name => test,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(test, utf8)),
- release_cursor_interval => 0,
- delivery_limit => 1}),
- {State0, [_, _]} = enq(1, 1, first, Init),
- {State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active} | _ ]} = check_auto(Cid, 2, State0),
- % returning immediately checks out the same message again
- {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
- {aux, active}]} =
- apply(meta(3), make_return(Cid, [MsgId]), State1),
- {#state{ra_indexes = RaIdxs}, ok, []} =
- apply(meta(4), make_return(Cid, [MsgId2]), State2),
- ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
- ok.
-
-return_auto_checked_out_test() ->
- Cid = {<<"cid">>, self()},
- {State00, [_, _]} = enq(1, 1, first, test_init(test)),
- {State0, [_]} = enq(2, 2, second, State00),
- % it first active then inactive as the consumer took on but cannot take
- % any more
- {State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
- {aux, active},
- {aux, inactive}
- ]} = check_auto(Cid, 2, State0),
- % return should include another delivery
- {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
- Effects),
- ok.
-
-cancelled_checkout_out_test() ->
- Cid = {<<"cid">>, self()},
- {State00, [_, _]} = enq(1, 1, first, test_init(test)),
- {State0, [_]} = enq(2, 2, second, State00),
- {State1, _} = check_auto(Cid, 2, State0),
- % cancelled checkout should not return pending messages to queue
- {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#state.messages)),
- ?assertEqual(0, lqueue:len(State2#state.returns)),
-
- {State3, {dequeue, empty}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- %% settle
- {State4, ok, _} =
- apply(meta(4), make_settle(Cid, [0]), State3),
-
- {_State, {dequeue, {_, {_, second}}, _}, _} =
- apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4),
- ok.
-
-down_with_noproc_consumer_returns_unsettled_test() ->
- Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
- {State0, [_, _]} = enq(1, 1, second, test_init(test)),
- {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
- {_State, Effects} = check(Cid, 4, State2),
- ?ASSERT_EFF({monitor, process, _}, Effects),
- ok.
-
-down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
- Pid = spawn(fun() -> ok end),
- Cid = {<<"down_with_noconnect">>, Pid},
- Self = self(),
- Node = node(Pid),
- {State0, Effects0} = enq(1, 1, second, test_init(test)),
- ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
- {State1, Effects1} = check_auto(Cid, 2, State0),
- #consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
- ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
- % monitor both enqueuer and consumer
- % because we received a noconnection we now need to monitor the node
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
- %% validate consumer has credit
- {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
- ?ASSERT_EFF({monitor, node, _}, Effects2),
- ?assertNoEffect({demonitor, process, _}, Effects2),
- % when the node comes up we need to retry the process monitors for the
- % disconnected processes
- {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
- % try to re-monitor the suspect processes
- ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
- ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
- ok.
-
-down_with_noconnection_returns_unack_test() ->
- Pid = spawn(fun() -> ok end),
- Cid = {<<"down_with_noconnect">>, Pid},
- {State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#state.messages)),
- ?assertEqual(0, lqueue:len(State0#state.returns)),
- {State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#state.messages)),
- ?assertEqual(0, lqueue:len(State1#state.returns)),
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#state.messages)),
- ?assertEqual(1, lqueue:len(State2a#state.returns)),
- ok.
-
-down_with_noproc_enqueuer_is_cleaned_up_test() ->
- State00 = test_init(test),
- Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
- ?ASSERT_EFF({monitor, process, _}, Effects0),
- {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
- % ensure there are no enqueuers
- ?assert(0 =:= maps:size(State1#state.enqueuers)),
- ok.
-
-discarded_message_without_dead_letter_handler_is_removed_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, Effects1} = check_n(Cid, 2, 10, State0),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects1),
- {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
- ?assertNoEffect({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects2),
- ok.
-
-discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- State00 = init(#{name => test,
- queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
- dead_letter_handler =>
- {somemod, somefun, [somearg]}}),
- {State0, [_, _]} = enq(1, 1, first, State00),
- {State1, Effects1} = check_n(Cid, 2, 10, State0),
- ?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
- Effects1),
- {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
- % assert mod call effect with appended reason and message
- ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
- Effects2),
- ok.
-
-tick_test() ->
- Cid = {<<"c">>, self()},
- Cid2 = {<<"c2">>, self()},
- {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
- {S1, _} = enq(2, 2, <<"snd">>, S0),
- {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
- {S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3),
-
- [{mod_call, _, _,
- [#resource{},
- {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
- ok.
-
-enq_deq_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, settled}, #{}),
- make_enqueue(self(), 3, three),
- make_enqueue(self(), 4, four),
- make_checkout(Cid, {dequeue, settled}, #{}),
- make_enqueue(self(), 5, five),
- make_checkout(Cid, {dequeue, settled}, #{})
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_settle_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- % OthPid = spawn(fun () -> ok end),
- % Oth = {<<"oth">>, OthPid},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Cid, [0])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_settle_snapshot_recover_2_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
- Commands = [
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, two),
- make_checkout(Cid, {dequeue, unsettled}, #{}),
- make_settle(Oth, [0])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_enqueue(self(), 3, three),
- make_purge()
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_deq_return_settle_snapshot_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one), %% to Cid
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_return(Cid, [0]), %% should be re-delivered to Cid
- make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
- make_settle(Cid, [1]),
- make_settle(Cid, [2])
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-return_prefix_msg_count_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one),
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_checkout(Cid, cancel, #{}),
- make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2
- ],
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
- ok.
-
-
-return_settle_snapshot_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_enqueue(self(), 1, one), %% to Cid
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- make_return(Cid, [0]), %% should be re-delivered to Oth
- make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
- make_settle(Cid, [1]),
- make_return(Cid, [2]),
- make_settle(Cid, [3]),
- make_enqueue(self(), 3, three),
- make_purge(),
- make_enqueue(self(), 4, four)
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_snapshot_recover_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_settle(Cid, [1]),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, three),
- make_settle(Cid, [2])
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_snapshot_purge_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch},#{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_settle(Cid, [1]),
- make_settle(Cid, [0]),
- make_enqueue(self(), 3, three),
- make_purge()
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-enq_check_settle_duplicate_test() ->
- %% duplicate settle commands are likely
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one), %% 0
- make_enqueue(self(), 2, two), %% 0
- make_settle(Cid, [0]),
- make_settle(Cid, [1]),
- make_settle(Cid, [1]),
- make_enqueue(self(), 3, three),
- make_settle(Cid, [2])
- ],
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
- run_snapshot_test(?FUNCTION_NAME, Commands).
-
-run_snapshot_test(Name, Commands) ->
- %% create every incremental permutation of the commands lists
- %% and run the snapshot tests against that
- [begin
- run_snapshot_test0(Name, C)
- end || C <- prefixes(Commands, 1, [])].
-
-run_snapshot_test0(Name, Commands) ->
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
-
- [begin
- Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
- (_) -> false
- end, Entries),
- {S, _} = run_log(SnapState, Filtered),
- % assert log can be restored from any release cursor index
- ?assertEqual(normalize(State), normalize(S))
- end || {release_cursor, SnapIdx, SnapState} <- Effects],
- ok.
-
-prefixes(Source, N, Acc) when N > length(Source) ->
- lists:reverse(Acc);
-prefixes(Source, N, Acc) ->
- {X, _} = lists:split(N, Source),
- prefixes(Source, N+1, [X | Acc]).
-
-delivery_query_returns_deliveries_test() ->
- Tag = atom_to_binary(?FUNCTION_NAME, utf8),
- Cid = {Tag, self()},
- Commands = [
- make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
- make_enqueue(self(), 1, one),
- make_enqueue(self(), 2, two),
- make_enqueue(self(), 3, tre),
- make_enqueue(self(), 4, for)
- ],
- Indexes = lists:seq(1, length(Commands)),
- Entries = lists:zip(Indexes, Commands),
- {State, _Effects} = run_log(test_init(help), Entries),
- % 3 deliveries are returned
- [{0, {#{}, one}}] = get_checked_out(Cid, 0, 0, State),
- [_, _, _] = get_checked_out(Cid, 1, 3, State),
- ok.
-
-pending_enqueue_is_enqueued_on_down_test() ->
- Cid = {<<"cid">>, self()},
- Pid = self(),
- {State0, _} = enq(1, 2, first, test_init(test)),
- {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
- {_State2, {dequeue, {0, {_, first}}, 0}, _} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
- ok.
-
-duplicate_delivery_test() ->
- {State0, _} = enq(1, 1, first, test_init(test)),
- {#state{ra_indexes = RaIdxs,
- messages = Messages}, _} = enq(2, 1, first, State0),
- ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
- ?assertEqual(1, maps:size(Messages)),
- ok.
-
-state_enter_test() ->
- S0 = init(#{name => the_name,
- queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
- become_leader_handler => {m, f, [a]}}),
- [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
- ok.
-
-state_enter_monitors_and_notifications_test() ->
- Oth = spawn(fun () -> ok end),
- {State0, _} = enq(1, 1, first, test_init(test)),
- Cid = {<<"adf">>, self()},
- OthCid = {<<"oth">>, Oth},
- {State1, _} = check(Cid, 2, State0),
- {State, _} = check(OthCid, 3, State1),
- Self = self(),
- Effects = state_enter(leader, State),
-
- %% monitor all enqueuers and consumers
- [{monitor, process, Self},
- {monitor, process, Oth}] =
- lists:filter(fun ({monitor, process, _}) -> true;
- (_) -> false
- end, Effects),
- [{send_msg, Self, leader_change, ra_event},
- {send_msg, Oth, leader_change, ra_event}] =
- lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
- (_) -> false
- end, Effects),
- ?ASSERT_EFF({monitor, process, _}, Effects),
- ok.
-
-purge_test() ->
- Cid = {<<"purge_test">>, self()},
- {State1, _} = enq(1, 1, first, test_init(test)),
- {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
- {State3, _} = enq(3, 2, second, State2),
- % get returns a reply value
- {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
- ok.
-
-purge_with_checkout_test() ->
- Cid = {<<"purge_test">>, self()},
- {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
- {State1, _} = enq(2, 1, <<"first">>, State0),
- {State2, _} = enq(3, 2, <<"second">>, State1),
- %% assert message bytes are non zero
- ?assert(State2#state.msg_bytes_checkout > 0),
- ?assert(State2#state.msg_bytes_enqueue > 0),
- {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2),
- ?assert(State2#state.msg_bytes_checkout > 0),
- ?assertEqual(0, State3#state.msg_bytes_enqueue),
- ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)),
- #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
- ?assertEqual(1, maps:size(Checked)),
- ok.
-
-down_returns_checked_out_in_order_test() ->
- S0 = test_init(?FUNCTION_NAME),
- %% enqueue 100
- S1 = lists:foldl(fun (Num, FS0) ->
- {FS, _} = enq(Num, Num, Num, FS0),
- FS
- end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#state.messages)),
- Cid = {<<"cid">>, self()},
- {S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
- ?assertEqual(100, maps:size(Checked)),
- %% simulate down
- {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
- Returns = lqueue:to_list(S#state.returns),
- ?assertEqual(100, length(Returns)),
- %% validate returns are in order
- ?assertEqual(lists:sort(Returns), Returns),
- ok.
-
-single_active_consumer_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#state.cfg#cfg.consumer_strategy),
- ?assertEqual(0, map_size(State0#state.consumers)),
-
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- meta(1),
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- % the first registered consumer is the active one, the others are waiting
- ?assertEqual(1, map_size(State1#state.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
- ?assertEqual(3, length(State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
-
- % cancelling a waiting consumer
- {State2, _, Effects1} = apply(meta(2),
- #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
- % the active consumer should still be in place
- ?assertEqual(1, map_size(State2#state.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
- % the cancelled consumer has been removed from waiting consumers
- ?assertEqual(2, length(State2#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects1)),
-
- % cancelling the active consumer
- {State3, _, Effects2} = apply(meta(3),
- #checkout{spec = cancel,
- consumer_id = {<<"ctag1">>, self()}},
- State2),
- % the second registered consumer is now the active one
- ?assertEqual(1, map_size(State3#state.consumers)),
- ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
- % the new active consumer is no longer in the waiting list
- ?assertEqual(1, length(State3#state.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects2)),
-
- % cancelling the active consumer
- {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
- % the last waiting consumer became the active one
- ?assertEqual(1, map_size(State4#state.consumers)),
- ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
- % the waiting consumer list is now empty
- ?assertEqual(0, length(State4#state.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects3)),
-
- % cancelling the last consumer
- {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
- % no active consumer anymore
- ?assertEqual(0, map_size(State5#state.consumers)),
- % still nothing in the waiting list
- ?assertEqual(0, length(State5#state.waiting_consumers)),
- % there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects4)),
-
- ok.
-
-single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- % the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
- % fell back to another consumer
- ?assertEqual(1, map_size(State2#state.consumers)),
- % there are still waiting consumers
- ?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and
- % to update the new active one (metrics) are there
- ?assertEqual(2, length(Effects)),
-
- % the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
- % fell back to another consumer
- ?assertEqual(1, map_size(State3#state.consumers)),
- % no more waiting consumer
- ?assertEqual(0, length(State3#state.waiting_consumers)),
- % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
- ?assertEqual(3, length(Effects2)),
-
- % the last channel goes down
- {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
- % no more consumers
- ?assertEqual(0, map_size(State4#state.consumers)),
- ?assertEqual(0, length(State4#state.waiting_consumers)),
- % there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects3)),
-
- ok.
-
-single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- % simulate node goes down
- {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
-
- % all the waiting consumers should be suspected down
- ?assertEqual(3, length(State2#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status == suspected_down)
- end, State2#state.waiting_consumers),
-
- % simulate node goes back up
- {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
-
- % all the waiting consumers should be un-suspected
- ?assertEqual(3, length(State3#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status /= suspected_down)
- end, State3#state.waiting_consumers),
-
- ok.
-
-single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- Effects = state_enter(leader, State1),
- % 2 effects for each consumer process (channel process), 1 effect for the node
- ?assertEqual(2 * 3 + 1, length(Effects)).
-
-single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- Effects = state_enter(eol, State1),
- % 1 effect for each consumer process (channel process)
- ?assertEqual(3, length(Effects)).
-
-query_consumers_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => false}),
-
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
- Consumers0 = State1#state.consumers,
- Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
- Consumers1 = maps:put({<<"ctag2">>, self()},
- Consumer#consumer{status = suspected_down}, Consumers0),
- State2 = State1#state{consumers = Consumers1},
-
- ?assertEqual(4, query_consumer_count(State2)),
- Consumers2 = query_consumers(State2),
- ?assertEqual(4, maps:size(Consumers2)),
- maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
- ?assertEqual(self(), Pid),
- case Tag of
- <<"ctag2">> ->
- ?assertNot(Active),
- ?assertEqual(suspected_down, ActivityStatus);
- _ ->
- ?assert(Active),
- ?assertEqual(up, ActivityStatus)
- end
- end, [], Consumers2).
-
-query_consumers_when_single_active_consumer_is_on_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
- Meta = #{index => 1},
- % adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, self()}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
-
- ?assertEqual(4, query_consumer_count(State1)),
- Consumers = query_consumers(State1),
- ?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
- ?assertEqual(self(), Pid),
- case Tag of
- <<"ctag1">> ->
- ?assert(Active),
- ?assertEqual(single_active, ActivityStatus);
- _ ->
- ?assertNot(Active),
- ?assertEqual(waiting, ActivityStatus)
- end
- end, [], Consumers).
-
-active_flag_updated_when_consumer_suspected_unsuspected_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => false}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
- ?assertEqual(4 + 1, length(Effects2)),
-
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
- % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
- ?assertEqual(4 + 4, length(Effects3)).
-
-active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() ->
- State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
- DummyFunction = fun() -> ok end,
- Pid1 = spawn(DummyFunction),
- Pid2 = spawn(DummyFunction),
- Pid3 = spawn(DummyFunction),
-
- % adding some consumers
- AddConsumer = fun({CTag, ChannelId}, State) ->
- {NewState, _, _} = apply(
- #{index => 1},
- #checkout{spec = {once, 1, simple_prefetch},
- meta = #{},
- consumer_id = {CTag, ChannelId}},
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
-
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % only 1 effect to monitor the node
- ?assertEqual(1, length(Effects2)),
-
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
- % for each consumer: 1 effect to monitor the consumer PID
- ?assertEqual(4, length(Effects3)).
-
-meta(Idx) ->
- #{index => Idx, term => 1}.
-
-enq(Idx, MsgSeq, Msg, State) ->
- strip_reply(
- apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
-
-deq(Idx, Cid, Settlement, State0) ->
- {State, {dequeue, {MsgId, Msg}, _}, _} =
- apply(meta(Idx),
- make_checkout(Cid, {dequeue, Settlement}, #{}),
- State0),
- {State, {MsgId, Msg}}.
-
-check_n(Cid, Idx, N, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
- State)).
-
-check(Cid, Idx, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
- State)).
-
-check_auto(Cid, Idx, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- State)).
-
-check(Cid, Idx, Num, State) ->
- strip_reply(
- apply(meta(Idx),
- make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
- State)).
-
-settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)).
-
-credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
- strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
- State)).
-
-strip_reply({State, _, Effects}) ->
- {State, Effects}.
-
-run_log(InitState, Entries) ->
- lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
- case apply(meta(Idx), E, Acc0) of
- {Acc, _, Efx} when is_list(Efx) ->
- {Acc, Efx0 ++ Efx};
- {Acc, _, Efx} ->
- {Acc, Efx0 ++ [Efx]};
- {Acc, _} ->
- {Acc, Efx0}
- end
- end, {InitState, []}, Entries).
-
-
-%% AUX Tests
-
-aux_test() ->
- _ = ra_machine_ets:start_link(),
- Aux0 = init_aux(aux_test),
- MacState = init(#{name => aux_test,
- queue_resource =>
- rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
- Log = undefined,
- {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
- Log, MacState),
- {no_reply, _Aux, undefined} = handle_aux(leader, cast, emit, Aux,
- Log, MacState),
- [X] = ets:lookup(rabbit_fifo_usage, aux_test),
- ?assert(X > 0.0),
- ok.
-
-
--endif.
-
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
new file mode 100644
index 0000000000..42fc5d0b10
--- /dev/null
+++ b/src/rabbit_fifo.hrl
@@ -0,0 +1,172 @@
+
+-type raw_msg() :: term().
+%% The raw message. It is opaque to rabbit_fifo.
+
+-type msg_in_id() :: non_neg_integer().
+% a queue scoped monotonically incrementing integer used to enforce order
+% in the unassigned messages map
+
+-type msg_id() :: non_neg_integer().
+%% A consumer-scoped monotonically incrementing integer included with a
+%% {@link delivery/0.}. Used to settle deliveries using
+%% {@link rabbit_fifo_client:settle/3.}
+
+-type msg_seqno() :: non_neg_integer().
+%% A sender process scoped monotonically incrementing integer included
+%% in enqueue messages. Used to ensure ordering of messages send from the
+%% same process
+
+-type msg_header() :: #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
+%% The message header map:
+%% delivery_count: the number of unsuccessful delivery attempts.
+%% A non-zero value indicates a previous attempt.
+
+-type msg() :: {msg_header(), raw_msg()}.
+%% message with a header map.
+
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
+-type indexed_msg() :: {ra_index(), msg()}.
+
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
+
+-type delivery_msg() :: {msg_id(), msg()}.
+%% A tuple consisting of the message id and the headered message.
+
+-type consumer_tag() :: binary().
+%% An arbitrary binary tag used to distinguish between different consumers
+%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
+
+-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
+%% Represents the delivery of one or more rabbit_fifo messages.
+
+-type consumer_id() :: {consumer_tag(), pid()}.
+%% The entity that receives messages. Uniquely identifies a consumer.
+
+-type credit_mode() :: simple_prefetch | credited.
+%% determines how credit is replenished
+
+-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
+ credit_mode()} |
+ {dequeue, settled | unsettled} |
+ cancel.
+
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+
+-type applied_mfa() :: {module(), atom(), list()}.
+% represents a partially applied module call
+
+-define(RELEASE_CURSOR_EVERY, 64000).
+-define(USE_AVG_HALF_LIFE, 10000.0).
+
+-record(consumer,
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ next_msg_id = 0 :: msg_id(), % part of snapshot data
+ %% max number of messages that can be sent
+ %% decremented for each delivery
+ credit = 0 : non_neg_integer(),
+ %% total number of checked out messages - ever
+ %% incremented for each delivery
+ delivery_count = 0 :: non_neg_integer(),
+ %% the mode of how credit is incremented
+ %% simple_prefetch: credit is re-filled as deliveries are settled
+ %% or returned.
+ %% credited: credit can only be changed by receiving a consumer_credit
+ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
+ credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
+ lifetime = once :: once | auto,
+ status = up :: up | suspected_down | cancelled
+ }).
+
+-type consumer() :: #consumer{}.
+
+-record(enqueuer,
+ {next_seqno = 1 :: msg_seqno(),
+ % out of order enqueues - sorted list
+ pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
+ status = up :: up | suspected_down
+ }).
+
+-record(cfg,
+ {name :: atom(),
+ resource :: rabbit_types:r('queue'),
+ release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
+ dead_letter_handler :: maybe(applied_mfa()),
+ become_leader_handler :: maybe(applied_mfa()),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = default :: default | single_active,
+ delivery_limit :: maybe(non_neg_integer())
+ }).
+
+-record(rabbit_fifo,
+ {cfg :: #cfg{},
+ % unassigned messages
+ messages = #{} :: #{msg_in_id() => indexed_msg()},
+ % defines the lowest message in id available in the messages map
+ % that isn't a return
+ low_msg_num :: msg_in_id() | undefined,
+ % defines the next message in id to be added to the messages map
+ next_msg_num = 1 :: msg_in_id(),
+ % list of returned msg_in_ids - when checking out it picks from
+ % this list first before taking low_msg_num
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
+ % a counter of enqueues - used to trigger shadow copy points
+ enqueue_count = 0 :: non_neg_integer(),
+ % a map containing all the live processes that have ever enqueued
+ % a message to this queue as well as a cached value of the smallest
+ % ra_index of all pending enqueues
+ enqueuers = #{} :: #{pid() => #enqueuer{}},
+ % master index of all enqueue raft indexes including pending
+ % enqueues
+ % rabbit_fifo_index can be slow when calculating the smallest
+ % index when there are large gaps but should be faster than gb_trees
+ % for normal appending operations as it's backed by a map
+ ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
+ release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
+ ra_index(), #rabbit_fifo{}}),
+ % consumers need to reflect consumer state at time of snapshot
+ % needs to be part of snapshot
+ consumers = #{} :: #{consumer_id() => #consumer{}},
+ % consumers that require further service are queued here
+ % needs to be part of snapshot
+ service_queue = queue:new() :: queue:queue(consumer_id()),
+ %% This is a special field that is only used for snapshots
+ %% It represents the queued messages at the time the
+ %% dehydrated snapshot state was cached.
+ %% As release_cursors are only emitted for raft indexes where all
+ %% prior messages no longer contribute to the current state we can
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
+ %% This is done so that consumers are still served in a deterministic
+ %% order on recovery.
+ prefix_msgs = {[], []} :: {Return :: [msg_header()],
+ PrefixMsgs :: [msg_header()]},
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ }).
+
+
+
+-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
+ dead_letter_handler => applied_mfa(),
+ become_leader_handler => applied_mfa(),
+ release_cursor_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.