summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2015-09-25 15:39:36 +0200
committerJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2015-09-25 15:39:36 +0200
commit31a4b251e4ce303458031d09f29730a8c27aff6d (patch)
tree1f56568295e4c79743ffc7bf3ed914170ccfdee6
parent3136aa25752542dfdbc7af3f77d8a66eb8d5d844 (diff)
parentb2a63a047c598fccd2fe020c7128e5344efbcec7 (diff)
downloadrabbitmq-server-git-31a4b251e4ce303458031d09f29730a8c27aff6d.tar.gz
Merge pull request #331 from rabbitmq/rabbitmq-server-330
Adds documentation for credit_flow usage
-rw-r--r--src/credit_flow.erl18
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl27
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_msg_store.erl11
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--src/rabbit_variable_queue.erl4
8 files changed, 80 insertions, 3 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 1271819c5d..b9547cff1c 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -27,6 +27,24 @@
%% receiver it will not grant any more credit to its senders when it
%% is itself blocked - thus the only processes that need to check
%% blocked/0 are ones that read from network sockets.
+%%
+%% Credit flows left to right when process send messags down the
+%% chain, starting at the rabbit_reader, ending at the msg_store:
+%% reader -> channel -> queue_process -> msg_store.
+%%
+%% If the message store has a back log, then it will block the
+%% queue_process, which will block the channel, and finally the reader
+%% will be blocked, throttling down publishers.
+%%
+%% Once a process is unblocked, it will grant credits up the chain,
+%% possibly unblocking other processes:
+%% reader <--grant channel <--grant queue_process <--grant msg_store.
+%%
+%% Grepping the project files for `credit_flow` will reveal the places
+%% where this module is currently used, with extra comments on what's
+%% going on at each instance. Note that credit flow between mirrors
+%% synchronization has not been documented, since this doesn't affect
+%% client publishes.
-define(DEFAULT_INITIAL_CREDIT, 200).
-define(DEFAULT_MORE_CREDIT_AFTER, 50).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5bfa006e09..f6cc0fbdda 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -873,6 +873,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
%% the slave receives the message direct from the channel, and the
%% other when it receives it via GM.
case Flow of
+ %% Here we are tracking messages sent by the rabbit_channel
+ %% process. We are accessing the rabbit_channel process
+ %% dictionary.
flow -> [credit_flow:send(QPid) || QPid <- QPids],
[credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c5e4206fe3..999e66aee3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -662,9 +662,21 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
- false -> Senders;
- true -> credit_flow:peer_down(DownPid),
- pmon:demonitor(DownPid, Senders)
+ false ->
+ Senders;
+ true ->
+ %% A rabbit_channel process died. Here credit_flow will take care
+ %% of cleaning up the rabbit_amqqueue_process process dictionary
+ %% with regards to the credit we were tracking for the channel
+ %% process. See handle_cast({deliver, Deliver}, State) in this
+ %% module. In that cast function we process deliveries from the
+ %% channel, which means we credit_flow:ack/1 said
+ %% messages. credit_flow:ack'ing messages means we are increasing
+ %% a counter to know when we need to send MoreCreditAfter. Since
+ %% the process died, the credit_flow flow module will clean up
+ %% that for us.
+ credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
end},
case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
@@ -1110,6 +1122,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
flow = Flow}, SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
+ %% In both credit_flow:ack/1 we are acking messages to the channel
+ %% process that sent us the message delivery. See handle_ch_down
+ %% for more info.
flow -> credit_flow:ack(Sender),
case SlaveWhenPublished of
true -> credit_flow:ack(Sender); %% [0]
@@ -1289,6 +1304,12 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ %% The message_store is granting us more credit. This means the
+ %% backing queue (for the rabbit_variable_queue case) might
+ %% continue paging messages to disk if it still needs to. We
+ %% consume credits from the message_store whenever we need to
+ %% persist a message to disk. See:
+ %% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 489f7b3469..b23a8410c5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -133,6 +133,8 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content, noflow}).
do_flow(Pid, Method, Content) ->
+ %% Here we are tracking messages sent by the rabbit_reader
+ %% process. We are accessing the rabbit_reader process dictionary.
credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
@@ -327,6 +329,9 @@ handle_cast({method, Method, Content, Flow},
State = #ch{reader_pid = Reader,
virtual_host = VHost}) ->
case Flow of
+ %% We are going to process a message from the rabbit_reader
+ %% process, so here we ack it. In this case we are accessing
+ %% the rabbit_channel process dictionary.
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
@@ -435,6 +440,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
handle_info({bump_credit, Msg}, State) ->
+ %% A rabbit_amqqueue_process is granting credit to our channel. If
+ %% our channel was being blocked by this process, and no other
+ %% process is blocking our channel, then this channel will be
+ %% unblocked. This means that any credit that was deferred will be
+ %% sent to rabbit_reader processs that might be blocked by this
+ %% particular channel.
credit_flow:handle_bump_msg(Msg),
noreply(State);
@@ -452,6 +463,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
+ %% A rabbit_amqqueue_process has died. If our channel was being
+ %% blocked by this process, and no other process is blocking our
+ %% channel, then this channel will be unblocked. This means that
+ %% any credit that was deferred will be sent to the rabbit_reader
+ %% processs that might be blocked by this particular channel.
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
case dict:find(QPid, QNames) of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 168e71170b..7f309ab0b7 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -257,6 +257,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
+ %% We are acking messages to the channel process that sent us
+ %% the message delivery. See
+ %% rabbit_amqqueue_process:handle_ch_down for more info.
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d0969f1b0e..8909484984 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -475,6 +475,10 @@ write_flow(MsgId, Msg,
CState = #client_msstate {
server = Server,
credit_disc_bound = CreditDiscBound }) ->
+ %% Here we are tracking messages sent by the
+ %% rabbit_amqqueue_process process via the
+ %% rabbit_variable_queue. We are accessing the
+ %% rabbit_amqqueue_process process dictionary.
credit_flow:send(whereis(Server), CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
@@ -829,6 +833,9 @@ handle_cast({write, CRef, MsgId, Flow},
credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ %% We are going to process a message sent by the
+ %% rabbit_amqqueue_process. Now we are accessing the
+ %% msg_store process dictionary.
credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
@@ -904,6 +911,10 @@ handle_info(timeout, State) ->
noreply(internal_sync(State));
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
+ %% similar to what happens in
+ %% rabbit_amqqueue_process:handle_ch_down but with a relation of
+ %% msg_store -> rabbit_amqqueue_process instead of
+ %% rabbit_amqqueue_process -> rabbit_channel.
credit_flow:peer_down(Pid),
noreply(State);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d296c41344..8812e1d0e1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -478,6 +478,7 @@ handle_other(ensure_stats, State) ->
handle_other(emit_stats, State) ->
emit_stats(State);
handle_other({bump_credit, Msg}, State) ->
+ %% Here we are receiving credit by some channel process.
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
handle_other(Other, State) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 995282b344..f50c1bde7e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2115,6 +2115,10 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
TargetRamCount >= RamMsgCount ->
{Quota, ui(State)};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ %% We consume credits from the message_store whenever we need to
+ %% persist a message to disk. See:
+ %% rabbit_variable_queue:msg_store_write/4. So perhaps the
+ %% msg_store is trying to throttle down our queue.
case credit_flow:blocked() of
true -> {Quota, ui(State)};
false -> case Generator(Q) of