diff options
| -rw-r--r-- | src/rabbit_channel.erl | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 51e550ed15..710097477a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,10 +39,10 @@ %% callbacks -export([init/2, handle_message/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, proxy_pid, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, limiter, + username, virtual_host, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -100,6 +100,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> proxy_pid = ProxyPid, reader_pid = ReaderPid, writer_pid = WriterPid, + limiter_pid = undefined, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -108,7 +109,6 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - limiter = undefined, consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -291,7 +291,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> ok = notify_limiter(State#ch.limiter, Acked), + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), @@ -336,7 +336,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, - limiter = LimiterPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -430,23 +430,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, [Size]); handle_method(#'basic.qos'{prefetch_count = 0}, - _, State = #ch{ limiter = undefined }) -> + _, State = #ch{ limiter_pid = undefined }) -> {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter = Limiter, + _, State = #ch{ limiter_pid = LimiterPid, proxy_pid = ProxyPid }) -> %% TODO: terminate limiter when transitioning to 'unlimited' - NewLimiter = case Limiter of - undefined -> - LPid = rabbit_limiter:start_link(ProxyPid), - ok = limit_queues(LPid, State), - LPid; - LPid -> - LPid - end, - ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; + NewLimiterPid = case LimiterPid of + undefined -> + LPid = rabbit_limiter:start_link(ProxyPid), + ok = limit_queues(LPid, State), + LPid; + LPid -> + LPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -792,7 +792,7 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> ok = notify_limiter(State#ch.limiter, + ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( @@ -854,12 +854,12 @@ consumer_queues(Consumers) -> %% for messages sent in a response to a basic.get notify_limiter(undefined, _Acked) -> ok; -notify_limiter(Limiter, Acked) -> +notify_limiter(LimiterPid, Acked) -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 end, 0, queue:to_list(Acked)) of 0 -> ok; - Count -> rabbit_limiter:ack(Limiter, Count) + Count -> rabbit_limiter:ack(LimiterPid, Count) end. is_message_persistent(#content{properties = #'P_basic'{ |
