summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_queue_consumers.erl18
2 files changed, 27 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8fb73d6156..e143f8a1a8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -161,6 +161,7 @@
queue_cleanup_timer
}).
+-define(QUEUE, lqueue).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -433,7 +434,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
limiter = Limiter,
tx = none,
next_tag = 1,
- unacked_message_q = queue:new(),
+ unacked_message_q = ?QUEUE:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -1181,7 +1182,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
- {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
@@ -1389,10 +1390,10 @@ handle_method(#'basic.qos'{global = true,
handle_method(#'basic.qos'{global = true,
prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- %% TODO queue:len(UAMQ) is not strictly right since that counts
+ %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
- PrefetchCount, queue:len(UAMQ)),
+ PrefetchCount, ?QUEUE:len(UAMQ)),
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
@@ -1405,7 +1406,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter,
queue_states = QueueStates0}) ->
OkFun = fun () -> ok end,
- UAMQL = queue:to_list(UAMQ),
+ UAMQL = ?QUEUE:to_list(UAMQ),
QueueStates =
foreach_per_queue(
fun ({QPid, CTag}, MsgIds, Acc0) ->
@@ -1419,7 +1420,7 @@ handle_method(#'basic.recover_async'{requeue = true},
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
- {noreply, State#ch{unacked_message_q = queue:new(),
+ {noreply, State#ch{unacked_message_q = ?QUEUE:new(),
queue_states = QueueStates}};
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
@@ -1543,7 +1544,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
- UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1835,28 +1836,28 @@ record_sent(Type, Tag, AckRequired,
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
+ UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {lists:reverse(queue:to_list(Q)), queue:new()};
+ {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case queue:out(Q) of
+ case ?QUEUE:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
- _ -> queue:join(
- queue:from_list(lists:reverse(PrefixAcc)),
+ _ -> ?QUEUE:join(
+ ?QUEUE:from_list(lists:reverse(PrefixAcc)),
QTail)
end};
Multiple ->
@@ -1902,7 +1903,7 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
%% well as the list overall, are in "most-recent (generally youngest)
%% ack first" order.
-new_tx() -> {queue:new(), []}.
+new_tx() -> {?QUEUE:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -2145,8 +2146,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index e3322f3f64..98582c8117 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -25,6 +25,8 @@
%%----------------------------------------------------------------------------
+-define(QUEUE, lqueue).
+
-define(UNSENT_MESSAGE_LIMIT, 200).
%% Utilisation average calculations are all in μs.
@@ -122,7 +124,7 @@ consumers(Consumers, Acc) ->
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+ lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Username, State = #state{consumers = Consumers,
@@ -179,7 +181,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
- {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
+ {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)],
tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -237,7 +239,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in({AckTag, CTag}, ChAckTags);
+ true -> ?QUEUE:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -246,7 +248,7 @@ deliver_to_consumer(FetchFun,
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
+ update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
ok.
subtract_acks(ChPid, AckTags, State) ->
@@ -274,9 +276,9 @@ subtract_acks(ChPid, AckTags, State) ->
subtract_acks([], [], CTagCounts, AckQ) ->
{CTagCounts, AckQ};
subtract_acks([], Prefix, CTagCounts, AckQ) ->
- {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+ {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)};
subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
- case queue:out(AckQ) of
+ case ?QUEUE:out(AckQ) of
{{value, {T, CTag}}, QTail} ->
subtract_acks(TL, Prefix,
maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
@@ -380,7 +382,7 @@ ch_record(ChPid, LimiterPid) ->
Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = queue:new(),
+ acktags = ?QUEUE:new(),
consumer_count = 0,
blocked_consumers = priority_queue:new(),
limiter = Limiter,
@@ -393,7 +395,7 @@ ch_record(ChPid, LimiterPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
{true, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
end,