summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-23 21:14:46 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-23 21:14:46 +0000
commitd3cf883b712e6f5f815aaa8dbcbf993a5ed57ef1 (patch)
tree308e9952d6626d0a98990ae446edab49528a2124 /src
parent699abf69df84406d5a56f6851353d9a608b8d860 (diff)
downloadrabbitmq-server-git-d3cf883b712e6f5f815aaa8dbcbf993a5ed57ef1.tar.gz
cosmetic
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl40
1 files changed, 20 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 51e550ed15..710097477a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,10 +39,10 @@
%% callbacks
-export([init/2, handle_message/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, proxy_pid, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host, limiter,
+ username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@@ -100,6 +100,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
proxy_pid = ProxyPid,
reader_pid = ReaderPid,
writer_pid = WriterPid,
+ limiter_pid = undefined,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -108,7 +109,6 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- limiter = undefined,
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@@ -291,7 +291,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
- none -> ok = notify_limiter(State#ch.limiter, Acked),
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
@@ -336,7 +336,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ proxy_pid = ProxyPid,
reader_pid = ReaderPid,
- limiter = LimiterPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -430,23 +430,23 @@ handle_method(#'basic.qos'{prefetch_size = Size},
[Size]);
handle_method(#'basic.qos'{prefetch_count = 0},
- _, State = #ch{ limiter = undefined }) ->
+ _, State = #ch{ limiter_pid = undefined }) ->
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter = Limiter,
+ _, State = #ch{ limiter_pid = LimiterPid,
proxy_pid = ProxyPid }) ->
%% TODO: terminate limiter when transitioning to 'unlimited'
- NewLimiter = case Limiter of
- undefined ->
- LPid = rabbit_limiter:start_link(ProxyPid),
- ok = limit_queues(LPid, State),
- LPid;
- LPid ->
- LPid
- end,
- ok = rabbit_limiter:limit(NewLimiter, PrefetchCount),
- {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}};
+ NewLimiterPid = case LimiterPid of
+ undefined ->
+ LPid = rabbit_limiter:start_link(ProxyPid),
+ ok = limit_queues(LPid, State),
+ LPid;
+ LPid ->
+ LPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
@@ -792,7 +792,7 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> ok = notify_limiter(State#ch.limiter,
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
State#ch.uncommitted_ack_q),
new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
@@ -854,12 +854,12 @@ consumer_queues(Consumers) ->
%% for messages sent in a response to a basic.get
notify_limiter(undefined, _Acked) ->
ok;
-notify_limiter(Limiter, Acked) ->
+notify_limiter(LimiterPid, Acked) ->
case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
end, 0, queue:to_list(Acked)) of
0 -> ok;
- Count -> rabbit_limiter:ack(Limiter, Count)
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
is_message_persistent(#content{properties = #'P_basic'{