diff options
| author | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2015-09-25 15:39:36 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2015-09-25 15:39:36 +0200 |
| commit | 31a4b251e4ce303458031d09f29730a8c27aff6d (patch) | |
| tree | 1f56568295e4c79743ffc7bf3ed914170ccfdee6 | |
| parent | 3136aa25752542dfdbc7af3f77d8a66eb8d5d844 (diff) | |
| parent | b2a63a047c598fccd2fe020c7128e5344efbcec7 (diff) | |
| download | rabbitmq-server-git-31a4b251e4ce303458031d09f29730a8c27aff6d.tar.gz | |
Merge pull request #331 from rabbitmq/rabbitmq-server-330
Adds documentation for credit_flow usage
| -rw-r--r-- | src/credit_flow.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
8 files changed, 80 insertions, 3 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 1271819c5d..b9547cff1c 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -27,6 +27,24 @@ %% receiver it will not grant any more credit to its senders when it %% is itself blocked - thus the only processes that need to check %% blocked/0 are ones that read from network sockets. +%% +%% Credit flows left to right when process send messags down the +%% chain, starting at the rabbit_reader, ending at the msg_store: +%% reader -> channel -> queue_process -> msg_store. +%% +%% If the message store has a back log, then it will block the +%% queue_process, which will block the channel, and finally the reader +%% will be blocked, throttling down publishers. +%% +%% Once a process is unblocked, it will grant credits up the chain, +%% possibly unblocking other processes: +%% reader <--grant channel <--grant queue_process <--grant msg_store. +%% +%% Grepping the project files for `credit_flow` will reveal the places +%% where this module is currently used, with extra comments on what's +%% going on at each instance. Note that credit flow between mirrors +%% synchronization has not been documented, since this doesn't affect +%% client publishes. -define(DEFAULT_INITIAL_CREDIT, 200). -define(DEFAULT_MORE_CREDIT_AFTER, 50). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5bfa006e09..f6cc0fbdda 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -873,6 +873,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) -> %% the slave receives the message direct from the channel, and the %% other when it receives it via GM. case Flow of + %% Here we are tracking messages sent by the rabbit_channel + %% process. We are accessing the rabbit_channel process + %% dictionary. flow -> [credit_flow:send(QPid) || QPid <- QPids], [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c5e4206fe3..999e66aee3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -662,9 +662,21 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder, senders = Senders}) -> State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) + false -> + Senders; + true -> + %% A rabbit_channel process died. Here credit_flow will take care + %% of cleaning up the rabbit_amqqueue_process process dictionary + %% with regards to the credit we were tracking for the channel + %% process. See handle_cast({deliver, Deliver}, State) in this + %% module. In that cast function we process deliveries from the + %% channel, which means we credit_flow:ack/1 said + %% messages. credit_flow:ack'ing messages means we are increasing + %% a counter to know when we need to send MoreCreditAfter. Since + %% the process died, the credit_flow flow module will clean up + %% that for us. + credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) end}, case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of not_found -> @@ -1110,6 +1122,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of + %% In both credit_flow:ack/1 we are acking messages to the channel + %% process that sent us the message delivery. See handle_ch_down + %% for more info. flow -> credit_flow:ack(Sender), case SlaveWhenPublished of true -> credit_flow:ack(Sender); %% [0] @@ -1289,6 +1304,12 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + %% The message_store is granting us more credit. This means the + %% backing queue (for the rabbit_variable_queue case) might + %% continue paging messages to disk if it still needs to. We + %% consume credits from the message_store whenever we need to + %% persist a message to disk. See: + %% rabbit_variable_queue:msg_store_write/4. credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 489f7b3469..b23a8410c5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -133,6 +133,8 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content, noflow}). do_flow(Pid, Method, Content) -> + %% Here we are tracking messages sent by the rabbit_reader + %% process. We are accessing the rabbit_reader process dictionary. credit_flow:send(Pid), gen_server2:cast(Pid, {method, Method, Content, flow}). @@ -327,6 +329,9 @@ handle_cast({method, Method, Content, Flow}, State = #ch{reader_pid = Reader, virtual_host = VHost}) -> case Flow of + %% We are going to process a message from the rabbit_reader + %% process, so here we ack it. In this case we are accessing + %% the rabbit_channel process dictionary. flow -> credit_flow:ack(Reader); noflow -> ok end, @@ -435,6 +440,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). handle_info({bump_credit, Msg}, State) -> + %% A rabbit_amqqueue_process is granting credit to our channel. If + %% our channel was being blocked by this process, and no other + %% process is blocking our channel, then this channel will be + %% unblocked. This means that any credit that was deferred will be + %% sent to rabbit_reader processs that might be blocked by this + %% particular channel. credit_flow:handle_bump_msg(Msg), noreply(State); @@ -452,6 +463,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), + %% A rabbit_amqqueue_process has died. If our channel was being + %% blocked by this process, and no other process is blocking our + %% channel, then this channel will be unblocked. This means that + %% any credit that was deferred will be sent to the rabbit_reader + %% processs that might be blocked by this particular channel. credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, case dict:find(QPid, QNames) of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 168e71170b..7f309ab0b7 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -257,6 +257,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of + %% We are acking messages to the channel process that sent us + %% the message delivery. See + %% rabbit_amqqueue_process:handle_ch_down for more info. flow -> credit_flow:ack(Sender); noflow -> ok end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d0969f1b0e..8909484984 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -475,6 +475,10 @@ write_flow(MsgId, Msg, CState = #client_msstate { server = Server, credit_disc_bound = CreditDiscBound }) -> + %% Here we are tracking messages sent by the + %% rabbit_amqqueue_process process via the + %% rabbit_variable_queue. We are accessing the + %% rabbit_amqqueue_process process dictionary. credit_flow:send(whereis(Server), CreditDiscBound), client_write(MsgId, Msg, flow, CState). @@ -829,6 +833,9 @@ handle_cast({write, CRef, MsgId, Flow}, credit_disc_bound = CreditDiscBound }) -> case Flow of flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + %% We are going to process a message sent by the + %% rabbit_amqqueue_process. Now we are accessing the + %% msg_store process dictionary. credit_flow:ack(CPid, CreditDiscBound); noflow -> ok end, @@ -904,6 +911,10 @@ handle_info(timeout, State) -> noreply(internal_sync(State)); handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + %% similar to what happens in + %% rabbit_amqqueue_process:handle_ch_down but with a relation of + %% msg_store -> rabbit_amqqueue_process instead of + %% rabbit_amqqueue_process -> rabbit_channel. credit_flow:peer_down(Pid), noreply(State); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d296c41344..8812e1d0e1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -478,6 +478,7 @@ handle_other(ensure_stats, State) -> handle_other(emit_stats, State) -> emit_stats(State); handle_other({bump_credit, Msg}, State) -> + %% Here we are receiving credit by some channel process. credit_flow:handle_bump_msg(Msg), control_throttle(State); handle_other(Other, State) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 995282b344..f50c1bde7e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2115,6 +2115,10 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, TargetRamCount >= RamMsgCount -> {Quota, ui(State)}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> + %% We consume credits from the message_store whenever we need to + %% persist a message to disk. See: + %% rabbit_variable_queue:msg_store_write/4. So perhaps the + %% msg_store is trying to throttle down our queue. case credit_flow:blocked() of true -> {Quota, ui(State)}; false -> case Generator(Q) of |
