diff options
| author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:28:22 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:28:22 +0000 |
| commit | fa2c6bbd15c90722180b82e92685bdb31f9e089e (patch) | |
| tree | 2b7a497e23759e73b38018f3bd0970ad7378e368 /src | |
| parent | b231622c007710b6bed38804b08d7c19aa1c7e62 (diff) | |
| download | rabbitmq-server-git-fa2c6bbd15c90722180b82e92685bdb31f9e089e.tar.gz | |
create limiter lazily
This makes an 'unlimited' channel as efficient as it used to be
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 2 |
2 files changed, 22 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index af1923a79b..001fa4af75 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -108,8 +108,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(ProxyPid), + limiter = undefined, consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -430,11 +429,25 @@ handle_method(#'basic.qos'{prefetch_size = Size}, "Pre-fetch size (~s) for basic.qos not implementented", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> - ok = rabbit_limiter:limit(Limiter, PrefetchCount), +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{ limiter = undefined }) -> {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter = Limiter, + proxy_pid = ProxyPid }) -> + %% TODO: terminate limiter when transitioning to 'unlimited' + NewLimiter = case Limiter of + undefined -> + %% TODO: tell queues with subscribers about + %% the limiter + rabbit_limiter:start_link(ProxyPid); + Pid -> + Pid + end, + ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; + handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, proxy_pid = ProxyPid, @@ -835,6 +848,8 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those %% for messages sent in a response to a basic.get +notify_limiter(undefined, _Acked) -> + ok; notify_limiter(Limiter, Acked) -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index f1a45415ba..824de0724f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -70,6 +70,8 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit +can_send(undefined, _QPid) -> + true; can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). |
