diff options
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 53 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.hrl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 62 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_v1.erl | 352 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_v1.hrl | 5 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_prop_SUITE.erl | 16 |
6 files changed, 231 insertions, 261 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index c7cbb55aa6..59e3962278 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -684,6 +684,7 @@ convert_v1_to_v2(V1State) -> max_in_memory_bytes = rabbit_fifo_v1:get_cfg_field(max_in_memory_bytes, V1State), expires = rabbit_fifo_v1:get_cfg_field(expires, V1State) }, + #?MODULE{ cfg = Cfg, messages = MessagesV2, @@ -713,16 +714,8 @@ purge_node(Meta, Node, State, Effects) -> %% any downs that re not noconnection handle_down(Meta, Pid, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, + % Remove any enqueuer for the down pid + State1 = State0#?MODULE{enqueuers = maps:remove(Pid, Enqs0)}, {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid @@ -1343,6 +1336,8 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> State2 = incr_enqueue_count(incr_total(State1)), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; + {out_of_sequence, State, Effects} -> + {State, not_enqueued, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} end. @@ -1433,50 +1428,30 @@ maybe_store_dehydrated_state(RaftIdx, maybe_store_dehydrated_state(_RaftIdx, State) -> State. -enqueue_pending(From, - #enqueuer{next_seqno = Next, - pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0, - State0) -> - State = enqueue(RaftIdx, RawMsg, State0), - Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, - enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> - State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. - maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #?MODULE{enqueuers = Enqueuers0, - ra_indexes = Indexes0} = State0) -> - + #?MODULE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, - {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, - RawMsg, Effects0, State1), - {ok, State, [{monitor, process, From} | Effects]}; + {Res, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, + RawMsg, Effects0, State1), + {Res, State, [{monitor, process, From} | Effects]}; #enqueuer{next_seqno = MsgSeqNo} = Enq0 -> % it is the next expected seqno State1 = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1}, - State = enqueue_pending(From, Enq, State1), + State = State1#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, {ok, State, Effects0}; - #enqueuer{next_seqno = Next, - pending = Pending0} = Enq0 + #enqueuer{next_seqno = Next} when MsgSeqNo > Next -> - % out of order delivery - Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], - Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - %% if the enqueue it out of order we need to mark it in the - %% index - Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}, - ra_indexes = Indexes}, Effects0}; + %% TODO: when can this happen? + {out_of_sequence, State0, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> - % duplicate delivery - remove the raft index from the ra_indexes - % map as it was added earlier + % duplicate delivery {duplicate, State0, Effects0} end. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 95e7003460..a8d539fa1a 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -123,9 +123,7 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list unused, - pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], - status = up :: up | - suspected_down, + status = up :: up | suspected_down, %% it is useful to have a record of when this was blocked %% so that we can retry sending the block effect if %% the publisher did not receive the initial one diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index e102ad2207..5947f6ddae 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -40,8 +40,6 @@ -define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). -%% last_applied is initialised to -1 --type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: {rabbit_fifo:consumer_tag(), non_neg_integer()}}. @@ -65,10 +63,6 @@ leader :: undefined | ra:server_id(), queue_status :: undefined | go | reject_publish, next_seq = 0 :: seq(), - %% Last applied is initialise to -1 to note that no command has yet been - %% applied, but allowing to resend messages if the first ones on the sequence - %% are lost (messages are sent from last_applied + 1) - last_applied = -1 :: maybe_seq(), next_enqueue_seq = 1 :: seq(), %% indicates that we've exceeded the soft limit slow = false :: boolean(), @@ -585,18 +579,26 @@ handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known {ok, State, []}; -handle_ra_event(Leader, {machine, leader_change}, State0) -> +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = OldLeader} = State0) -> %% we need to update leader %% and resend any pending commands + rabbit_log:debug("~s: Detected QQ leader change from ~w to ~w", + [?MODULE, OldLeader, Leader]), + State = resend_all_pending(State0#state{leader = Leader}), + {ok, State, []}; +handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}}, + #state{leader = Leader} = State) -> + {ok, State, []}; +handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}}, + #state{leader = OldLeader} = State0) -> + rabbit_log:debug("~s: Detected QQ leader change (rejection) from ~w to ~w", + [?MODULE, OldLeader, Leader]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []}; -handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> +handle_ra_event(_From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {ok, State0, []}; -handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> - State1 = State0#state{leader = Leader}, - State = resend(Seq, State1), - {ok, State, []}; handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> case find_leader(Servers) of undefined -> @@ -642,29 +644,27 @@ try_process_command([Server | Rem], Cmd, State) -> try_process_command(Rem, Cmd, State) end. -seq_applied({Seq, MaybeAction}, - {Corrs, Actions0, #state{last_applied = Last} = State0}) - when Seq > Last -> - State1 = do_resends(Last+1, Seq-1, State0), - {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), +seq_applied({Seq, Response}, + {Corrs, Actions0, #state{} = State0}) -> + %% sequences aren't guaranteed to be applied in order as enqueues are + %% low priority commands and may be overtaken by others with a normal priority. + {Actions, State} = maybe_add_action(Response, Actions0, State0), case maps:take(Seq, State#state.pending) of {{undefined, _}, Pending} -> - {Corrs, Actions, State#state{pending = Pending, - last_applied = Seq}}; - {{Corr, _}, Pending} -> - {[Corr | Corrs], Actions, State#state{pending = Pending, - last_applied = Seq}}; - error -> - % must have already been resent or removed for some other reason - % still need to update last_applied or we may inadvertently resend - % stuff later - {Corrs, Actions, State#state{last_applied = Seq}} + {Corrs, Actions, State#state{pending = Pending}}; + {{Corr, _}, Pending} + when Response /= not_enqueued -> + {[Corr | Corrs], Actions, State#state{pending = Pending}}; + _ -> + {Corrs, Actions, State#state{}} end; seq_applied(_Seq, Acc) -> Acc. maybe_add_action(ok, Acc, State) -> {Acc, State}; +maybe_add_action(not_enqueued, Acc, State) -> + {Acc, State}; maybe_add_action({multi, Actions}, Acc0, State0) -> lists:foldl(fun (Act, {Acc, State}) -> maybe_add_action(Act, Acc, State) @@ -681,10 +681,10 @@ maybe_add_action(Action, Acc, State) -> %% anything else is assumed to be an action {[Action | Acc], State}. -do_resends(From, To, State) when From =< To -> - lists:foldl(fun resend/2, State, lists:seq(From, To)); -do_resends(_, _, State) -> - State. +% do_resends(From, To, State) when From =< To -> +% lists:foldl(fun resend/2, State, lists:seq(From, To)); +% do_resends(_, _, State) -> +% State. % resends a command with a new sequence number resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> diff --git a/deps/rabbit/src/rabbit_fifo_v1.erl b/deps/rabbit/src/rabbit_fifo_v1.erl index a59a5c9250..1859251f9f 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.erl +++ b/deps/rabbit/src/rabbit_fifo_v1.erl @@ -113,7 +113,7 @@ -type client_msg() :: delivery(). %% the messages `rabbit_fifo' can send to consumers. --opaque state() :: #?MODULE{}. +-opaque state() :: #?STATE{}. -export_type([protocol/0, delivery/0, @@ -133,7 +133,7 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_config(Conf, #?MODULE{cfg = #cfg{name = Name, + update_config(Conf, #?STATE{cfg = #cfg{name = Name, resource = Resource}}). update_config(Conf, State) -> @@ -153,11 +153,11 @@ update_config(Conf, State) -> false -> competing end, - Cfg = State#?MODULE.cfg, + Cfg = State#?STATE.cfg, RCISpec = {RCI, RCI}, LastActive = maps:get(created, Conf, undefined), - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, + State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, dead_letter_handler = DLH, become_leader_handler = BLH, overflow_strategy = Overflow, @@ -182,7 +182,7 @@ apply(Meta, #enqueue{pid = From, seq = Seq, msg = RawMsg}, State00) -> apply_enqueue(Meta, From, Seq, RawMsg, State00); apply(_Meta, #register_enqueuer{pid = Pid}, - #?MODULE{enqueuers = Enqueuers0, + #?STATE{enqueuers = Enqueuers0, cfg = #cfg{overflow_strategy = Overflow}} = State0) -> State = case maps:is_key(Pid, Enqueuers0) of @@ -190,7 +190,7 @@ apply(_Meta, #register_enqueuer{pid = Pid}, %% if the enqueuer exits just echo the overflow state State0; false -> - State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} + State0#?STATE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} end, Res = case is_over_limit(State) of true when Overflow == reject_publish -> @@ -201,7 +201,7 @@ apply(_Meta, #register_enqueuer{pid = Pid}, {State, Res, [{monitor, process, Pid}]}; apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot @@ -213,7 +213,7 @@ apply(Meta, end; apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State0) -> + #?STATE{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), @@ -224,7 +224,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, {State0, ok} end; apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), @@ -234,7 +234,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0, waiting_consumers = Waiting0} = State0) -> case Cons0 of @@ -248,7 +248,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = checkout(Meta, State0, - State0#?MODULE{service_queue = ServiceQueue, + State0#?STATE{service_queue = ServiceQueue, consumers = Cons}, [], false), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value @@ -259,16 +259,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = - maps:get(ConsumerId, State1#?MODULE.consumers), + maps:get(ConsumerId, State1#?STATE.consumers), %% add the outstanding credit to the delivery count DeliveryCount = Con#consumer.delivery_count + PostCred, Consumers = maps:put(ConsumerId, Con#consumer{delivery_count = DeliveryCount, credit = 0}, - State1#?MODULE.consumers), + State1#?STATE.consumers), Drained = Con#consumer.credit, {CTag, _} = ConsumerId, - {State1#?MODULE{consumers = Consumers}, + {State1#?STATE{consumers = Consumers}, %% returning a multi response with two client actions %% for the channel to execute {multi, [Response, {send_drained, {CTag, Drained}}]}, @@ -282,7 +282,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% grant the credit C = max(0, RemoteDelCnt + NewCredit - DelCnt), Con = Con0#consumer{credit = C}, - State = State0#?MODULE{waiting_consumers = + State = State0#?STATE{waiting_consumers = [{ConsumerId, Con} | Waiting]}, {State, {send_credit_reply, messages_ready(State)}}; false -> @@ -293,16 +293,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State0, ok} end; apply(_, #checkout{spec = {dequeue, _}}, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, {unsupported, single_active_consumer}}}; apply(#{index := Index, system_time := Ts, from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, - #?MODULE{consumers = Consumers} = State00) -> + #?STATE{consumers = Consumers} = State00) -> %% dequeue always updates last_active - State0 = State00#?MODULE{last_active = Ts}, + State0 = State00#?STATE{last_active = Ts}, %% all dequeue operations result in keeping the queue from expiring Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of @@ -361,7 +361,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0), checkout(Meta, State0, State1, [{monitor, process, Pid}]); apply(#{index := Index}, #purge{}, - #?MODULE{ra_indexes = Indexes0, + #?STATE{ra_indexes = Indexes0, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), @@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{}, Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), - State1 = State0#?MODULE{ra_indexes = Indexes, + State1 = State0#?STATE{ra_indexes = Indexes, messages = lqueue:new(), returns = lqueue:new(), msg_bytes_enqueue = 0, @@ -385,7 +385,7 @@ apply(#{index := Index}, #purge{}, apply(#{index := Idx}, #garbage_collection{}, State) -> update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, enqueuers = Enqs0} = State0) -> @@ -405,13 +405,13 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, Cid, C0#consumer{credit = Credit}), %% if the consumer was cancelled there is a chance it got %% removed when returning hence we need to be defensive here - Waiting = case St#?MODULE.consumers of + Waiting = case St#?STATE.consumers of #{Cid := C} -> Waiting0 ++ [{Cid, C}]; _ -> Waiting0 end, - {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers), waiting_consumers = Waiting, last_active = Ts}, Effs1}; @@ -422,7 +422,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, suspected_down), %% select a new consumer from the waiting queue and run a checkout - State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + State2 = State1#?STATE{waiting_consumers = WaitingConsumers}, {State, Effects1} = activate_next_consumer(State2, Effects0), %% mark any enquers as suspected @@ -431,9 +431,9 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that %% any processes on this node are down, they _may_ come back so here @@ -469,18 +469,18 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects = case maps:size(State#?MODULE.consumers) of + Effects = case maps:size(State#?STATE.consumers) of 0 -> [{aux, inactive}, {monitor, node, Node}]; _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State0, State#?MODULE{enqueuers = Enqs, + checkout(Meta, State0, State#?STATE{enqueuers = Enqs, last_active = Ts}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> {State, Effects} = handle_down(Meta, Pid, State0), checkout(Meta, State0, State, Effects); -apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, +apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, enqueuers = Enqs0, service_queue = _SQ0} = State0) -> %% A node we are monitoring has come back. @@ -509,7 +509,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {State0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State1, up), - State2 = State1#?MODULE{ + State2 = State1#?STATE{ enqueuers = Enqs1, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State2, Effects1), @@ -566,7 +566,7 @@ convert_v0_to_v1(V0State0) -> max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State) }, - #?MODULE{cfg = Cfg, + #?STATE{cfg = Cfg, messages = V1Msgs, next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State), returns = rabbit_fifo_v0:get_field(returns, V0State), @@ -591,7 +591,7 @@ purge_node(Meta, Node, State, Effects) -> end, {State, Effects}, all_pids_for(Node, State)). %% any downs that re not noconnection -handle_down(Meta, Pid, #?MODULE{consumers = Cons0, +handle_down(Meta, Pid, #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid @@ -599,7 +599,7 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0, {#enqueuer{pending = Pend}, Enqs} -> lists:foldl(fun ({_, RIdx, RawMsg}, S) -> enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); + end, State0#?STATE{enqueuers = Enqs}, Pend); error -> State0 end, @@ -612,25 +612,25 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0, cancel_consumer(Meta, ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) -> fun(_, _, _, _, _, Effects) -> Effects end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, @@ -642,11 +642,11 @@ handle_waiting_consumer_down(Pid, % update state to have only up waiting consumers StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), - State = State0#?MODULE{waiting_consumers = StillUp}, + State = State0#?STATE{waiting_consumers = StillUp}, {Effects, State}. update_waiting_consumer_status(Node, - #?MODULE{waiting_consumers = WaitingConsumers}, + #?STATE{waiting_consumers = WaitingConsumers}, Status) -> [begin case node(Pid) of @@ -659,7 +659,7 @@ update_waiting_consumer_status(Node, Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #?MODULE{consumers = Cons, +state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, @@ -682,7 +682,7 @@ state_enter(leader, #?MODULE{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(eol, #?MODULE{enqueuers = Enqs, +state_enter(eol, #?STATE{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), @@ -693,7 +693,7 @@ state_enter(eol, #?MODULE{enqueuers = Enqs, || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ [{aux, eol}, {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> +state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, [FHReservation]; state_enter(_, _) -> @@ -702,7 +702,7 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(Ts, #?MODULE{cfg = #cfg{name = Name, +tick(Ts, #?STATE{cfg = #cfg{name = Name, resource = QName}, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> @@ -722,7 +722,7 @@ tick(Ts, #?MODULE{cfg = #cfg{name = Name, end. -spec overview(state()) -> map(). -overview(#?MODULE{consumers = Cons, +overview(#?STATE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, enqueue_count = EnqCount, @@ -743,7 +743,7 @@ overview(#?MODULE{consumers = Cons, delivery_limit => Cfg#cfg.delivery_limit }, Smallest = rabbit_fifo_index:smallest(Indexes), - #{type => ?MODULE, + #{type => ?STATE, config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -759,7 +759,7 @@ overview(#?MODULE{consumers = Cons, -spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> [delivery_msg()]. -get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> +get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> [{K, snd(snd(maps:get(K, Checked)))} @@ -773,7 +773,7 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> version() -> 1. which_module(0) -> rabbit_fifo_v0; -which_module(1) -> ?MODULE. +which_module(1) -> ?STATE. -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), @@ -812,7 +812,7 @@ handle_aux(_RaState, cast, eol, #aux{name = Name} = Aux, Log, _) -> ets:delete(rabbit_fifo_usage, Name), {no_reply, Aux, Log}; handle_aux(_RaState, {call, _From}, oldest_entry_timestamp, Aux, - Log, #?MODULE{ra_indexes = Indexes}) -> + Log, #?STATE{ra_indexes = Indexes}) -> Ts = case rabbit_fifo_index:smallest(Indexes) of %% if there are no entries, we return current timestamp %% so that any previously obtained entries are considered older than this @@ -839,7 +839,7 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, end. -eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, +eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> {Idx, _} = ra_log:last_index_term(Log), {memory, Mem} = erlang:process_info(self(), memory), @@ -856,7 +856,7 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, AuxState end. -force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}}, +force_eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}}, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> {Idx, _} = ra_log:last_index_term(Log), {memory, Mem} = erlang:process_info(self(), memory), @@ -877,7 +877,7 @@ force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}}, query_messages_ready(State) -> messages_ready(State). -query_messages_checked_out(#?MODULE{consumers = Consumers}) -> +query_messages_checked_out(#?STATE{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> maps:size(C) + S end, 0, Consumers). @@ -885,22 +885,22 @@ query_messages_checked_out(#?MODULE{consumers = Consumers}) -> query_messages_total(State) -> messages_total(State). -query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> +query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), maps:keys(maps:merge(Enqs, Cons)). -query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> +query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#?MODULE{consumers = Consumers, +query_consumer_count(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) -> Status =/= suspected_down end, Consumers), maps:size(Up) + length(WaitingConsumers). -query_consumers(#?MODULE{consumers = Consumers, +query_consumers(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = @@ -961,7 +961,7 @@ query_consumers(#?MODULE{consumers = Consumers, maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, +query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> case maps:size(Consumers) of 0 -> @@ -975,10 +975,10 @@ query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_acti query_single_active_consumer(_) -> disabled. -query_stat(#?MODULE{consumers = Consumers} = State) -> +query_stat(#?STATE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. -query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, +query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) -> {Length, Bytes}. @@ -993,7 +993,7 @@ query_peek(Pos, State0) when Pos > 0 -> query_peek(Pos-1, State) end. -query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) -> +query_notify_decorators_info(#?STATE{consumers = Consumers} = State) -> MaxActivePriority = maps:fold(fun(_, #consumer{credit = C, status = up, priority = P0}, MaxP) when C > 0 -> @@ -1018,14 +1018,14 @@ usage(Name) when is_atom(Name) -> %%% Internal -messages_ready(#?MODULE{messages = M, +messages_ready(#?STATE{messages = M, prefix_msgs = {RCnt, _R, PCnt, _P}, returns = R}) -> %% prefix messages will rarely have anything in them during normal %% operations so length/1 is fine here lqueue:len(M) + lqueue:len(R) + RCnt + PCnt. -messages_total(#?MODULE{ra_indexes = I, +messages_total(#?STATE{ra_indexes = I, prefix_msgs = {RCnt, _R, PCnt, _P}}) -> rabbit_fifo_index:size(I) + RCnt + PCnt. @@ -1059,23 +1059,23 @@ moving_average(Time, HalfLife, Next, Current) -> Weight = math:exp(Time * math:log(0.5) / HalfLife), Next * (1 - Weight) + Current * Weight. -num_checked_out(#?MODULE{consumers = Cons}) -> +num_checked_out(#?STATE{consumers = Cons}) -> maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> maps:size(C) + Acc end, 0, Cons). cancel_consumer(Meta, ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State, Effects, Reason) -> cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); cancel_consumer(Meta, ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); cancel_consumer(Meta, ConsumerId, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0} = State0, Effects0, Reason) -> @@ -1093,10 +1093,10 @@ cancel_consumer(Meta, ConsumerId, Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = Waiting}, Effects} + {State0#?STATE{waiting_consumers = Waiting}, Effects} end. -consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, +consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}}, ConsumerId, #consumer{meta = Meta}, Active, ActivityStatus, Effects) -> @@ -1108,7 +1108,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, | Effects]. cancel_consumer0(Meta, ConsumerId, - #?MODULE{consumers = C0} = S0, Effects0, Reason) -> + #?STATE{consumers = C0} = S0, Effects0, Reason) -> case C0 of #{ConsumerId := Consumer} -> {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, @@ -1120,7 +1120,7 @@ cancel_consumer0(Meta, ConsumerId, %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), - case maps:size(S#?MODULE.consumers) of + case maps:size(S#?STATE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; _ -> @@ -1131,7 +1131,7 @@ cancel_consumer0(Meta, ConsumerId, {S0, Effects0} end. -activate_next_consumer(#?MODULE{consumers = Cons, +activate_next_consumer(#?STATE{consumers = Cons, waiting_consumers = Waiting0} = State0, Effects0) -> case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of @@ -1143,11 +1143,11 @@ activate_next_consumer(#?MODULE{consumers = Cons, [{NextConsumerId, NextConsumer} | _] -> %% there is a potential next active consumer Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), - #?MODULE{service_queue = ServiceQueue} = State0, + #?STATE{service_queue = ServiceQueue} = State0, ServiceQueue1 = maybe_queue_consumer(NextConsumerId, NextConsumer, ServiceQueue), - State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer}, service_queue = ServiceQueue1, waiting_consumers = Remaining}, Effects = consumer_update_active_effects(State, NextConsumerId, @@ -1173,7 +1173,7 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0 S0), Effects0}; down -> {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer), - {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers), + {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers), last_active = Ts}, Effects1} end. @@ -1188,12 +1188,12 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> {State, ok, Effects} end. -drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> +drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}), State = case Msg of 'empty' -> State2; _ -> subtract_in_memory_counts(Header, State2) @@ -1211,7 +1211,7 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> {State0, Effects0} end. -enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, +enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages, next_msg_num = NextMsgNum} = State0) -> %% the initial header is an integer only - it will get expanded to a map %% when the next required key is added @@ -1227,17 +1227,17 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, State = add_bytes_enqueue(Header, State1), - State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages), + State#?STATE{messages = lqueue:in({NextMsgNum, Msg}, Messages), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, - #?MODULE{ra_indexes = Indexes0} = State0) -> + #?STATE{ra_indexes = Indexes0} = State0) -> State = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - State#?MODULE{ra_indexes = Indexes}. + State#?STATE{ra_indexes = Indexes}. -incr_enqueue_count(#?MODULE{enqueue_count = EC, +incr_enqueue_count(#?STATE{enqueue_count = EC, cfg = #cfg{release_cursor_interval = {_Base, C}} } = State0) when EC >= C-> %% this will trigger a dehydrated version of the state to be stored @@ -1245,12 +1245,12 @@ incr_enqueue_count(#?MODULE{enqueue_count = EC, %% Q: Why don't we just stash the release cursor here? %% A: Because it needs to be the very last thing we do and we %% first needs to run the checkout logic. - State0#?MODULE{enqueue_count = 0}; -incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> - State#?MODULE{enqueue_count = C + 1}. + State0#?STATE{enqueue_count = 0}; +incr_enqueue_count(#?STATE{enqueue_count = C} = State) -> + State#?STATE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = + #?STATE{cfg = #cfg{release_cursor_interval = {Base, _}} = Cfg, ra_indexes = Indexes, @@ -1267,12 +1267,12 @@ maybe_store_dehydrated_state(RaftIdx, Total = messages_total(State0), min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX) end, - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {Base, Interval}}}, Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - State#?MODULE{release_cursors = Cursors} + State#?STATE{release_cursors = Cursors} end; maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1284,18 +1284,18 @@ enqueue_pending(From, State = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> - State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. +enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) -> + State#?STATE{enqueuers = Enqueuers0#{From => Enq}}. maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #?MODULE{enqueuers = Enqueuers0} = State0) -> + #?STATE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> - State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, State1), {ok, State, [{monitor, process, From} | Effects]}; @@ -1311,7 +1311,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, % out of order delivery Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; + {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> % duplicate delivery - remove the raft index from the ra_indexes % map as it was added earlier @@ -1333,7 +1333,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, ConsumerId) end, {State0, Effects0}, Returned), State2 = - case State1#?MODULE.consumers of + case State1#?STATE.consumers of #{ConsumerId := Con0} -> Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, @@ -1347,7 +1347,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, % used to processes messages that are finished complete(Meta, ConsumerId, Discarded, #consumer{checked_out = Checked} = Con0, Effects, - #?MODULE{ra_indexes = Indexes0} = State0) -> + #?STATE{ra_indexes = Indexes0} = State0) -> %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit @@ -1365,7 +1365,7 @@ complete(Meta, ConsumerId, Discarded, ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) end, State1, maps:values(Discarded)), - {State2#?MODULE{ra_indexes = Indexes}, Effects}. + {State2#?STATE{ra_indexes = Indexes}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -1389,11 +1389,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = undefined}}, + #?STATE{cfg = #cfg{dead_letter_handler = undefined}}, Effects) -> Effects; dead_letter_effects(Reason, Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> RaftIdxs = maps:fold( fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> @@ -1417,7 +1417,7 @@ dead_letter_effects(Reason, Discarded, end} | Effects]. cancel_consumer_effects(ConsumerId, - #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) -> + #?STATE{cfg = #cfg{resource = QName}} = State, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]}, notify_decorators_effect(State) | Effects]. @@ -1426,7 +1426,7 @@ update_smallest_raft_index(Idx, State, Effects) -> update_smallest_raft_index(Idx, ok, State, Effects). update_smallest_raft_index(IncomingRaftIdx, Reply, - #?MODULE{cfg = Cfg, + #?STATE{cfg = Cfg, ra_indexes = Indexes, release_cursors = Cursors0} = State0, Effects) -> @@ -1438,7 +1438,7 @@ update_smallest_raft_index(IncomingRaftIdx, Reply, %% reset the release cursor interval #cfg{release_cursor_interval = {Base, _}} = Cfg, RCI = {Base, Base}, - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}, + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}, release_cursors = lqueue:new(), enqueue_count = 0}, {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; @@ -1446,11 +1446,11 @@ update_smallest_raft_index(IncomingRaftIdx, Reply, Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#?MODULE{release_cursors = Cursors}, Reply, Effects}; + {State0#?STATE{release_cursors = Cursors}, Reply, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor when we've passed the smallest %% release cursor available. - {State0#?MODULE{release_cursors = Cursors}, Reply, + {State0#?STATE{release_cursors = Cursors}, Reply, Effects ++ [Cursor]} end end. @@ -1475,7 +1475,7 @@ update_header(Key, UpdateFun, Default, Header) -> return_one(Meta, MsgId, 0, {Tag, Header0}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) @@ -1501,12 +1501,12 @@ return_one(Meta, MsgId, 0, {Tag, Header0}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in(Msg, Returns)}), Effects0} end; return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> @@ -1535,17 +1535,17 @@ return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, +return_all(Meta, #?STATE{consumers = Cons} = State0, Effects0, ConsumerId, #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + State = State0#?STATE{consumers = Cons#{ConsumerId => Con}}, lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> @@ -1558,7 +1558,7 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, checkout(Meta, OldState, State, Effects) -> checkout(Meta, OldState, State, Effects, true). -checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, +checkout(#{index := Index} = Meta, #?STATE{cfg = #cfg{resource = QName}} = OldState, State0, Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), Effects0, #{}), @@ -1608,12 +1608,12 @@ checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) -> {State0, ok, lists:reverse(Effects1)}. evaluate_limit(_Index, Result, _BeforeState, - #?MODULE{cfg = #cfg{max_length = undefined, + #?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; evaluate_limit(Index, Result, BeforeState, - #?MODULE{cfg = #cfg{overflow_strategy = Strategy}, + #?STATE{cfg = #cfg{overflow_strategy = Strategy}, enqueuers = Enqs0} = State0, Effects0) -> case is_over_limit(State0) of @@ -1633,7 +1633,7 @@ evaluate_limit(Index, Result, BeforeState, (_P, _E, Acc) -> Acc end, {Enqs0, Effects0}, Enqs0), - {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Result, Effects}; false when Strategy == reject_publish -> %% TODO: optimise as this case gets called for every command %% pretty much @@ -1651,7 +1651,7 @@ evaluate_limit(Index, Result, BeforeState, (_P, _E, Acc) -> Acc end, {Enqs0, Effects0}, Enqs0), - {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Result, Effects}; _ -> {State0, Result, Effects0} end; @@ -1660,13 +1660,13 @@ evaluate_limit(Index, Result, BeforeState, end. evaluate_memory_limit(_Header, - #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + #?STATE{cfg = #cfg{max_in_memory_length = undefined, max_in_memory_bytes = undefined}}) -> false; evaluate_memory_limit(#{size := Size}, State) -> evaluate_memory_limit(Size, State); evaluate_memory_limit(Size, - #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + #?STATE{cfg = #cfg{max_in_memory_length = MaxLength, max_in_memory_bytes = MaxBytes}, msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) @@ -1690,18 +1690,18 @@ append_delivery_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) -> +take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) -> %% conversion - take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}); -take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], + take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}}); +take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first - {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> + {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{returns = Returns, + State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{returns = Returns, messages = Messages0, prefix_msgs = {NumR, R, NumP, P}} = State) -> %% use peek rather than out there as the most likely case is an empty @@ -1709,13 +1709,13 @@ take_next_msg(#?MODULE{returns = Returns, case lqueue:peek(Returns) of {value, NextMsg} -> {NextMsg, - State#?MODULE{returns = lqueue:drop(Returns)}}; + State#?STATE{returns = lqueue:drop(Returns)}}; empty when P == [] -> case lqueue:out(Messages0) of {empty, _} -> empty; {{value, {_, _} = SeqMsg}, Messages} -> - {SeqMsg, State#?MODULE{messages = Messages }} + {SeqMsg, State#?STATE{messages = Messages }} end; empty -> [Msg | Rem] = P, @@ -1723,10 +1723,10 @@ take_next_msg(#?MODULE{returns = Returns, {Header, 'empty'} -> %% There are prefix msgs {{'$empty_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; Header -> {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}} + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}} end end. @@ -1757,7 +1757,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {dequeue, {MsgId, {Header, Msg}}, Ready}}}] end}. -checkout_one(Meta, #?MODULE{service_queue = SQ0, +checkout_one(Meta, #?STATE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case priority_queue:out(SQ0) of @@ -1772,11 +1772,11 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{status = cancelled} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{status = suspected_down} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1788,7 +1788,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, delivery_count = DelCnt + 1}, State1 = update_or_remove_sub(Meta, ConsumerId, Con, - State0#?MODULE{service_queue = SQ1}), + State0#?STATE{service_queue = SQ1}), {State, Msg} = case ConsumerMsg of {'$prefix_msg', Header} -> @@ -1814,7 +1814,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, end; {{value, _ConsumerId}, SQ1} -> %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; @@ -1824,31 +1824,31 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto, credit = 0} = Con, - #?MODULE{consumers = Cons} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}; + #?STATE{consumers = Cons} = State) -> + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)}; update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con, - #?MODULE{consumers = Cons, + #?STATE{consumers = Cons, service_queue = ServiceQueue} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons), service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}; update_or_remove_sub(#{system_time := Ts}, ConsumerId, #consumer{lifetime = once, checked_out = Checked, credit = 0} = Con, - #?MODULE{consumers = Cons} = State) -> + #?STATE{consumers = Cons} = State) -> case maps:size(Checked) of 0 -> % we're done with this consumer - State#?MODULE{consumers = maps:remove(ConsumerId, Cons), + State#?STATE{consumers = maps:remove(ConsumerId, Cons), last_active = Ts}; _ -> % there are unsettled items so need to keep around - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)} + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)} end; update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con, - #?MODULE{consumers = Cons, + #?STATE{consumers = Cons, service_queue = ServiceQueue} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons), service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}. uniq_queue_in(Key, #consumer{priority = P}, Queue) -> @@ -1862,17 +1862,17 @@ uniq_queue_in(Key, #consumer{priority = P}, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, Priority, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, Priority, State0); update_consumer(ConsumerId, Meta, Spec, Priority, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, Priority, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list @@ -1880,10 +1880,10 @@ update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority, priority = Priority, credit = Credit, credit_mode = Mode}, WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], - State0#?MODULE{waiting_consumers = WaitingConsumers1}. + State0#?STATE{waiting_consumers = WaitingConsumers1}. update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, @@ -1899,7 +1899,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. + State0#?STATE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con, ServiceQueue0) -> @@ -1913,7 +1913,7 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#?MODULE{messages = Messages, +dehydrate_state(#?STATE{messages = Messages, consumers = Consumers, returns = Returns, prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, @@ -1937,7 +1937,7 @@ dehydrate_state(#?MODULE{messages = Messages, %% recovering from a snapshot PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], - State#?MODULE{messages = lqueue:new(), + State#?STATE{messages = lqueue:new(), ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), consumers = maps:map(fun (_, C) -> @@ -1973,23 +1973,23 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Con#consumer{checked_out = Checked}. %% make the state suitable for equality comparison -normalize(#?MODULE{messages = Messages, +normalize(#?STATE{messages = Messages, release_cursors = Cursors} = State) -> - State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)), + State#?STATE{messages = lqueue:from_list(lqueue:to_list(Messages)), release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. -is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, +is_over_limit(#?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> false; -is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, +is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). -is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined, +is_below_soft_limit(#?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> false; -is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, +is_below_soft_limit(#?STATE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> is_below(MaxLength, messages_ready(State)) andalso @@ -2049,58 +2049,58 @@ make_update_config(Config) -> #update_config{config = Config}. add_bytes_enqueue(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_enqueue(#{size := Bytes}, State) -> add_bytes_enqueue(Bytes, State). add_bytes_drop(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_drop(#{size := Bytes}, State) -> add_bytes_drop(Bytes, State). add_bytes_checkout(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout + Bytes, + State#?STATE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_checkout(#{size := Bytes}, State) -> add_bytes_checkout(Bytes, State). add_bytes_settle(Bytes, - #?MODULE{msg_bytes_checkout = Checkout} = State) + #?STATE{msg_bytes_checkout = Checkout} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; + State#?STATE{msg_bytes_checkout = Checkout - Bytes}; add_bytes_settle(#{size := Bytes}, State) -> add_bytes_settle(Bytes, State). add_bytes_return(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes, + State#?STATE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_return(#{size := Bytes}, State) -> add_bytes_return(Bytes, State). add_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes, msgs_ready_in_memory = InMemoryCount + 1}; add_in_memory_counts(#{size := Bytes}, State) -> add_in_memory_counts(Bytes, State). subtract_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes, msgs_ready_in_memory = InMemoryCount - 1}; subtract_in_memory_counts(#{size := Bytes}, State) -> subtract_in_memory_counts(Bytes, State). @@ -2124,7 +2124,7 @@ get_size_from_header(#{size := B}) -> B. -all_nodes(#?MODULE{consumers = Cons0, +all_nodes(#?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Nodes0 = maps:fold(fun({_, P}, _, Acc) -> @@ -2138,7 +2138,7 @@ all_nodes(#?MODULE{consumers = Cons0, Acc#{node(P) => ok} end, Nodes1, WaitingConsumers0)). -all_pids_for(Node, #?MODULE{consumers = Cons0, +all_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, _, Acc) @@ -2157,7 +2157,7 @@ all_pids_for(Node, #?MODULE{consumers = Cons0, (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -suspected_pids_for(Node, #?MODULE{consumers = Cons0, +suspected_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) @@ -2177,7 +2177,7 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0, (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires}, +is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires}, last_active = LastActive, consumers = Consumers}) when is_number(LastActive) andalso is_number(Expires) -> @@ -2206,7 +2206,7 @@ maybe_notify_decorators(_, false) -> maybe_notify_decorators(State, _) -> {true, query_notify_decorators_info(State)}. -notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) -> +notify_decorators_effect(#?STATE{cfg = #cfg{resource = QName}} = State) -> {MaxActivePriority, IsEmpty} = query_notify_decorators_info(State), notify_decorators_effect(QName, MaxActivePriority, IsEmpty). @@ -2215,11 +2215,11 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) -> [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}. get_field(Field, State) -> - Fields = record_info(fields, ?MODULE), + Fields = record_info(fields, ?STATE), Index = record_index_of(Field, Fields), element(Index, State). -get_cfg_field(Field, #?MODULE{cfg = Cfg} ) -> +get_cfg_field(Field, #?STATE{cfg = Cfg} ) -> Fields = record_info(fields, cfg), Index = record_index_of(Field, Fields), element(Index, Cfg). diff --git a/deps/rabbit/src/rabbit_fifo_v1.hrl b/deps/rabbit/src/rabbit_fifo_v1.hrl index 3df9883445..4a427f8fed 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.hrl +++ b/deps/rabbit/src/rabbit_fifo_v1.hrl @@ -76,6 +76,7 @@ -define(MB, 1048576). -define(LOW_LIMIT, 0.8). +-define(STATE, rabbit_fifo). -record(consumer, {meta = #{} :: consumer_meta(), @@ -142,7 +143,7 @@ {non_neg_integer(), list(), non_neg_integer(), list()}. --record(rabbit_fifo_v1, +-record(?STATE, {cfg :: #cfg{}, % unassigned messages messages = lqueue:new() :: lqueue:lqueue({msg_in_id(), indexed_msg()}), @@ -164,7 +165,7 @@ % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, - ra:index(), #rabbit_fifo_v1{}}), + ra:index(), #?STATE{}}), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot consumers = #{} :: #{consumer_id() => #consumer{}}, diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index 22cfc0d719..5aefc4debc 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -1164,16 +1164,12 @@ messages_total_prop(Conf0, Commands) -> messages_total_invariant() -> fun(#rabbit_fifo{messages = M, consumers = C, - enqueuers = E, prefix_msgs = {PTot, _, RTot, _}, returns = R} = S) -> Base = lqueue:len(M) + lqueue:len(R) + PTot + RTot, - CTot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) -> + Tot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) -> Acc + map_size(Ch) - end, Base, C), - Tot = maps:fold(fun (_, #enqueuer{pending = P}, Acc) -> - Acc + length(P) - end, CTot, E), + end, Base, C), QTot = rabbit_fifo:query_messages_total(S), case Tot == QTot of true -> true; @@ -1355,10 +1351,10 @@ nodeup_gen(Nodes) -> enqueue_gen(Pid) -> enqueue_gen(Pid, 10, 1). -enqueue_gen(Pid, Enq, Del) -> - ?LET(E, {enqueue, Pid, - frequency([{Enq, enqueue}, - {Del, delay}]), +enqueue_gen(Pid, _Enq, _Del) -> + ?LET(E, {enqueue, Pid, enqueue, + % frequency([{Enq, enqueue}, + % {Del, delay}]), binary()}, E). checkout_cancel_gen(Pid) -> |