diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-29 13:19:58 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-12-03 11:05:22 +0000 |
commit | 88d8bd98acfc3f3487a10af5aae118a4ffa50f4f (patch) | |
tree | 44d7eef6e7d44e3244a876aa56e8a4cfd0a9170c | |
parent | b831dbc88bffab8707140362e84f2e6b23fc3b0e (diff) | |
download | rabbitmq-server-git-qq-resend-protocol-refactor.tar.gz |
QQ: resend protocol refacorqq-resend-protocol-refactor
Instead of resending individual commands when a gap is detect
we Resend all pending commands whenever a leader change is detected.
The quorum queue will drop any duplicate requeues based on its enqueue
sequence number and all other pipelined commands are idempotent.
This makes the resend protocol simpler and removes the need for
keeping pending enqueues in the state machine.
remove unused pending field from enqueuer record
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 53 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.hrl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 62 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_v1.erl | 352 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_v1.hrl | 5 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_prop_SUITE.erl | 16 |
6 files changed, 231 insertions, 261 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index c7cbb55aa6..59e3962278 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -684,6 +684,7 @@ convert_v1_to_v2(V1State) -> max_in_memory_bytes = rabbit_fifo_v1:get_cfg_field(max_in_memory_bytes, V1State), expires = rabbit_fifo_v1:get_cfg_field(expires, V1State) }, + #?MODULE{ cfg = Cfg, messages = MessagesV2, @@ -713,16 +714,8 @@ purge_node(Meta, Node, State, Effects) -> %% any downs that re not noconnection handle_down(Meta, Pid, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, + % Remove any enqueuer for the down pid + State1 = State0#?MODULE{enqueuers = maps:remove(Pid, Enqs0)}, {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid @@ -1343,6 +1336,8 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> State2 = incr_enqueue_count(incr_total(State1)), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; + {out_of_sequence, State, Effects} -> + {State, not_enqueued, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} end. @@ -1433,50 +1428,30 @@ maybe_store_dehydrated_state(RaftIdx, maybe_store_dehydrated_state(_RaftIdx, State) -> State. -enqueue_pending(From, - #enqueuer{next_seqno = Next, - pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0, - State0) -> - State = enqueue(RaftIdx, RawMsg, State0), - Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, - enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> - State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. - maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #?MODULE{enqueuers = Enqueuers0, - ra_indexes = Indexes0} = State0) -> - + #?MODULE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, - {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, - RawMsg, Effects0, State1), - {ok, State, [{monitor, process, From} | Effects]}; + {Res, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, + RawMsg, Effects0, State1), + {Res, State, [{monitor, process, From} | Effects]}; #enqueuer{next_seqno = MsgSeqNo} = Enq0 -> % it is the next expected seqno State1 = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1}, - State = enqueue_pending(From, Enq, State1), + State = State1#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, {ok, State, Effects0}; - #enqueuer{next_seqno = Next, - pending = Pending0} = Enq0 + #enqueuer{next_seqno = Next} when MsgSeqNo > Next -> - % out of order delivery - Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], - Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - %% if the enqueue it out of order we need to mark it in the - %% index - Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}, - ra_indexes = Indexes}, Effects0}; + %% TODO: when can this happen? + {out_of_sequence, State0, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> - % duplicate delivery - remove the raft index from the ra_indexes - % map as it was added earlier + % duplicate delivery {duplicate, State0, Effects0} end. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 95e7003460..a8d539fa1a 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -123,9 +123,7 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list unused, - pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], - status = up :: up | - suspected_down, + status = up :: up | suspected_down, %% it is useful to have a record of when this was blocked %% so that we can retry sending the block effect if %% the publisher did not receive the initial one diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index e102ad2207..5947f6ddae 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -40,8 +40,6 @@ -define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). -%% last_applied is initialised to -1 --type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: {rabbit_fifo:consumer_tag(), non_neg_integer()}}. @@ -65,10 +63,6 @@ leader :: undefined | ra:server_id(), queue_status :: undefined | go | reject_publish, next_seq = 0 :: seq(), - %% Last applied is initialise to -1 to note that no command has yet been - %% applied, but allowing to resend messages if the first ones on the sequence - %% are lost (messages are sent from last_applied + 1) - last_applied = -1 :: maybe_seq(), next_enqueue_seq = 1 :: seq(), %% indicates that we've exceeded the soft limit slow = false :: boolean(), @@ -585,18 +579,26 @@ handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known {ok, State, []}; -handle_ra_event(Leader, {machine, leader_change}, State0) -> +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = OldLeader} = State0) -> %% we need to update leader %% and resend any pending commands + rabbit_log:debug("~s: Detected QQ leader change from ~w to ~w", + [?MODULE, OldLeader, Leader]), + State = resend_all_pending(State0#state{leader = Leader}), + {ok, State, []}; +handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}}, + #state{leader = Leader} = State) -> + {ok, State, []}; +handle_ra_event(_From, {rejected, {not_leader, Leader, _Seq}}, + #state{leader = OldLeader} = State0) -> + rabbit_log:debug("~s: Detected QQ leader change (rejection) from ~w to ~w", + [?MODULE, OldLeader, Leader]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []}; -handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> +handle_ra_event(_From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {ok, State0, []}; -handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> - State1 = State0#state{leader = Leader}, - State = resend(Seq, State1), - {ok, State, []}; handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> case find_leader(Servers) of undefined -> @@ -642,29 +644,27 @@ try_process_command([Server | Rem], Cmd, State) -> try_process_command(Rem, Cmd, State) end. -seq_applied({Seq, MaybeAction}, - {Corrs, Actions0, #state{last_applied = Last} = State0}) - when Seq > Last -> - State1 = do_resends(Last+1, Seq-1, State0), - {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), +seq_applied({Seq, Response}, + {Corrs, Actions0, #state{} = State0}) -> + %% sequences aren't guaranteed to be applied in order as enqueues are + %% low priority commands and may be overtaken by others with a normal priority. + {Actions, State} = maybe_add_action(Response, Actions0, State0), case maps:take(Seq, State#state.pending) of {{undefined, _}, Pending} -> - {Corrs, Actions, State#state{pending = Pending, - last_applied = Seq}}; - {{Corr, _}, Pending} -> - {[Corr | Corrs], Actions, State#state{pending = Pending, - last_applied = Seq}}; - error -> - % must have already been resent or removed for some other reason - % still need to update last_applied or we may inadvertently resend - % stuff later - {Corrs, Actions, State#state{last_applied = Seq}} + {Corrs, Actions, State#state{pending = Pending}}; + {{Corr, _}, Pending} + when Response /= not_enqueued -> + {[Corr | Corrs], Actions, State#state{pending = Pending}}; + _ -> + {Corrs, Actions, State#state{}} end; seq_applied(_Seq, Acc) -> Acc. maybe_add_action(ok, Acc, State) -> {Acc, State}; +maybe_add_action(not_enqueued, Acc, State) -> + {Acc, State}; maybe_add_action({multi, Actions}, Acc0, State0) -> lists:foldl(fun (Act, {Acc, State}) -> maybe_add_action(Act, Acc, State) @@ -681,10 +681,10 @@ maybe_add_action(Action, Acc, State) -> %% anything else is assumed to be an action {[Action | Acc], State}. -do_resends(From, To, State) when From =< To -> - lists:foldl(fun resend/2, State, lists:seq(From, To)); -do_resends(_, _, State) -> - State. +% do_resends(From, To, State) when From =< To -> +% lists:foldl(fun resend/2, State, lists:seq(From, To)); +% do_resends(_, _, State) -> +% State. % resends a command with a new sequence number resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> diff --git a/deps/rabbit/src/rabbit_fifo_v1.erl b/deps/rabbit/src/rabbit_fifo_v1.erl index a59a5c9250..1859251f9f 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.erl +++ b/deps/rabbit/src/rabbit_fifo_v1.erl @@ -113,7 +113,7 @@ -type client_msg() :: delivery(). %% the messages `rabbit_fifo' can send to consumers. --opaque state() :: #?MODULE{}. +-opaque state() :: #?STATE{}. -export_type([protocol/0, delivery/0, @@ -133,7 +133,7 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_config(Conf, #?MODULE{cfg = #cfg{name = Name, + update_config(Conf, #?STATE{cfg = #cfg{name = Name, resource = Resource}}). update_config(Conf, State) -> @@ -153,11 +153,11 @@ update_config(Conf, State) -> false -> competing end, - Cfg = State#?MODULE.cfg, + Cfg = State#?STATE.cfg, RCISpec = {RCI, RCI}, LastActive = maps:get(created, Conf, undefined), - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, + State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, dead_letter_handler = DLH, become_leader_handler = BLH, overflow_strategy = Overflow, @@ -182,7 +182,7 @@ apply(Meta, #enqueue{pid = From, seq = Seq, msg = RawMsg}, State00) -> apply_enqueue(Meta, From, Seq, RawMsg, State00); apply(_Meta, #register_enqueuer{pid = Pid}, - #?MODULE{enqueuers = Enqueuers0, + #?STATE{enqueuers = Enqueuers0, cfg = #cfg{overflow_strategy = Overflow}} = State0) -> State = case maps:is_key(Pid, Enqueuers0) of @@ -190,7 +190,7 @@ apply(_Meta, #register_enqueuer{pid = Pid}, %% if the enqueuer exits just echo the overflow state State0; false -> - State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} + State0#?STATE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} end, Res = case is_over_limit(State) of true when Overflow == reject_publish -> @@ -201,7 +201,7 @@ apply(_Meta, #register_enqueuer{pid = Pid}, {State, Res, [{monitor, process, Pid}]}; apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot @@ -213,7 +213,7 @@ apply(Meta, end; apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State0) -> + #?STATE{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), @@ -224,7 +224,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, {State0, ok} end; apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), @@ -234,7 +234,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0, waiting_consumers = Waiting0} = State0) -> case Cons0 of @@ -248,7 +248,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = checkout(Meta, State0, - State0#?MODULE{service_queue = ServiceQueue, + State0#?STATE{service_queue = ServiceQueue, consumers = Cons}, [], false), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value @@ -259,16 +259,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = - maps:get(ConsumerId, State1#?MODULE.consumers), + maps:get(ConsumerId, State1#?STATE.consumers), %% add the outstanding credit to the delivery count DeliveryCount = Con#consumer.delivery_count + PostCred, Consumers = maps:put(ConsumerId, Con#consumer{delivery_count = DeliveryCount, credit = 0}, - State1#?MODULE.consumers), + State1#?STATE.consumers), Drained = Con#consumer.credit, {CTag, _} = ConsumerId, - {State1#?MODULE{consumers = Consumers}, + {State1#?STATE{consumers = Consumers}, %% returning a multi response with two client actions %% for the channel to execute {multi, [Response, {send_drained, {CTag, Drained}}]}, @@ -282,7 +282,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% grant the credit C = max(0, RemoteDelCnt + NewCredit - DelCnt), Con = Con0#consumer{credit = C}, - State = State0#?MODULE{waiting_consumers = + State = State0#?STATE{waiting_consumers = [{ConsumerId, Con} | Waiting]}, {State, {send_credit_reply, messages_ready(State)}}; false -> @@ -293,16 +293,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State0, ok} end; apply(_, #checkout{spec = {dequeue, _}}, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, {unsupported, single_active_consumer}}}; apply(#{index := Index, system_time := Ts, from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, - #?MODULE{consumers = Consumers} = State00) -> + #?STATE{consumers = Consumers} = State00) -> %% dequeue always updates last_active - State0 = State00#?MODULE{last_active = Ts}, + State0 = State00#?STATE{last_active = Ts}, %% all dequeue operations result in keeping the queue from expiring Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of @@ -361,7 +361,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0), checkout(Meta, State0, State1, [{monitor, process, Pid}]); apply(#{index := Index}, #purge{}, - #?MODULE{ra_indexes = Indexes0, + #?STATE{ra_indexes = Indexes0, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), @@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{}, Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), - State1 = State0#?MODULE{ra_indexes = Indexes, + State1 = State0#?STATE{ra_indexes = Indexes, messages = lqueue:new(), returns = lqueue:new(), msg_bytes_enqueue = 0, @@ -385,7 +385,7 @@ apply(#{index := Index}, #purge{}, apply(#{index := Idx}, #garbage_collection{}, State) -> update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, enqueuers = Enqs0} = State0) -> @@ -405,13 +405,13 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, Cid, C0#consumer{credit = Credit}), %% if the consumer was cancelled there is a chance it got %% removed when returning hence we need to be defensive here - Waiting = case St#?MODULE.consumers of + Waiting = case St#?STATE.consumers of #{Cid := C} -> Waiting0 ++ [{Cid, C}]; _ -> Waiting0 end, - {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers), waiting_consumers = Waiting, last_active = Ts}, Effs1}; @@ -422,7 +422,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, suspected_down), %% select a new consumer from the waiting queue and run a checkout - State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + State2 = State1#?STATE{waiting_consumers = WaitingConsumers}, {State, Effects1} = activate_next_consumer(State2, Effects0), %% mark any enquers as suspected @@ -431,9 +431,9 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that %% any processes on this node are down, they _may_ come back so here @@ -469,18 +469,18 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects = case maps:size(State#?MODULE.consumers) of + Effects = case maps:size(State#?STATE.consumers) of 0 -> [{aux, inactive}, {monitor, node, Node}]; _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State0, State#?MODULE{enqueuers = Enqs, + checkout(Meta, State0, State#?STATE{enqueuers = Enqs, last_active = Ts}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> {State, Effects} = handle_down(Meta, Pid, State0), checkout(Meta, State0, State, Effects); -apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, +apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, enqueuers = Enqs0, service_queue = _SQ0} = State0) -> %% A node we are monitoring has come back. @@ -509,7 +509,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {State0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State1, up), - State2 = State1#?MODULE{ + State2 = State1#?STATE{ enqueuers = Enqs1, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State2, Effects1), @@ -566,7 +566,7 @@ convert_v0_to_v1(V0State0) -> max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State) }, - #?MODULE{cfg = Cfg, + #?STATE{cfg = Cfg, messages = V1Msgs, next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State), returns = rabbit_fifo_v0:get_field(returns, V0State), @@ -591,7 +591,7 @@ purge_node(Meta, Node, State, Effects) -> end, {State, Effects}, all_pids_for(Node, State)). %% any downs that re not noconnection -handle_down(Meta, Pid, #?MODULE{consumers = Cons0, +handle_down(Meta, Pid, #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid @@ -599,7 +599,7 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0, {#enqueuer{pending = Pend}, Enqs} -> lists:foldl(fun ({_, RIdx, RawMsg}, S) -> enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); + end, State0#?STATE{enqueuers = Enqs}, Pend); error -> State0 end, @@ -612,25 +612,25 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0, cancel_consumer(Meta, ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) -> fun(_, _, _, _, _, Effects) -> Effects end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, @@ -642,11 +642,11 @@ handle_waiting_consumer_down(Pid, % update state to have only up waiting consumers StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), - State = State0#?MODULE{waiting_consumers = StillUp}, + State = State0#?STATE{waiting_consumers = StillUp}, {Effects, State}. update_waiting_consumer_status(Node, - #?MODULE{waiting_consumers = WaitingConsumers}, + #?STATE{waiting_consumers = WaitingConsumers}, Status) -> [begin case node(Pid) of @@ -659,7 +659,7 @@ update_waiting_consumer_status(Node, Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #?MODULE{consumers = Cons, +state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, @@ -682,7 +682,7 @@ state_enter(leader, #?MODULE{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(eol, #?MODULE{enqueuers = Enqs, +state_enter(eol, #?STATE{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), @@ -693,7 +693,7 @@ state_enter(eol, #?MODULE{enqueuers = Enqs, || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ [{aux, eol}, {mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> +state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, [FHReservation]; state_enter(_, _) -> @@ -702,7 +702,7 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(Ts, #?MODULE{cfg = #cfg{name = Name, +tick(Ts, #?STATE{cfg = #cfg{name = Name, resource = QName}, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> @@ -722,7 +722,7 @@ tick(Ts, #?MODULE{cfg = #cfg{name = Name, end. -spec overview(state()) -> map(). -overview(#?MODULE{consumers = Cons, +overview(#?STATE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, enqueue_count = EnqCount, @@ -743,7 +743,7 @@ overview(#?MODULE{consumers = Cons, delivery_limit => Cfg#cfg.delivery_limit }, Smallest = rabbit_fifo_index:smallest(Indexes), - #{type => ?MODULE, + #{type => ?STATE, config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -759,7 +759,7 @@ overview(#?MODULE{consumers = Cons, -spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> [delivery_msg()]. -get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> +get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> [{K, snd(snd(maps:get(K, Checked)))} @@ -773,7 +773,7 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> version() -> 1. which_module(0) -> rabbit_fifo_v0; -which_module(1) -> ?MODULE. +which_module(1) -> ?STATE. -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), @@ -812,7 +812,7 @@ handle_aux(_RaState, cast, eol, #aux{name = Name} = Aux, Log, _) -> ets:delete(rabbit_fifo_usage, Name), {no_reply, Aux, Log}; handle_aux(_RaState, {call, _From}, oldest_entry_timestamp, Aux, - Log, #?MODULE{ra_indexes = Indexes}) -> + Log, #?STATE{ra_indexes = Indexes}) -> Ts = case rabbit_fifo_index:smallest(Indexes) of %% if there are no entries, we return current timestamp %% so that any previously obtained entries are considered older than this @@ -839,7 +839,7 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, end. -eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, +eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> {Idx, _} = ra_log:last_index_term(Log), {memory, Mem} = erlang:process_info(self(), memory), @@ -856,7 +856,7 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, AuxState end. -force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}}, +force_eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}}, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> {Idx, _} = ra_log:last_index_term(Log), {memory, Mem} = erlang:process_info(self(), memory), @@ -877,7 +877,7 @@ force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}}, query_messages_ready(State) -> messages_ready(State). -query_messages_checked_out(#?MODULE{consumers = Consumers}) -> +query_messages_checked_out(#?STATE{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> maps:size(C) + S end, 0, Consumers). @@ -885,22 +885,22 @@ query_messages_checked_out(#?MODULE{consumers = Consumers}) -> query_messages_total(State) -> messages_total(State). -query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> +query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), maps:keys(maps:merge(Enqs, Cons)). -query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> +query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#?MODULE{consumers = Consumers, +query_consumer_count(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) -> Status =/= suspected_down end, Consumers), maps:size(Up) + length(WaitingConsumers). -query_consumers(#?MODULE{consumers = Consumers, +query_consumers(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = @@ -961,7 +961,7 @@ query_consumers(#?MODULE{consumers = Consumers, maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, +query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> case maps:size(Consumers) of 0 -> @@ -975,10 +975,10 @@ query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_acti query_single_active_consumer(_) -> disabled. -query_stat(#?MODULE{consumers = Consumers} = State) -> +query_stat(#?STATE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. -query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, +query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) -> {Length, Bytes}. @@ -993,7 +993,7 @@ query_peek(Pos, State0) when Pos > 0 -> query_peek(Pos-1, State) end. -query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) -> +query_notify_decorators_info(#?STATE{consumers = Consumers} = State) -> MaxActivePriority = maps:fold(fun(_, #consumer{credit = C, status = up, priority = P0}, MaxP) when C > 0 -> @@ -1018,14 +1018,14 @@ usage(Name) when is_atom(Name) -> %%% Internal -messages_ready(#?MODULE{messages = M, +messages_ready(#?STATE{messages = M, prefix_msgs = {RCnt, _R, PCnt, _P}, returns = R}) -> %% prefix messages will rarely have anything in them during normal %% operations so length/1 is fine here lqueue:len(M) + lqueue:len(R) + RCnt + PCnt. -messages_total(#?MODULE{ra_indexes = I, +messages_total(#?STATE{ra_indexes = I, prefix_msgs = {RCnt, _R, PCnt, _P}}) -> rabbit_fifo_index:size(I) + RCnt + PCnt. @@ -1059,23 +1059,23 @@ moving_average(Time, HalfLife, Next, Current) -> Weight = math:exp(Time * math:log(0.5) / HalfLife), Next * (1 - Weight) + Current * Weight. -num_checked_out(#?MODULE{consumers = Cons}) -> +num_checked_out(#?STATE{consumers = Cons}) -> maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> maps:size(C) + Acc end, 0, Cons). cancel_consumer(Meta, ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State, Effects, Reason) -> cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); cancel_consumer(Meta, ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); cancel_consumer(Meta, ConsumerId, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0} = State0, Effects0, Reason) -> @@ -1093,10 +1093,10 @@ cancel_consumer(Meta, ConsumerId, Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = Waiting}, Effects} + {State0#?STATE{waiting_consumers = Waiting}, Effects} end. -consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, +consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}}, ConsumerId, #consumer{meta = Meta}, Active, ActivityStatus, Effects) -> @@ -1108,7 +1108,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, | Effects]. cancel_consumer0(Meta, ConsumerId, - #?MODULE{consumers = C0} = S0, Effects0, Reason) -> + #?STATE{consumers = C0} = S0, Effects0, Reason) -> case C0 of #{ConsumerId := Consumer} -> {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, @@ -1120,7 +1120,7 @@ cancel_consumer0(Meta, ConsumerId, %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), - case maps:size(S#?MODULE.consumers) of + case maps:size(S#?STATE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; _ -> @@ -1131,7 +1131,7 @@ cancel_consumer0(Meta, ConsumerId, {S0, Effects0} end. -activate_next_consumer(#?MODULE{consumers = Cons, +activate_next_consumer(#?STATE{consumers = Cons, waiting_consumers = Waiting0} = State0, Effects0) -> case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of @@ -1143,11 +1143,11 @@ activate_next_consumer(#?MODULE{consumers = Cons, [{NextConsumerId, NextConsumer} | _] -> %% there is a potential next active consumer Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), - #?MODULE{service_queue = ServiceQueue} = State0, + #?STATE{service_queue = ServiceQueue} = State0, ServiceQueue1 = maybe_queue_consumer(NextConsumerId, NextConsumer, ServiceQueue), - State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer}, service_queue = ServiceQueue1, waiting_consumers = Remaining}, Effects = consumer_update_active_effects(State, NextConsumerId, @@ -1173,7 +1173,7 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0 S0), Effects0}; down -> {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer), - {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers), + {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers), last_active = Ts}, Effects1} end. @@ -1188,12 +1188,12 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> {State, ok, Effects} end. -drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> +drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}), State = case Msg of 'empty' -> State2; _ -> subtract_in_memory_counts(Header, State2) @@ -1211,7 +1211,7 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> {State0, Effects0} end. -enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, +enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages, next_msg_num = NextMsgNum} = State0) -> %% the initial header is an integer only - it will get expanded to a map %% when the next required key is added @@ -1227,17 +1227,17 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, State = add_bytes_enqueue(Header, State1), - State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages), + State#?STATE{messages = lqueue:in({NextMsgNum, Msg}, Messages), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, - #?MODULE{ra_indexes = Indexes0} = State0) -> + #?STATE{ra_indexes = Indexes0} = State0) -> State = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - State#?MODULE{ra_indexes = Indexes}. + State#?STATE{ra_indexes = Indexes}. -incr_enqueue_count(#?MODULE{enqueue_count = EC, +incr_enqueue_count(#?STATE{enqueue_count = EC, cfg = #cfg{release_cursor_interval = {_Base, C}} } = State0) when EC >= C-> %% this will trigger a dehydrated version of the state to be stored @@ -1245,12 +1245,12 @@ incr_enqueue_count(#?MODULE{enqueue_count = EC, %% Q: Why don't we just stash the release cursor here? %% A: Because it needs to be the very last thing we do and we %% first needs to run the checkout logic. - State0#?MODULE{enqueue_count = 0}; -incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> - State#?MODULE{enqueue_count = C + 1}. + State0#?STATE{enqueue_count = 0}; +incr_enqueue_count(#?STATE{enqueue_count = C} = State) -> + State#?STATE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = + #?STATE{cfg = #cfg{release_cursor_interval = {Base, _}} = Cfg, ra_indexes = Indexes, @@ -1267,12 +1267,12 @@ maybe_store_dehydrated_state(RaftIdx, Total = messages_total(State0), min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX) end, - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {Base, Interval}}}, Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - State#?MODULE{release_cursors = Cursors} + State#?STATE{release_cursors = Cursors} end; maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1284,18 +1284,18 @@ enqueue_pending(From, State = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> - State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. +enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) -> + State#?STATE{enqueuers = Enqueuers0#{From => Enq}}. maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #?MODULE{enqueuers = Enqueuers0} = State0) -> + #?STATE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> - State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, State1), {ok, State, [{monitor, process, From} | Effects]}; @@ -1311,7 +1311,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, % out of order delivery Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; + {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> % duplicate delivery - remove the raft index from the ra_indexes % map as it was added earlier @@ -1333,7 +1333,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, ConsumerId) end, {State0, Effects0}, Returned), State2 = - case State1#?MODULE.consumers of + case State1#?STATE.consumers of #{ConsumerId := Con0} -> Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, @@ -1347,7 +1347,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, % used to processes messages that are finished complete(Meta, ConsumerId, Discarded, #consumer{checked_out = Checked} = Con0, Effects, - #?MODULE{ra_indexes = Indexes0} = State0) -> + #?STATE{ra_indexes = Indexes0} = State0) -> %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit @@ -1365,7 +1365,7 @@ complete(Meta, ConsumerId, Discarded, ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) end, State1, maps:values(Discarded)), - {State2#?MODULE{ra_indexes = Indexes}, Effects}. + {State2#?STATE{ra_indexes = Indexes}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -1389,11 +1389,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = undefined}}, + #?STATE{cfg = #cfg{dead_letter_handler = undefined}}, Effects) -> Effects; dead_letter_effects(Reason, Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> RaftIdxs = maps:fold( fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> @@ -1417,7 +1417,7 @@ dead_letter_effects(Reason, Discarded, end} | Effects]. cancel_consumer_effects(ConsumerId, - #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) -> + #?STATE{cfg = #cfg{resource = QName}} = State, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]}, notify_decorators_effect(State) | Effects]. @@ -1426,7 +1426,7 @@ update_smallest_raft_index(Idx, State, Effects) -> update_smallest_raft_index(Idx, ok, State, Effects). update_smallest_raft_index(IncomingRaftIdx, Reply, - #?MODULE{cfg = Cfg, + #?STATE{cfg = Cfg, ra_indexes = Indexes, release_cursors = Cursors0} = State0, Effects) -> @@ -1438,7 +1438,7 @@ update_smallest_raft_index(IncomingRaftIdx, Reply, %% reset the release cursor interval #cfg{release_cursor_interval = {Base, _}} = Cfg, RCI = {Base, Base}, - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}, + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}, release_cursors = lqueue:new(), enqueue_count = 0}, {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; @@ -1446,11 +1446,11 @@ update_smallest_raft_index(IncomingRaftIdx, Reply, Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#?MODULE{release_cursors = Cursors}, Reply, Effects}; + {State0#?STATE{release_cursors = Cursors}, Reply, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor when we've passed the smallest %% release cursor available. - {State0#?MODULE{release_cursors = Cursors}, Reply, + {State0#?STATE{release_cursors = Cursors}, Reply, Effects ++ [Cursor]} end end. @@ -1475,7 +1475,7 @@ update_header(Key, UpdateFun, Default, Header) -> return_one(Meta, MsgId, 0, {Tag, Header0}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) @@ -1501,12 +1501,12 @@ return_one(Meta, MsgId, 0, {Tag, Header0}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in(Msg, Returns)}), Effects0} end; return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> @@ -1535,17 +1535,17 @@ return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, +return_all(Meta, #?STATE{consumers = Cons} = State0, Effects0, ConsumerId, #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + State = State0#?STATE{consumers = Cons#{ConsumerId => Con}}, lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> @@ -1558,7 +1558,7 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, checkout(Meta, OldState, State, Effects) -> checkout(Meta, OldState, State, Effects, true). -checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, +checkout(#{index := Index} = Meta, #?STATE{cfg = #cfg{resource = QName}} = OldState, State0, Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), Effects0, #{}), @@ -1608,12 +1608,12 @@ checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) -> {State0, ok, lists:reverse(Effects1)}. evaluate_limit(_Index, Result, _BeforeState, - #?MODULE{cfg = #cfg{max_length = undefined, + #?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; evaluate_limit(Index, Result, BeforeState, - #?MODULE{cfg = #cfg{overflow_strategy = Strategy}, + #?STATE{cfg = #cfg{overflow_strategy = Strategy}, enqueuers = Enqs0} = State0, Effects0) -> case is_over_limit(State0) of @@ -1633,7 +1633,7 @@ evaluate_limit(Index, Result, BeforeState, (_P, _E, Acc) -> Acc end, {Enqs0, Effects0}, Enqs0), - {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Result, Effects}; false when Strategy == reject_publish -> %% TODO: optimise as this case gets called for every command %% pretty much @@ -1651,7 +1651,7 @@ evaluate_limit(Index, Result, BeforeState, (_P, _E, Acc) -> Acc end, {Enqs0, Effects0}, Enqs0), - {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Result, Effects}; _ -> {State0, Result, Effects0} end; @@ -1660,13 +1660,13 @@ evaluate_limit(Index, Result, BeforeState, end. evaluate_memory_limit(_Header, - #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + #?STATE{cfg = #cfg{max_in_memory_length = undefined, max_in_memory_bytes = undefined}}) -> false; evaluate_memory_limit(#{size := Size}, State) -> evaluate_memory_limit(Size, State); evaluate_memory_limit(Size, - #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + #?STATE{cfg = #cfg{max_in_memory_length = MaxLength, max_in_memory_bytes = MaxBytes}, msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) @@ -1690,18 +1690,18 @@ append_delivery_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) -> +take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) -> %% conversion - take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}); -take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], + take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}}); +take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first - {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> + {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{returns = Returns, + State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{returns = Returns, messages = Messages0, prefix_msgs = {NumR, R, NumP, P}} = State) -> %% use peek rather than out there as the most likely case is an empty @@ -1709,13 +1709,13 @@ take_next_msg(#?MODULE{returns = Returns, case lqueue:peek(Returns) of {value, NextMsg} -> {NextMsg, - State#?MODULE{returns = lqueue:drop(Returns)}}; + State#?STATE{returns = lqueue:drop(Returns)}}; empty when P == [] -> case lqueue:out(Messages0) of {empty, _} -> empty; {{value, {_, _} = SeqMsg}, Messages} -> - {SeqMsg, State#?MODULE{messages = Messages }} + {SeqMsg, State#?STATE{messages = Messages }} end; empty -> [Msg | Rem] = P, @@ -1723,10 +1723,10 @@ take_next_msg(#?MODULE{returns = Returns, {Header, 'empty'} -> %% There are prefix msgs {{'$empty_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; Header -> {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}} + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}} end end. @@ -1757,7 +1757,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {dequeue, {MsgId, {Header, Msg}}, Ready}}}] end}. -checkout_one(Meta, #?MODULE{service_queue = SQ0, +checkout_one(Meta, #?STATE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case priority_queue:out(SQ0) of @@ -1772,11 +1772,11 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{status = cancelled} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{status = suspected_down} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1788,7 +1788,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, delivery_count = DelCnt + 1}, State1 = update_or_remove_sub(Meta, ConsumerId, Con, - State0#?MODULE{service_queue = SQ1}), + State0#?STATE{service_queue = SQ1}), {State, Msg} = case ConsumerMsg of {'$prefix_msg', Header} -> @@ -1814,7 +1814,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, end; {{value, _ConsumerId}, SQ1} -> %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?STATE{service_queue = SQ1}); {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; @@ -1824,31 +1824,31 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto, credit = 0} = Con, - #?MODULE{consumers = Cons} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}; + #?STATE{consumers = Cons} = State) -> + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)}; update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con, - #?MODULE{consumers = Cons, + #?STATE{consumers = Cons, service_queue = ServiceQueue} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons), service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}; update_or_remove_sub(#{system_time := Ts}, ConsumerId, #consumer{lifetime = once, checked_out = Checked, credit = 0} = Con, - #?MODULE{consumers = Cons} = State) -> + #?STATE{consumers = Cons} = State) -> case maps:size(Checked) of 0 -> % we're done with this consumer - State#?MODULE{consumers = maps:remove(ConsumerId, Cons), + State#?STATE{consumers = maps:remove(ConsumerId, Cons), last_active = Ts}; _ -> % there are unsettled items so need to keep around - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)} + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons)} end; update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con, - #?MODULE{consumers = Cons, + #?STATE{consumers = Cons, service_queue = ServiceQueue} = State) -> - State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + State#?STATE{consumers = maps:put(ConsumerId, Con, Cons), service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}. uniq_queue_in(Key, #consumer{priority = P}, Queue) -> @@ -1862,17 +1862,17 @@ uniq_queue_in(Key, #consumer{priority = P}, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, Priority, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, Priority, State0); update_consumer(ConsumerId, Meta, Spec, Priority, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, Priority, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list @@ -1880,10 +1880,10 @@ update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority, priority = Priority, credit = Credit, credit_mode = Mode}, WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], - State0#?MODULE{waiting_consumers = WaitingConsumers1}. + State0#?STATE{waiting_consumers = WaitingConsumers1}. update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, @@ -1899,7 +1899,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. + State0#?STATE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con, ServiceQueue0) -> @@ -1913,7 +1913,7 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#?MODULE{messages = Messages, +dehydrate_state(#?STATE{messages = Messages, consumers = Consumers, returns = Returns, prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, @@ -1937,7 +1937,7 @@ dehydrate_state(#?MODULE{messages = Messages, %% recovering from a snapshot PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], - State#?MODULE{messages = lqueue:new(), + State#?STATE{messages = lqueue:new(), ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), consumers = maps:map(fun (_, C) -> @@ -1973,23 +1973,23 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Con#consumer{checked_out = Checked}. %% make the state suitable for equality comparison -normalize(#?MODULE{messages = Messages, +normalize(#?STATE{messages = Messages, release_cursors = Cursors} = State) -> - State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)), + State#?STATE{messages = lqueue:from_list(lqueue:to_list(Messages)), release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. -is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, +is_over_limit(#?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> false; -is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, +is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). -is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined, +is_below_soft_limit(#?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> false; -is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, +is_below_soft_limit(#?STATE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> is_below(MaxLength, messages_ready(State)) andalso @@ -2049,58 +2049,58 @@ make_update_config(Config) -> #update_config{config = Config}. add_bytes_enqueue(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_enqueue(#{size := Bytes}, State) -> add_bytes_enqueue(Bytes, State). add_bytes_drop(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_drop(#{size := Bytes}, State) -> add_bytes_drop(Bytes, State). add_bytes_checkout(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout + Bytes, + State#?STATE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_checkout(#{size := Bytes}, State) -> add_bytes_checkout(Bytes, State). add_bytes_settle(Bytes, - #?MODULE{msg_bytes_checkout = Checkout} = State) + #?STATE{msg_bytes_checkout = Checkout} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; + State#?STATE{msg_bytes_checkout = Checkout - Bytes}; add_bytes_settle(#{size := Bytes}, State) -> add_bytes_settle(Bytes, State). add_bytes_return(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes, + State#?STATE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_return(#{size := Bytes}, State) -> add_bytes_return(Bytes, State). add_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes, msgs_ready_in_memory = InMemoryCount + 1}; add_in_memory_counts(#{size := Bytes}, State) -> add_in_memory_counts(Bytes, State). subtract_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes, msgs_ready_in_memory = InMemoryCount - 1}; subtract_in_memory_counts(#{size := Bytes}, State) -> subtract_in_memory_counts(Bytes, State). @@ -2124,7 +2124,7 @@ get_size_from_header(#{size := B}) -> B. -all_nodes(#?MODULE{consumers = Cons0, +all_nodes(#?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Nodes0 = maps:fold(fun({_, P}, _, Acc) -> @@ -2138,7 +2138,7 @@ all_nodes(#?MODULE{consumers = Cons0, Acc#{node(P) => ok} end, Nodes1, WaitingConsumers0)). -all_pids_for(Node, #?MODULE{consumers = Cons0, +all_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, _, Acc) @@ -2157,7 +2157,7 @@ all_pids_for(Node, #?MODULE{consumers = Cons0, (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -suspected_pids_for(Node, #?MODULE{consumers = Cons0, +suspected_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) @@ -2177,7 +2177,7 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0, (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires}, +is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires}, last_active = LastActive, consumers = Consumers}) when is_number(LastActive) andalso is_number(Expires) -> @@ -2206,7 +2206,7 @@ maybe_notify_decorators(_, false) -> maybe_notify_decorators(State, _) -> {true, query_notify_decorators_info(State)}. -notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) -> +notify_decorators_effect(#?STATE{cfg = #cfg{resource = QName}} = State) -> {MaxActivePriority, IsEmpty} = query_notify_decorators_info(State), notify_decorators_effect(QName, MaxActivePriority, IsEmpty). @@ -2215,11 +2215,11 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) -> [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}. get_field(Field, State) -> - Fields = record_info(fields, ?MODULE), + Fields = record_info(fields, ?STATE), Index = record_index_of(Field, Fields), element(Index, State). -get_cfg_field(Field, #?MODULE{cfg = Cfg} ) -> +get_cfg_field(Field, #?STATE{cfg = Cfg} ) -> Fields = record_info(fields, cfg), Index = record_index_of(Field, Fields), element(Index, Cfg). diff --git a/deps/rabbit/src/rabbit_fifo_v1.hrl b/deps/rabbit/src/rabbit_fifo_v1.hrl index 3df9883445..4a427f8fed 100644 --- a/deps/rabbit/src/rabbit_fifo_v1.hrl +++ b/deps/rabbit/src/rabbit_fifo_v1.hrl @@ -76,6 +76,7 @@ -define(MB, 1048576). -define(LOW_LIMIT, 0.8). +-define(STATE, rabbit_fifo). -record(consumer, {meta = #{} :: consumer_meta(), @@ -142,7 +143,7 @@ {non_neg_integer(), list(), non_neg_integer(), list()}. --record(rabbit_fifo_v1, +-record(?STATE, {cfg :: #cfg{}, % unassigned messages messages = lqueue:new() :: lqueue:lqueue({msg_in_id(), indexed_msg()}), @@ -164,7 +165,7 @@ % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, - ra:index(), #rabbit_fifo_v1{}}), + ra:index(), #?STATE{}}), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot consumers = #{} :: #{consumer_id() => #consumer{}}, diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index 22cfc0d719..5aefc4debc 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -1164,16 +1164,12 @@ messages_total_prop(Conf0, Commands) -> messages_total_invariant() -> fun(#rabbit_fifo{messages = M, consumers = C, - enqueuers = E, prefix_msgs = {PTot, _, RTot, _}, returns = R} = S) -> Base = lqueue:len(M) + lqueue:len(R) + PTot + RTot, - CTot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) -> + Tot = maps:fold(fun (_, #consumer{checked_out = Ch}, Acc) -> Acc + map_size(Ch) - end, Base, C), - Tot = maps:fold(fun (_, #enqueuer{pending = P}, Acc) -> - Acc + length(P) - end, CTot, E), + end, Base, C), QTot = rabbit_fifo:query_messages_total(S), case Tot == QTot of true -> true; @@ -1355,10 +1351,10 @@ nodeup_gen(Nodes) -> enqueue_gen(Pid) -> enqueue_gen(Pid, 10, 1). -enqueue_gen(Pid, Enq, Del) -> - ?LET(E, {enqueue, Pid, - frequency([{Enq, enqueue}, - {Del, delay}]), +enqueue_gen(Pid, _Enq, _Del) -> + ?LET(E, {enqueue, Pid, enqueue, + % frequency([{Enq, enqueue}, + % {Del, delay}]), binary()}, E). checkout_cancel_gen(Pid) -> |