diff options
| author | Matthias Radestock <matthias@lshift.net> | 2008-12-24 14:57:47 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2008-12-24 14:57:47 +0000 |
| commit | 0dce1d0a71b9add835f722e38419760428b69d92 (patch) | |
| tree | e32ac67dc7a53659004ce81275d4f388d4373944 /src | |
| parent | d9dcfb42bd6763bd066d53fb2f1effc675eb14c6 (diff) | |
| download | rabbitmq-server-git-0dce1d0a71b9add835f722e38419760428b69d92.tar.gz | |
destroy limiter when a channel becomes unlimited
which results in far more efficient handling of subsequent deliveries
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 10 |
3 files changed, 38 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6bb0502d8..c49b06e5d6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -784,17 +784,20 @@ handle_cast({notify_sent, ChPid}, State) -> end)); handle_cast({limit, ChPid, LimiterPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - ok; - C = #cr{consumers = Consumers} -> - if Consumers =/= [] -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> ok - end, - store_ch_record(C#cr{limiter_pid = LimiterPid}) - end, - noreply(State). + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e7678cdf68..a4bfacbb3f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -154,6 +154,12 @@ handle_message({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), State; +handle_message({'EXIT', Pid, Reason}, State = #ch{proxy_pid = Pid}) -> + terminate(Reason, State); + +handle_message({'EXIT', _Pid, normal}, State) -> + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -431,21 +437,22 @@ handle_method(#'basic.qos'{prefetch_size = Size}, "Pre-fetch size (~s) for basic.qos not implementented", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, - _, State = #ch{ limiter_pid = undefined }) -> - {reply, #'basic.qos_ok'{}, State}; - handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{ limiter_pid = LimiterPid, proxy_pid = ProxyPid }) -> - %% TODO: terminate limiter when transitioning to 'unlimited' - NewLimiterPid = case LimiterPid of - undefined -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> LPid = rabbit_limiter:start_link(ProxyPid), ok = limit_queues(LPid, State), LPid; - LPid -> - LPid + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid end, ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6ffa8c23b1..3e09bb3756 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -72,13 +72,19 @@ shutdown(undefined) -> shutdown(LimiterPid) -> gen_server:cast(LimiterPid, shutdown). +limit(undefined, 0) -> + ok; limit(LimiterPid, PrefetchCount) -> gen_server:cast(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> true; -can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). +can_send(undefined, _QPid) -> + true; +can_send(LimiterPid, QPid) -> + rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> gen_server:call(LimiterPid, {can_send, QPid}) end). %% Let the limiter know that the channel has received some acks from a %% consumer |
