summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-03-03 17:58:51 +0000
committerMatthias Radestock <matthias@lshift.net>2010-03-03 17:58:51 +0000
commitf53aee7bf6cc84caf3521c3945b87666c6574111 (patch)
treefd3c0180d5b72ed9f712ed51d130ca86314a61e0
parenta0e433961a44679244dbe3817b2c4abdba244be6 (diff)
parent7a17fac3276320fd9b49021c1d9bd81dd7dd7203 (diff)
downloadrabbitmq-server-git-f53aee7bf6cc84caf3521c3945b87666c6574111.tar.gz
merge bug22423 into default
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl101
-rw-r--r--src/rabbit_limiter.erl77
4 files changed, 149 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 99fd5d76c0..31787466bb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,6 +107,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -334,6 +335,12 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+flush_all(QPids, ChPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
+ QPids).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e4791f9524..19cb5c711f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -826,7 +826,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({flush, ChPid}, State) ->
+ ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a56955cb96..41085fb738 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/5, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2]).
+-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3,
@@ -45,8 +45,8 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
- most_recently_declared_queue, consumer_mapping}).
+ username, virtual_host, most_recently_declared_queue,
+ consumer_mapping, blocking}).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -77,6 +77,7 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -112,6 +113,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
+flushed(Pid, QPid) ->
+ gen_server2:cast(Pid, {flushed, QPid}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -152,7 +156,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()},
+ consumer_mapping = dict:new(),
+ blocking = dict:new()},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -190,6 +195,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
+handle_cast({flushed, QPid}, State) ->
+ {noreply, queue_blocked(QPid, State)};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -215,7 +223,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
+ {stop, Reason, State};
+handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ {noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -331,6 +341,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
+ case dict:find(QPid, Blocking) of
+ error -> State;
+ {ok, MRef} -> true = erlang:demonitor(MRef),
+ Blocking1 = dict:erase(QPid, Blocking),
+ ok = case dict:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ State#ch{blocking = Blocking1}
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -540,25 +564,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid,
- unacked_message_q = UAMQ }) ->
- NewLimiterPid = case {LimiterPid, PrefetchCount} of
- {undefined, 0} ->
- undefined;
- {undefined, _} ->
- LPid = rabbit_limiter:start_link(self(),
- queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid;
- {_, 0} ->
- ok = rabbit_limiter:shutdown(LimiterPid),
- ok = limit_queues(undefined, State),
- undefined;
- {_, _} ->
- LimiterPid
- end,
- ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
+ _, State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} -> undefined;
+ {undefined, _} -> start_limiter(State);
+ {_, _} -> LimiterPid
+ end,
+ LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
+ ok -> LimiterPid1;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
@@ -791,9 +807,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'channel.flow'{active = _}, _, State) ->
- %% FIXME: implement
- {reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow'{active = true}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
+ ok -> LimiterPid;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'channel.flow_ok'{active = true},
+ State#ch{limiter_pid = LimiterPid1}};
+
+handle_method(#'channel.flow'{active = false}, _,
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
+ LimiterPid1 = case LimiterPid of
+ undefined -> start_limiter(State);
+ Other -> Other
+ end,
+ ok = rabbit_limiter:block(LimiterPid1),
+ QPids = consumer_queues(Consumers),
+ Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ case Queues of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State};
+ _ -> {noreply, State#ch{limiter_pid = LimiterPid1,
+ blocking = dict:from_list(Queues)}}
+ end;
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
%% TODO: We may want to correlate this to channel.flow messages we
@@ -940,9 +978,18 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
+start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
+ LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ ok = limit_queues(LPid, State),
+ LPid.
+
notify_queues(#ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+unlimit_queues(State) ->
+ ok = limit_queues(undefined, State),
+ undefined.
+
limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c9f8183fc9..7d84086108 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1]).
+-export([get_limit/1, block/1, unblock/1]).
%%----------------------------------------------------------------------------
@@ -47,12 +47,14 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-endif.
@@ -60,6 +62,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ blocked = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
- unlink(LimiterPid),
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -113,6 +117,17 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
+ ok;
+unblock(LimiterPid) ->
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -120,29 +135,45 @@ get_limit(Pid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+handle_call({can_send, _QPid, _AckRequired}, _From,
+ State = #lim{blocked = true}) ->
+ {reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State}.
+ {reply, PrefetchCount, State};
+
+handle_call({limit, PrefetchCount}, _From, State) ->
+ case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end;
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ case maybe_notify(State, State#lim{blocked = false}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end.
handle_cast(shutdown, State) ->
{stop, normal, State};
-handle_cast({limit, PrefetchCount}, State) ->
- {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -164,14 +195,21 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case limit_reached(OldState) andalso not(limit_reached(NewState)) of
- true -> notify_queues(NewState);
- false -> NewState
+ case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ true -> NewState1 = notify_queues(NewState),
+ {case NewState1#lim.prefetch_count of
+ 0 -> stop;
+ _ -> cont
+ end, NewState1};
+ false -> {cont, NewState}
end.
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
+is_blocked(#lim{blocked = Blocked}) -> Blocked.
+
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
@@ -209,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ true = unlink(LimiterPid),
+ ok = receive {'EXIT', LimiterPid, _Reason} -> ok
+ after 0 -> ok
+ end,
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.