diff options
| author | Luke Bakken <lbakken@pivotal.io> | 2018-12-05 18:43:24 -0800 |
|---|---|---|
| committer | Luke Bakken <lbakken@pivotal.io> | 2018-12-05 18:43:24 -0800 |
| commit | 19f3901df38384f626c2d355a8c4e6ad216ef6e3 (patch) | |
| tree | 63614f331a1b4fadb4dbb7f5c39ca2957b0e93ed | |
| parent | dbe1dea0debc248bd0480ae210b7d518ac82aa01 (diff) | |
| download | rabbitmq-server-git-19f3901df38384f626c2d355a8c4e6ad216ef6e3.tar.gz | |
Replace a couple instances of queue with lqueue
| -rw-r--r-- | src/rabbit_channel.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 18 |
2 files changed, 27 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8fb73d6156..e143f8a1a8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -161,6 +161,7 @@ queue_cleanup_timer }). +-define(QUEUE, lqueue). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -433,7 +434,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, limiter = Limiter, tx = none, next_tag = 1, - unacked_message_q = queue:new(), + unacked_message_q = ?QUEUE:new(), user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -1181,7 +1182,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); - {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs), State1#ch{tx = {Msgs1, Acks}} end}; {error, Reason} -> @@ -1389,10 +1390,10 @@ handle_method(#'basic.qos'{global = true, handle_method(#'basic.qos'{global = true, prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> - %% TODO queue:len(UAMQ) is not strictly right since that counts + %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, - PrefetchCount, queue:len(UAMQ)), + PrefetchCount, ?QUEUE:len(UAMQ)), case ((not rabbit_limiter:is_active(Limiter)) andalso rabbit_limiter:is_active(Limiter1)) of true -> rabbit_amqqueue:activate_limit_all( @@ -1405,7 +1406,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter, queue_states = QueueStates0}) -> OkFun = fun () -> ok end, - UAMQL = queue:to_list(UAMQ), + UAMQL = ?QUEUE:to_list(UAMQ), QueueStates = foreach_per_queue( fun ({QPid, CTag}, MsgIds, Acc0) -> @@ -1419,7 +1420,7 @@ handle_method(#'basic.recover_async'{requeue = true}, ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method - {noreply, State#ch{unacked_message_q = queue:new(), + {noreply, State#ch{unacked_message_q = ?QUEUE:new(), queue_states = QueueStates}}; handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> @@ -1543,7 +1544,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = {_Msgs, Acks}}) -> AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), - UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), + UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; @@ -1835,28 +1836,28 @@ record_sent(Type, Tag, AckRequired, end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, + UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. %% NB: returns acks in youngest-first order collect_acks(Q, 0, true) -> - {lists:reverse(queue:to_list(Q)), queue:new()}; + {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()}; collect_acks(Q, DeliveryTag, Multiple) -> collect_acks([], [], Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> - case queue:out(Q) of + case ?QUEUE:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], case PrefixAcc of [] -> QTail; - _ -> queue:join( - queue:from_list(lists:reverse(PrefixAcc)), + _ -> ?QUEUE:join( + ?QUEUE:from_list(lists:reverse(PrefixAcc)), QTail) end}; Multiple -> @@ -1902,7 +1903,7 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as %% well as the list overall, are in "most-recent (generally youngest) %% ack first" order. -new_tx() -> {queue:new(), []}. +new_tx() -> {?QUEUE:new(), []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -2145,8 +2146,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); -i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index e3322f3f64..98582c8117 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -25,6 +25,8 @@ %%---------------------------------------------------------------------------- +-define(QUEUE, lqueue). + -define(UNSENT_MESSAGE_LIMIT, 200). %% Utilisation average calculations are all in μs. @@ -122,7 +124,7 @@ consumers(Consumers, Acc) -> count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, Username, State = #state{consumers = Consumers, @@ -179,7 +181,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> All = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), Filtered = priority_queue:filter(chan_pred(ChPid, true), All), - {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], + {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)], tags(priority_queue:to_list(Filtered)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -237,7 +239,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in({AckTag, CTag}, ChAckTags); + true -> ?QUEUE:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -246,7 +248,7 @@ deliver_to_consumer(FetchFun, record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), + update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}), ok. subtract_acks(ChPid, AckTags, State) -> @@ -274,9 +276,9 @@ subtract_acks(ChPid, AckTags, State) -> subtract_acks([], [], CTagCounts, AckQ) -> {CTagCounts, AckQ}; subtract_acks([], Prefix, CTagCounts, AckQ) -> - {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; + {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)}; subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> - case queue:out(AckQ) of + case ?QUEUE:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); @@ -380,7 +382,7 @@ ch_record(ChPid, LimiterPid) -> Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, - acktags = queue:new(), + acktags = ?QUEUE:new(), consumer_count = 0, blocked_consumers = priority_queue:new(), limiter = Limiter, @@ -393,7 +395,7 @@ ch_record(ChPid, LimiterPid) -> update_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of {true, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) end, |
