diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-25 21:09:53 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-25 21:09:53 +0200 |
| commit | c5dc37f9bb70a2daf95e79cc5248fdc9bbe229fb (patch) | |
| tree | 592ae4bb47af56f615190b71e2849cac43dfa128 | |
| parent | e35e05833ad9a36f7461e94f59666b16083cc984 (diff) | |
| parent | 31a4b251e4ce303458031d09f29730a8c27aff6d (diff) | |
| download | rabbitmq-server-git-c5dc37f9bb70a2daf95e79cc5248fdc9bbe229fb.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | src/credit_flow.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
8 files changed, 80 insertions, 3 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 02fd19620b..8c8d340601 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -27,6 +27,24 @@ %% receiver it will not grant any more credit to its senders when it %% is itself blocked - thus the only processes that need to check %% blocked/0 are ones that read from network sockets. +%% +%% Credit flows left to right when process send messags down the +%% chain, starting at the rabbit_reader, ending at the msg_store: +%% reader -> channel -> queue_process -> msg_store. +%% +%% If the message store has a back log, then it will block the +%% queue_process, which will block the channel, and finally the reader +%% will be blocked, throttling down publishers. +%% +%% Once a process is unblocked, it will grant credits up the chain, +%% possibly unblocking other processes: +%% reader <--grant channel <--grant queue_process <--grant msg_store. +%% +%% Grepping the project files for `credit_flow` will reveal the places +%% where this module is currently used, with extra comments on what's +%% going on at each instance. Note that credit flow between mirrors +%% synchronization has not been documented, since this doesn't affect +%% client publishes. -define(DEFAULT_INITIAL_CREDIT, 200). -define(DEFAULT_MORE_CREDIT_AFTER, 50). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 52b8ed6e06..65e4255a73 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -879,6 +879,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) -> %% the slave receives the message direct from the channel, and the %% other when it receives it via GM. case Flow of + %% Here we are tracking messages sent by the rabbit_channel + %% process. We are accessing the rabbit_channel process + %% dictionary. flow -> [credit_flow:send(QPid) || QPid <- QPids], [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c1e6e5286e..15fde37c9c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -662,9 +662,21 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder, senders = Senders}) -> State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) + false -> + Senders; + true -> + %% A rabbit_channel process died. Here credit_flow will take care + %% of cleaning up the rabbit_amqqueue_process process dictionary + %% with regards to the credit we were tracking for the channel + %% process. See handle_cast({deliver, Deliver}, State) in this + %% module. In that cast function we process deliveries from the + %% channel, which means we credit_flow:ack/1 said + %% messages. credit_flow:ack'ing messages means we are increasing + %% a counter to know when we need to send MoreCreditAfter. Since + %% the process died, the credit_flow flow module will clean up + %% that for us. + credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) end}, case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of not_found -> @@ -1109,6 +1121,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of + %% In both credit_flow:ack/1 we are acking messages to the channel + %% process that sent us the message delivery. See handle_ch_down + %% for more info. flow -> credit_flow:ack(Sender), case SlaveWhenPublished of true -> credit_flow:ack(Sender); %% [0] @@ -1288,6 +1303,12 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + %% The message_store is granting us more credit. This means the + %% backing queue (for the rabbit_variable_queue case) might + %% continue paging messages to disk if it still needs to. We + %% consume credits from the message_store whenever we need to + %% persist a message to disk. See: + %% rabbit_variable_queue:msg_store_write/4. credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index db16817845..f8ed9cae9f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -239,6 +239,8 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content, noflow}). do_flow(Pid, Method, Content) -> + %% Here we are tracking messages sent by the rabbit_reader + %% process. We are accessing the rabbit_reader process dictionary. credit_flow:send(Pid), gen_server2:cast(Pid, {method, Method, Content, flow}). @@ -436,6 +438,9 @@ handle_cast({method, Method, Content, Flow}, State = #ch{reader_pid = Reader, interceptor_state = IState}) -> case Flow of + %% We are going to process a message from the rabbit_reader + %% process, so here we ack it. In this case we are accessing + %% the rabbit_channel process dictionary. flow -> credit_flow:ack(Reader); noflow -> ok end, @@ -545,6 +550,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). handle_info({bump_credit, Msg}, State) -> + %% A rabbit_amqqueue_process is granting credit to our channel. If + %% our channel was being blocked by this process, and no other + %% process is blocking our channel, then this channel will be + %% unblocked. This means that any credit that was deferred will be + %% sent to rabbit_reader processs that might be blocked by this + %% particular channel. credit_flow:handle_bump_msg(Msg), noreply(State); @@ -562,6 +573,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), + %% A rabbit_amqqueue_process has died. If our channel was being + %% blocked by this process, and no other process is blocking our + %% channel, then this channel will be unblocked. This means that + %% any credit that was deferred will be sent to the rabbit_reader + %% processs that might be blocked by this particular channel. credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, case dict:find(QPid, QNames) of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 168e71170b..7f309ab0b7 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -257,6 +257,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of + %% We are acking messages to the channel process that sent us + %% the message delivery. See + %% rabbit_amqqueue_process:handle_ch_down for more info. flow -> credit_flow:ack(Sender); noflow -> ok end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1a6beb5438..e463f5ffec 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -510,6 +510,10 @@ write_flow(MsgId, Msg, CState = #client_msstate { server = Server, credit_disc_bound = CreditDiscBound }) -> + %% Here we are tracking messages sent by the + %% rabbit_amqqueue_process process via the + %% rabbit_variable_queue. We are accessing the + %% rabbit_amqqueue_process process dictionary. credit_flow:send(whereis(Server), CreditDiscBound), client_write(MsgId, Msg, flow, CState). @@ -864,6 +868,9 @@ handle_cast({write, CRef, MsgId, Flow}, credit_disc_bound = CreditDiscBound }) -> case Flow of flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + %% We are going to process a message sent by the + %% rabbit_amqqueue_process. Now we are accessing the + %% msg_store process dictionary. credit_flow:ack(CPid, CreditDiscBound); noflow -> ok end, @@ -939,6 +946,10 @@ handle_info(timeout, State) -> noreply(internal_sync(State)); handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + %% similar to what happens in + %% rabbit_amqqueue_process:handle_ch_down but with a relation of + %% msg_store -> rabbit_amqqueue_process instead of + %% rabbit_amqqueue_process -> rabbit_channel. credit_flow:peer_down(Pid), noreply(State); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c1cfb10c67..58c5aef8fd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -585,6 +585,7 @@ handle_other(ensure_stats, State) -> handle_other(emit_stats, State) -> emit_stats(State); handle_other({bump_credit, Msg}, State) -> + %% Here we are receiving credit by some channel process. credit_flow:handle_bump_msg(Msg), control_throttle(State); handle_other(Other, State) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 59232a13c7..8a8945114c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2176,6 +2176,10 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, TargetRamCount >= RamMsgCount -> {Quota, ui(State)}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> + %% We consume credits from the message_store whenever we need to + %% persist a message to disk. See: + %% rabbit_variable_queue:msg_store_write/4. So perhaps the + %% msg_store is trying to throttle down our queue. case credit_flow:blocked() of true -> {Quota, ui(State)}; false -> case Generator(Q) of |
