diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-06-30 15:40:31 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
| commit | cb2b424fa947b0d885ccf7d21a20f9d240cccb1f (patch) | |
| tree | 225b1b499ee1f757e5a0baff83142f109aa51059 | |
| parent | bd3827b0cf5f2199268a5b579f27483a9c581ab2 (diff) | |
| download | rabbitmq-server-git-cb2b424fa947b0d885ccf7d21a20f9d240cccb1f.tar.gz | |
rabbit_fifo: Fixes for move to lqueue
Also rename v0 state back to rabbit_fifo else it could never work with
old state!
| -rw-r--r-- | src/rabbit_fifo.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.erl | 342 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.hrl | 6 | ||||
| -rw-r--r-- | test/rabbit_fifo_v0_SUITE.erl | 181 |
4 files changed, 280 insertions, 270 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 13c532af47..4982f008c2 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -310,7 +310,7 @@ apply(#{index := RaftIdx}, #purge{}, messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, - [I || {I, _} <- lists:sort(lqueue:to_list(Messages))]), + [I || {_, {I, _}} <- lqueue:to_list(Messages)]), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = @@ -320,7 +320,6 @@ apply(#{index := RaftIdx}, #purge{}, returns = lqueue:new(), msg_bytes_enqueue = 0, prefix_msgs = {0, [], 0, []}, - % low_msg_num = undefined, msg_bytes_in_memory = 0, msgs_ready_in_memory = 0}, []), @@ -469,8 +468,9 @@ apply(_Meta, {machine_version, 0, 1}, V0State0) -> V0State = rabbit_fifo_v0:normalize_for_v1(V0State0), %% quick hack to "convert" the state from version one State = setelement(1, V0State, ?MODULE), - V0Msgs = rabbit_fifo_v0:messages_map(V0State), + V0Msgs = rabbit_fifo_v0:get_field(messages, V0State), V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))), + %% TODOD each release cursor needs converting too! {State#?MODULE{messages = V1Msgs}, ok, []}. purge_node(Node, State, Effects) -> @@ -629,7 +629,7 @@ overview(#?MODULE{consumers = Cons, num_ready_messages => messages_ready(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), - release_crusor_enqueue_counter => EnqCount, + release_cursor_enqueue_counter => EnqCount, enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -1244,7 +1244,7 @@ update_smallest_raft_index(IncomingRaftIdx, {State0#?MODULE{release_cursors = Cursors}, ok, Effects}; {Cursor, Cursors} -> - %% we can emit a release cursor we've passed the smallest + %% we can emit a release cursor when we've passed the smallest %% release cursor available. {State0#?MODULE{release_cursors = Cursors}, ok, Effects ++ [Cursor]} @@ -1680,7 +1680,6 @@ dehydrate_state(#?MODULE{messages = Messages, State#?MODULE{messages = lqueue:new(), ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), - % low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) end, Consumers), @@ -1693,9 +1692,9 @@ dehydrate_state(#?MODULE{messages = Messages, dehydrate_messages(Msgs0, Acc0) -> {OutRes, Msgs} = lqueue:out(Msgs0), case OutRes of - {value, {_, 'empty'} = Msg} -> + {value, {_MsgId, {_RaftId, {_, 'empty'} = Msg}}} -> dehydrate_messages(Msgs, [Msg | Acc0]); - {value, {Header, _}} -> + {value, {_MsgId, {_RaftId, {Header, _}}}} -> dehydrate_messages(Msgs, [Header | Acc0]); empty -> lists:reverse(Acc0) @@ -1714,8 +1713,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Con#consumer{checked_out = Checked}. %% make the state suitable for equality comparison -normalize(#?MODULE{release_cursors = Cursors} = State) -> - State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. +normalize(#?MODULE{messages = Messages, + release_cursors = Cursors} = State) -> + State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)), + release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 52706aec1f..ad261317c9 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -55,7 +55,7 @@ normalize/1, normalize_for_v1/1, %% getters for coversions - messages_map/1, + get_field/2, %% protocol helpers make_enqueue/3, @@ -107,7 +107,7 @@ -type client_msg() :: delivery(). %% the messages `rabbit_fifo' can send to consumers. --opaque state() :: #?MODULE{}. +-opaque state() :: #?STATE{}. -export_type([protocol/0, delivery/0, @@ -128,7 +128,7 @@ init(#{name := Name, queue_resource := Resource} = Conf) -> rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]), - update_config(Conf, #?MODULE{cfg = #cfg{name = Name, + update_config(Conf, #?STATE{cfg = #cfg{name = Name, resource = Resource}}). update_config(Conf, State) -> @@ -146,8 +146,8 @@ update_config(Conf, State) -> false -> competing end, - Cfg = State#?MODULE.cfg, - SHICur = case State#?MODULE.cfg of + Cfg = State#?STATE.cfg, + SHICur = case State#?STATE.cfg of #cfg{release_cursor_interval = {_, C}} -> C; #cfg{release_cursor_interval = undefined} -> @@ -156,7 +156,7 @@ update_config(Conf, State) -> C end, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, + State#?STATE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, dead_letter_handler = DLH, become_leader_handler = BLH, max_length = MaxLength, @@ -179,7 +179,7 @@ apply(Metadata, #enqueue{pid = From, seq = Seq, apply_enqueue(Metadata, From, Seq, RawMsg, State00); apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot @@ -191,7 +191,7 @@ apply(Meta, end; apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State0) -> + #?STATE{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), @@ -202,7 +202,7 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, {State0, ok} end; apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0} = State) -> + #?STATE{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), @@ -212,7 +212,7 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0, waiting_consumers = Waiting0} = State0) -> case Cons0 of @@ -225,7 +225,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, + checkout(Meta, State0#?STATE{service_queue = ServiceQueue, consumers = Cons}, []), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value @@ -236,16 +236,16 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = - maps:get(ConsumerId, State1#?MODULE.consumers), + maps:get(ConsumerId, State1#?STATE.consumers), %% add the outstanding credit to the delivery count DeliveryCount = Con#consumer.delivery_count + PostCred, Consumers = maps:put(ConsumerId, Con#consumer{delivery_count = DeliveryCount, credit = 0}, - State1#?MODULE.consumers), + State1#?STATE.consumers), Drained = Con#consumer.credit, {CTag, _} = ConsumerId, - {State1#?MODULE{consumers = Consumers}, + {State1#?STATE{consumers = Consumers}, %% returning a multi response with two client actions %% for the channel to execute {multi, [Response, {send_drained, {CTag, Drained}}]}, @@ -259,7 +259,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% grant the credit C = max(0, RemoteDelCnt + NewCredit - DelCnt), Con = Con0#consumer{credit = C}, - State = State0#?MODULE{waiting_consumers = + State = State0#?STATE{waiting_consumers = [{ConsumerId, Con} | Waiting]}, {State, {send_credit_reply, messages_ready(State)}}; false -> @@ -270,12 +270,12 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State0, ok} end; apply(_, #checkout{spec = {dequeue, _}}, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, unsupported}}; apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, - #?MODULE{consumers = Consumers} = State0) -> + #?STATE{consumers = Consumers} = State0) -> Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of 0 -> @@ -317,7 +317,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, - #?MODULE{ra_indexes = Indexes0, + #?STATE{ra_indexes = Indexes0, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), @@ -327,7 +327,7 @@ apply(#{index := RaftIdx}, #purge{}, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = update_smallest_raft_index(RaftIdx, - State0#?MODULE{ra_indexes = Indexes, + State0#?STATE{ra_indexes = Indexes, messages = #{}, returns = lqueue:new(), msg_bytes_enqueue = 0, @@ -341,7 +341,7 @@ apply(#{index := RaftIdx}, #purge{}, {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; apply(Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, enqueuers = Enqs0} = State0) -> @@ -361,13 +361,13 @@ apply(Meta, {down, Pid, noconnection}, Cid, C0#consumer{credit = Credit}), %% if the consumer was cancelled there is a chance it got %% removed when returning hence we need to be defensive here - Waiting = case St#?MODULE.consumers of + Waiting = case St#?STATE.consumers of #{Cid := C} -> Waiting0 ++ [{Cid, C}]; _ -> Waiting0 end, - {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers), waiting_consumers = Waiting}, Effs1}; (_, _, S) -> @@ -377,7 +377,7 @@ apply(Meta, {down, Pid, noconnection}, suspected_down), %% select a new consumer from the waiting queue and run a checkout - State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + State2 = State1#?STATE{waiting_consumers = WaitingConsumers}, {State, Effects1} = activate_next_consumer(State2, Effects0), %% mark any enquers as suspected @@ -386,9 +386,9 @@ apply(Meta, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, noconnection}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that %% any processes on this node are down, they _may_ come back so here @@ -423,17 +423,17 @@ apply(Meta, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects = case maps:size(State#?MODULE.consumers) of + Effects = case maps:size(State#?STATE.consumers) of 0 -> [{aux, inactive}, {monitor, node, Node}]; _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> {State, Effects} = handle_down(Pid, State0), checkout(Meta, State, Effects); -apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, +apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. @@ -462,7 +462,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, + State1 = State0#?STATE{consumers = Cons1, enqueuers = Enqs1, service_queue = SQ, waiting_consumers = Waiting}, @@ -485,7 +485,7 @@ purge_node(Node, State, Effects) -> end, {State, Effects}, all_pids_for(Node, State)). %% any downs that re not noconnection -handle_down(Pid, #?MODULE{consumers = Cons0, +handle_down(Pid, #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid @@ -493,7 +493,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0, {#enqueuer{pending = Pend}, Enqs} -> lists:foldl(fun ({_, RIdx, RawMsg}, S) -> enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); + end, State0#?STATE{enqueuers = Enqs}, Pend); error -> State0 end, @@ -506,25 +506,25 @@ handle_down(Pid, #?MODULE{consumers = Cons0, cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) -> +consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) -> fun(_, _, _, _, _, Effects) -> Effects end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, @@ -536,11 +536,11 @@ handle_waiting_consumer_down(Pid, % update state to have only up waiting consumers StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), - State = State0#?MODULE{waiting_consumers = StillUp}, + State = State0#?STATE{waiting_consumers = StillUp}, {Effects, State}. update_waiting_consumer_status(Node, - #?MODULE{waiting_consumers = WaitingConsumers}, + #?STATE{waiting_consumers = WaitingConsumers}, Status) -> [begin case node(Pid) of @@ -553,7 +553,7 @@ update_waiting_consumer_status(Node, Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #?MODULE{consumers = Cons, +state_enter(leader, #?STATE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, @@ -576,7 +576,7 @@ state_enter(leader, #?MODULE{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(eol, #?MODULE{enqueuers = Enqs, +state_enter(eol, #?STATE{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), @@ -586,7 +586,7 @@ state_enter(eol, #?MODULE{enqueuers = Enqs, [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; -state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> +state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader -> FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}, [FHReservation]; state_enter(_, _) -> @@ -595,7 +595,7 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(_Ts, #?MODULE{cfg = #cfg{name = Name, +tick(_Ts, #?STATE{cfg = #cfg{name = Name, resource = QName}, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> @@ -610,7 +610,7 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, handle_tick, [QName, Metrics, all_nodes(State)]}]. -spec overview(state()) -> map(). -overview(#?MODULE{consumers = Cons, +overview(#?STATE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, enqueue_count = EnqCount, @@ -640,7 +640,7 @@ overview(#?MODULE{consumers = Cons, -spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> [delivery_msg()]. -get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> +get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> [{K, snd(snd(maps:get(K, Checked)))} @@ -679,7 +679,7 @@ handle_aux(_RaState, cast, Cmd, #aux{name = Name, end, {no_reply, State, Log}. -eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, +eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState, #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> {Idx, _} = ra_log:last_index_term(Log), {memory, Mem} = erlang:process_info(self(), memory), @@ -701,7 +701,7 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, query_messages_ready(State) -> messages_ready(State). -query_messages_checked_out(#?MODULE{consumers = Consumers}) -> +query_messages_checked_out(#?STATE{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> maps:size(C) + S end, 0, Consumers). @@ -709,19 +709,19 @@ query_messages_checked_out(#?MODULE{consumers = Consumers}) -> query_messages_total(State) -> messages_total(State). -query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> +query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), maps:keys(maps:merge(Enqs, Cons)). -query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> +query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#?MODULE{consumers = Consumers, +query_consumer_count(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). -query_consumers(#?MODULE{consumers = Consumers, +query_consumers(#?STATE{consumers = Consumers, waiting_consumers = WaitingConsumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = @@ -781,7 +781,7 @@ query_consumers(#?MODULE{consumers = Consumers, end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, +query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> case maps:size(Consumers) of 0 -> @@ -795,10 +795,10 @@ query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_acti query_single_active_consumer(_) -> disabled. -query_stat(#?MODULE{consumers = Consumers} = State) -> +query_stat(#?STATE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. -query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, +query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) -> {Length, Bytes}. @@ -811,7 +811,7 @@ usage(Name) when is_atom(Name) -> %%% Internal -messages_ready(#?MODULE{messages = M, +messages_ready(#?STATE{messages = M, prefix_msgs = {RCnt, _R, PCnt, _P}, returns = R}) -> @@ -819,7 +819,7 @@ messages_ready(#?MODULE{messages = M, %% operations so length/1 is fine here maps:size(M) + lqueue:len(R) + RCnt + PCnt. -messages_total(#?MODULE{ra_indexes = I, +messages_total(#?STATE{ra_indexes = I, prefix_msgs = {RCnt, _R, PCnt, _P}}) -> rabbit_fifo_index:size(I) + RCnt + PCnt. @@ -851,23 +851,23 @@ moving_average(Time, HalfLife, Next, Current) -> Weight = math:exp(Time * math:log(0.5) / HalfLife), Next * (1 - Weight) + Current * Weight. -num_checked_out(#?MODULE{consumers = Cons}) -> +num_checked_out(#?STATE{consumers = Cons}) -> maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> maps:size(C) + Acc end, 0, Cons). cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State, Effects, Reason) -> cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0} = State0, Effects0, Reason) -> @@ -885,10 +885,10 @@ cancel_consumer(ConsumerId, Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = Waiting}, Effects} + {State0#?STATE{waiting_consumers = Waiting}, Effects} end. -consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, +consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}}, ConsumerId, #consumer{meta = Meta}, Active, ActivityStatus, Effects) -> @@ -899,7 +899,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> +cancel_consumer0(ConsumerId, #?STATE{consumers = C0} = S0, Effects0, Reason) -> case C0 of #{ConsumerId := Consumer} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, @@ -909,7 +909,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> %% in line with what classic queues do (from an external point of %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), - case maps:size(S#?MODULE.consumers) of + case maps:size(S#?STATE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; _ -> @@ -920,7 +920,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. -activate_next_consumer(#?MODULE{consumers = Cons, +activate_next_consumer(#?STATE{consumers = Cons, waiting_consumers = Waiting0} = State0, Effects0) -> case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of @@ -932,11 +932,11 @@ activate_next_consumer(#?MODULE{consumers = Cons, [{NextConsumerId, NextConsumer} | _] -> %% there is a potential next active consumer Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), - #?MODULE{service_queue = ServiceQueue} = State0, + #?STATE{service_queue = ServiceQueue} = State0, ServiceQueue1 = maybe_queue_consumer(NextConsumerId, NextConsumer, ServiceQueue), - State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer}, service_queue = ServiceQueue1, waiting_consumers = Remaining}, Effects = consumer_update_active_effects(State, NextConsumerId, @@ -953,7 +953,7 @@ activate_next_consumer(#?MODULE{consumers = Cons, maybe_return_all(ConsumerId, Consumer, - #?MODULE{consumers = C0, + #?STATE{consumers = C0, service_queue = SQ0} = S0, Effects0, Reason) -> case Reason of @@ -964,11 +964,11 @@ maybe_return_all(ConsumerId, Consumer, credit = 0, status = cancelled}, C0, SQ0, Effects0), - {S0#?MODULE{consumers = Cons, + {S0#?STATE{consumers = Cons, service_queue = SQ}, Effects1}; down -> {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), - {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers)}, Effects1} end. @@ -982,12 +982,12 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> {State, ok, Effects} end. -drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> +drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}), State = case Msg of 'empty' -> State2; _ -> subtract_in_memory_counts(Header, State2) @@ -1005,7 +1005,7 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> {State0, Effects0} end. -enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, +enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> %% the initial header is an integer only - it will get expanded to a map @@ -1021,20 +1021,20 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, State = add_bytes_enqueue(Header, State1), - State#?MODULE{messages = Messages#{NextMsgNum => Msg}, + State#?STATE{messages = Messages#{NextMsgNum => Msg}, %% this is probably only done to record it when low_msg_num %% is undefined low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, - #?MODULE{ra_indexes = Indexes0} = State0) -> + #?STATE{ra_indexes = Indexes0} = State0) -> State = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), - State#?MODULE{ra_indexes = Indexes}. + State#?STATE{ra_indexes = Indexes}. -incr_enqueue_count(#?MODULE{enqueue_count = C, +incr_enqueue_count(#?STATE{enqueue_count = C, cfg = #cfg{release_cursor_interval = {_Base, C}} } = State0) -> %% this will trigger a dehydrated version of the state to be stored @@ -1042,18 +1042,18 @@ incr_enqueue_count(#?MODULE{enqueue_count = C, %% Q: Why don't we just stash the release cursor here? %% A: Because it needs to be the very last thing we do and we %% first needs to run the checkout logic. - State0#?MODULE{enqueue_count = 0}; -incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} + State0#?STATE{enqueue_count = 0}; +incr_enqueue_count(#?STATE{cfg = #cfg{release_cursor_interval = C} = Cfg} = State0) when is_integer(C) -> %% conversion to new release cursor interval format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, incr_enqueue_count(State); -incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> - State#?MODULE{enqueue_count = C + 1}. +incr_enqueue_count(#?STATE{enqueue_count = C} = State) -> + State#?STATE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = + #?STATE{cfg = #cfg{release_cursor_interval = {Base, _}} = Cfg, ra_indexes = Indexes, @@ -1072,20 +1072,20 @@ maybe_store_dehydrated_state(RaftIdx, ?RELEASE_CURSOR_EVERY_MAX) end, State = convert_prefix_msgs( - State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {Base, Interval}}}), Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - State#?MODULE{release_cursors = Cursors} + State#?STATE{release_cursors = Cursors} end; maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = + #?STATE{cfg = #cfg{release_cursor_interval = C} = Cfg} = State0) when is_integer(C) -> %% convert to new format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, maybe_store_dehydrated_state(RaftIdx, State); maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1097,18 +1097,18 @@ enqueue_pending(From, State = enqueue(RaftIdx, RawMsg, State0), Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending}, enqueue_pending(From, Enq, State); -enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) -> - State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}. +enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) -> + State#?STATE{enqueuers = Enqueuers0#{From => Enq}}. maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking State = enqueue(RaftIdx, RawMsg, State0), {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, - #?MODULE{enqueuers = Enqueuers0} = State0) -> + #?STATE{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of undefined -> - State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, State1), {ok, State, [{monitor, process, From} | Effects]}; @@ -1124,7 +1124,7 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, % out of order delivery Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0], Enq = Enq0#enqueuer{pending = lists:sort(Pending)}, - {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; + {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0}; #enqueuer{next_seqno = Next} when MsgSeqNo =< Next -> % duplicate delivery - remove the raft index from the ra_indexes % map as it was added earlier @@ -1135,7 +1135,7 @@ snd(T) -> element(2, T). return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, - Effects0, #?MODULE{service_queue = SQ0} = State0) -> + Effects0, #?STATE{service_queue = SQ0} = State0) -> {State1, Effects1} = maps:fold( fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; @@ -1146,13 +1146,13 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, ConsumerId) end, {State0, Effects0}, Returned), {State2, Effects3} = - case State1#?MODULE.consumers of + case State1#?STATE.consumers of #{ConsumerId := Con0} = Cons0 -> Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects1), - {State1#?MODULE{consumers = Cons, + {State1#?STATE{consumers = Cons, service_queue = SQ}, Effects2}; _ -> {State1, Effects1} @@ -1163,7 +1163,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, % used to processes messages that are finished complete(ConsumerId, Discarded, #consumer{checked_out = Checked} = Con0, Effects0, - #?MODULE{consumers = Cons0, service_queue = SQ0, + #?STATE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], @@ -1183,7 +1183,7 @@ complete(ConsumerId, Discarded, ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) end, State0, maps:values(Discarded)), - {State1#?MODULE{consumers = Cons, + {State1#?STATE{consumers = Cons, ra_indexes = Indexes, service_queue = SQ}, Effects}. @@ -1209,11 +1209,11 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = undefined}}, + #?STATE{cfg = #cfg{dead_letter_handler = undefined}}, Effects) -> Effects; dead_letter_effects(Reason, Discarded, - #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> RaftIdxs = maps:fold( fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> @@ -1237,12 +1237,12 @@ dead_letter_effects(Reason, Discarded, end} | Effects]. cancel_consumer_effects(ConsumerId, - #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> + #?STATE{cfg = #cfg{resource = QName}}, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. update_smallest_raft_index(IncomingRaftIdx, - #?MODULE{ra_indexes = Indexes, + #?STATE{ra_indexes = Indexes, release_cursors = Cursors0} = State0, Effects) -> case rabbit_fifo_index:size(Indexes) of @@ -1250,18 +1250,18 @@ update_smallest_raft_index(IncomingRaftIdx, % there are no messages on queue anymore and no pending enqueues % we can forward release_cursor all the way until % the last received command, hooray - State = State0#?MODULE{release_cursors = lqueue:new()}, + State = State0#?STATE{release_cursors = lqueue:new()}, {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; _ -> Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#?MODULE{release_cursors = Cursors}, + {State0#?STATE{release_cursors = Cursors}, ok, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor we've passed the smallest %% release cursor available. - {State0#?MODULE{release_cursors = Cursors}, ok, + {State0#?STATE{release_cursors = Cursors}, ok, Effects ++ [Cursor]} end end. @@ -1286,7 +1286,7 @@ update_header(Key, UpdateFun, Default, Header) -> return_one(MsgId, 0, {Tag, Header0}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) @@ -1312,12 +1312,12 @@ return_one(MsgId, 0, {Tag, Header0}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in(Msg, Returns)}), Effects0} end; return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, - #?MODULE{returns = Returns, + #?STATE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> @@ -1346,17 +1346,17 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, end, {add_bytes_return( Header, - State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + State1#?STATE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, +return_all(#?STATE{consumers = Cons} = State0, Effects0, ConsumerId, #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + State = State0#?STATE{consumers = Cons#{ConsumerId => Con}}, lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> @@ -1403,7 +1403,7 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> {State0, ok, lists:reverse(Effects1)}. evaluate_limit(Result, - #?MODULE{cfg = #cfg{max_length = undefined, + #?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; @@ -1418,13 +1418,13 @@ evaluate_limit(Result, State00, Effects0) -> end. evaluate_memory_limit(_Header, - #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + #?STATE{cfg = #cfg{max_in_memory_length = undefined, max_in_memory_bytes = undefined}}) -> false; evaluate_memory_limit(#{size := Size}, State) -> evaluate_memory_limit(Size, State); evaluate_memory_limit(Size, - #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + #?STATE{cfg = #cfg{max_in_memory_length = MaxLength, max_in_memory_bytes = MaxBytes}, msg_bytes_in_memory = Bytes, msgs_ready_in_memory = Length}) @@ -1452,18 +1452,18 @@ append_log_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) -> +take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) -> %% conversion - take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}); -take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], + take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}}); +take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first - {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> + {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; -take_next_msg(#?MODULE{returns = Returns, + State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?STATE{returns = Returns, low_msg_num = Low0, messages = Messages0, prefix_msgs = {NumR, R, NumP, P}} = State) -> @@ -1472,7 +1472,7 @@ take_next_msg(#?MODULE{returns = Returns, case lqueue:peek(Returns) of {value, NextMsg} -> {NextMsg, - State#?MODULE{returns = lqueue:drop(Returns)}}; + State#?STATE{returns = lqueue:drop(Returns)}}; empty when P == [] -> case Low0 of undefined -> @@ -1482,11 +1482,11 @@ take_next_msg(#?MODULE{returns = Returns, case maps:size(Messages) of 0 -> {{Low0, Msg}, - State#?MODULE{messages = Messages, + State#?STATE{messages = Messages, low_msg_num = undefined}}; _ -> {{Low0, Msg}, - State#?MODULE{messages = Messages, + State#?STATE{messages = Messages, low_msg_num = Low0 + 1}} end end; @@ -1496,10 +1496,10 @@ take_next_msg(#?MODULE{returns = Returns, {Header, 'empty'} -> %% There are prefix msgs {{'$empty_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; Header -> {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}} + State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}} end end. @@ -1524,7 +1524,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {dequeue, {MsgId, {Header, Msg}}, Ready}}}] end}. -checkout_one(#?MODULE{service_queue = SQ0, +checkout_one(#?STATE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case queue:peek(SQ0) of @@ -1539,11 +1539,11 @@ checkout_one(#?MODULE{service_queue = SQ0, %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(InitState#?STATE{service_queue = SQ1}); {ok, #consumer{status = cancelled}} -> - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(InitState#?STATE{service_queue = SQ1}); {ok, #consumer{status = suspected_down}} -> - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(InitState#?STATE{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1556,7 +1556,7 @@ checkout_one(#?MODULE{service_queue = SQ0, {Cons, SQ, []} = % we expect no effects update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), - State1 = State0#?MODULE{service_queue = SQ, + State1 = State0#?STATE{service_queue = SQ, consumers = Cons}, {State, Msg} = case ConsumerMsg of @@ -1579,7 +1579,7 @@ checkout_one(#?MODULE{service_queue = SQ0, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse - checkout_one(InitState#?MODULE{service_queue = SQ1}) + checkout_one(InitState#?STATE{service_queue = SQ1}) end; empty -> {nochange, InitState} @@ -1629,27 +1629,27 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> + #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, + #?STATE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], - State0#?MODULE{waiting_consumers = WaitingConsumers1}. + State0#?STATE{waiting_consumers = WaitingConsumers1}. update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, - #?MODULE{consumers = Cons0, + #?STATE{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, @@ -1664,7 +1664,7 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. + State0#?STATE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0) -> @@ -1676,14 +1676,14 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. -convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) -> - State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}; +convert_prefix_msgs(#?STATE{prefix_msgs = {R, P}} = State) -> + State#?STATE{prefix_msgs = {length(R), R, length(P), P}}; convert_prefix_msgs(State) -> State. %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#?MODULE{messages = Messages, +dehydrate_state(#?STATE{messages = Messages, consumers = Consumers, returns = Returns, low_msg_num = Low, @@ -1709,7 +1709,7 @@ dehydrate_state(#?MODULE{messages = Messages, %% recovering from a snapshot PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], - State#?MODULE{messages = #{}, + State#?STATE{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), low_msg_num = undefined, @@ -1746,19 +1746,19 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Con#consumer{checked_out = Checked}. %% make the state suitable for equality comparison -normalize(#?MODULE{release_cursors = Cursors} = State) -> - State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. +normalize(#?STATE{release_cursors = Cursors} = State) -> + State#?STATE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. -is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, +is_over_limit(#?STATE{cfg = #cfg{max_length = undefined, max_bytes = undefined}}) -> false; -is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, +is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). -normalize_for_v1(#?MODULE{cfg = Cfg} = State) -> +normalize_for_v1(#?STATE{cfg = Cfg} = State) -> %% run all v0 conversions so that v1 does not have to have this code RCI = case Cfg of #cfg{release_cursor_interval = {_, _} = R} -> @@ -1769,10 +1769,22 @@ normalize_for_v1(#?MODULE{cfg = Cfg} = State) -> {?RELEASE_CURSOR_EVERY, C} end, convert_prefix_msgs( - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}}). + State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}}). -messages_map(#?MODULE{messages = Messages}) -> - Messages. +get_field(Field, State) -> + Fields = record_info(fields, ?STATE), + Index = record_index_of(Field, Fields), + element(Index, State). + +record_index_of(F, Fields) -> + index_of(2, F, Fields). + +index_of(_, F, []) -> + exit({field_not_found, F}); +index_of(N, F, [F | _]) -> + N; +index_of(N, F, [_ | T]) -> + index_of(N+1, F, T). -spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> @@ -1815,58 +1827,58 @@ make_update_config(Config) -> #update_config{config = Config}. add_bytes_enqueue(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_enqueue(#{size := Bytes}, State) -> add_bytes_enqueue(Bytes, State). add_bytes_drop(Bytes, - #?MODULE{msg_bytes_enqueue = Enqueue} = State) + #?STATE{msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; + State#?STATE{msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_drop(#{size := Bytes}, State) -> add_bytes_drop(Bytes, State). add_bytes_checkout(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout + Bytes, + State#?STATE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}; add_bytes_checkout(#{size := Bytes}, State) -> add_bytes_checkout(Bytes, State). add_bytes_settle(Bytes, - #?MODULE{msg_bytes_checkout = Checkout} = State) + #?STATE{msg_bytes_checkout = Checkout} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; + State#?STATE{msg_bytes_checkout = Checkout - Bytes}; add_bytes_settle(#{size := Bytes}, State) -> add_bytes_settle(Bytes, State). add_bytes_return(Bytes, - #?MODULE{msg_bytes_checkout = Checkout, + #?STATE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes, + State#?STATE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}; add_bytes_return(#{size := Bytes}, State) -> add_bytes_return(Bytes, State). add_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes, msgs_ready_in_memory = InMemoryCount + 1}; add_in_memory_counts(#{size := Bytes}, State) -> add_in_memory_counts(Bytes, State). subtract_in_memory_counts(Bytes, - #?MODULE{msg_bytes_in_memory = InMemoryBytes, + #?STATE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) when is_integer(Bytes) -> - State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes, msgs_ready_in_memory = InMemoryCount - 1}; subtract_in_memory_counts(#{size := Bytes}, State) -> subtract_in_memory_counts(Bytes, State). @@ -1890,7 +1902,7 @@ get_size_from_header(#{size := B}) -> B. -all_nodes(#?MODULE{consumers = Cons0, +all_nodes(#?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Nodes0 = maps:fold(fun({_, P}, _, Acc) -> @@ -1904,7 +1916,7 @@ all_nodes(#?MODULE{consumers = Cons0, Acc#{node(P) => ok} end, Nodes1, WaitingConsumers0)). -all_pids_for(Node, #?MODULE{consumers = Cons0, +all_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, _, Acc) @@ -1923,7 +1935,7 @@ all_pids_for(Node, #?MODULE{consumers = Cons0, (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -suspected_pids_for(Node, #?MODULE{consumers = Cons0, +suspected_pids_for(Node, #?STATE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) diff --git a/src/rabbit_fifo_v0.hrl b/src/rabbit_fifo_v0.hrl index 26a988ee10..333ccb4d77 100644 --- a/src/rabbit_fifo_v0.hrl +++ b/src/rabbit_fifo_v0.hrl @@ -75,7 +75,7 @@ -define(GC_MEM_LIMIT_B, 2000000). -define(MB, 1048576). --define(RABBIT_FIFO, rabbit_fifo_v0). +-define(STATE, rabbit_fifo). -record(consumer, {meta = #{} :: consumer_meta(), @@ -130,7 +130,7 @@ {non_neg_integer(), list(), non_neg_integer(), list()}. --record(?RABBIT_FIFO, +-record(?STATE, {cfg :: #cfg{}, % unassigned messages messages = #{} :: #{msg_in_id() => indexed_msg()}, @@ -156,7 +156,7 @@ % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, - ra:index(), #?RABBIT_FIFO{}}), + ra:index(), #?STATE{}}), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot consumers = #{} :: #{consumer_id() => #consumer{}}, diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl index 6b84911d7f..fcb84377de 100644 --- a/test/rabbit_fifo_v0_SUITE.erl +++ b/test/rabbit_fifo_v0_SUITE.erl @@ -128,12 +128,12 @@ credit_with_drained_test(_) -> {State1, _, _} = apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}), State0), - ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 1, + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1, delivery_count = 0}}}, State1), {State, Result, _} = apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1), - ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0, + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), ?assertEqual({multi, [{send_credit_reply, 0}, @@ -154,7 +154,7 @@ credit_and_drain_test(_) -> {State4, {multi, [{send_credit_reply, 0}, {send_drained, {?FUNCTION_NAME, 2}}]}, Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3), - ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0, + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), @@ -290,9 +290,9 @@ return_test(_) -> {State2, _} = check_auto(Cid2, 3, State1), {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2), ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0, - State3#?RABBIT_FIFO.consumers), + State3#?STATE.consumers), ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1, - State3#?RABBIT_FIFO.consumers), + State3#?STATE.consumers), ok. return_dequeue_delivery_limit_test(_) -> @@ -350,7 +350,7 @@ return_checked_out_limit_test(_) -> {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _}, {aux, active}]} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), - {#?RABBIT_FIFO{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} = + {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2), ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), ok. @@ -380,8 +380,8 @@ cancelled_checkout_out_test(_) -> {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(1, maps:size(State2#?RABBIT_FIFO.messages)), - ?assertEqual(0, lqueue:len(State2#?RABBIT_FIFO.returns)), + ?assertEqual(1, maps:size(State2#?STATE.messages)), + ?assertEqual(0, lqueue:len(State2#?STATE.returns)), {State3, {dequeue, empty}} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2), @@ -410,14 +410,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> {State0, Effects0} = enq(1, 1, second, test_init(test)), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), {State1, Effects1} = check_auto(Cid, 2, State0), - #consumer{credit = 0} = maps:get(Cid, State1#?RABBIT_FIFO.consumers), + #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers), ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), #consumer{credit = 1, checked_out = Ch, - status = suspected_down} = maps:get(Cid, State2a#?RABBIT_FIFO.consumers), + status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers), ?assertEqual(#{}, Ch), %% validate consumer has credit {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), @@ -426,7 +426,7 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> % when the node comes up we need to retry the process monitors for the % disconnected processes {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), - #consumer{status = up} = maps:get(Cid, State3#?RABBIT_FIFO.consumers), + #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers), % try to re-monitor the suspect processes ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), @@ -436,18 +436,18 @@ down_with_noconnection_returns_unack_test(_) -> Pid = spawn(fun() -> ok end), Cid = {<<"down_with_noconnect">>, Pid}, {State0, _} = enq(1, 1, second, test_init(test)), - ?assertEqual(1, maps:size(State0#?RABBIT_FIFO.messages)), - ?assertEqual(0, lqueue:len(State0#?RABBIT_FIFO.returns)), + ?assertEqual(1, maps:size(State0#?STATE.messages)), + ?assertEqual(0, lqueue:len(State0#?STATE.returns)), {State1, {_, _}} = deq(2, Cid, unsettled, State0), - ?assertEqual(0, maps:size(State1#?RABBIT_FIFO.messages)), - ?assertEqual(0, lqueue:len(State1#?RABBIT_FIFO.returns)), + ?assertEqual(0, maps:size(State1#?STATE.messages)), + ?assertEqual(0, lqueue:len(State1#?STATE.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(0, maps:size(State2a#?RABBIT_FIFO.messages)), - ?assertEqual(1, lqueue:len(State2a#?RABBIT_FIFO.returns)), + ?assertEqual(0, maps:size(State2a#?STATE.messages)), + ?assertEqual(1, lqueue:len(State2a#?STATE.returns)), ?assertMatch(#consumer{checked_out = Ch, status = suspected_down} when map_size(Ch) == 0, - maps:get(Cid, State2a#?RABBIT_FIFO.consumers)), + maps:get(Cid, State2a#?STATE.consumers)), ok. down_with_noproc_enqueuer_is_cleaned_up_test(_) -> @@ -457,7 +457,7 @@ down_with_noproc_enqueuer_is_cleaned_up_test(_) -> ?ASSERT_EFF({monitor, process, _}, Effects0), {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), % ensure there are no enqueuers - ?assert(0 =:= maps:size(State1#?RABBIT_FIFO.enqueuers)), + ?assert(0 =:= maps:size(State1#?STATE.enqueuers)), ok. discarded_message_without_dead_letter_handler_is_removed_test(_) -> @@ -536,7 +536,7 @@ pending_enqueue_is_enqueued_on_down_test(_) -> duplicate_delivery_test(_) -> {State0, _} = enq(1, 1, first, test_init(test)), - {#?RABBIT_FIFO{ra_indexes = RaIdxs, + {#?STATE{ra_indexes = RaIdxs, messages = Messages}, _} = enq(2, 1, first, State0), ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), ?assertEqual(1, maps:size(Messages)), @@ -605,13 +605,13 @@ purge_with_checkout_test(_) -> {State1, _} = enq(2, 1, <<"first">>, State0), {State2, _} = enq(3, 2, <<"second">>, State1), %% assert message bytes are non zero - ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0), - ?assert(State2#?RABBIT_FIFO.msg_bytes_enqueue > 0), + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assert(State2#?STATE.msg_bytes_enqueue > 0), {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2), - ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0), - ?assertEqual(0, State3#?RABBIT_FIFO.msg_bytes_enqueue), - ?assertEqual(1, rabbit_fifo_index:size(State3#?RABBIT_FIFO.ra_indexes)), - #consumer{checked_out = Checked} = maps:get(Cid, State3#?RABBIT_FIFO.consumers), + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assertEqual(0, State3#?STATE.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)), + #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers), ?assertEqual(1, maps:size(Checked)), ok. @@ -622,16 +622,16 @@ down_noproc_returns_checked_out_in_order_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, 100)), - ?assertEqual(100, maps:size(S1#?RABBIT_FIFO.messages)), + ?assertEqual(100, maps:size(S1#?STATE.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), - #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), ?assertEqual(100, maps:size(Checked)), %% simulate down {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), - Returns = lqueue:to_list(S#?RABBIT_FIFO.returns), + Returns = lqueue:to_list(S#?STATE.returns), ?assertEqual(100, length(Returns)), - ?assertEqual(0, maps:size(S#?RABBIT_FIFO.consumers)), + ?assertEqual(0, maps:size(S#?STATE.consumers)), %% validate returns are in order ?assertEqual(lists:sort(Returns), Returns), ok. @@ -643,18 +643,18 @@ down_noconnection_returns_checked_out_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, NumMsgs)), - ?assertEqual(NumMsgs, maps:size(S1#?RABBIT_FIFO.messages)), + ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), - #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), ?assertEqual(NumMsgs, maps:size(Checked)), %% simulate down {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2), - Returns = lqueue:to_list(S#?RABBIT_FIFO.returns), + Returns = lqueue:to_list(S#?STATE.returns), ?assertEqual(NumMsgs, length(Returns)), ?assertMatch(#consumer{checked_out = Ch} when map_size(Ch) == 0, - maps:get(Cid, S#?RABBIT_FIFO.consumers)), + maps:get(Cid, S#?STATE.consumers)), %% validate returns are in order ?assertEqual(lists:sort(Returns), Returns), ok. @@ -666,8 +666,8 @@ single_active_consumer_basic_get_test(_) -> atom_to_binary(?FUNCTION_NAME, utf8)), release_cursor_interval => 0, single_active_consumer_on => true}), - ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy), - ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), {State1, _} = enq(1, 1, first, State0), {_State, {error, unsupported}} = apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), @@ -680,8 +680,8 @@ single_active_consumer_test(_) -> atom_to_binary(?FUNCTION_NAME, utf8)), release_cursor_interval => 0, single_active_consumer_on => true}), - ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy), - ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), % adding some consumers AddConsumer = fun(CTag, State) -> @@ -701,24 +701,24 @@ single_active_consumer_test(_) -> C4 = {<<"ctag4">>, self()}, % the first registered consumer is the active one, the others are waiting - ?assertEqual(1, map_size(State1#?RABBIT_FIFO.consumers)), - ?assertMatch(#{C1 := _}, State1#?RABBIT_FIFO.consumers), - ?assertEqual(3, length(State1#?RABBIT_FIFO.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?RABBIT_FIFO.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?RABBIT_FIFO.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(1, map_size(State1#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State1#?STATE.consumers), + ?assertEqual(3, length(State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)), % cancelling a waiting consumer {State2, _, Effects1} = apply(meta(2), make_checkout(C3, cancel, #{}), State1), % the active consumer should still be in place - ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)), - ?assertMatch(#{C1 := _}, State2#?RABBIT_FIFO.consumers), + ?assertEqual(1, map_size(State2#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State2#?STATE.consumers), % the cancelled consumer has been removed from waiting consumers - ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?RABBIT_FIFO.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)), % there are some effects to unregister the consumer ?ASSERT_EFF({mod_call, rabbit_quorum_queue, cancel_consumer_handler, [_, C]}, C == C3, Effects1), @@ -728,12 +728,12 @@ single_active_consumer_test(_) -> make_checkout(C1, cancel, #{}), State2), % the second registered consumer is now the active one - ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)), - ?assertMatch(#{C2 := _}, State3#?RABBIT_FIFO.consumers), + ?assertEqual(1, map_size(State3#?STATE.consumers)), + ?assertMatch(#{C2 := _}, State3#?STATE.consumers), % the new active consumer is no longer in the waiting list - ?assertEqual(1, length(State3#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(1, length(State3#?STATE.waiting_consumers)), ?assertNotEqual(false, lists:keyfind(C4, 1, - State3#?RABBIT_FIFO.waiting_consumers)), + State3#?STATE.waiting_consumers)), %% should have a cancel consumer handler mod_call effect and %% an active new consumer effect ?ASSERT_EFF({mod_call, rabbit_quorum_queue, @@ -746,10 +746,10 @@ single_active_consumer_test(_) -> make_checkout(C2, cancel, #{}), State3), % the last waiting consumer became the active one - ?assertEqual(1, map_size(State4#?RABBIT_FIFO.consumers)), - ?assertMatch(#{C4 := _}, State4#?RABBIT_FIFO.consumers), + ?assertEqual(1, map_size(State4#?STATE.consumers)), + ?assertMatch(#{C4 := _}, State4#?STATE.consumers), % the waiting consumer list is now empty - ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), % there are some effects to unregister the consumer and % to update the new active one (metrics) ?ASSERT_EFF({mod_call, rabbit_quorum_queue, @@ -762,9 +762,9 @@ single_active_consumer_test(_) -> make_checkout(C4, cancel, #{}), State4), % no active consumer anymore - ?assertEqual(0, map_size(State5#?RABBIT_FIFO.consumers)), + ?assertEqual(0, map_size(State5#?STATE.consumers)), % still nothing in the waiting list - ?assertEqual(0, length(State5#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(0, length(State5#?STATE.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect ?ASSERT_EFF({mod_call, rabbit_quorum_queue, cancel_consumer_handler, _}, Effects4), @@ -799,9 +799,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> % the channel of the active consumer goes down {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), % fell back to another consumer - ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)), + ?assertEqual(1, map_size(State2#?STATE.consumers)), % there are still waiting consumers - ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), % effects to unregister the consumer and % to update the new active one (metrics) are there ?ASSERT_EFF({mod_call, rabbit_quorum_queue, @@ -812,9 +812,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> % the channel of the active consumer and a waiting consumer goes down {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), % fell back to another consumer - ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)), + ?assertEqual(1, map_size(State3#?STATE.consumers)), % no more waiting consumer - ?assertEqual(0, length(State3#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(0, length(State3#?STATE.waiting_consumers)), % effects to cancel both consumers of this channel + effect to update the new active one (metrics) ?ASSERT_EFF({mod_call, rabbit_quorum_queue, cancel_consumer_handler, [_, C]}, C == C2, Effects2), @@ -826,8 +826,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> % the last channel goes down {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), % no more consumers - ?assertEqual(0, map_size(State4#?RABBIT_FIFO.consumers)), - ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(0, map_size(State4#?STATE.consumers)), + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect ?ASSERT_EFF({mod_call, rabbit_quorum_queue, cancel_consumer_handler, [_, C]}, C == C4, Effects3), @@ -862,10 +862,10 @@ single_active_returns_messages_on_noconnection_test(_) -> % simulate node goes down {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2), %% assert the consumer is up - ?assertMatch([_], lqueue:to_list(State3#?RABBIT_FIFO.returns)), + ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)), ?assertMatch([{_, #consumer{checked_out = Checked}}] when map_size(Checked) == 0, - State3#?RABBIT_FIFO.waiting_consumers), + State3#?STATE.waiting_consumers), ok. @@ -896,7 +896,7 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> %% assert the consumer is up ?assertMatch(#{C1 := #consumer{status = up}}, - State1a#?RABBIT_FIFO.consumers), + State1a#?STATE.consumers), {State1, _} = enq(10, 1, msg, State1a), @@ -907,24 +907,24 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> ?assertMatch([{C2, #consumer{status = up, checked_out = Ch}}] when map_size(Ch) == 1, - maps:to_list(State2#?RABBIT_FIFO.consumers)), + maps:to_list(State2#?STATE.consumers)), %% the disconnected consumer has been returned to waiting ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, - State2#?RABBIT_FIFO.waiting_consumers)), - ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)), + State2#?STATE.waiting_consumers)), + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), % simulate node comes back up {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), %% the consumer is still active and the same as before ?assertMatch([{C2, #consumer{status = up}}], - maps:to_list(State3#?RABBIT_FIFO.consumers)), + maps:to_list(State3#?STATE.consumers)), % the waiting consumers should be un-suspected - ?assertEqual(2, length(State3#?RABBIT_FIFO.waiting_consumers)), + ?assertEqual(2, length(State3#?STATE.waiting_consumers)), lists:foreach(fun({_, #consumer{status = Status}}) -> ?assert(Status /= suspected_down) - end, State3#?RABBIT_FIFO.waiting_consumers), + end, State3#?STATE.waiting_consumers), ok. single_active_consumer_all_disconnected_test(_) -> @@ -953,22 +953,22 @@ single_active_consumer_all_disconnected_test(_) -> end, State0, ConsumerIds), %% assert the consumer is up - ?assertMatch(#{C1 := #consumer{status = up}}, State1#?RABBIT_FIFO.consumers), + ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers), % simulate node goes down {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), %% assert the consumer fails over to the consumer on n2 - ?assertMatch(#{C2 := #consumer{status = up}}, State2#?RABBIT_FIFO.consumers), + ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers), {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), %% assert these no active consumer after both nodes are maked as down - ?assertMatch([], maps:to_list(State3#?RABBIT_FIFO.consumers)), + ?assertMatch([], maps:to_list(State3#?STATE.consumers)), %% n2 comes back {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), %% ensure n2 is the active consumer as this node as been registered %% as up again ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, credit = 1}}], - maps:to_list(State4#?RABBIT_FIFO.consumers)), + maps:to_list(State4#?STATE.consumers)), ok. single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> @@ -1050,11 +1050,11 @@ query_consumers_test(_) -> NewState end, State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), - Consumers0 = State1#?RABBIT_FIFO.consumers, + Consumers0 = State1#?STATE.consumers, Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{status = suspected_down}, Consumers0), - State2 = State1#?RABBIT_FIFO{consumers = Consumers1}, + State2 = State1#?STATE{consumers = Consumers1}, ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)), Consumers2 = rabbit_fifo_v0:query_consumers(State2), @@ -1203,13 +1203,13 @@ single_active_cancelled_with_unacked_test(_) -> %% C2 should be the active consumer ?assertMatch(#{C2 := #consumer{status = up, checked_out = #{0 := _}}}, - State4#?RABBIT_FIFO.consumers), + State4#?STATE.consumers), %% C1 should be a cancelled consumer ?assertMatch(#{C1 := #consumer{status = cancelled, lifetime = once, checked_out = #{0 := _}}}, - State4#?RABBIT_FIFO.consumers), - ?assertMatch([], State4#?RABBIT_FIFO.waiting_consumers), + State4#?STATE.consumers), + ?assertMatch([], State4#?STATE.waiting_consumers), %% Ack both messages {State5, _Effects5} = settle(C1, 1, 0, State4), @@ -1218,11 +1218,11 @@ single_active_cancelled_with_unacked_test(_) -> %% C2 should remain ?assertMatch(#{C2 := #consumer{status = up}}, - State6#?RABBIT_FIFO.consumers), + State6#?STATE.consumers), %% C1 should be gone ?assertNotMatch(#{C1 := _}, - State6#?RABBIT_FIFO.consumers), - ?assertMatch([], State6#?RABBIT_FIFO.waiting_consumers), + State6#?STATE.consumers), + ?assertMatch([], State6#?STATE.waiting_consumers), ok. single_active_with_credited_test(_) -> @@ -1253,9 +1253,9 @@ single_active_with_credited_test(_) -> {State3, _} = apply(meta(4), C2Cred, State2), %% both consumers should have credit ?assertMatch(#{C1 := #consumer{credit = 5}}, - State3#?RABBIT_FIFO.consumers), + State3#?STATE.consumers), ?assertMatch([{C2, #consumer{credit = 4}}], - State3#?RABBIT_FIFO.waiting_consumers), + State3#?STATE.waiting_consumers), ok. purge_nodes_test(_) -> @@ -1292,11 +1292,8 @@ purge_nodes_test(_) -> State4), %% assert there are no enqueuers nor consumers - ?assertMatch(#?RABBIT_FIFO{enqueuers = Enqs} when map_size(Enqs) == 1, - State), - - ?assertMatch(#?RABBIT_FIFO{consumers = Cons} when map_size(Cons) == 0, - State), + ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State), + ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State), ?assertMatch( [{mod_call, rabbit_quorum_queue, handle_tick, [#resource{}, _Metrics, |
