summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 13:18:34 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 13:18:34 +0000
commit96ddd6baec383e6a8f97b29497e719b68f3880f9 (patch)
treeaffcea75965227e7c916352e225252957b0fed63
parent9c957923ec5228a35f8094e20f321b46b61c70e3 (diff)
downloadrabbitmq-server-git-96ddd6baec383e6a8f97b29497e719b68f3880f9.tar.gz
Implemented the rest
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl45
3 files changed, 47 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 99fd5d76c0..285445f278 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, invoke/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,6 +107,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(invoke/2 :: (pid(), (fun ((pid()) -> any()))) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -334,6 +335,9 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+invoke(QPid, Fun) ->
+ gen_server2:cast(QPid, {invoke, Fun}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e4791f9524..269148672f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -826,7 +826,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({invoke, Fun}, State) ->
+ Fun(self()),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 48e204d84a..281cdf2712 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -45,8 +45,9 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
- most_recently_declared_queue, consumer_mapping}).
+ username, virtual_host, most_recently_declared_queue,
+ consumer_mapping, blocking
+ }).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()},
+ consumer_mapping = dict:new(),
+ blocking = dict:new()},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
+handle_cast({from_queue, QPid}, State) ->
+ {noreply, queue_blocked(QPid, State)};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -217,7 +222,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
handle_info({'EXIT', _OldLimiterPid, normal}, State) ->
{noreply, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
+ {stop, Reason, State};
+handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ {noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -333,6 +340,22 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
+ case dict:find(QPid, Blocking) of
+ error -> State;
+ {ok, MRef} -> true = erlang:demonitor(MRef),
+ Blocking1 = dict:erase(QPid, Blocking),
+ ok = case dict:size(Blocking1) =:= 0 of
+ true ->
+ rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ false ->
+ ok
+ end,
+ State#ch{blocking = Blocking1}
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -795,15 +818,21 @@ handle_method(#'channel.flow'{active = true}, _,
State#ch{limiter_pid = LimiterPid1}};
handle_method(#'channel.flow'{active = false}, _,
- State = #ch{limiter_pid = LimiterPid}) ->
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
LimiterPid1 = case LimiterPid =:= undefined of
true -> start_limiter(State);
false -> LimiterPid
end,
ok = rabbit_limiter:block(LimiterPid1),
- %% FIXME: need to go and notify the queues and not reply now
- {reply, #'channel.flow_ok'{active = false},
- State#ch{limiter_pid = LimiterPid1}};
+ Me = self(),
+ Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end,
+ Queues = [begin MRef = erlang:monitor(process, QPid),
+ rabbit_amqqueue:invoke(QPid, Fun),
+ {QPid, MRef}
+ end || QPid <- consumer_queues(Consumers)],
+ {noreply, State#ch{limiter_pid = LimiterPid1,
+ blocking = dict:from_list(Queues)}};
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
%% TODO: We may want to correlate this to channel.flow messages we