summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl53
-rw-r--r--deps/rabbit/src/rabbit_fifo.hrl4
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl62
-rw-r--r--deps/rabbit/src/rabbit_fifo_v1.erl352
-rw-r--r--deps/rabbit/src/rabbit_fifo_v1.hrl5
-rw-r--r--deps/rabbit/test/rabbit_fifo_prop_SUITE.erl16
6 files changed, 231 insertions, 261 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index c7cbb55aa6..59e3962278 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -684,6 +684,7 @@ convert_v1_to_v2(V1State) ->
max_in_memory_bytes = rabbit_fifo_v1:get_cfg_field(max_in_memory_bytes, V1State),
expires = rabbit_fifo_v1:get_cfg_field(expires, V1State)
},
+
#?MODULE{
cfg = Cfg,
messages = MessagesV2,
@@ -713,16 +714,8 @@ purge_node(Meta, Node, State, Effects) ->
%% any downs that re not noconnection
handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
- % Remove any enqueuer for the same pid and enqueue any pending messages
- % This should be ok as we won't see any more enqueues from this pid
- State1 = case maps:take(Pid, Enqs0) of
- {#enqueuer{pending = Pend}, Enqs} ->
- lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
- enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
- error ->
- State0
- end,
+ % Remove any enqueuer for the down pid
+ State1 = State0#?MODULE{enqueuers = maps:remove(Pid, Enqs0)},
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
% return checked out messages to main queue
% Find the consumers for the down pid
@@ -1343,6 +1336,8 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
State2 = incr_enqueue_count(incr_total(State1)),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {out_of_sequence, State, Effects} ->
+ {State, not_enqueued, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
end.
@@ -1433,50 +1428,30 @@ maybe_store_dehydrated_state(RaftIdx,
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
-enqueue_pending(From,
- #enqueuer{next_seqno = Next,
- pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0,
- State0) ->
- State = enqueue(RaftIdx, RawMsg, State0),
- Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
- enqueue_pending(From, Enq, State);
-enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
- State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
-
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
State = enqueue(RaftIdx, RawMsg, State0),
{ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
- #?MODULE{enqueuers = Enqueuers0,
- ra_indexes = Indexes0} = State0) ->
-
+ #?MODULE{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
undefined ->
State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
- {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
- RawMsg, Effects0, State1),
- {ok, State, [{monitor, process, From} | Effects]};
+ {Res, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
+ RawMsg, Effects0, State1),
+ {Res, State, [{monitor, process, From} | Effects]};
#enqueuer{next_seqno = MsgSeqNo} = Enq0 ->
% it is the next expected seqno
State1 = enqueue(RaftIdx, RawMsg, State0),
Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1},
- State = enqueue_pending(From, Enq, State1),
+ State = State1#?MODULE{enqueuers = Enqueuers0#{From => Enq}},
{ok, State, Effects0};
- #enqueuer{next_seqno = Next,
- pending = Pending0} = Enq0
+ #enqueuer{next_seqno = Next}
when MsgSeqNo > Next ->
- % out of order delivery
- Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
- Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
- %% if the enqueue it out of order we need to mark it in the
- %% index
- Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
- {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq},
- ra_indexes = Indexes}, Effects0};
+ %% TODO: when can this happen?
+ {out_of_sequence, State0, Effects0};
#enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
- % duplicate delivery - remove the raft index from the ra_indexes
- % map as it was added earlier
+ % duplicate delivery
{duplicate, State0, Effects0}
end.
diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl
index 95e7003460..a8d539fa1a 100644
--- a/deps/rabbit/src/rabbit_fifo.hrl
+++ b/deps/rabbit/src/rabbit_fifo.hrl
@@ -123,9 +123,7 @@
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
unused,
- pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
- status = up :: up |
- suspected_down,
+ status = up :: up | suspected_down,
%% it is useful to have a record of when this was blocked
%% so that we can retry sending the block effect if
%% the publisher did not receive the initial one
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index e102ad2207..5947f6ddae 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -40,8 +40,6 @@
-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
-%% last_applied is initialised to -1
--type maybe_seq() :: integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
{send_drained, CTagCredit ::
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
@@ -65,10 +63,6 @@
leader :: undefined | ra:server_id(),
queue_status :: undefined | go | reject_publish,
next_seq = 0 :: seq(),
- %% Last applied is initialise to -1 to note that no command has yet been
- %% applied, but allowing to resend messages if the first ones on the sequence
- %% are lost (messages are sent from last_applied + 1)
- last_applied = -1 :: maybe_seq(),
next_enqueue_seq = 1 :: seq(),
%% indicates that we've exceeded the soft limit
slow = false :: boolean(),
@@ -585,18 +579,26 @@ handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
{ok, State, []};
-handle_ra_event(Leader, {machine, leader_change}, State0) ->
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = OldLeader} = State0) ->
%% we need to update leader
%% and resend any pending commands
+ rabbit_log:debug("~s: Detected QQ leader change from ~w to ~w",
+ [?MODULE, OldLeader, Leader]),
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {ok, State, []};
+handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}},
+ #state{leader = Leader} = State) ->
+ {ok, State, []};
+handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}},
+ #state{leader = OldLeader} = State0) ->
+ rabbit_log:debug("~s: Detected QQ leader change (rejection) from ~w to ~w",
+ [?MODULE, OldLeader, Leader]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []};
-handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
+handle_ra_event(_From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{ok, State0, []};
-handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
- State1 = State0#state{leader = Leader},
- State = resend(Seq, State1),
- {ok, State, []};
handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
case find_leader(Servers) of
undefined ->
@@ -642,29 +644,27 @@ try_process_command([Server | Rem], Cmd, State) ->
try_process_command(Rem, Cmd, State)
end.
-seq_applied({Seq, MaybeAction},
- {Corrs, Actions0, #state{last_applied = Last} = State0})
- when Seq > Last ->
- State1 = do_resends(Last+1, Seq-1, State0),
- {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
+seq_applied({Seq, Response},
+ {Corrs, Actions0, #state{} = State0}) ->
+ %% sequences aren't guaranteed to be applied in order as enqueues are
+ %% low priority commands and may be overtaken by others with a normal priority.
+ {Actions, State} = maybe_add_action(Response, Actions0, State0),
case maps:take(Seq, State#state.pending) of
{{undefined, _}, Pending} ->
- {Corrs, Actions, State#state{pending = Pending,
- last_applied = Seq}};
- {{Corr, _}, Pending} ->
- {[Corr | Corrs], Actions, State#state{pending = Pending,
- last_applied = Seq}};
- error ->
- % must have already been resent or removed for some other reason
- % still need to update last_applied or we may inadvertently resend
- % stuff later
- {Corrs, Actions, State#state{last_applied = Seq}}
+ {Corrs, Actions, State#state{pending = Pending}};
+ {{Corr, _}, Pending}
+ when Response /= not_enqueued ->
+ {[Corr | Corrs], Actions, State#state{pending = Pending}};
+ _ ->
+ {Corrs, Actions, State#state{}}
end;
seq_applied(_Seq, Acc) ->
Acc.
maybe_add_action(ok, Acc, State) ->
{Acc, State};
+maybe_add_action(not_enqueued, Acc, State) ->
+ {Acc, State};
maybe_add_action({multi, Actions}, Acc0, State0) ->
lists:foldl(fun (Act, {Acc, State}) ->
maybe_add_action(Act, Acc, State)
@@ -681,10 +681,10 @@ maybe_add_action(Action, Acc, State) ->
%% anything else is assumed to be an action
{[Action | Acc], State}.
-do_resends(From, To, State) when From =< To ->
- lists:foldl(fun resend/2, State, lists:seq(From, To));
-do_resends(_, _, State) ->
- State.
+% do_resends(From, To, State) when From =< To ->
+% lists:foldl(fun resend/2, State, lists:seq(From, To));
+% do_resends(_, _, State) ->
+% State.
% resends a command with a new sequence number
resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
diff --git a/deps/rabbit/src/rabbit_fifo_v1.erl b/deps/rabbit/src/rabbit_fifo_v1.erl
index a59a5c9250..1859251f9f 100644
--- a/deps/rabbit/src/rabbit_fifo_v1.erl
+++ b/deps/rabbit/src/rabbit_fifo_v1.erl
@@ -113,7 +113,7 @@
-type client_msg() :: delivery().
%% the messages `rabbit_fifo' can send to consumers.
--opaque state() :: #?MODULE{}.
+-opaque state() :: #?STATE{}.
-export_type([protocol/0,
delivery/0,
@@ -133,7 +133,7 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
+ update_config(Conf, #?STATE{cfg = #cfg{name = Name,
resource = Resource}}).
update_config(Conf, State) ->
@@ -153,11 +153,11 @@ update_config(Conf, State) ->
false ->
competing
end,
- Cfg = State#?MODULE.cfg,
+ Cfg = State#?STATE.cfg,
RCISpec = {RCI, RCI},
LastActive = maps:get(created, Conf, undefined),
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
become_leader_handler = BLH,
overflow_strategy = Overflow,
@@ -182,7 +182,7 @@ apply(Meta, #enqueue{pid = From, seq = Seq,
msg = RawMsg}, State00) ->
apply_enqueue(Meta, From, Seq, RawMsg, State00);
apply(_Meta, #register_enqueuer{pid = Pid},
- #?MODULE{enqueuers = Enqueuers0,
+ #?STATE{enqueuers = Enqueuers0,
cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
State = case maps:is_key(Pid, Enqueuers0) of
@@ -190,7 +190,7 @@ apply(_Meta, #register_enqueuer{pid = Pid},
%% if the enqueuer exits just echo the overflow state
State0;
false ->
- State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ State0#?STATE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
end,
Res = case is_over_limit(State) of
true when Overflow == reject_publish ->
@@ -201,7 +201,7 @@ apply(_Meta, #register_enqueuer{pid = Pid},
{State, Res, [{monitor, process, Pid}]};
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State) ->
+ #?STATE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
@@ -213,7 +213,7 @@ apply(Meta,
end;
apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State0) ->
+ #?STATE{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
@@ -224,7 +224,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
{State0, ok}
end;
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State) ->
+ #?STATE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := #consumer{checked_out = Checked0}} ->
Returned = maps:with(MsgIds, Checked0),
@@ -234,7 +234,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
service_queue = ServiceQueue0,
waiting_consumers = Waiting0} = State0) ->
case Cons0 of
@@ -248,7 +248,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
checkout(Meta, State0,
- State0#?MODULE{service_queue = ServiceQueue,
+ State0#?STATE{service_queue = ServiceQueue,
consumers = Cons}, [], false),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
@@ -259,16 +259,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
- maps:get(ConsumerId, State1#?MODULE.consumers),
+ maps:get(ConsumerId, State1#?STATE.consumers),
%% add the outstanding credit to the delivery count
DeliveryCount = Con#consumer.delivery_count + PostCred,
Consumers = maps:put(ConsumerId,
Con#consumer{delivery_count = DeliveryCount,
credit = 0},
- State1#?MODULE.consumers),
+ State1#?STATE.consumers),
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
- {State1#?MODULE{consumers = Consumers},
+ {State1#?STATE{consumers = Consumers},
%% returning a multi response with two client actions
%% for the channel to execute
{multi, [Response, {send_drained, {CTag, Drained}}]},
@@ -282,7 +282,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% grant the credit
C = max(0, RemoteDelCnt + NewCredit - DelCnt),
Con = Con0#consumer{credit = C},
- State = State0#?MODULE{waiting_consumers =
+ State = State0#?STATE{waiting_consumers =
[{ConsumerId, Con} | Waiting]},
{State, {send_credit_reply, messages_ready(State)}};
false ->
@@ -293,16 +293,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State0, ok}
end;
apply(_, #checkout{spec = {dequeue, _}},
- #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, {unsupported, single_active_consumer}}};
apply(#{index := Index,
system_time := Ts,
from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #?MODULE{consumers = Consumers} = State00) ->
+ #?STATE{consumers = Consumers} = State00) ->
%% dequeue always updates last_active
- State0 = State00#?MODULE{last_active = Ts},
+ State0 = State00#?STATE{last_active = Ts},
%% all dequeue operations result in keeping the queue from expiring
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
@@ -361,7 +361,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0),
checkout(Meta, State0, State1, [{monitor, process, Pid}]);
apply(#{index := Index}, #purge{},
- #?MODULE{ra_indexes = Indexes0,
+ #?STATE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
@@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{},
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
- State1 = State0#?MODULE{ra_indexes = Indexes,
+ State1 = State0#?STATE{ra_indexes = Indexes,
messages = lqueue:new(),
returns = lqueue:new(),
msg_bytes_enqueue = 0,
@@ -385,7 +385,7 @@ apply(#{index := Index}, #purge{},
apply(#{index := Idx}, #garbage_collection{}, State) ->
update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
enqueuers = Enqs0} = State0) ->
@@ -405,13 +405,13 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
Cid, C0#consumer{credit = Credit}),
%% if the consumer was cancelled there is a chance it got
%% removed when returning hence we need to be defensive here
- Waiting = case St#?MODULE.consumers of
+ Waiting = case St#?STATE.consumers of
#{Cid := C} ->
Waiting0 ++ [{Cid, C}];
_ ->
Waiting0
end,
- {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
+ {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers),
waiting_consumers = Waiting,
last_active = Ts},
Effs1};
@@ -422,7 +422,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
suspected_down),
%% select a new consumer from the waiting queue and run a checkout
- State2 = State1#?MODULE{waiting_consumers = WaitingConsumers},
+ State2 = State1#?STATE{waiting_consumers = WaitingConsumers},
{State, Effects1} = activate_next_consumer(State2, Effects0),
%% mark any enquers as suspected
@@ -431,9 +431,9 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
%% any processes on this node are down, they _may_ come back so here
@@ -469,18 +469,18 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
% comes back, then re-issue all monitors and discover the final fate of
% these processes
- Effects = case maps:size(State#?MODULE.consumers) of
+ Effects = case maps:size(State#?STATE.consumers) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State0, State#?MODULE{enqueuers = Enqs,
+ checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
last_active = Ts}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
{State, Effects} = handle_down(Meta, Pid, State0),
checkout(Meta, State0, State, Effects);
-apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
+apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = _SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -509,7 +509,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Acc
end, {State0, Monitors}, Cons0),
Waiting = update_waiting_consumer_status(Node, State1, up),
- State2 = State1#?MODULE{
+ State2 = State1#?STATE{
enqueuers = Enqs1,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State2, Effects1),
@@ -566,7 +566,7 @@ convert_v0_to_v1(V0State0) ->
max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State)
},
- #?MODULE{cfg = Cfg,
+ #?STATE{cfg = Cfg,
messages = V1Msgs,
next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State),
returns = rabbit_fifo_v0:get_field(returns, V0State),
@@ -591,7 +591,7 @@ purge_node(Meta, Node, State, Effects) ->
end, {State, Effects}, all_pids_for(Node, State)).
%% any downs that re not noconnection
-handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
+handle_down(Meta, Pid, #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
@@ -599,7 +599,7 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
{#enqueuer{pending = Pend}, Enqs} ->
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ end, State0#?STATE{enqueuers = Enqs}, Pend);
error ->
State0
end,
@@ -612,25 +612,25 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
cancel_consumer(Meta, ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
end;
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) ->
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) ->
fun(_, _, _, _, _, Effects) ->
Effects
end.
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
@@ -642,11 +642,11 @@ handle_waiting_consumer_down(Pid,
% update state to have only up waiting consumers
StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
WaitingConsumers0),
- State = State0#?MODULE{waiting_consumers = StillUp},
+ State = State0#?STATE{waiting_consumers = StillUp},
{Effects, State}.
update_waiting_consumer_status(Node,
- #?MODULE{waiting_consumers = WaitingConsumers},
+ #?STATE{waiting_consumers = WaitingConsumers},
Status) ->
[begin
case node(Pid) of
@@ -659,7 +659,7 @@ update_waiting_consumer_status(Node,
Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #?MODULE{consumers = Cons,
+state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
@@ -682,7 +682,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(eol, #?MODULE{enqueuers = Enqs,
+state_enter(eol, #?STATE{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
@@ -693,7 +693,7 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
-state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
state_enter(_, _) ->
@@ -702,7 +702,7 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(Ts, #?MODULE{cfg = #cfg{name = Name,
+tick(Ts, #?STATE{cfg = #cfg{name = Name,
resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
@@ -722,7 +722,7 @@ tick(Ts, #?MODULE{cfg = #cfg{name = Name,
end.
-spec overview(state()) -> map().
-overview(#?MODULE{consumers = Cons,
+overview(#?STATE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
enqueue_count = EnqCount,
@@ -743,7 +743,7 @@ overview(#?MODULE{consumers = Cons,
delivery_limit => Cfg#cfg.delivery_limit
},
Smallest = rabbit_fifo_index:smallest(Indexes),
- #{type => ?MODULE,
+ #{type => ?STATE,
config => Conf,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
@@ -759,7 +759,7 @@ overview(#?MODULE{consumers = Cons,
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
-get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
+get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
[{K, snd(snd(maps:get(K, Checked)))}
@@ -773,7 +773,7 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
version() -> 1.
which_module(0) -> rabbit_fifo_v0;
-which_module(1) -> ?MODULE.
+which_module(1) -> ?STATE.
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
@@ -812,7 +812,7 @@ handle_aux(_RaState, cast, eol, #aux{name = Name} = Aux, Log, _) ->
ets:delete(rabbit_fifo_usage, Name),
{no_reply, Aux, Log};
handle_aux(_RaState, {call, _From}, oldest_entry_timestamp, Aux,
- Log, #?MODULE{ra_indexes = Indexes}) ->
+ Log, #?STATE{ra_indexes = Indexes}) ->
Ts = case rabbit_fifo_index:smallest(Indexes) of
%% if there are no entries, we return current timestamp
%% so that any previously obtained entries are considered older than this
@@ -839,7 +839,7 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
end.
-eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
+eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState,
#aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
{Idx, _} = ra_log:last_index_term(Log),
{memory, Mem} = erlang:process_info(self(), memory),
@@ -856,7 +856,7 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
AuxState
end.
-force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
+force_eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}},
#aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
{Idx, _} = ra_log:last_index_term(Log),
{memory, Mem} = erlang:process_info(self(), memory),
@@ -877,7 +877,7 @@ force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
query_messages_ready(State) ->
messages_ready(State).
-query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
+query_messages_checked_out(#?STATE{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
maps:size(C) + S
end, 0, Consumers).
@@ -885,22 +885,22 @@ query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
query_messages_total(State) ->
messages_total(State).
-query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
+query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
maps:keys(maps:merge(Enqs, Cons)).
-query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
+query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#?MODULE{consumers = Consumers,
+query_consumer_count(#?STATE{consumers = Consumers,
waiting_consumers = WaitingConsumers}) ->
Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) ->
Status =/= suspected_down
end, Consumers),
maps:size(Up) + length(WaitingConsumers).
-query_consumers(#?MODULE{consumers = Consumers,
+query_consumers(#?STATE{consumers = Consumers,
waiting_consumers = WaitingConsumers,
cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
@@ -961,7 +961,7 @@ query_consumers(#?MODULE{consumers = Consumers,
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
+query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
@@ -975,10 +975,10 @@ query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_acti
query_single_active_consumer(_) ->
disabled.
-query_stat(#?MODULE{consumers = Consumers} = State) ->
+query_stat(#?STATE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
-query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
+query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes,
msgs_ready_in_memory = Length}) ->
{Length, Bytes}.
@@ -993,7 +993,7 @@ query_peek(Pos, State0) when Pos > 0 ->
query_peek(Pos-1, State)
end.
-query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
+query_notify_decorators_info(#?STATE{consumers = Consumers} = State) ->
MaxActivePriority = maps:fold(fun(_, #consumer{credit = C,
status = up,
priority = P0}, MaxP) when C > 0 ->
@@ -1018,14 +1018,14 @@ usage(Name) when is_atom(Name) ->
%%% Internal
-messages_ready(#?MODULE{messages = M,
+messages_ready(#?STATE{messages = M,
prefix_msgs = {RCnt, _R, PCnt, _P},
returns = R}) ->
%% prefix messages will rarely have anything in them during normal
%% operations so length/1 is fine here
lqueue:len(M) + lqueue:len(R) + RCnt + PCnt.
-messages_total(#?MODULE{ra_indexes = I,
+messages_total(#?STATE{ra_indexes = I,
prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
rabbit_fifo_index:size(I) + RCnt + PCnt.
@@ -1059,23 +1059,23 @@ moving_average(Time, HalfLife, Next, Current) ->
Weight = math:exp(Time * math:log(0.5) / HalfLife),
Next * (1 - Weight) + Current * Weight.
-num_checked_out(#?MODULE{consumers = Cons}) ->
+num_checked_out(#?STATE{consumers = Cons}) ->
maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
maps:size(C) + Acc
end, 0, Cons).
cancel_consumer(Meta, ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects, Reason) ->
cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
cancel_consumer(Meta, ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
cancel_consumer(Meta, ConsumerId,
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0} = State0,
Effects0, Reason) ->
@@ -1093,10 +1093,10 @@ cancel_consumer(Meta, ConsumerId,
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#?MODULE{waiting_consumers = Waiting}, Effects}
+ {State0#?STATE{waiting_consumers = Waiting}, Effects}
end.
-consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
+consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}},
ConsumerId, #consumer{meta = Meta},
Active, ActivityStatus,
Effects) ->
@@ -1108,7 +1108,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
| Effects].
cancel_consumer0(Meta, ConsumerId,
- #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+ #?STATE{consumers = C0} = S0, Effects0, Reason) ->
case C0 of
#{ConsumerId := Consumer} ->
{S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
@@ -1120,7 +1120,7 @@ cancel_consumer0(Meta, ConsumerId,
%% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
- case maps:size(S#?MODULE.consumers) of
+ case maps:size(S#?STATE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
_ ->
@@ -1131,7 +1131,7 @@ cancel_consumer0(Meta, ConsumerId,
{S0, Effects0}
end.
-activate_next_consumer(#?MODULE{consumers = Cons,
+activate_next_consumer(#?STATE{consumers = Cons,
waiting_consumers = Waiting0} = State0,
Effects0) ->
case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
@@ -1143,11 +1143,11 @@ activate_next_consumer(#?MODULE{consumers = Cons,
[{NextConsumerId, NextConsumer} | _] ->
%% there is a potential next active consumer
Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
- #?MODULE{service_queue = ServiceQueue} = State0,
+ #?STATE{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
NextConsumer,
ServiceQueue),
- State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer},
service_queue = ServiceQueue1,
waiting_consumers = Remaining},
Effects = consumer_update_active_effects(State, NextConsumerId,
@@ -1173,7 +1173,7 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0
S0), Effects0};
down ->
{S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer),
- {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers),
+ {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers),
last_active = Ts},
Effects1}
end.
@@ -1188,12 +1188,12 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
{State, ok, Effects}
end.
-drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
+drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
{FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}),
State = case Msg of
'empty' -> State2;
_ -> subtract_in_memory_counts(Header, State2)
@@ -1211,7 +1211,7 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
{State0, Effects0}
end.
-enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
+enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages,
next_msg_num = NextMsgNum} = State0) ->
%% the initial header is an integer only - it will get expanded to a map
%% when the next required key is added
@@ -1227,17 +1227,17 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
State = add_bytes_enqueue(Header, State1),
- State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
+ State#?STATE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
- #?MODULE{ra_indexes = Indexes0} = State0) ->
+ #?STATE{ra_indexes = Indexes0} = State0) ->
State = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
- State#?MODULE{ra_indexes = Indexes}.
+ State#?STATE{ra_indexes = Indexes}.
-incr_enqueue_count(#?MODULE{enqueue_count = EC,
+incr_enqueue_count(#?STATE{enqueue_count = EC,
cfg = #cfg{release_cursor_interval = {_Base, C}}
} = State0) when EC >= C->
%% this will trigger a dehydrated version of the state to be stored
@@ -1245,12 +1245,12 @@ incr_enqueue_count(#?MODULE{enqueue_count = EC,
%% Q: Why don't we just stash the release cursor here?
%% A: Because it needs to be the very last thing we do and we
%% first needs to run the checkout logic.
- State0#?MODULE{enqueue_count = 0};
-incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
- State#?MODULE{enqueue_count = C + 1}.
+ State0#?STATE{enqueue_count = 0};
+incr_enqueue_count(#?STATE{enqueue_count = C} = State) ->
+ State#?STATE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
+ #?STATE{cfg =
#cfg{release_cursor_interval = {Base, _}}
= Cfg,
ra_indexes = Indexes,
@@ -1267,12 +1267,12 @@ maybe_store_dehydrated_state(RaftIdx,
Total = messages_total(State0),
min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX)
end,
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval =
{Base, Interval}}},
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
- State#?MODULE{release_cursors = Cursors}
+ State#?STATE{release_cursors = Cursors}
end;
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1284,18 +1284,18 @@ enqueue_pending(From,
State = enqueue(RaftIdx, RawMsg, State0),
Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
enqueue_pending(From, Enq, State);
-enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
- State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
+enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) ->
+ State#?STATE{enqueuers = Enqueuers0#{From => Enq}}.
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
State = enqueue(RaftIdx, RawMsg, State0),
{ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
- #?MODULE{enqueuers = Enqueuers0} = State0) ->
+ #?STATE{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
undefined ->
- State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
{ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
RawMsg, Effects0, State1),
{ok, State, [{monitor, process, From} | Effects]};
@@ -1311,7 +1311,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
% out of order delivery
Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
- {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
#enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
% duplicate delivery - remove the raft index from the ra_indexes
% map as it was added earlier
@@ -1333,7 +1333,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
ConsumerId)
end, {State0, Effects0}, Returned),
State2 =
- case State1#?MODULE.consumers of
+ case State1#?STATE.consumers of
#{ConsumerId := Con0} ->
Con = Con0#consumer{credit = increase_credit(Con0,
map_size(Returned))},
@@ -1347,7 +1347,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
% used to processes messages that are finished
complete(Meta, ConsumerId, Discarded,
#consumer{checked_out = Checked} = Con0, Effects,
- #?MODULE{ra_indexes = Indexes0} = State0) ->
+ #?STATE{ra_indexes = Indexes0} = State0) ->
%% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
@@ -1365,7 +1365,7 @@ complete(Meta, ConsumerId, Discarded,
({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
end, State1, maps:values(Discarded)),
- {State2#?MODULE{ra_indexes = Indexes}, Effects}.
+ {State2#?STATE{ra_indexes = Indexes}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1389,11 +1389,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
- #?MODULE{cfg = #cfg{dead_letter_handler = undefined}},
+ #?STATE{cfg = #cfg{dead_letter_handler = undefined}},
Effects) ->
Effects;
dead_letter_effects(Reason, Discarded,
- #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
Effects) ->
RaftIdxs = maps:fold(
fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
@@ -1417,7 +1417,7 @@ dead_letter_effects(Reason, Discarded,
end} | Effects].
cancel_consumer_effects(ConsumerId,
- #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) ->
+ #?STATE{cfg = #cfg{resource = QName}} = State, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]},
notify_decorators_effect(State) | Effects].
@@ -1426,7 +1426,7 @@ update_smallest_raft_index(Idx, State, Effects) ->
update_smallest_raft_index(Idx, ok, State, Effects).
update_smallest_raft_index(IncomingRaftIdx, Reply,
- #?MODULE{cfg = Cfg,
+ #?STATE{cfg = Cfg,
ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
@@ -1438,7 +1438,7 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
%% reset the release cursor interval
#cfg{release_cursor_interval = {Base, _}} = Cfg,
RCI = {Base, Base},
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI},
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI},
release_cursors = lqueue:new(),
enqueue_count = 0},
{State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
@@ -1446,11 +1446,11 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
+ {State0#?STATE{release_cursors = Cursors}, Reply, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor when we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, Reply,
+ {State0#?STATE{release_cursors = Cursors}, Reply,
Effects ++ [Cursor]}
end
end.
@@ -1475,7 +1475,7 @@ update_header(Key, UpdateFun, Default, Header) ->
return_one(Meta, MsgId, 0, {Tag, Header0},
- #?MODULE{returns = Returns,
+ #?STATE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId)
@@ -1501,12 +1501,12 @@ return_one(Meta, MsgId, 0, {Tag, Header0},
end,
{add_bytes_return(
Header,
- State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
- #?MODULE{returns = Returns,
+ #?STATE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
@@ -1535,17 +1535,17 @@ return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
end,
{add_bytes_return(
Header,
- State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
-return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+return_all(Meta, #?STATE{consumers = Cons} = State0, Effects0, ConsumerId,
#consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
+ State = State0#?STATE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
@@ -1558,7 +1558,7 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
checkout(Meta, OldState, State, Effects) ->
checkout(Meta, OldState, State, Effects, true).
-checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
+checkout(#{index := Index} = Meta, #?STATE{cfg = #cfg{resource = QName}} = OldState, State0,
Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, #{}),
@@ -1608,12 +1608,12 @@ checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) ->
{State0, ok, lists:reverse(Effects1)}.
evaluate_limit(_Index, Result, _BeforeState,
- #?MODULE{cfg = #cfg{max_length = undefined,
+ #?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
evaluate_limit(Index, Result, BeforeState,
- #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ #?STATE{cfg = #cfg{overflow_strategy = Strategy},
enqueuers = Enqs0} = State0,
Effects0) ->
case is_over_limit(State0) of
@@ -1633,7 +1633,7 @@ evaluate_limit(Index, Result, BeforeState,
(_P, _E, Acc) ->
Acc
end, {Enqs0, Effects0}, Enqs0),
- {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ {State0#?STATE{enqueuers = Enqs}, Result, Effects};
false when Strategy == reject_publish ->
%% TODO: optimise as this case gets called for every command
%% pretty much
@@ -1651,7 +1651,7 @@ evaluate_limit(Index, Result, BeforeState,
(_P, _E, Acc) ->
Acc
end, {Enqs0, Effects0}, Enqs0),
- {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ {State0#?STATE{enqueuers = Enqs}, Result, Effects};
_ ->
{State0, Result, Effects0}
end;
@@ -1660,13 +1660,13 @@ evaluate_limit(Index, Result, BeforeState,
end.
evaluate_memory_limit(_Header,
- #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ #?STATE{cfg = #cfg{max_in_memory_length = undefined,
max_in_memory_bytes = undefined}}) ->
false;
evaluate_memory_limit(#{size := Size}, State) ->
evaluate_memory_limit(Size, State);
evaluate_memory_limit(Size,
- #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ #?STATE{cfg = #cfg{max_in_memory_length = MaxLength,
max_in_memory_bytes = MaxBytes},
msg_bytes_in_memory = Bytes,
msgs_ready_in_memory = Length})
@@ -1690,18 +1690,18 @@ append_delivery_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) ->
+take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) ->
%% conversion
- take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}});
-take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
NumP, P}} = State) ->
%% there are prefix returns, these should be served first
- {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
-take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
+ {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
-take_next_msg(#?MODULE{returns = Returns,
+ State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{returns = Returns,
messages = Messages0,
prefix_msgs = {NumR, R, NumP, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
@@ -1709,13 +1709,13 @@ take_next_msg(#?MODULE{returns = Returns,
case lqueue:peek(Returns) of
{value, NextMsg} ->
{NextMsg,
- State#?MODULE{returns = lqueue:drop(Returns)}};
+ State#?STATE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
case lqueue:out(Messages0) of
{empty, _} ->
empty;
{{value, {_, _} = SeqMsg}, Messages} ->
- {SeqMsg, State#?MODULE{messages = Messages }}
+ {SeqMsg, State#?STATE{messages = Messages }}
end;
empty ->
[Msg | Rem] = P,
@@ -1723,10 +1723,10 @@ take_next_msg(#?MODULE{returns = Returns,
{Header, 'empty'} ->
%% There are prefix msgs
{{'$empty_msg', Header},
- State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
Header ->
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
end
end.
@@ -1757,7 +1757,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(Meta, #?MODULE{service_queue = SQ0,
+checkout_one(Meta, #?STATE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case priority_queue:out(SQ0) of
@@ -1772,11 +1772,11 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?STATE{service_queue = SQ1});
#consumer{status = cancelled} ->
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?STATE{service_queue = SQ1});
#consumer{status = suspected_down} ->
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?STATE{service_queue = SQ1});
#consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1788,7 +1788,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
delivery_count = DelCnt + 1},
State1 = update_or_remove_sub(Meta,
ConsumerId, Con,
- State0#?MODULE{service_queue = SQ1}),
+ State0#?STATE{service_queue = SQ1}),
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', Header} ->
@@ -1814,7 +1814,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
end;
{{value, _ConsumerId}, SQ1} ->
%% consumer did not exist but was queued, recurse
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?STATE{service_queue = SQ1});
{empty, _} ->
case lqueue:len(Messages0) of
0 -> {nochange, InitState};
@@ -1824,31 +1824,31 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto,
credit = 0} = Con,
- #?MODULE{consumers = Cons} = State) ->
- State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)};
+ #?STATE{consumers = Cons} = State) ->
+ State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)};
update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
- #?MODULE{consumers = Cons,
+ #?STATE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
- State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ State#?STATE{consumers = maps:put(ConsumerId, Con, Cons),
service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)};
update_or_remove_sub(#{system_time := Ts},
ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
credit = 0} = Con,
- #?MODULE{consumers = Cons} = State) ->
+ #?STATE{consumers = Cons} = State) ->
case maps:size(Checked) of
0 ->
% we're done with this consumer
- State#?MODULE{consumers = maps:remove(ConsumerId, Cons),
+ State#?STATE{consumers = maps:remove(ConsumerId, Cons),
last_active = Ts};
_ ->
% there are unsettled items so need to keep around
- State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}
+ State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)}
end;
update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
- #?MODULE{consumers = Cons,
+ #?STATE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
- State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ State#?STATE{consumers = maps:put(ConsumerId, Con, Cons),
service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}.
uniq_queue_in(Key, #consumer{priority = P}, Queue) ->
@@ -1862,17 +1862,17 @@ uniq_queue_in(Key, #consumer{priority = P}, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec, Priority,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
update_consumer(ConsumerId, Meta, Spec, Priority,
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
@@ -1880,10 +1880,10 @@ update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
priority = Priority,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
- State0#?MODULE{waiting_consumers = WaitingConsumers1}.
+ State0#?STATE{waiting_consumers = WaitingConsumers1}.
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
@@ -1899,7 +1899,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
- State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
+ State0#?STATE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
ServiceQueue0) ->
@@ -1913,7 +1913,7 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#?MODULE{messages = Messages,
+dehydrate_state(#?STATE{messages = Messages,
consumers = Consumers,
returns = Returns,
prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
@@ -1937,7 +1937,7 @@ dehydrate_state(#?MODULE{messages = Messages,
%% recovering from a snapshot
PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
- State#?MODULE{messages = lqueue:new(),
+ State#?STATE{messages = lqueue:new(),
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
consumers = maps:map(fun (_, C) ->
@@ -1973,23 +1973,23 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#?MODULE{messages = Messages,
+normalize(#?STATE{messages = Messages,
release_cursors = Cursors} = State) ->
- State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)),
+ State#?STATE{messages = lqueue:from_list(lqueue:to_list(Messages)),
release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
-is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+is_over_limit(#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
false;
-is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
-is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+is_below_soft_limit(#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
false;
-is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+is_below_soft_limit(#?STATE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
is_below(MaxLength, messages_ready(State)) andalso
@@ -2049,58 +2049,58 @@ make_update_config(Config) ->
#update_config{config = Config}.
add_bytes_enqueue(Bytes,
- #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+ State#?STATE{msg_bytes_enqueue = Enqueue + Bytes};
add_bytes_enqueue(#{size := Bytes}, State) ->
add_bytes_enqueue(Bytes, State).
add_bytes_drop(Bytes,
- #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+ State#?STATE{msg_bytes_enqueue = Enqueue - Bytes};
add_bytes_drop(#{size := Bytes}, State) ->
add_bytes_drop(Bytes, State).
add_bytes_checkout(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout,
+ #?STATE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
+ State#?STATE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes};
add_bytes_checkout(#{size := Bytes}, State) ->
add_bytes_checkout(Bytes, State).
add_bytes_settle(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout} = State)
+ #?STATE{msg_bytes_checkout = Checkout} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes};
add_bytes_settle(#{size := Bytes}, State) ->
add_bytes_settle(Bytes, State).
add_bytes_return(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout,
+ #?STATE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes};
add_bytes_return(#{size := Bytes}, State) ->
add_bytes_return(Bytes, State).
add_in_memory_counts(Bytes,
- #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
msgs_ready_in_memory = InMemoryCount} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes,
msgs_ready_in_memory = InMemoryCount + 1};
add_in_memory_counts(#{size := Bytes}, State) ->
add_in_memory_counts(Bytes, State).
subtract_in_memory_counts(Bytes,
- #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
msgs_ready_in_memory = InMemoryCount} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes,
msgs_ready_in_memory = InMemoryCount - 1};
subtract_in_memory_counts(#{size := Bytes}, State) ->
subtract_in_memory_counts(Bytes, State).
@@ -2124,7 +2124,7 @@ get_size_from_header(#{size := B}) ->
B.
-all_nodes(#?MODULE{consumers = Cons0,
+all_nodes(#?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
@@ -2138,7 +2138,7 @@ all_nodes(#?MODULE{consumers = Cons0,
Acc#{node(P) => ok}
end, Nodes1, WaitingConsumers0)).
-all_pids_for(Node, #?MODULE{consumers = Cons0,
+all_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, _, Acc)
@@ -2157,7 +2157,7 @@ all_pids_for(Node, #?MODULE{consumers = Cons0,
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
-suspected_pids_for(Node, #?MODULE{consumers = Cons0,
+suspected_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
@@ -2177,7 +2177,7 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0,
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
-is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
+is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
last_active = LastActive,
consumers = Consumers})
when is_number(LastActive) andalso is_number(Expires) ->
@@ -2206,7 +2206,7 @@ maybe_notify_decorators(_, false) ->
maybe_notify_decorators(State, _) ->
{true, query_notify_decorators_info(State)}.
-notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) ->
+notify_decorators_effect(#?STATE{cfg = #cfg{resource = QName}} = State) ->
{MaxActivePriority, IsEmpty} = query_notify_decorators_info(State),
notify_decorators_effect(QName, MaxActivePriority, IsEmpty).
@@ -2215,11 +2215,11 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.
get_field(Field, State) ->
- Fields = record_info(fields, ?MODULE),
+ Fields = record_info(fields, ?STATE),
Index = record_index_of(Field, Fields),
element(Index, State).
-get_cfg_field(Field, #?MODULE{cfg = Cfg} ) ->
+get_cfg_field(Field, #?STATE{cfg = Cfg} ) ->
Fields = record_info(fields, cfg),
Index = record_index_of(Field, Fields),
element(Index, Cfg).
diff --git a/deps/rabbit/src/rabbit_fifo_v1.hrl b/deps/rabbit/src/rabbit_fifo_v1.hrl
index 3df9883445..4a427f8fed 100644
--- a/deps/rabbit/src/rabbit_fifo_v1.hrl
+++ b/deps/rabbit/src/rabbit_fifo_v1.hrl
@@ -76,6 +76,7 @@
-define(MB, 1048576).
-define(LOW_LIMIT, 0.8).
+-define(STATE, rabbit_fifo).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -142,7 +143,7 @@
{non_neg_integer(), list(),
non_neg_integer(), list()}.
--record(rabbit_fifo_v1,
+-record(?STATE,
{cfg :: #cfg{},
% unassigned messages
messages = lqueue:new() :: lqueue:lqueue({msg_in_id(), indexed_msg()}),
@@ -164,7 +165,7 @@
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
- ra:index(), #rabbit_fifo_v1{}}),
+ ra:index(), #?STATE{}}),
% consumers need to reflect consumer state at time of snapshot
% needs to be part of snapshot
consumers = #{} :: #{consumer_id() => #consumer{}},
diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
index 22cfc0d719..5aefc4debc 100644
--- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
@@ -1164,16 +1164,12 @@ messages_total_prop(Conf0, Commands) ->
messages_total_invariant() ->
fun(#rabbit_fifo{messages = M,
consumers = C,
- enqueuers = E,
prefix_msgs = {PTot, _, RTot, _},
returns = R} = S) ->
Base = lqueue:len(M) + lqueue:len(R) + PTot + RTot,
- CTot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) ->
+ Tot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) ->
Acc + map_size(Ch)
- end, Base, C),
- Tot = maps:fold(fun (_, #enqueuer{pending = P}, Acc) ->
- Acc + length(P)
- end, CTot, E),
+ end, Base, C),
QTot = rabbit_fifo:query_messages_total(S),
case Tot == QTot of
true -> true;
@@ -1355,10 +1351,10 @@ nodeup_gen(Nodes) ->
enqueue_gen(Pid) ->
enqueue_gen(Pid, 10, 1).
-enqueue_gen(Pid, Enq, Del) ->
- ?LET(E, {enqueue, Pid,
- frequency([{Enq, enqueue},
- {Del, delay}]),
+enqueue_gen(Pid, _Enq, _Del) ->
+ ?LET(E, {enqueue, Pid, enqueue,
+ % frequency([{Enq, enqueue},
+ % {Del, delay}]),
binary()}, E).
checkout_cancel_gen(Pid) ->