diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 20:21:50 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 20:21:50 +0000 |
| commit | f3bfe1e4ab43c8317317be4b2159b81704505247 (patch) | |
| tree | 7d2f95b74f64299d591c448931ad92315cbf3e27 /src | |
| parent | 30339e71c3f4a3b6c244ccf5195bba518b490861 (diff) | |
| download | rabbitmq-server-git-f3bfe1e4ab43c8317317be4b2159b81704505247.tar.gz | |
some documentation
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 79 |
2 files changed, 93 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eb248a4c11..17bf5c8363 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1089,6 +1089,17 @@ handle_method(#'channel.flow'{active = false}, _, false -> Limiter1 = rabbit_limiter:block(Limiter), State1 = maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1}), + %% The semantics of channel.flow{active=false} + %% require that no messages are delivered after the + %% channel.flow_ok has been sent. We accomplish that + %% by "flushing" all messages in flight from the + %% consumer queues to us. To do this we tell all the + %% queues to invoke rabbit_channel:flushed/2, which + %% will send us a {flushed, ...} message that appears + %% *after* all the {deliver, ...} messages. We keep + %% track of all the QPids thus asked, and once all of + %% them have responded (or died) we send the + %% channel.flow_ok. QPids = consumer_queues(Consumers), ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, maybe_send_flow_ok( @@ -1347,7 +1358,9 @@ consumer_queues(Consumers) -> %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) notify_limiter(Limiter, Acked) -> - case rabbit_limiter:is_limited(Limiter) of + %% optimisation: avoid the potentially expensive 'foldl' in the + %% common case. + case rabbit_limiter:is_limited(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 235c69c2a8..4059fdb089 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -14,6 +14,85 @@ %% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% +%% The purpose of the limiter is to stem the flow of messages from +%% queues to channels, in order to act upon various protocol-level +%% flow control mechanisms, specifically AMQP's basic.qos +%% prefetch_count and channel.flow. +%% +%% Each channel has an associated limiter process, created with +%% start_link/1, which it passes to queues on consumer creation with +%% rabbit_amqqueue:basic_consume/8. This process holds state that is, +%% in effect, shared between the channel and all queues from which the +%% channel is consuming. Essentially all these queues are competing +%% for access to a single, limited resource - the ability to deliver +%% messages via the channel - and it is the job of the limiter process +%% to mediate that access. +%% +%% The limiter process is separate from the channel process for two +%% reasons: separation of concerns, and efficiency. Channels can get +%% very busy, particularly if they are also dealing with publishes. +%% With a separate limiter process all the aforementioned access +%% mediation can take place without touching the channel. +%% +%% For efficiency, both the channel and the queues keep some local +%% state, initialised from the limiter pid with new/1 and client/1, +%% respectively. In particular this allows them to avoid any +%% interaction with the limiter process when it is 'inactive', i.e. no +%% protocol-level flow control is taking place. +%% +%% This optimisation does come at the cost of some complexity though: +%% when a limiter becomes active, the channel needs to inform all its +%% consumer queues of this change in status. It does this by invoking +%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse +%% transition, i.e. once a queue has been told about an active +%% limiter, it is not subsequently told when that limiter becomes +%% inactive. In practice it is rare for that to happen, though we +%% could optimise this case in the future. +%% +%% The interactions with the limiter are as follows: +%% +%% 1. Channels tell the limiter about basic.qos prefetch counts - +%% that's what the limit/3, unlimit/1, is_limited/1, get_limit/1 +%% API functions are about - and channel.flow blocking - that's +%% what block/1, unblock/1 and is_blocked/1 are for. +%% +%% 2. Queues register with the limiter - this happens as part of +%% activate/1. +%% +%% 4. The limiter process maintains an internal counter of 'messages +%% sent but not yet acknowledged', called the 'volume'. +%% +%% 5. Queues ask the limiter for permission (with can_send/2) whenever +%% they want to deliver a message to a channel. The limiter checks +%% whether a) the channel isn't blocked by channel.flow, and b) the +%% volume has not yet reached the prefetch limit. If so it +%% increments the volume and tells the queue to proceed. Otherwise +%% it marks the queue as requiring notification (see below) and +%% tells the queue not to proceed. +%% +%% 6. A queue that has told to proceed (by the return value of +%% can_send/2) sends the message to the channel. Conversely, a +%% queue that has been told not to proceed, will not attempt to +%% deliver that message, or any future messages, to the +%% channel. This is accomplished by can_send/2 capturing the +%% outcome in the local state, where it can be accessed with +%% is_suspended/1. +%% +%% 7. When a channel receives an ack it tells the limiter (via ack/2) +%% how many messages were ack'ed. The limiter process decrements +%% the volume and if it falls below the prefetch_count then it +%% notifies (through rabbit_amqqueue:resume/2) all the queues +%% requiring notification, i.e. all those that had a can_send/2 +%% request denied. +%% +%% 8. Upon receipt of such a notification, queues resume delivery to +%% the channel, i.e. they will once again start asking limiter, as +%% described in (5). +%% +%% 9. When a queues has no more consumers associated with a particular +%% channel, it unregisters with the limiter and forgets about it - +%% all via forget/1. + -module(rabbit_limiter). -behaviour(gen_server2). |
