summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-25 12:02:25 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-25 12:02:25 +0000
commitfecd996a4b2d0b79fbe2f870ed48f7299a59b56e (patch)
treeab64900e7960ec47cad422a0b6dd87f3417bfb07 /src
parent20bc8c429b6a3e1056e73056afec2bba5299743e (diff)
downloadrabbitmq-server-git-fecd996a4b2d0b79fbe2f870ed48f7299a59b56e.tar.gz
Formatting
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl206
-rw-r--r--src/rabbit_fifo.hrl2
2 files changed, 104 insertions, 104 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 36341259be..863375671b 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -138,12 +138,12 @@ update_config(Conf, State) ->
end,
Cfg = State#?MODULE.cfg,
State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
- dead_letter_handler = DLH,
- become_leader_handler = BLH,
- max_length = MaxLength,
- max_bytes = MaxBytes,
- consumer_strategy = ConsumerStrategy,
- delivery_limit = DeliveryLimit}}.
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}}.
zero(_) ->
0.
@@ -206,7 +206,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -274,8 +274,8 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#?MODULE{ra_indexes = Indexes0,
- returns = Returns,
- messages = Messages} = State0) ->
+ returns = Returns,
+ messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
@@ -284,11 +284,11 @@ apply(#{index := RaftIdx}, #purge{},
{State, _, Effects} =
update_smallest_raft_index(RaftIdx,
State0#?MODULE{ra_indexes = Indexes,
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {[], []},
- low_msg_num = undefined},
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {[], []},
+ low_msg_num = undefined},
[]),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
@@ -296,7 +296,7 @@ apply(#{index := RaftIdx}, #purge{},
lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
#?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
% mark all consumers and enqueuers as suspected down
@@ -335,9 +335,9 @@ apply(_, {down, ConsumerPid, noconnection},
end ++ Effects1,
%% TODO: should we run a checkout here?
{State#?MODULE{consumers = Cons, enqueuers = Enqs,
- waiting_consumers = WaitingConsumers}, ok, Effects2};
+ waiting_consumers = WaitingConsumers}, ok, Effects2};
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -358,8 +358,8 @@ apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -389,8 +389,8 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
end, {Cons0, SQ0, Monitors}, Cons0),
checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, #update_config{config = Conf}, State) ->
@@ -411,11 +411,11 @@ handle_waiting_consumer_down(_Pid,
{[], State};
handle_waiting_consumer_down(_Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = []} = State) ->
+ waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers0} = State0) ->
+ waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
WaitingConsumers0),
@@ -434,12 +434,12 @@ update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = de
[];
update_waiting_consumer_status(_Node,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = []},
+ waiting_consumers = []},
_Status) ->
[];
update_waiting_consumer_status(Node,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers},
+ waiting_consumers = WaitingConsumers},
Status) ->
[begin
case node(P) of
@@ -453,12 +453,12 @@ update_waiting_consumer_status(Node,
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #?MODULE{consumers = Cons,
- enqueuers = Enqs,
- waiting_consumers = WaitingConsumers,
- cfg = #cfg{name = Name,
- become_leader_handler = BLH},
- prefix_msgs = {[], []}
- }) ->
+ enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{name = Name,
+ become_leader_handler = BLH},
+ prefix_msgs = {[], []}
+ }) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
++ [P || {_, P} <- maps:keys(Cons)]
@@ -478,8 +478,8 @@ state_enter(recovered, #?MODULE{prefix_msgs = PrefixMsgCounts})
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
state_enter(eol, #?MODULE{enqueuers = Enqs,
- consumers = Custs0,
- waiting_consumers = WaitingConsumers0}) ->
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
#{}, WaitingConsumers0),
@@ -493,9 +493,9 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
- resource = QName},
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+ resource = QName},
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
messages_ready(State),
num_checked_out(State), % checked out
@@ -508,10 +508,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
- enqueuers = Enqs,
- release_cursors = Cursors,
- msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
@@ -575,12 +575,12 @@ query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
RaIndexes.
query_consumer_count(#?MODULE{consumers = Consumers,
- waiting_consumers = WaitingConsumers}) ->
+ waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
query_consumers(#?MODULE{consumers = Consumers,
- waiting_consumers = WaitingConsumers,
- cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
case ConsumerStrategy of
default ->
@@ -639,7 +639,7 @@ query_consumers(#?MODULE{consumers = Consumers,
maps:merge(FromConsumers, FromWaitingConsumers).
query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- consumers = Consumers}) ->
+ consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
{error, no_value};
@@ -665,14 +665,14 @@ usage(Name) when is_atom(Name) ->
%%% Internal
messages_ready(#?MODULE{messages = M,
- prefix_msgs = {PreR, PreM},
- returns = R}) ->
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
%% TODO: optimise to avoid length/1 call
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
messages_total(#?MODULE{ra_indexes = I,
- prefix_msgs = {PreR, PreM}}) ->
+ prefix_msgs = {PreR, PreM}}) ->
rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
update_use({inactive, _, _, _} = CUInfo, inactive) ->
@@ -714,14 +714,14 @@ cancel_consumer(ConsumerId,
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = []} = State,
+ waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#?MODULE{consumers = Cons0,
- cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers0} = State0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0,
Effects0, Reason) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
@@ -739,10 +739,10 @@ cancel_consumer(ConsumerId,
NewActiveConsumer,
ServiceQueue),
State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId,
- NewActiveConsumer,
- State1#?MODULE.consumers),
- service_queue = ServiceQueue1,
- waiting_consumers = RemainingWaitingConsumers},
+ NewActiveConsumer,
+ State1#?MODULE.consumers),
+ service_queue = ServiceQueue1,
+ waiting_consumers = RemainingWaitingConsumers},
Effects = consumer_update_active_effects(State, NewActiveConsumerId,
NewActiveConsumer, true,
single_active, Effects2),
@@ -788,9 +788,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
-maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
- #?MODULE{consumers = C0,
- service_queue = SQ0} = S0, Effects0, Reason) ->
+maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
+ Cons1, #?MODULE{consumers = C0,
+ service_queue = SQ0} = S0,
+ Effects0, Reason) ->
case Reason of
consumer_cancel ->
{Cons, SQ, Effects1} =
@@ -801,7 +802,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
C0, SQ0, Effects0),
{S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer),
+ {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId,
+ Consumer),
{S1#?MODULE{consumers = Cons1}, Effects1}
end.
@@ -852,7 +854,7 @@ append_to_master_index(RaftIdx,
incr_enqueue_count(#?MODULE{enqueue_count = C,
- cfg = #cfg{release_cursor_interval = C}} = State0) ->
+ cfg = #cfg{release_cursor_interval = C}} = State0) ->
% this will trigger a dehydrated version of the state to be stored
% at this raft index for potential future snapshot generation
State0#?MODULE{enqueue_count = 0};
@@ -861,8 +863,8 @@ incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
maybe_store_dehydrated_state(RaftIdx,
#?MODULE{ra_indexes = Indexes,
- enqueue_count = 0,
- release_cursors = Cursors} = State) ->
+ enqueue_count = 0,
+ release_cursors = Cursors} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
@@ -934,14 +936,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
ConsumerId, Con)
end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#?MODULE{consumers = Cons,
- service_queue = SQ},
+ service_queue = SQ},
Effects2).
% used to processes messages that are finished
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
- ra_indexes = Indexes0} = State0) ->
+ ra_indexes = Indexes0} = State0) ->
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
@@ -951,8 +953,8 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
{State0#?MODULE{consumers = Cons,
- ra_indexes = Indexes,
- service_queue = SQ}, Effects}.
+ ra_indexes = Indexes,
+ service_queue = SQ}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1005,7 +1007,7 @@ cancel_consumer_effects(ConsumerId,
update_smallest_raft_index(IncomingRaftIdx,
#?MODULE{ra_indexes = Indexes,
- release_cursors = Cursors0} = State0,
+ release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 ->
@@ -1043,8 +1045,8 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
return_one(0, {'$prefix_msg', Header0},
#?MODULE{returns = Returns,
- cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0,
- ConsumerId, Con) ->
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
@@ -1063,7 +1065,7 @@ return_one(0, {'$prefix_msg', Header0},
end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
- cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
@@ -1082,7 +1084,7 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
State0#?MODULE{returns =
- lqueue:in({MsgNum, Msg}, Returns)}),
+ lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
@@ -1127,7 +1129,7 @@ checkout0({Activity, State0}, Effects0, Acc) ->
evaluate_limit(_OldIndexes, Result,
#?MODULE{cfg = #cfg{max_length = undefined,
- max_bytes = undefined}} = State,
+ max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
evaluate_limit(OldIndexes, Result,
@@ -1161,9 +1163,9 @@ take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
{{'$prefix_msg', Header},
State#?MODULE{prefix_msgs = {Rem, P}}};
take_next_msg(#?MODULE{returns = Returns,
- low_msg_num = Low0,
- messages = Messages0,
- prefix_msgs = {R, P}} = State) ->
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
@@ -1180,11 +1182,11 @@ take_next_msg(#?MODULE{returns = Returns,
0 ->
{{Low0, Msg},
State#?MODULE{messages = Messages,
- low_msg_num = undefined}};
+ low_msg_num = undefined}};
_ ->
{{Low0, Msg},
State#?MODULE{messages = Messages,
- low_msg_num = Low0 + 1}}
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
@@ -1198,8 +1200,8 @@ send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
checkout_one(#?MODULE{service_queue = SQ0,
- messages = Messages0,
- consumers = Cons0} = InitState) ->
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
@@ -1230,7 +1232,7 @@ checkout_one(#?MODULE{service_queue = SQ0,
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State1 = State0#?MODULE{service_queue = SQ,
- consumers = Cons},
+ consumers = Cons},
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', _} ->
@@ -1298,13 +1300,13 @@ update_consumer(ConsumerId, Meta, Spec,
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
#?MODULE{consumers = Cons0,
- cfg = #cfg{consumer_strategy = single_active}} = State0)
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers0} = State0) ->
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
@@ -1314,7 +1316,7 @@ update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
#?MODULE{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
@@ -1345,9 +1347,9 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
- consumers = Consumers,
- returns = Returns,
- prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ consumers = Consumers,
+ returns = Returns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
%% TODO: optimise this function as far as possible
PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
@@ -1362,15 +1364,15 @@ dehydrate_state(#?MODULE{messages = Messages,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
State#?MODULE{messages = #{},
- ra_indexes = rabbit_fifo_index:empty(),
- release_cursors = lqueue:new(),
- low_msg_num = undefined,
- consumers = maps:map(fun (_, C) ->
- dehydrate_consumer(C)
- end, Consumers),
- returns = lqueue:new(),
- prefix_msgs = {lists:reverse(PrefRet),
- lists:reverse(PrefMsgs)}}.
+ ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
+ low_msg_num = undefined,
+ consumers = maps:map(fun (_, C) ->
+ dehydrate_consumer(C)
+ end, Consumers),
+ returns = lqueue:new(),
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
@@ -1385,11 +1387,11 @@ normalize(#?MODULE{release_cursors = Cursors} = State) ->
State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
- max_bytes = undefined}}) ->
+ max_bytes = undefined}}) ->
false;
is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
- max_bytes = MaxBytes},
- msg_bytes_enqueue = BytesEnq} = State) ->
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
@@ -1439,17 +1441,17 @@ add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
- msg_bytes_enqueue = Enqueue - Bytes}.
+ msg_bytes_enqueue = Enqueue - Bytes}.
add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State) ->
+ msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
- msg_bytes_enqueue = Enqueue + Bytes}.
+ msg_bytes_enqueue = Enqueue + Bytes}.
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
@@ -1463,8 +1465,8 @@ message_size(Msg) ->
erts_debug:size(Msg).
suspected_pids_for(Node, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0,
- waiting_consumers = WaitingConsumers0}) ->
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 42fc5d0b10..ebe5f3328a 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -159,8 +159,6 @@
waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-
-
-type config() :: #{name := atom(),
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),