diff options
| author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 20:47:16 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 20:47:16 +0000 |
| commit | 699abf69df84406d5a56f6851353d9a608b8d860 (patch) | |
| tree | 23c59906cff7bed5746a1e39f3aec7cc93969a39 | |
| parent | fa2c6bbd15c90722180b82e92685bdb31f9e089e (diff) | |
| download | rabbitmq-server-git-699abf69df84406d5a56f6851353d9a608b8d860.tar.gz | |
deal with limiting after consumer subscription
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 38 |
3 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24ded98cc0..a345f5ab58 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). -export([unblock/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -92,6 +92,7 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid()) -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -261,6 +262,12 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c01f08df83..c6bb0502d8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -781,7 +781,20 @@ handle_cast({notify_sent, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)). + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + case lookup_ch(ChPid) of + not_found -> + ok; + C = #cr{consumers = Consumers} -> + if Consumers =/= [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> ok + end, + store_ch_record(C#cr{limiter_pid = LimiterPid}) + end, + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 001fa4af75..51e550ed15 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -439,11 +439,11 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% TODO: terminate limiter when transitioning to 'unlimited' NewLimiter = case Limiter of undefined -> - %% TODO: tell queues with subscribers about - %% the limiter - rabbit_limiter:start_link(ProxyPid); - Pid -> - Pid + LPid = rabbit_limiter:start_link(ProxyPid), + ok = limit_queues(LPid, State), + LPid; + LPid -> + LPid end, ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; @@ -832,18 +832,22 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid). + +limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those |
