diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 13:38:27 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 13:38:27 +0000 |
| commit | cbbe139eaf0de198b54ffb56f207a9cc2f37d4b4 (patch) | |
| tree | dc43d81d1f8ea2d71b96d5d78623baffc9b96598 /src | |
| parent | 8614b6e32381b6b499483d689afd4db6c9bc3856 (diff) | |
| download | rabbitmq-server-git-cbbe139eaf0de198b54ffb56f207a9cc2f37d4b4.tar.gz | |
limiter API revision - part 2/2 - queue-side API
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 87 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 19 |
5 files changed, 123 insertions, 108 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd5de23923..be8ab38513 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([notify_down_all/2, activate_limit_all/2]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -144,19 +144,18 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:lstate()) -> - ok_or_errors()). +-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) +-spec(basic_consume/8 :: + (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(resume/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | @@ -538,16 +537,16 @@ notify_down_all(QPids, ChPid) -> Bads1 -> {error, Bads1} end. -limit_all(QPids, ChPid, Limiter) -> - delegate:cast(QPids, {limit, ChPid, Limiter}). +activate_limit_all(QPids, ChPid) -> + delegate:cast(QPids, {activate_limit, ChPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). + delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -569,7 +568,7 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). +resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0ddc9eba11..37daa0dfca 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -68,7 +68,6 @@ consumer_count, blocked_consumers, limiter, - is_limit_active, unsent_message_count}). %%---------------------------------------------------------------------------- @@ -371,7 +370,6 @@ ch_record(ChPid) -> acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - is_limit_active = false, limiter = undefined, unsent_message_count = 0}, put(Key, C), @@ -392,30 +390,18 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), ok. -erase_ch_record(#cr{ch_pid = ChPid, - limiter = Limiter, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(Limiter), +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. -update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> - ok = rabbit_limiter:register(Limiter), - update_ch_record(C#cr{consumer_count = 1}); -update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> - ok = rabbit_limiter:unregister(Limiter), - update_ch_record(C#cr{consumer_count = 0, limiter = undefined}); -update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> - update_ch_record(C#cr{consumer_count = Count + Delta}). - all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). -is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). ch_record_state_transition(OldCR, NewCR) -> case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of @@ -439,18 +425,20 @@ deliver_msgs_to_consumers(DeliverFun, false, end. deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> - C = ch_record(ChPid), + C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + {false, State}; + {continue, Limiter} -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Limiter}, + State#q{active_consumers = AC1}) end end. @@ -1127,15 +1115,25 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, BQ:len(BQS), Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, Limiter, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = ch_record(ChPid), - update_consumer_count(C#cr{limiter = Limiter}, +1), + C = #cr{consumer_count = Count, + limiter = Limiter0} = ch_record(ChPid), + Limiter = case Limiter0 of + undefined -> rabbit_limiter:client(LimiterPid); + _ -> Limiter0 + end, + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter1}), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1156,10 +1154,18 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + Limiter1 = case Count of + 1 -> rabbit_limiter:forget(Limiter); + _ -> Limiter + end, + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter1, + blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1287,10 +1293,13 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> handle_cast(delete_immediately, State) -> stop(State); -handle_cast({unblock, ChPid}, State) -> +handle_cast({resume, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); + fun (C = #cr{limiter = undefined}) -> C; + (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end)); handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( @@ -1299,20 +1308,13 @@ handle_cast({notify_sent, ChPid, Credit}, State) -> C#cr{unsent_message_count = Count - Credit} end)); -handle_cast({limit, ChPid, Limiter}, State) -> +handle_cast({activate_limit, ChPid}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter = OldLimiter, - is_limit_active = OldLimited}) -> - case (ConsumerCount =/= 0 andalso - not rabbit_limiter:is_active(OldLimiter)) of - true -> ok = rabbit_limiter:register(Limiter); - false -> ok - end, - Limited = OldLimited andalso rabbit_limiter:is_active(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} + fun (C = #cr{limiter = Limiter, consumer_count = Count}) -> + true = Limiter =/= undefined andalso Count =/= 0, %% assertion + C#cr{limiter = rabbit_limiter:activate(Limiter)} end)); handle_cast({flush, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cda5747ad4..005200f84c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -81,8 +81,8 @@ -spec(start_link/11 :: (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), - pid(), pid()) -> rabbit_types:ok_pid_or_error()). + rabbit_framing:amqp_table(), pid(), pid()) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -728,7 +728,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), Limiter, + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -1326,7 +1328,7 @@ maybe_limit_queues(OldLimiter, NewLimiter, State) -> case ((not rabbit_limiter:is_active(OldLimiter)) andalso rabbit_limiter:is_active(NewLimiter)) of true -> Queues = consumer_queues(State#ch.consumer_mapping), - rabbit_amqqueue:limit_all(Queues, self(), NewLimiter); + rabbit_amqqueue:activate_limit_all(Queues, self()); false -> ok end, State. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index ae6563286e..235c69c2a8 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -19,24 +19,27 @@ -behaviour(gen_server2). -export([start_link/0]). +%% channel API -export([new/1, limit/3, unlimit/1, block/1, unblock/1, - is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2]). --export([can_send/2, register/1, unregister/1]). - + is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]). +%% queue API +-export([client/1, activate/1, can_send/2, resume/1, forget/1, is_suspended/1]). +%% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). %%---------------------------------------------------------------------------- -record(lstate, {pid, limited, blocked}). +-record(qstate, {pid, state}). -ifdef(use_specs). --export_type([lstate/0]). - --opaque(lstate() :: #lstate {pid :: pid(), - limited :: boolean(), - blocked :: boolean()}). +-type(lstate() :: #lstate{pid :: pid(), + limited :: boolean(), + blocked :: boolean()}). +-type(qstate() :: #qstate{pid :: pid(), + state :: 'dormant' | 'active' | 'suspended'}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). @@ -51,10 +54,15 @@ -spec(is_active/1 :: (lstate()) -> boolean()). -spec(get_limit/1 :: (lstate()) -> non_neg_integer()). -spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). +-spec(pid/1 :: (lstate()) -> pid()). --spec(can_send/2 :: (lstate(), boolean()) -> boolean()). --spec(register/1 :: (lstate()) -> 'ok'). --spec(unregister/1 :: (lstate()) -> 'ok'). +-spec(client/1 :: (pid()) -> qstate()). +-spec(activate/1 :: (qstate()) -> qstate()). +-spec(can_send/2 :: (qstate(), boolean()) -> + {'continue' | 'suspend', qstate()}). +-spec(resume/1 :: (qstate()) -> qstate()). +-spec(forget/1 :: (qstate()) -> undefined). +-spec(is_suspended/1 :: (qstate()) -> boolean()). -endif. @@ -108,29 +116,37 @@ get_limit(L) -> gen_server:call(L#lstate.pid, get_limit). ack(#lstate{limited = false}, _AckCount) -> ok; ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. -can_send(L, AckRequired) -> - case is_active(L) of - false -> true; - true -> rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> Msg = {can_send, self(), AckRequired}, - gen_server2:call(L#lstate.pid, Msg, infinity) - end) - end. +pid(#lstate{pid = Pid}) -> Pid. -register(L) -> - case is_active(L) of - false -> ok; - true -> gen_server:cast(L#lstate.pid, {register, self()}) - end. +client(Pid) -> #qstate{pid = Pid, state = dormant}. -unregister(L) -> - case is_active(L) of - false -> ok; - true -> gen_server:cast(L#lstate.pid, {unregister, self()}) - end. +activate(L = #qstate{state = dormant}) -> + ok = gen_server:cast(L#qstate.pid, {register, self()}), + L#qstate{state = active}; +activate(L) -> L. + +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit. +can_send(L = #qstate{state = active}, AckRequired) -> + rabbit_misc:with_exit_handler( + fun () -> {continue, L} end, + fun () -> Msg = {can_send, self(), AckRequired}, + case gen_server2:call(L#qstate.pid, Msg, infinity) of + true -> {continue, L}; + false -> {suspend, L#qstate{state = suspended}} + end + end); +can_send(L, _AckRequired) -> {continue, L}. + +resume(L) -> L#qstate{state = active}. + +forget(#qstate{state = dormant}) -> undefined; +forget(L) -> + ok = gen_server:cast(L#qstate.pid, {unregister, self()}), + undefined. + +is_suspended(#qstate{state = suspended}) -> true; +is_suspended(#qstate{}) -> false. %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -220,10 +236,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{queues = Queues}) -> case orddict:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = orddict:erase(QPid, Queues)}; error -> State end. @@ -240,13 +255,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; - 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case + 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3] + [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3] || L3 <- [L2, L1]], ok end, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1188c5549a..31a56ac80c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1100,20 +1100,14 @@ test_policy_validation() -> test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun test_writer/0), - {ok, Ch} = rabbit_channel:start_link( - 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, - user(<<"user">>), <<"/">>, [], self(), - rabbit_limiter:make_token(self())), + {_Writer, Limiter, Ch} = test_channel(), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, rabbit_limiter:make_token(), - <<"ctag">>, true, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1191,8 +1185,6 @@ find_listener() -> N =:= node()], {H, P}. -test_writer() -> test_writer(none). - test_writer(Pid) -> receive {'$gen_call', From, flush} -> gen_server:reply(From, ok), @@ -1202,13 +1194,18 @@ test_writer(Pid) -> shutdown -> ok end. -test_spawn() -> +test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), + {ok, Limiter} = rabbit_limiter:start_link(), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, rabbit_limiter:make_token(self())), + {Writer, Limiter, Ch}. + +test_spawn() -> + {Writer, _Limiter, Ch} = test_channel(), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) |
