summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-06-30 15:40:31 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commitcb2b424fa947b0d885ccf7d21a20f9d240cccb1f (patch)
tree225b1b499ee1f757e5a0baff83142f109aa51059
parentbd3827b0cf5f2199268a5b579f27483a9c581ab2 (diff)
downloadrabbitmq-server-git-cb2b424fa947b0d885ccf7d21a20f9d240cccb1f.tar.gz
rabbit_fifo: Fixes for move to lqueue
Also rename v0 state back to rabbit_fifo else it could never work with old state!
-rw-r--r--src/rabbit_fifo.erl21
-rw-r--r--src/rabbit_fifo_v0.erl342
-rw-r--r--src/rabbit_fifo_v0.hrl6
-rw-r--r--test/rabbit_fifo_v0_SUITE.erl181
4 files changed, 280 insertions, 270 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 13c532af47..4982f008c2 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -310,7 +310,7 @@ apply(#{index := RaftIdx}, #purge{},
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {I, _} <- lists:sort(lqueue:to_list(Messages))]),
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
@@ -320,7 +320,6 @@ apply(#{index := RaftIdx}, #purge{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
prefix_msgs = {0, [], 0, []},
- % low_msg_num = undefined,
msg_bytes_in_memory = 0,
msgs_ready_in_memory = 0},
[]),
@@ -469,8 +468,9 @@ apply(_Meta, {machine_version, 0, 1}, V0State0) ->
V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
%% quick hack to "convert" the state from version one
State = setelement(1, V0State, ?MODULE),
- V0Msgs = rabbit_fifo_v0:messages_map(V0State),
+ V0Msgs = rabbit_fifo_v0:get_field(messages, V0State),
V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
+ %% TODOD each release cursor needs converting too!
{State#?MODULE{messages = V1Msgs}, ok, []}.
purge_node(Node, State, Effects) ->
@@ -629,7 +629,7 @@ overview(#?MODULE{consumers = Cons,
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
- release_crusor_enqueue_counter => EnqCount,
+ release_cursor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -1244,7 +1244,7 @@ update_smallest_raft_index(IncomingRaftIdx,
{State0#?MODULE{release_cursors = Cursors},
ok, Effects};
{Cursor, Cursors} ->
- %% we can emit a release cursor we've passed the smallest
+ %% we can emit a release cursor when we've passed the smallest
%% release cursor available.
{State0#?MODULE{release_cursors = Cursors}, ok,
Effects ++ [Cursor]}
@@ -1680,7 +1680,6 @@ dehydrate_state(#?MODULE{messages = Messages,
State#?MODULE{messages = lqueue:new(),
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
- % low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
end, Consumers),
@@ -1693,9 +1692,9 @@ dehydrate_state(#?MODULE{messages = Messages,
dehydrate_messages(Msgs0, Acc0) ->
{OutRes, Msgs} = lqueue:out(Msgs0),
case OutRes of
- {value, {_, 'empty'} = Msg} ->
+ {value, {_MsgId, {_RaftId, {_, 'empty'} = Msg}}} ->
dehydrate_messages(Msgs, [Msg | Acc0]);
- {value, {Header, _}} ->
+ {value, {_MsgId, {_RaftId, {Header, _}}}} ->
dehydrate_messages(Msgs, [Header | Acc0]);
empty ->
lists:reverse(Acc0)
@@ -1714,8 +1713,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#?MODULE{release_cursors = Cursors} = State) ->
- State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?MODULE{messages = Messages,
+ release_cursors = Cursors} = State) ->
+ State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)),
+ release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
index 52706aec1f..ad261317c9 100644
--- a/src/rabbit_fifo_v0.erl
+++ b/src/rabbit_fifo_v0.erl
@@ -55,7 +55,7 @@
normalize/1,
normalize_for_v1/1,
%% getters for coversions
- messages_map/1,
+ get_field/2,
%% protocol helpers
make_enqueue/3,
@@ -107,7 +107,7 @@
-type client_msg() :: delivery().
%% the messages `rabbit_fifo' can send to consumers.
--opaque state() :: #?MODULE{}.
+-opaque state() :: #?STATE{}.
-export_type([protocol/0,
delivery/0,
@@ -128,7 +128,7 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]),
- update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
+ update_config(Conf, #?STATE{cfg = #cfg{name = Name,
resource = Resource}}).
update_config(Conf, State) ->
@@ -146,8 +146,8 @@ update_config(Conf, State) ->
false ->
competing
end,
- Cfg = State#?MODULE.cfg,
- SHICur = case State#?MODULE.cfg of
+ Cfg = State#?STATE.cfg,
+ SHICur = case State#?STATE.cfg of
#cfg{release_cursor_interval = {_, C}} ->
C;
#cfg{release_cursor_interval = undefined} ->
@@ -156,7 +156,7 @@ update_config(Conf, State) ->
C
end,
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
dead_letter_handler = DLH,
become_leader_handler = BLH,
max_length = MaxLength,
@@ -179,7 +179,7 @@ apply(Metadata, #enqueue{pid = From, seq = Seq,
apply_enqueue(Metadata, From, Seq, RawMsg, State00);
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State) ->
+ #?STATE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
@@ -191,7 +191,7 @@ apply(Meta,
end;
apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State0) ->
+ #?STATE{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
@@ -202,7 +202,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
{State0, ok}
end;
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0} = State) ->
+ #?STATE{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := #consumer{checked_out = Checked0}} ->
Returned = maps:with(MsgIds, Checked0),
@@ -212,7 +212,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
service_queue = ServiceQueue0,
waiting_consumers = Waiting0} = State0) ->
case Cons0 of
@@ -225,7 +225,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
+ checkout(Meta, State0#?STATE{service_queue = ServiceQueue,
consumers = Cons}, []),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
@@ -236,16 +236,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
- maps:get(ConsumerId, State1#?MODULE.consumers),
+ maps:get(ConsumerId, State1#?STATE.consumers),
%% add the outstanding credit to the delivery count
DeliveryCount = Con#consumer.delivery_count + PostCred,
Consumers = maps:put(ConsumerId,
Con#consumer{delivery_count = DeliveryCount,
credit = 0},
- State1#?MODULE.consumers),
+ State1#?STATE.consumers),
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
- {State1#?MODULE{consumers = Consumers},
+ {State1#?STATE{consumers = Consumers},
%% returning a multi response with two client actions
%% for the channel to execute
{multi, [Response, {send_drained, {CTag, Drained}}]},
@@ -259,7 +259,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% grant the credit
C = max(0, RemoteDelCnt + NewCredit - DelCnt),
Con = Con0#consumer{credit = C},
- State = State0#?MODULE{waiting_consumers =
+ State = State0#?STATE{waiting_consumers =
[{ConsumerId, Con} | Waiting]},
{State, {send_credit_reply, messages_ready(State)}};
false ->
@@ -270,12 +270,12 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State0, ok}
end;
apply(_, #checkout{spec = {dequeue, _}},
- #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #?MODULE{consumers = Consumers} = State0) ->
+ #?STATE{consumers = Consumers} = State0) ->
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -317,7 +317,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
- #?MODULE{ra_indexes = Indexes0,
+ #?STATE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
@@ -327,7 +327,7 @@ apply(#{index := RaftIdx}, #purge{},
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx,
- State0#?MODULE{ra_indexes = Indexes,
+ State0#?STATE{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
@@ -341,7 +341,7 @@ apply(#{index := RaftIdx}, #purge{},
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
apply(Meta, {down, Pid, noconnection},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
enqueuers = Enqs0} = State0) ->
@@ -361,13 +361,13 @@ apply(Meta, {down, Pid, noconnection},
Cid, C0#consumer{credit = Credit}),
%% if the consumer was cancelled there is a chance it got
%% removed when returning hence we need to be defensive here
- Waiting = case St#?MODULE.consumers of
+ Waiting = case St#?STATE.consumers of
#{Cid := C} ->
Waiting0 ++ [{Cid, C}];
_ ->
Waiting0
end,
- {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
+ {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers),
waiting_consumers = Waiting},
Effs1};
(_, _, S) ->
@@ -377,7 +377,7 @@ apply(Meta, {down, Pid, noconnection},
suspected_down),
%% select a new consumer from the waiting queue and run a checkout
- State2 = State1#?MODULE{waiting_consumers = WaitingConsumers},
+ State2 = State1#?STATE{waiting_consumers = WaitingConsumers},
{State, Effects1} = activate_next_consumer(State2, Effects0),
%% mark any enquers as suspected
@@ -386,9 +386,9 @@ apply(Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, noconnection},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
%% any processes on this node are down, they _may_ come back so here
@@ -423,17 +423,17 @@ apply(Meta, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes
- Effects = case maps:size(State#?MODULE.consumers) of
+ Effects = case maps:size(State#?STATE.consumers) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
{State, Effects} = handle_down(Pid, State0),
checkout(Meta, State, Effects);
-apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
+apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -462,7 +462,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1,
+ State1 = State0#?STATE{consumers = Cons1,
enqueuers = Enqs1,
service_queue = SQ,
waiting_consumers = Waiting},
@@ -485,7 +485,7 @@ purge_node(Node, State, Effects) ->
end, {State, Effects}, all_pids_for(Node, State)).
%% any downs that re not noconnection
-handle_down(Pid, #?MODULE{consumers = Cons0,
+handle_down(Pid, #?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
@@ -493,7 +493,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0,
{#enqueuer{pending = Pend}, Enqs} ->
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ end, State0#?STATE{enqueuers = Enqs}, Pend);
error ->
State0
end,
@@ -506,25 +506,25 @@ handle_down(Pid, #?MODULE{consumers = Cons0,
cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
end;
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) ->
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) ->
fun(_, _, _, _, _, Effects) ->
Effects
end.
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
@@ -536,11 +536,11 @@ handle_waiting_consumer_down(Pid,
% update state to have only up waiting consumers
StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
WaitingConsumers0),
- State = State0#?MODULE{waiting_consumers = StillUp},
+ State = State0#?STATE{waiting_consumers = StillUp},
{Effects, State}.
update_waiting_consumer_status(Node,
- #?MODULE{waiting_consumers = WaitingConsumers},
+ #?STATE{waiting_consumers = WaitingConsumers},
Status) ->
[begin
case node(Pid) of
@@ -553,7 +553,7 @@ update_waiting_consumer_status(Node,
Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #?MODULE{consumers = Cons,
+state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
@@ -576,7 +576,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(eol, #?MODULE{enqueuers = Enqs,
+state_enter(eol, #?STATE{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
@@ -586,7 +586,7 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
-state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
state_enter(_, _) ->
@@ -595,7 +595,7 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
+tick(_Ts, #?STATE{cfg = #cfg{name = Name,
resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
@@ -610,7 +610,7 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
handle_tick, [QName, Metrics, all_nodes(State)]}].
-spec overview(state()) -> map().
-overview(#?MODULE{consumers = Cons,
+overview(#?STATE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
enqueue_count = EnqCount,
@@ -640,7 +640,7 @@ overview(#?MODULE{consumers = Cons,
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
-get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
+get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
[{K, snd(snd(maps:get(K, Checked)))}
@@ -679,7 +679,7 @@ handle_aux(_RaState, cast, Cmd, #aux{name = Name,
end,
{no_reply, State, Log}.
-eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
+eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState,
#aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
{Idx, _} = ra_log:last_index_term(Log),
{memory, Mem} = erlang:process_info(self(), memory),
@@ -701,7 +701,7 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
query_messages_ready(State) ->
messages_ready(State).
-query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
+query_messages_checked_out(#?STATE{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
maps:size(C) + S
end, 0, Consumers).
@@ -709,19 +709,19 @@ query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
query_messages_total(State) ->
messages_total(State).
-query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
+query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
maps:keys(maps:merge(Enqs, Cons)).
-query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
+query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#?MODULE{consumers = Consumers,
+query_consumer_count(#?STATE{consumers = Consumers,
waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#?MODULE{consumers = Consumers,
+query_consumers(#?STATE{consumers = Consumers,
waiting_consumers = WaitingConsumers,
cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
@@ -781,7 +781,7 @@ query_consumers(#?MODULE{consumers = Consumers,
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
+query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
@@ -795,10 +795,10 @@ query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_acti
query_single_active_consumer(_) ->
disabled.
-query_stat(#?MODULE{consumers = Consumers} = State) ->
+query_stat(#?STATE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
-query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
+query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes,
msgs_ready_in_memory = Length}) ->
{Length, Bytes}.
@@ -811,7 +811,7 @@ usage(Name) when is_atom(Name) ->
%%% Internal
-messages_ready(#?MODULE{messages = M,
+messages_ready(#?STATE{messages = M,
prefix_msgs = {RCnt, _R, PCnt, _P},
returns = R}) ->
@@ -819,7 +819,7 @@ messages_ready(#?MODULE{messages = M,
%% operations so length/1 is fine here
maps:size(M) + lqueue:len(R) + RCnt + PCnt.
-messages_total(#?MODULE{ra_indexes = I,
+messages_total(#?STATE{ra_indexes = I,
prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
rabbit_fifo_index:size(I) + RCnt + PCnt.
@@ -851,23 +851,23 @@ moving_average(Time, HalfLife, Next, Current) ->
Weight = math:exp(Time * math:log(0.5) / HalfLife),
Next * (1 - Weight) + Current * Weight.
-num_checked_out(#?MODULE{consumers = Cons}) ->
+num_checked_out(#?STATE{consumers = Cons}) ->
maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
maps:size(C) + Acc
end, 0, Cons).
cancel_consumer(ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects, Reason) ->
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0} = State0,
Effects0, Reason) ->
@@ -885,10 +885,10 @@ cancel_consumer(ConsumerId,
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#?MODULE{waiting_consumers = Waiting}, Effects}
+ {State0#?STATE{waiting_consumers = Waiting}, Effects}
end.
-consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
+consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}},
ConsumerId, #consumer{meta = Meta},
Active, ActivityStatus,
Effects) ->
@@ -899,7 +899,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(ConsumerId, #?STATE{consumers = C0} = S0, Effects0, Reason) ->
case C0 of
#{ConsumerId := Consumer} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
@@ -909,7 +909,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
%% in line with what classic queues do (from an external point of
%% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
- case maps:size(S#?MODULE.consumers) of
+ case maps:size(S#?STATE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
_ ->
@@ -920,7 +920,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
-activate_next_consumer(#?MODULE{consumers = Cons,
+activate_next_consumer(#?STATE{consumers = Cons,
waiting_consumers = Waiting0} = State0,
Effects0) ->
case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
@@ -932,11 +932,11 @@ activate_next_consumer(#?MODULE{consumers = Cons,
[{NextConsumerId, NextConsumer} | _] ->
%% there is a potential next active consumer
Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
- #?MODULE{service_queue = ServiceQueue} = State0,
+ #?STATE{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
NextConsumer,
ServiceQueue),
- State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer},
service_queue = ServiceQueue1,
waiting_consumers = Remaining},
Effects = consumer_update_active_effects(State, NextConsumerId,
@@ -953,7 +953,7 @@ activate_next_consumer(#?MODULE{consumers = Cons,
maybe_return_all(ConsumerId, Consumer,
- #?MODULE{consumers = C0,
+ #?STATE{consumers = C0,
service_queue = SQ0} = S0,
Effects0, Reason) ->
case Reason of
@@ -964,11 +964,11 @@ maybe_return_all(ConsumerId, Consumer,
credit = 0,
status = cancelled},
C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons,
+ {S0#?STATE{consumers = Cons,
service_queue = SQ}, Effects1};
down ->
{S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
- {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers)},
Effects1}
end.
@@ -982,12 +982,12 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
{State, ok, Effects}
end.
-drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
+drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
{FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}),
State = case Msg of
'empty' -> State2;
_ -> subtract_in_memory_counts(Header, State2)
@@ -1005,7 +1005,7 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
{State0, Effects0}
end.
-enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
+enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
%% the initial header is an integer only - it will get expanded to a map
@@ -1021,20 +1021,20 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
State = add_bytes_enqueue(Header, State1),
- State#?MODULE{messages = Messages#{NextMsgNum => Msg},
+ State#?STATE{messages = Messages#{NextMsgNum => Msg},
%% this is probably only done to record it when low_msg_num
%% is undefined
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
- #?MODULE{ra_indexes = Indexes0} = State0) ->
+ #?STATE{ra_indexes = Indexes0} = State0) ->
State = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
- State#?MODULE{ra_indexes = Indexes}.
+ State#?STATE{ra_indexes = Indexes}.
-incr_enqueue_count(#?MODULE{enqueue_count = C,
+incr_enqueue_count(#?STATE{enqueue_count = C,
cfg = #cfg{release_cursor_interval = {_Base, C}}
} = State0) ->
%% this will trigger a dehydrated version of the state to be stored
@@ -1042,18 +1042,18 @@ incr_enqueue_count(#?MODULE{enqueue_count = C,
%% Q: Why don't we just stash the release cursor here?
%% A: Because it needs to be the very last thing we do and we
%% first needs to run the checkout logic.
- State0#?MODULE{enqueue_count = 0};
-incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ State0#?STATE{enqueue_count = 0};
+incr_enqueue_count(#?STATE{cfg = #cfg{release_cursor_interval = C} = Cfg}
= State0)
when is_integer(C) ->
%% conversion to new release cursor interval format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
incr_enqueue_count(State);
-incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
- State#?MODULE{enqueue_count = C + 1}.
+incr_enqueue_count(#?STATE{enqueue_count = C} = State) ->
+ State#?STATE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
+ #?STATE{cfg =
#cfg{release_cursor_interval = {Base, _}}
= Cfg,
ra_indexes = Indexes,
@@ -1072,20 +1072,20 @@ maybe_store_dehydrated_state(RaftIdx,
?RELEASE_CURSOR_EVERY_MAX)
end,
State = convert_prefix_msgs(
- State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ State0#?STATE{cfg = Cfg#cfg{release_cursor_interval =
{Base, Interval}}}),
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
- State#?MODULE{release_cursors = Cursors}
+ State#?STATE{release_cursors = Cursors}
end;
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
+ #?STATE{cfg =
#cfg{release_cursor_interval = C} = Cfg}
= State0)
when is_integer(C) ->
%% convert to new format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1097,18 +1097,18 @@ enqueue_pending(From,
State = enqueue(RaftIdx, RawMsg, State0),
Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
enqueue_pending(From, Enq, State);
-enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
- State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
+enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) ->
+ State#?STATE{enqueuers = Enqueuers0#{From => Enq}}.
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
State = enqueue(RaftIdx, RawMsg, State0),
{ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
- #?MODULE{enqueuers = Enqueuers0} = State0) ->
+ #?STATE{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
undefined ->
- State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
{ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
RawMsg, Effects0, State1),
{ok, State, [{monitor, process, From} | Effects]};
@@ -1124,7 +1124,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
% out of order delivery
Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
- {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
#enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
% duplicate delivery - remove the raft index from the ra_indexes
% map as it was added earlier
@@ -1135,7 +1135,7 @@ snd(T) ->
element(2, T).
return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
- Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ Effects0, #?STATE{service_queue = SQ0} = State0) ->
{State1, Effects1} = maps:fold(
fun(MsgId, {Tag, _} = Msg, {S0, E0})
when Tag == '$prefix_msg';
@@ -1146,13 +1146,13 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
ConsumerId)
end, {State0, Effects0}, Returned),
{State2, Effects3} =
- case State1#?MODULE.consumers of
+ case State1#?STATE.consumers of
#{ConsumerId := Con0} = Cons0 ->
Con = Con0#consumer{credit = increase_credit(Con0,
map_size(Returned))},
{Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
Cons0, SQ0, Effects1),
- {State1#?MODULE{consumers = Cons,
+ {State1#?STATE{consumers = Cons,
service_queue = SQ}, Effects2};
_ ->
{State1, Effects1}
@@ -1163,7 +1163,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
% used to processes messages that are finished
complete(ConsumerId, Discarded,
#consumer{checked_out = Checked} = Con0, Effects0,
- #?MODULE{consumers = Cons0, service_queue = SQ0,
+ #?STATE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
%% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
@@ -1183,7 +1183,7 @@ complete(ConsumerId, Discarded,
({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
end, State0, maps:values(Discarded)),
- {State1#?MODULE{consumers = Cons,
+ {State1#?STATE{consumers = Cons,
ra_indexes = Indexes,
service_queue = SQ}, Effects}.
@@ -1209,11 +1209,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
- #?MODULE{cfg = #cfg{dead_letter_handler = undefined}},
+ #?STATE{cfg = #cfg{dead_letter_handler = undefined}},
Effects) ->
Effects;
dead_letter_effects(Reason, Discarded,
- #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
Effects) ->
RaftIdxs = maps:fold(
fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
@@ -1237,12 +1237,12 @@ dead_letter_effects(Reason, Discarded,
end} | Effects].
cancel_consumer_effects(ConsumerId,
- #?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
+ #?STATE{cfg = #cfg{resource = QName}}, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
update_smallest_raft_index(IncomingRaftIdx,
- #?MODULE{ra_indexes = Indexes,
+ #?STATE{ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
@@ -1250,18 +1250,18 @@ update_smallest_raft_index(IncomingRaftIdx,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#?MODULE{release_cursors = lqueue:new()},
+ State = State0#?STATE{release_cursors = lqueue:new()},
{State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors},
+ {State0#?STATE{release_cursors = Cursors},
ok, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, ok,
+ {State0#?STATE{release_cursors = Cursors}, ok,
Effects ++ [Cursor]}
end
end.
@@ -1286,7 +1286,7 @@ update_header(Key, UpdateFun, Default, Header) ->
return_one(MsgId, 0, {Tag, Header0},
- #?MODULE{returns = Returns,
+ #?STATE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId)
@@ -1312,12 +1312,12 @@ return_one(MsgId, 0, {Tag, Header0},
end,
{add_bytes_return(
Header,
- State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
- #?MODULE{returns = Returns,
+ #?STATE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
@@ -1346,17 +1346,17 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
end,
{add_bytes_return(
Header,
- State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
-return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+return_all(#?STATE{consumers = Cons} = State0, Effects0, ConsumerId,
#consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
+ State = State0#?STATE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
return_one(MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
@@ -1403,7 +1403,7 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
{State0, ok, lists:reverse(Effects1)}.
evaluate_limit(Result,
- #?MODULE{cfg = #cfg{max_length = undefined,
+ #?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
@@ -1418,13 +1418,13 @@ evaluate_limit(Result, State00, Effects0) ->
end.
evaluate_memory_limit(_Header,
- #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ #?STATE{cfg = #cfg{max_in_memory_length = undefined,
max_in_memory_bytes = undefined}}) ->
false;
evaluate_memory_limit(#{size := Size}, State) ->
evaluate_memory_limit(Size, State);
evaluate_memory_limit(Size,
- #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ #?STATE{cfg = #cfg{max_in_memory_length = MaxLength,
max_in_memory_bytes = MaxBytes},
msg_bytes_in_memory = Bytes,
msgs_ready_in_memory = Length})
@@ -1452,18 +1452,18 @@ append_log_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) ->
+take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) ->
%% conversion
- take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}});
-take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
NumP, P}} = State) ->
%% there are prefix returns, these should be served first
- {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
-take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
+ {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
-take_next_msg(#?MODULE{returns = Returns,
+ State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{returns = Returns,
low_msg_num = Low0,
messages = Messages0,
prefix_msgs = {NumR, R, NumP, P}} = State) ->
@@ -1472,7 +1472,7 @@ take_next_msg(#?MODULE{returns = Returns,
case lqueue:peek(Returns) of
{value, NextMsg} ->
{NextMsg,
- State#?MODULE{returns = lqueue:drop(Returns)}};
+ State#?STATE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
case Low0 of
undefined ->
@@ -1482,11 +1482,11 @@ take_next_msg(#?MODULE{returns = Returns,
case maps:size(Messages) of
0 ->
{{Low0, Msg},
- State#?MODULE{messages = Messages,
+ State#?STATE{messages = Messages,
low_msg_num = undefined}};
_ ->
{{Low0, Msg},
- State#?MODULE{messages = Messages,
+ State#?STATE{messages = Messages,
low_msg_num = Low0 + 1}}
end
end;
@@ -1496,10 +1496,10 @@ take_next_msg(#?MODULE{returns = Returns,
{Header, 'empty'} ->
%% There are prefix msgs
{{'$empty_msg', Header},
- State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
Header ->
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
end
end.
@@ -1524,7 +1524,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(#?MODULE{service_queue = SQ0,
+checkout_one(#?STATE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
@@ -1539,11 +1539,11 @@ checkout_one(#?MODULE{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(InitState#?STATE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(InitState#?STATE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(InitState#?STATE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1556,7 +1556,7 @@ checkout_one(#?MODULE{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
- State1 = State0#?MODULE{service_queue = SQ,
+ State1 = State0#?STATE{service_queue = SQ,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
@@ -1579,7 +1579,7 @@ checkout_one(#?MODULE{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#?MODULE{service_queue = SQ1})
+ checkout_one(InitState#?STATE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
@@ -1629,27 +1629,27 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
- State0#?MODULE{waiting_consumers = WaitingConsumers1}.
+ State0#?STATE{waiting_consumers = WaitingConsumers1}.
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
- #?MODULE{consumers = Cons0,
+ #?STATE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
@@ -1664,7 +1664,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
- State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
+ State0#?STATE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0) ->
@@ -1676,14 +1676,14 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) ->
- State#?MODULE{prefix_msgs = {length(R), R, length(P), P}};
+convert_prefix_msgs(#?STATE{prefix_msgs = {R, P}} = State) ->
+ State#?STATE{prefix_msgs = {length(R), R, length(P), P}};
convert_prefix_msgs(State) ->
State.
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#?MODULE{messages = Messages,
+dehydrate_state(#?STATE{messages = Messages,
consumers = Consumers,
returns = Returns,
low_msg_num = Low,
@@ -1709,7 +1709,7 @@ dehydrate_state(#?MODULE{messages = Messages,
%% recovering from a snapshot
PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
- State#?MODULE{messages = #{},
+ State#?STATE{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
low_msg_num = undefined,
@@ -1746,19 +1746,19 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#?MODULE{release_cursors = Cursors} = State) ->
- State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?STATE{release_cursors = Cursors} = State) ->
+ State#?STATE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
-is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+is_over_limit(#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
false;
-is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
-normalize_for_v1(#?MODULE{cfg = Cfg} = State) ->
+normalize_for_v1(#?STATE{cfg = Cfg} = State) ->
%% run all v0 conversions so that v1 does not have to have this code
RCI = case Cfg of
#cfg{release_cursor_interval = {_, _} = R} ->
@@ -1769,10 +1769,22 @@ normalize_for_v1(#?MODULE{cfg = Cfg} = State) ->
{?RELEASE_CURSOR_EVERY, C}
end,
convert_prefix_msgs(
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}}).
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}}).
-messages_map(#?MODULE{messages = Messages}) ->
- Messages.
+get_field(Field, State) ->
+ Fields = record_info(fields, ?STATE),
+ Index = record_index_of(Field, Fields),
+ element(Index, State).
+
+record_index_of(F, Fields) ->
+ index_of(2, F, Fields).
+
+index_of(_, F, []) ->
+ exit({field_not_found, F});
+index_of(N, F, [F | _]) ->
+ N;
+index_of(N, F, [_ | T]) ->
+ index_of(N+1, F, T).
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
@@ -1815,58 +1827,58 @@ make_update_config(Config) ->
#update_config{config = Config}.
add_bytes_enqueue(Bytes,
- #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+ State#?STATE{msg_bytes_enqueue = Enqueue + Bytes};
add_bytes_enqueue(#{size := Bytes}, State) ->
add_bytes_enqueue(Bytes, State).
add_bytes_drop(Bytes,
- #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+ State#?STATE{msg_bytes_enqueue = Enqueue - Bytes};
add_bytes_drop(#{size := Bytes}, State) ->
add_bytes_drop(Bytes, State).
add_bytes_checkout(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout,
+ #?STATE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
+ State#?STATE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes};
add_bytes_checkout(#{size := Bytes}, State) ->
add_bytes_checkout(Bytes, State).
add_bytes_settle(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout} = State)
+ #?STATE{msg_bytes_checkout = Checkout} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes};
add_bytes_settle(#{size := Bytes}, State) ->
add_bytes_settle(Bytes, State).
add_bytes_return(Bytes,
- #?MODULE{msg_bytes_checkout = Checkout,
+ #?STATE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes};
add_bytes_return(#{size := Bytes}, State) ->
add_bytes_return(Bytes, State).
add_in_memory_counts(Bytes,
- #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
msgs_ready_in_memory = InMemoryCount} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes,
msgs_ready_in_memory = InMemoryCount + 1};
add_in_memory_counts(#{size := Bytes}, State) ->
add_in_memory_counts(Bytes, State).
subtract_in_memory_counts(Bytes,
- #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
msgs_ready_in_memory = InMemoryCount} = State)
when is_integer(Bytes) ->
- State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes,
msgs_ready_in_memory = InMemoryCount - 1};
subtract_in_memory_counts(#{size := Bytes}, State) ->
subtract_in_memory_counts(Bytes, State).
@@ -1890,7 +1902,7 @@ get_size_from_header(#{size := B}) ->
B.
-all_nodes(#?MODULE{consumers = Cons0,
+all_nodes(#?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
@@ -1904,7 +1916,7 @@ all_nodes(#?MODULE{consumers = Cons0,
Acc#{node(P) => ok}
end, Nodes1, WaitingConsumers0)).
-all_pids_for(Node, #?MODULE{consumers = Cons0,
+all_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, _, Acc)
@@ -1923,7 +1935,7 @@ all_pids_for(Node, #?MODULE{consumers = Cons0,
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
-suspected_pids_for(Node, #?MODULE{consumers = Cons0,
+suspected_pids_for(Node, #?STATE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
diff --git a/src/rabbit_fifo_v0.hrl b/src/rabbit_fifo_v0.hrl
index 26a988ee10..333ccb4d77 100644
--- a/src/rabbit_fifo_v0.hrl
+++ b/src/rabbit_fifo_v0.hrl
@@ -75,7 +75,7 @@
-define(GC_MEM_LIMIT_B, 2000000).
-define(MB, 1048576).
--define(RABBIT_FIFO, rabbit_fifo_v0).
+-define(STATE, rabbit_fifo).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -130,7 +130,7 @@
{non_neg_integer(), list(),
non_neg_integer(), list()}.
--record(?RABBIT_FIFO,
+-record(?STATE,
{cfg :: #cfg{},
% unassigned messages
messages = #{} :: #{msg_in_id() => indexed_msg()},
@@ -156,7 +156,7 @@
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
- ra:index(), #?RABBIT_FIFO{}}),
+ ra:index(), #?STATE{}}),
% consumers need to reflect consumer state at time of snapshot
% needs to be part of snapshot
consumers = #{} :: #{consumer_id() => #consumer{}},
diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl
index 6b84911d7f..fcb84377de 100644
--- a/test/rabbit_fifo_v0_SUITE.erl
+++ b/test/rabbit_fifo_v0_SUITE.erl
@@ -128,12 +128,12 @@ credit_with_drained_test(_) ->
{State1, _, _} =
apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}),
State0),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 1,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
{State, Result, _} =
apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
?assertEqual({multi, [{send_credit_reply, 0},
@@ -154,7 +154,7 @@ credit_and_drain_test(_) ->
{State4, {multi, [{send_credit_reply, 0},
{send_drained, {?FUNCTION_NAME, 2}}]},
Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -290,9 +290,9 @@ return_test(_) ->
{State2, _} = check_auto(Cid2, 3, State1),
{State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2),
?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
ok.
return_dequeue_delivery_limit_test(_) ->
@@ -350,7 +350,7 @@ return_checked_out_limit_test(_) ->
{State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _},
{aux, active}]} =
apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
- {#?RABBIT_FIFO{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
+ {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2),
?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
ok.
@@ -380,8 +380,8 @@ cancelled_checkout_out_test(_) ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State2#?RABBIT_FIFO.returns)),
+ ?assertEqual(1, maps:size(State2#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State2#?STATE.returns)),
{State3, {dequeue, empty}} =
apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -410,14 +410,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
{State1, Effects1} = check_auto(Cid, 2, State0),
- #consumer{credit = 0} = maps:get(Cid, State1#?RABBIT_FIFO.consumers),
+ #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
#consumer{credit = 1,
checked_out = Ch,
- status = suspected_down} = maps:get(Cid, State2a#?RABBIT_FIFO.consumers),
+ status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers),
?assertEqual(#{}, Ch),
%% validate consumer has credit
{State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
@@ -426,7 +426,7 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
% when the node comes up we need to retry the process monitors for the
% disconnected processes
{State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
- #consumer{status = up} = maps:get(Cid, State3#?RABBIT_FIFO.consumers),
+ #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -436,18 +436,18 @@ down_with_noconnection_returns_unack_test(_) ->
Pid = spawn(fun() -> ok end),
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State0#?RABBIT_FIFO.returns)),
+ ?assertEqual(1, maps:size(State0#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State0#?STATE.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State1#?RABBIT_FIFO.returns)),
+ ?assertEqual(0, maps:size(State1#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State1#?STATE.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#?RABBIT_FIFO.messages)),
- ?assertEqual(1, lqueue:len(State2a#?RABBIT_FIFO.returns)),
+ ?assertEqual(0, maps:size(State2a#?STATE.messages)),
+ ?assertEqual(1, lqueue:len(State2a#?STATE.returns)),
?assertMatch(#consumer{checked_out = Ch,
status = suspected_down}
when map_size(Ch) == 0,
- maps:get(Cid, State2a#?RABBIT_FIFO.consumers)),
+ maps:get(Cid, State2a#?STATE.consumers)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
@@ -457,7 +457,7 @@ down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
- ?assert(0 =:= maps:size(State1#?RABBIT_FIFO.enqueuers)),
+ ?assert(0 =:= maps:size(State1#?STATE.enqueuers)),
ok.
discarded_message_without_dead_letter_handler_is_removed_test(_) ->
@@ -536,7 +536,7 @@ pending_enqueue_is_enqueued_on_down_test(_) ->
duplicate_delivery_test(_) ->
{State0, _} = enq(1, 1, first, test_init(test)),
- {#?RABBIT_FIFO{ra_indexes = RaIdxs,
+ {#?STATE{ra_indexes = RaIdxs,
messages = Messages}, _} = enq(2, 1, first, State0),
?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
?assertEqual(1, maps:size(Messages)),
@@ -605,13 +605,13 @@ purge_with_checkout_test(_) ->
{State1, _} = enq(2, 1, <<"first">>, State0),
{State2, _} = enq(3, 2, <<"second">>, State1),
%% assert message bytes are non zero
- ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0),
- ?assert(State2#?RABBIT_FIFO.msg_bytes_enqueue > 0),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assert(State2#?STATE.msg_bytes_enqueue > 0),
{State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2),
- ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0),
- ?assertEqual(0, State3#?RABBIT_FIFO.msg_bytes_enqueue),
- ?assertEqual(1, rabbit_fifo_index:size(State3#?RABBIT_FIFO.ra_indexes)),
- #consumer{checked_out = Checked} = maps:get(Cid, State3#?RABBIT_FIFO.consumers),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assertEqual(0, State3#?STATE.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)),
+ #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers),
?assertEqual(1, maps:size(Checked)),
ok.
@@ -622,16 +622,16 @@ down_noproc_returns_checked_out_in_order_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#?RABBIT_FIFO.messages)),
+ ?assertEqual(100, maps:size(S1#?STATE.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
?assertEqual(100, maps:size(Checked)),
%% simulate down
{S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
- Returns = lqueue:to_list(S#?RABBIT_FIFO.returns),
+ Returns = lqueue:to_list(S#?STATE.returns),
?assertEqual(100, length(Returns)),
- ?assertEqual(0, maps:size(S#?RABBIT_FIFO.consumers)),
+ ?assertEqual(0, maps:size(S#?STATE.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -643,18 +643,18 @@ down_noconnection_returns_checked_out_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, NumMsgs)),
- ?assertEqual(NumMsgs, maps:size(S1#?RABBIT_FIFO.messages)),
+ ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
?assertEqual(NumMsgs, maps:size(Checked)),
%% simulate down
{S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
- Returns = lqueue:to_list(S#?RABBIT_FIFO.returns),
+ Returns = lqueue:to_list(S#?STATE.returns),
?assertEqual(NumMsgs, length(Returns)),
?assertMatch(#consumer{checked_out = Ch}
when map_size(Ch) == 0,
- maps:get(Cid, S#?RABBIT_FIFO.consumers)),
+ maps:get(Cid, S#?STATE.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -666,8 +666,8 @@ single_active_consumer_basic_get_test(_) ->
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy),
- ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
{State1, _} = enq(1, 1, first, State0),
{_State, {error, unsupported}} =
apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
@@ -680,8 +680,8 @@ single_active_consumer_test(_) ->
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy),
- ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
% adding some consumers
AddConsumer = fun(CTag, State) ->
@@ -701,24 +701,24 @@ single_active_consumer_test(_) ->
C4 = {<<"ctag4">>, self()},
% the first registered consumer is the active one, the others are waiting
- ?assertEqual(1, map_size(State1#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C1 := _}, State1#?RABBIT_FIFO.consumers),
- ?assertEqual(3, length(State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(1, map_size(State1#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State1#?STATE.consumers),
+ ?assertEqual(3, length(State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)),
% cancelling a waiting consumer
{State2, _, Effects1} = apply(meta(2),
make_checkout(C3, cancel, #{}),
State1),
% the active consumer should still be in place
- ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C1 := _}, State2#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State2#?STATE.consumers),
% the cancelled consumer has been removed from waiting consumers
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)),
% there are some effects to unregister the consumer
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C3, Effects1),
@@ -728,12 +728,12 @@ single_active_consumer_test(_) ->
make_checkout(C1, cancel, #{}),
State2),
% the second registered consumer is now the active one
- ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C2 := _}, State3#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ ?assertMatch(#{C2 := _}, State3#?STATE.consumers),
% the new active consumer is no longer in the waiting list
- ?assertEqual(1, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(1, length(State3#?STATE.waiting_consumers)),
?assertNotEqual(false, lists:keyfind(C4, 1,
- State3#?RABBIT_FIFO.waiting_consumers)),
+ State3#?STATE.waiting_consumers)),
%% should have a cancel consumer handler mod_call effect and
%% an active new consumer effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -746,10 +746,10 @@ single_active_consumer_test(_) ->
make_checkout(C2, cancel, #{}),
State3),
% the last waiting consumer became the active one
- ?assertEqual(1, map_size(State4#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C4 := _}, State4#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State4#?STATE.consumers)),
+ ?assertMatch(#{C4 := _}, State4#?STATE.consumers),
% the waiting consumer list is now empty
- ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
% there are some effects to unregister the consumer and
% to update the new active one (metrics)
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -762,9 +762,9 @@ single_active_consumer_test(_) ->
make_checkout(C4, cancel, #{}),
State4),
% no active consumer anymore
- ?assertEqual(0, map_size(State5#?RABBIT_FIFO.consumers)),
+ ?assertEqual(0, map_size(State5#?STATE.consumers)),
% still nothing in the waiting list
- ?assertEqual(0, length(State5#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State5#?STATE.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, _}, Effects4),
@@ -799,9 +799,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the channel of the active consumer goes down
{State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
% fell back to another consumer
- ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)),
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
% there are still waiting consumers
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
% effects to unregister the consumer and
% to update the new active one (metrics) are there
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -812,9 +812,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the channel of the active consumer and a waiting consumer goes down
{State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
% fell back to another consumer
- ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)),
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
% no more waiting consumer
- ?assertEqual(0, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State3#?STATE.waiting_consumers)),
% effects to cancel both consumers of this channel + effect to update the new active one (metrics)
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C2, Effects2),
@@ -826,8 +826,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the last channel goes down
{State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
- ?assertEqual(0, map_size(State4#?RABBIT_FIFO.consumers)),
- ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, map_size(State4#?STATE.consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C4, Effects3),
@@ -862,10 +862,10 @@ single_active_returns_messages_on_noconnection_test(_) ->
% simulate node goes down
{State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
%% assert the consumer is up
- ?assertMatch([_], lqueue:to_list(State3#?RABBIT_FIFO.returns)),
+ ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)),
?assertMatch([{_, #consumer{checked_out = Checked}}]
when map_size(Checked) == 0,
- State3#?RABBIT_FIFO.waiting_consumers),
+ State3#?STATE.waiting_consumers),
ok.
@@ -896,7 +896,7 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
%% assert the consumer is up
?assertMatch(#{C1 := #consumer{status = up}},
- State1a#?RABBIT_FIFO.consumers),
+ State1a#?STATE.consumers),
{State1, _} = enq(10, 1, msg, State1a),
@@ -907,24 +907,24 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
?assertMatch([{C2, #consumer{status = up,
checked_out = Ch}}]
when map_size(Ch) == 1,
- maps:to_list(State2#?RABBIT_FIFO.consumers)),
+ maps:to_list(State2#?STATE.consumers)),
%% the disconnected consumer has been returned to waiting
?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
- State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
+ State2#?STATE.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
% simulate node comes back up
{State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
%% the consumer is still active and the same as before
?assertMatch([{C2, #consumer{status = up}}],
- maps:to_list(State3#?RABBIT_FIFO.consumers)),
+ maps:to_list(State3#?STATE.consumers)),
% the waiting consumers should be un-suspected
- ?assertEqual(2, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State3#?STATE.waiting_consumers)),
lists:foreach(fun({_, #consumer{status = Status}}) ->
?assert(Status /= suspected_down)
- end, State3#?RABBIT_FIFO.waiting_consumers),
+ end, State3#?STATE.waiting_consumers),
ok.
single_active_consumer_all_disconnected_test(_) ->
@@ -953,22 +953,22 @@ single_active_consumer_all_disconnected_test(_) ->
end, State0, ConsumerIds),
%% assert the consumer is up
- ?assertMatch(#{C1 := #consumer{status = up}}, State1#?RABBIT_FIFO.consumers),
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers),
% simulate node goes down
{State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
%% assert the consumer fails over to the consumer on n2
- ?assertMatch(#{C2 := #consumer{status = up}}, State2#?RABBIT_FIFO.consumers),
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers),
{State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
%% assert these no active consumer after both nodes are maked as down
- ?assertMatch([], maps:to_list(State3#?RABBIT_FIFO.consumers)),
+ ?assertMatch([], maps:to_list(State3#?STATE.consumers)),
%% n2 comes back
{State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
%% ensure n2 is the active consumer as this node as been registered
%% as up again
?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
credit = 1}}],
- maps:to_list(State4#?RABBIT_FIFO.consumers)),
+ maps:to_list(State4#?STATE.consumers)),
ok.
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
@@ -1050,11 +1050,11 @@ query_consumers_test(_) ->
NewState
end,
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
- Consumers0 = State1#?RABBIT_FIFO.consumers,
+ Consumers0 = State1#?STATE.consumers,
Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
Consumers1 = maps:put({<<"ctag2">>, self()},
Consumer#consumer{status = suspected_down}, Consumers0),
- State2 = State1#?RABBIT_FIFO{consumers = Consumers1},
+ State2 = State1#?STATE{consumers = Consumers1},
?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)),
Consumers2 = rabbit_fifo_v0:query_consumers(State2),
@@ -1203,13 +1203,13 @@ single_active_cancelled_with_unacked_test(_) ->
%% C2 should be the active consumer
?assertMatch(#{C2 := #consumer{status = up,
checked_out = #{0 := _}}},
- State4#?RABBIT_FIFO.consumers),
+ State4#?STATE.consumers),
%% C1 should be a cancelled consumer
?assertMatch(#{C1 := #consumer{status = cancelled,
lifetime = once,
checked_out = #{0 := _}}},
- State4#?RABBIT_FIFO.consumers),
- ?assertMatch([], State4#?RABBIT_FIFO.waiting_consumers),
+ State4#?STATE.consumers),
+ ?assertMatch([], State4#?STATE.waiting_consumers),
%% Ack both messages
{State5, _Effects5} = settle(C1, 1, 0, State4),
@@ -1218,11 +1218,11 @@ single_active_cancelled_with_unacked_test(_) ->
%% C2 should remain
?assertMatch(#{C2 := #consumer{status = up}},
- State6#?RABBIT_FIFO.consumers),
+ State6#?STATE.consumers),
%% C1 should be gone
?assertNotMatch(#{C1 := _},
- State6#?RABBIT_FIFO.consumers),
- ?assertMatch([], State6#?RABBIT_FIFO.waiting_consumers),
+ State6#?STATE.consumers),
+ ?assertMatch([], State6#?STATE.waiting_consumers),
ok.
single_active_with_credited_test(_) ->
@@ -1253,9 +1253,9 @@ single_active_with_credited_test(_) ->
{State3, _} = apply(meta(4), C2Cred, State2),
%% both consumers should have credit
?assertMatch(#{C1 := #consumer{credit = 5}},
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
?assertMatch([{C2, #consumer{credit = 4}}],
- State3#?RABBIT_FIFO.waiting_consumers),
+ State3#?STATE.waiting_consumers),
ok.
purge_nodes_test(_) ->
@@ -1292,11 +1292,8 @@ purge_nodes_test(_) ->
State4),
%% assert there are no enqueuers nor consumers
- ?assertMatch(#?RABBIT_FIFO{enqueuers = Enqs} when map_size(Enqs) == 1,
- State),
-
- ?assertMatch(#?RABBIT_FIFO{consumers = Cons} when map_size(Cons) == 0,
- State),
+ ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State),
+ ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State),
?assertMatch(
[{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{}, _Metrics,