summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-19 20:21:50 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-19 20:21:50 +0000
commitf3bfe1e4ab43c8317317be4b2159b81704505247 (patch)
tree7d2f95b74f64299d591c448931ad92315cbf3e27 /src
parent30339e71c3f4a3b6c244ccf5195bba518b490861 (diff)
downloadrabbitmq-server-git-f3bfe1e4ab43c8317317be4b2159b81704505247.tar.gz
some documentation
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl15
-rw-r--r--src/rabbit_limiter.erl79
2 files changed, 93 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eb248a4c11..17bf5c8363 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1089,6 +1089,17 @@ handle_method(#'channel.flow'{active = false}, _,
false -> Limiter1 = rabbit_limiter:block(Limiter),
State1 = maybe_limit_queues(Limiter, Limiter1,
State#ch{limiter = Limiter1}),
+ %% The semantics of channel.flow{active=false}
+ %% require that no messages are delivered after the
+ %% channel.flow_ok has been sent. We accomplish that
+ %% by "flushing" all messages in flight from the
+ %% consumer queues to us. To do this we tell all the
+ %% queues to invoke rabbit_channel:flushed/2, which
+ %% will send us a {flushed, ...} message that appears
+ %% *after* all the {deliver, ...} messages. We keep
+ %% track of all the QPids thus asked, and once all of
+ %% them have responded (or died) we send the
+ %% channel.flow_ok.
QPids = consumer_queues(Consumers),
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, maybe_send_flow_ok(
@@ -1347,7 +1358,9 @@ consumer_queues(Consumers) ->
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
notify_limiter(Limiter, Acked) ->
- case rabbit_limiter:is_limited(Limiter) of
+ %% optimisation: avoid the potentially expensive 'foldl' in the
+ %% common case.
+ case rabbit_limiter:is_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 235c69c2a8..4059fdb089 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -14,6 +14,85 @@
%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
+%% The purpose of the limiter is to stem the flow of messages from
+%% queues to channels, in order to act upon various protocol-level
+%% flow control mechanisms, specifically AMQP's basic.qos
+%% prefetch_count and channel.flow.
+%%
+%% Each channel has an associated limiter process, created with
+%% start_link/1, which it passes to queues on consumer creation with
+%% rabbit_amqqueue:basic_consume/8. This process holds state that is,
+%% in effect, shared between the channel and all queues from which the
+%% channel is consuming. Essentially all these queues are competing
+%% for access to a single, limited resource - the ability to deliver
+%% messages via the channel - and it is the job of the limiter process
+%% to mediate that access.
+%%
+%% The limiter process is separate from the channel process for two
+%% reasons: separation of concerns, and efficiency. Channels can get
+%% very busy, particularly if they are also dealing with publishes.
+%% With a separate limiter process all the aforementioned access
+%% mediation can take place without touching the channel.
+%%
+%% For efficiency, both the channel and the queues keep some local
+%% state, initialised from the limiter pid with new/1 and client/1,
+%% respectively. In particular this allows them to avoid any
+%% interaction with the limiter process when it is 'inactive', i.e. no
+%% protocol-level flow control is taking place.
+%%
+%% This optimisation does come at the cost of some complexity though:
+%% when a limiter becomes active, the channel needs to inform all its
+%% consumer queues of this change in status. It does this by invoking
+%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse
+%% transition, i.e. once a queue has been told about an active
+%% limiter, it is not subsequently told when that limiter becomes
+%% inactive. In practice it is rare for that to happen, though we
+%% could optimise this case in the future.
+%%
+%% The interactions with the limiter are as follows:
+%%
+%% 1. Channels tell the limiter about basic.qos prefetch counts -
+%% that's what the limit/3, unlimit/1, is_limited/1, get_limit/1
+%% API functions are about - and channel.flow blocking - that's
+%% what block/1, unblock/1 and is_blocked/1 are for.
+%%
+%% 2. Queues register with the limiter - this happens as part of
+%% activate/1.
+%%
+%% 4. The limiter process maintains an internal counter of 'messages
+%% sent but not yet acknowledged', called the 'volume'.
+%%
+%% 5. Queues ask the limiter for permission (with can_send/2) whenever
+%% they want to deliver a message to a channel. The limiter checks
+%% whether a) the channel isn't blocked by channel.flow, and b) the
+%% volume has not yet reached the prefetch limit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
+%%
+%% 6. A queue that has told to proceed (by the return value of
+%% can_send/2) sends the message to the channel. Conversely, a
+%% queue that has been told not to proceed, will not attempt to
+%% deliver that message, or any future messages, to the
+%% channel. This is accomplished by can_send/2 capturing the
+%% outcome in the local state, where it can be accessed with
+%% is_suspended/1.
+%%
+%% 7. When a channel receives an ack it tells the limiter (via ack/2)
+%% how many messages were ack'ed. The limiter process decrements
+%% the volume and if it falls below the prefetch_count then it
+%% notifies (through rabbit_amqqueue:resume/2) all the queues
+%% requiring notification, i.e. all those that had a can_send/2
+%% request denied.
+%%
+%% 8. Upon receipt of such a notification, queues resume delivery to
+%% the channel, i.e. they will once again start asking limiter, as
+%% described in (5).
+%%
+%% 9. When a queues has no more consumers associated with a particular
+%% channel, it unregisters with the limiter and forgets about it -
+%% all via forget/1.
+
-module(rabbit_limiter).
-behaviour(gen_server2).