summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl34
1 files changed, 16 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b77d26a0c8..c586df0260 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -277,7 +277,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- limiter = Limiter,
next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
if DeliveryTag >= NextDeliveryTag ->
@@ -286,24 +285,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- % CC the limiter on the number of acks that have been received
- % but don't include any acks from a basic.get bottom half
- % (hence the differentiation between tags set to none and other tags)
- % TODO - this is quite crude and is probably more expensive than it should
- % be - according to the OTP documentation, len/1 runs in O(n), probably
- % not so cool for a queuing system
- NotBasicGet = queue:filter(
- fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) ->
- case ConsumerTag of
- none -> false;
- _ -> true
- end
- end, Acked),
- % TODO Optimization: Probably don't need to send this if len = 0
- rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -789,7 +774,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -840,6 +827,17 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
end],
ProxyPid).
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, rather than those
+%% for messages sent in a response to a basic.get
+notify_limiter(Limiter, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:decrement_capacity(Limiter, Count)
+ end.
+
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of