diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-20 15:05:58 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-20 15:05:58 +0000 |
| commit | bdb5239b384f266424c6f3e1dbad4f28f49fc801 (patch) | |
| tree | b68a76385eb71630381a4e318268447af9e141ae /src | |
| parent | f3bfe1e4ab43c8317317be4b2159b81704505247 (diff) | |
| download | rabbitmq-server-git-bdb5239b384f266424c6f3e1dbad4f28f49fc801.tar.gz | |
always hold a valid limiter in queue's channel records
This is less fiddly, but does mean we have to pass the limiter pid in
basic_get.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 |
5 files changed, 48 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index be8ab38513..3f0a7f9c41 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([basic_get/4, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2]). -export([on_node_down/1]). @@ -145,7 +145,7 @@ -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> +-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), @@ -540,8 +540,8 @@ notify_down_all(QPids, ChPid) -> activate_limit_all(QPids, ChPid) -> delegate:cast(QPids, {activate_limit, ChPid}). -basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:call(QPid, {basic_get, ChPid, NoAck}). +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> + delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, OkMsg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d9264736d0..6fc79dca81 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -361,16 +361,17 @@ lookup_ch(ChPid) -> C -> C end. -ch_record(ChPid) -> +ch_record(ChPid, LimiterPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - limiter = undefined, + limiter = Limiter, unsent_message_count = 0}, put(Key, C), C; @@ -401,9 +402,7 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT - orelse (Limiter =/= undefined andalso - rabbit_limiter:is_suspended(Limiter)). + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; @@ -1090,7 +1089,7 @@ handle_call({notify_down, ChPid}, From, State) -> {stop, State1} -> stop(From, ok, State1) end; -handle_call({basic_get, ChPid, NoAck}, _From, +handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), @@ -1100,7 +1099,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag}, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), ChAckTags1 = queue:in(AckTag, ChAckTags), update_ch_record(C#cr{acktags = ChAckTags1}), State2; @@ -1118,11 +1118,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumer_count = Count, - limiter = Limiter0} = ch_record(ChPid), - Limiter = case Limiter0 of - undefined -> rabbit_limiter:client(LimiterPid); - _ -> Limiter0 - end, + limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of true -> rabbit_limiter:activate(Limiter); false -> Limiter @@ -1155,7 +1151,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), Limiter1 = case Count of - 1 -> rabbit_limiter:forget(Limiter); + 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, update_ch_record(C#cr{consumer_count = Count - 1, @@ -1291,8 +1287,7 @@ handle_cast(delete_immediately, State) -> handle_cast({resume, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, - fun (C = #cr{limiter = undefined}) -> C; - (C = #cr{limiter = Limiter}) -> + fun (C = #cr{limiter = Limiter}) -> C#cr{limiter = rabbit_limiter:resume(Limiter)} end)); @@ -1305,12 +1300,10 @@ handle_cast({notify_sent, ChPid, Credit}, State) -> handle_cast({activate_limit, ChPid}, State) -> noreply( - possibly_unblock( - State, ChPid, - fun (C = #cr{limiter = Limiter, consumer_count = Count}) -> - true = Limiter =/= undefined andalso Count =/= 0, %% assertion - C#cr{limiter = rabbit_limiter:activate(Limiter)} - end)); + possibly_unblock(State, ChPid, + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 17bf5c8363..67cabcfbfb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -676,12 +676,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, + limiter = Limiter, next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get( + Q, self(), NoAck, rabbit_limiter:pid(Limiter)) + end) of {ok, MessageCount, Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4059fdb089..b914306bd4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -21,12 +21,17 @@ %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with -%% rabbit_amqqueue:basic_consume/8. This process holds state that is, -%% in effect, shared between the channel and all queues from which the -%% channel is consuming. Essentially all these queues are competing -%% for access to a single, limited resource - the ability to deliver -%% messages via the channel - and it is the job of the limiter process -%% to mediate that access. +%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4. +%% The latter isn't strictly necessary, since basic.get is not +%% subject to limiting, but it means that whenever a queue knows about +%% a channel, it also knows about its limiter, which is less fiddly. +%% +%% Th limiter process holds state that is, in effect, shared between +%% the channel and all queues from which the channel is +%% consuming. Essentially all these queues are competing for access to +%% a single, limited resource - the ability to deliver messages via +%% the channel - and it is the job of the limiter process to mediate +%% that access. %% %% The limiter process is separate from the channel process for two %% reasons: separation of concerns, and efficiency. Channels can get @@ -90,8 +95,10 @@ %% described in (5). %% %% 9. When a queues has no more consumers associated with a particular -%% channel, it unregisters with the limiter and forgets about it - -%% all via forget/1. +%% channel, it deactivates use of the limiter with deactivate/1, +%% which alters the local state such that no further interactions +%% with the limiter process take place until a subsequent +%% activate/1. -module(rabbit_limiter). @@ -102,7 +109,8 @@ -export([new/1, limit/3, unlimit/1, block/1, unblock/1, is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]). %% queue API --export([client/1, activate/1, can_send/2, resume/1, forget/1, is_suspended/1]). +-export([client/1, activate/1, can_send/2, resume/1, deactivate/1, + is_suspended/1]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). @@ -140,7 +148,7 @@ -spec(can_send/2 :: (qstate(), boolean()) -> {'continue' | 'suspend', qstate()}). -spec(resume/1 :: (qstate()) -> qstate()). --spec(forget/1 :: (qstate()) -> undefined). +-spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -endif. @@ -219,10 +227,10 @@ can_send(L, _AckRequired) -> {continue, L}. resume(L) -> L#qstate{state = active}. -forget(#qstate{state = dormant}) -> undefined; -forget(L) -> +deactivate(L = #qstate{state = dormant}) -> L; +deactivate(L) -> ok = gen_server:cast(L#qstate.pid, {unregister, self()}), - undefined. + L#qstate{state = dormant}. is_suspended(#qstate{state = suspended}) -> true; is_suspended(#qstate{}) -> false. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b67be54404..d1ae38bef8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2718,12 +2718,13 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), + {ok, Limiter} = rabbit_limiter:start_link(), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false), + rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = @@ -2744,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), + {ok, Limiter} = rabbit_limiter:start_link(), + CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true), + rabbit_amqqueue:basic_get(Q, self(), true, Limiter), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg |
