diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 13:18:34 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 13:18:34 +0000 |
| commit | 96ddd6baec383e6a8f97b29497e719b68f3880f9 (patch) | |
| tree | affcea75965227e7c916352e225252957b0fed63 | |
| parent | 9c957923ec5228a35f8094e20f321b46b61c70e3 (diff) | |
| download | rabbitmq-server-git-96ddd6baec383e6a8f97b29497e719b68f3880f9.tar.gz | |
Implemented the rest
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 45 |
3 files changed, 47 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 99fd5d76c0..285445f278 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, invoke/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -107,6 +107,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(invoke/2 :: (pid(), (fun ((pid()) -> any()))) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -334,6 +335,9 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). +invoke(QPid, Fun) -> + gen_server2:cast(QPid, {invoke, Fun}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4791f9524..269148672f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -826,7 +826,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({invoke, Fun}, State) -> + Fun(self()), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 48e204d84a..281cdf2712 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -45,8 +45,9 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, - most_recently_declared_queue, consumer_mapping}). + username, virtual_host, most_recently_declared_queue, + consumer_mapping, blocking + }). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}, + consumer_mapping = dict:new(), + blocking = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; +handle_cast({from_queue, QPid}, State) -> + {noreply, queue_blocked(QPid, State)}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -217,7 +222,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, handle_info({'EXIT', _OldLimiterPid, normal}, State) -> {noreply, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; +handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -333,6 +340,22 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +queue_blocked(QPid, State = #ch{blocking = Blocking}) -> + case dict:find(QPid, Blocking) of + error -> State; + {ok, MRef} -> true = erlang:demonitor(MRef), + Blocking1 = dict:erase(QPid, Blocking), + ok = case dict:size(Blocking1) =:= 0 of + true -> + rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + false -> + ok + end, + State#ch{blocking = Blocking1} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -795,15 +818,21 @@ handle_method(#'channel.flow'{active = true}, _, State#ch{limiter_pid = LimiterPid1}}; handle_method(#'channel.flow'{active = false}, _, - State = #ch{limiter_pid = LimiterPid}) -> + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> LimiterPid1 = case LimiterPid =:= undefined of true -> start_limiter(State); false -> LimiterPid end, ok = rabbit_limiter:block(LimiterPid1), - %% FIXME: need to go and notify the queues and not reply now - {reply, #'channel.flow_ok'{active = false}, - State#ch{limiter_pid = LimiterPid1}}; + Me = self(), + Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end, + Queues = [begin MRef = erlang:monitor(process, QPid), + rabbit_amqqueue:invoke(QPid, Fun), + {QPid, MRef} + end || QPid <- consumer_queues(Consumers)], + {noreply, State#ch{limiter_pid = LimiterPid1, + blocking = dict:from_list(Queues)}}; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we |
