summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-24 14:57:47 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-24 14:57:47 +0000
commit0dce1d0a71b9add835f722e38419760428b69d92 (patch)
treee32ac67dc7a53659004ce81275d4f388d4373944
parentd9dcfb42bd6763bd066d53fb2f1effc675eb14c6 (diff)
downloadrabbitmq-server-git-0dce1d0a71b9add835f722e38419760428b69d92.tar.gz
destroy limiter when a channel becomes unlimited
which results in far more efficient handling of subsequent deliveries
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_channel.erl25
-rw-r--r--src/rabbit_limiter.erl10
3 files changed, 38 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c6bb0502d8..c49b06e5d6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -784,17 +784,20 @@ handle_cast({notify_sent, ChPid}, State) ->
end));
handle_cast({limit, ChPid, LimiterPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- ok;
- C = #cr{consumers = Consumers} ->
- if Consumers =/= [] ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true -> ok
- end,
- store_ch_record(C#cr{limiter_pid = LimiterPid})
- end,
- noreply(State).
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e7678cdf68..a4bfacbb3f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -154,6 +154,12 @@ handle_message({conserve_memory, Conserve}, State) ->
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
State;
+handle_message({'EXIT', Pid, Reason}, State = #ch{proxy_pid = Pid}) ->
+ terminate(Reason, State);
+
+handle_message({'EXIT', _Pid, normal}, State) ->
+ State;
+
handle_message({'EXIT', _Pid, Reason}, State) ->
terminate(Reason, State);
@@ -431,21 +437,22 @@ handle_method(#'basic.qos'{prefetch_size = Size},
"Pre-fetch size (~s) for basic.qos not implementented",
[Size]);
-handle_method(#'basic.qos'{prefetch_count = 0},
- _, State = #ch{ limiter_pid = undefined }) ->
- {reply, #'basic.qos_ok'{}, State};
-
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
_, State = #ch{ limiter_pid = LimiterPid,
proxy_pid = ProxyPid }) ->
- %% TODO: terminate limiter when transitioning to 'unlimited'
- NewLimiterPid = case LimiterPid of
- undefined ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
LPid = rabbit_limiter:start_link(ProxyPid),
ok = limit_queues(LPid, State),
LPid;
- LPid ->
- LPid
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
end,
ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6ffa8c23b1..3e09bb3756 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -72,13 +72,19 @@ shutdown(undefined) ->
shutdown(LimiterPid) ->
gen_server:cast(LimiterPid, shutdown).
+limit(undefined, 0) ->
+ ok;
limit(LimiterPid, PrefetchCount) ->
gen_server:cast(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) -> true;
-can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}).
+can_send(undefined, _QPid) ->
+ true;
+can_send(LimiterPid, QPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () -> gen_server:call(LimiterPid, {can_send, QPid}) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer