diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-25 12:02:25 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-25 12:02:25 +0000 |
| commit | fecd996a4b2d0b79fbe2f870ed48f7299a59b56e (patch) | |
| tree | ab64900e7960ec47cad422a0b6dd87f3417bfb07 | |
| parent | 20bc8c429b6a3e1056e73056afec2bba5299743e (diff) | |
| download | rabbitmq-server-git-fecd996a4b2d0b79fbe2f870ed48f7299a59b56e.tar.gz | |
Formatting
| -rw-r--r-- | src/rabbit_fifo.erl | 206 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 2 |
2 files changed, 104 insertions, 104 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 36341259be..863375671b 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -138,12 +138,12 @@ update_config(Conf, State) -> end, Cfg = State#?MODULE.cfg, State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, - dead_letter_handler = DLH, - become_leader_handler = BLH, - max_length = MaxLength, - max_bytes = MaxBytes, - consumer_strategy = ConsumerStrategy, - delivery_limit = DeliveryLimit}}. + dead_letter_handler = DLH, + become_leader_handler = BLH, + max_length = MaxLength, + max_bytes = MaxBytes, + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}}. zero(_) -> 0. @@ -206,7 +206,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, - consumers = Cons}, []), + consumers = Cons}, []), Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain @@ -274,8 +274,8 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, #?MODULE{ra_indexes = Indexes0, - returns = Returns, - messages = Messages} = State0) -> + returns = Returns, + messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, [I || {I, _} <- lists:sort(maps:values(Messages))]), @@ -284,11 +284,11 @@ apply(#{index := RaftIdx}, #purge{}, {State, _, Effects} = update_smallest_raft_index(RaftIdx, State0#?MODULE{ra_indexes = Indexes, - messages = #{}, - returns = lqueue:new(), - msg_bytes_enqueue = 0, - prefix_msgs = {[], []}, - low_msg_num = undefined}, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + prefix_msgs = {[], []}, + low_msg_num = undefined}, []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves @@ -296,7 +296,7 @@ apply(#{index := RaftIdx}, #purge{}, lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), % mark all consumers and enqueuers as suspected down @@ -335,9 +335,9 @@ apply(_, {down, ConsumerPid, noconnection}, end ++ Effects1, %% TODO: should we run a checkout here? {State#?MODULE{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; + waiting_consumers = WaitingConsumers}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -358,8 +358,8 @@ apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -389,8 +389,8 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, end, {Cons0, SQ0, Monitors}, Cons0), checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + service_queue = SQ, + waiting_consumers = WaitingConsumers}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> @@ -411,11 +411,11 @@ handle_waiting_consumer_down(_Pid, {[], State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []} = State) -> + waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0) -> + waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), @@ -434,12 +434,12 @@ update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = de []; update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []}, + waiting_consumers = []}, _Status) -> []; update_waiting_consumer_status(Node, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers}, + waiting_consumers = WaitingConsumers}, Status) -> [begin case node(P) of @@ -453,12 +453,12 @@ update_waiting_consumer_status(Node, -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #?MODULE{consumers = Cons, - enqueuers = Enqs, - waiting_consumers = WaitingConsumers, - cfg = #cfg{name = Name, - become_leader_handler = BLH}, - prefix_msgs = {[], []} - }) -> + enqueuers = Enqs, + waiting_consumers = WaitingConsumers, + cfg = #cfg{name = Name, + become_leader_handler = BLH}, + prefix_msgs = {[], []} + }) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)] @@ -478,8 +478,8 @@ state_enter(recovered, #?MODULE{prefix_msgs = PrefixMsgCounts}) %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts}); state_enter(eol, #?MODULE{enqueuers = Enqs, - consumers = Custs0, - waiting_consumers = WaitingConsumers0}) -> + consumers = Custs0, + waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), @@ -493,9 +493,9 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). tick(_Ts, #?MODULE{cfg = #cfg{name = Name, - resource = QName}, - msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + resource = QName}, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, messages_ready(State), num_checked_out(State), % checked out @@ -508,10 +508,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, - enqueuers = Enqs, - release_cursors = Cursors, - msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + enqueuers = Enqs, + release_cursors = Cursors, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -575,12 +575,12 @@ query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> RaIndexes. query_consumer_count(#?MODULE{consumers = Consumers, - waiting_consumers = WaitingConsumers}) -> + waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). query_consumers(#?MODULE{consumers = Consumers, - waiting_consumers = WaitingConsumers, - cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> + waiting_consumers = WaitingConsumers, + cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> @@ -639,7 +639,7 @@ query_consumers(#?MODULE{consumers = Consumers, maps:merge(FromConsumers, FromWaitingConsumers). query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active}, - consumers = Consumers}) -> + consumers = Consumers}) -> case maps:size(Consumers) of 0 -> {error, no_value}; @@ -665,14 +665,14 @@ usage(Name) when is_atom(Name) -> %%% Internal messages_ready(#?MODULE{messages = M, - prefix_msgs = {PreR, PreM}, - returns = R}) -> + prefix_msgs = {PreR, PreM}, + returns = R}) -> %% TODO: optimise to avoid length/1 call maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). messages_total(#?MODULE{ra_indexes = I, - prefix_msgs = {PreR, PreM}}) -> + prefix_msgs = {PreR, PreM}}) -> rabbit_fifo_index:size(I) + length(PreR) + length(PreM). update_use({inactive, _, _, _} = CUInfo, inactive) -> @@ -714,14 +714,14 @@ cancel_consumer(ConsumerId, cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []} = State, + waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{consumers = Cons0, - cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = WaitingConsumers0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of @@ -739,10 +739,10 @@ cancel_consumer(ConsumerId, NewActiveConsumer, ServiceQueue), State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#?MODULE.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, + NewActiveConsumer, + State1#?MODULE.consumers), + service_queue = ServiceQueue1, + waiting_consumers = RemainingWaitingConsumers}, Effects = consumer_update_active_effects(State, NewActiveConsumerId, NewActiveConsumer, true, single_active, Effects2), @@ -788,9 +788,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. -maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, - #?MODULE{consumers = C0, - service_queue = SQ0} = S0, Effects0, Reason) -> +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, + Cons1, #?MODULE{consumers = C0, + service_queue = SQ0} = S0, + Effects0, Reason) -> case Reason of consumer_cancel -> {Cons, SQ, Effects1} = @@ -801,7 +802,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 C0, SQ0, Effects0), {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer), + {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, + Consumer), {S1#?MODULE{consumers = Cons1}, Effects1} end. @@ -852,7 +854,7 @@ append_to_master_index(RaftIdx, incr_enqueue_count(#?MODULE{enqueue_count = C, - cfg = #cfg{release_cursor_interval = C}} = State0) -> + cfg = #cfg{release_cursor_interval = C}} = State0) -> % this will trigger a dehydrated version of the state to be stored % at this raft index for potential future snapshot generation State0#?MODULE{enqueue_count = 0}; @@ -861,8 +863,8 @@ incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> maybe_store_dehydrated_state(RaftIdx, #?MODULE{ra_indexes = Indexes, - enqueue_count = 0, - release_cursors = Cursors} = State) -> + enqueue_count = 0, + release_cursors = Cursors} = State) -> case rabbit_fifo_index:exists(RaftIdx, Indexes) of false -> %% the incoming enqueue must already have been dropped @@ -934,14 +936,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, ConsumerId, Con) end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#?MODULE{consumers = Cons, - service_queue = SQ}, + service_queue = SQ}, Effects2). % used to processes messages that are finished complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Con0, Checked, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, - ra_indexes = Indexes0} = State0) -> + ra_indexes = Indexes0} = State0) -> %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = Checked, @@ -951,8 +953,8 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), {State0#?MODULE{consumers = Cons, - ra_indexes = Indexes, - service_queue = SQ}, Effects}. + ra_indexes = Indexes, + service_queue = SQ}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -1005,7 +1007,7 @@ cancel_consumer_effects(ConsumerId, update_smallest_raft_index(IncomingRaftIdx, #?MODULE{ra_indexes = Indexes, - release_cursors = Cursors0} = State0, + release_cursors = Cursors0} = State0, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 -> @@ -1043,8 +1045,8 @@ find_next_cursor(Smallest, Cursors0, Potential) -> return_one(0, {'$prefix_msg', Header0}, #?MODULE{returns = Returns, - cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, - ConsumerId, Con) -> + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), @@ -1063,7 +1065,7 @@ return_one(0, {'$prefix_msg', Header0}, end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #?MODULE{returns = Returns, - cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, @@ -1082,7 +1084,7 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, %% this should not affect the release cursor in any way {add_bytes_return(RawMsg, State0#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. @@ -1127,7 +1129,7 @@ checkout0({Activity, State0}, Effects0, Acc) -> evaluate_limit(_OldIndexes, Result, #?MODULE{cfg = #cfg{max_length = undefined, - max_bytes = undefined}} = State, + max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; evaluate_limit(OldIndexes, Result, @@ -1161,9 +1163,9 @@ take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> {{'$prefix_msg', Header}, State#?MODULE{prefix_msgs = {Rem, P}}}; take_next_msg(#?MODULE{returns = Returns, - low_msg_num = Low0, - messages = Messages0, - prefix_msgs = {R, P}} = State) -> + low_msg_num = Low0, + messages = Messages0, + prefix_msgs = {R, P}} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue case lqueue:peek(Returns) of @@ -1180,11 +1182,11 @@ take_next_msg(#?MODULE{returns = Returns, 0 -> {{Low0, Msg}, State#?MODULE{messages = Messages, - low_msg_num = undefined}}; + low_msg_num = undefined}}; _ -> {{Low0, Msg}, State#?MODULE{messages = Messages, - low_msg_num = Low0 + 1}} + low_msg_num = Low0 + 1}} end end; empty -> @@ -1198,8 +1200,8 @@ send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. checkout_one(#?MODULE{service_queue = SQ0, - messages = Messages0, - consumers = Cons0} = InitState) -> + messages = Messages0, + consumers = Cons0} = InitState) -> case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of @@ -1230,7 +1232,7 @@ checkout_one(#?MODULE{service_queue = SQ0, update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), State1 = State0#?MODULE{service_queue = SQ, - consumers = Cons}, + consumers = Cons}, {State, Msg} = case ConsumerMsg of {'$prefix_msg', _} -> @@ -1298,13 +1300,13 @@ update_consumer(ConsumerId, Meta, Spec, update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, #?MODULE{consumers = Cons0, - cfg = #cfg{consumer_strategy = single_active}} = State0) + cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0) -> + waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, @@ -1314,7 +1316,7 @@ update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, #?MODULE{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, @@ -1345,9 +1347,9 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, - consumers = Consumers, - returns = Returns, - prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> + consumers = Consumers, + returns = Returns, + prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> %% TODO: optimise this function as far as possible PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; @@ -1362,15 +1364,15 @@ dehydrate_state(#?MODULE{messages = Messages, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), State#?MODULE{messages = #{}, - ra_indexes = rabbit_fifo_index:empty(), - release_cursors = lqueue:new(), - low_msg_num = undefined, - consumers = maps:map(fun (_, C) -> - dehydrate_consumer(C) - end, Consumers), - returns = lqueue:new(), - prefix_msgs = {lists:reverse(PrefRet), - lists:reverse(PrefMsgs)}}. + ra_indexes = rabbit_fifo_index:empty(), + release_cursors = lqueue:new(), + low_msg_num = undefined, + consumers = maps:map(fun (_, C) -> + dehydrate_consumer(C) + end, Consumers), + returns = lqueue:new(), + prefix_msgs = {lists:reverse(PrefRet), + lists:reverse(PrefMsgs)}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> @@ -1385,11 +1387,11 @@ normalize(#?MODULE{release_cursors = Cursors} = State) -> State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, - max_bytes = undefined}}) -> + max_bytes = undefined}}) -> false; is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, - max_bytes = MaxBytes}, - msg_bytes_enqueue = BytesEnq} = State) -> + max_bytes = MaxBytes}, + msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). @@ -1439,17 +1441,17 @@ add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout + Bytes, - msg_bytes_enqueue = Enqueue - Bytes}. + msg_bytes_enqueue = Enqueue - Bytes}. add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) -> Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue} = State) -> + msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout - Bytes, - msg_bytes_enqueue = Enqueue + Bytes}. + msg_bytes_enqueue = Enqueue + Bytes}. message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, @@ -1463,8 +1465,8 @@ message_size(Msg) -> erts_debug:size(Msg). suspected_pids_for(Node, #?MODULE{consumers = Cons0, - enqueuers = Enqs0, - waiting_consumers = WaitingConsumers0}) -> + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 42fc5d0b10..ebe5f3328a 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -159,8 +159,6 @@ waiting_consumers = [] :: [{consumer_id(), consumer()}] }). - - -type config() :: #{name := atom(), queue_resource := rabbit_types:r('queue'), dead_letter_handler => applied_mfa(), |
