diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 34 |
1 files changed, 16 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b77d26a0c8..c586df0260 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - limiter = Limiter, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> @@ -286,24 +285,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - % CC the limiter on the number of acks that have been received - % but don't include any acks from a basic.get bottom half - % (hence the differentiation between tags set to none and other tags) - % TODO - this is quite crude and is probably more expensive than it should - % be - according to the OTP documentation, len/1 runs in O(n), probably - % not so cool for a queuing system - NotBasicGet = queue:filter( - fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) -> - case ConsumerTag of - none -> false; - _ -> true - end - end, Acked), - % TODO Optimization: Probably don't need to send this if len = 0 - rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -789,7 +774,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -840,6 +827,17 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> end], ProxyPid). +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, rather than those +%% for messages sent in a response to a basic.get +notify_limiter(Limiter, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:decrement_capacity(Limiter, Count) + end. + is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of |
