diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 15:19:56 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 15:19:56 +0000 |
| commit | 31459cd1ea66eab4555fc469e72247e8538a4373 (patch) | |
| tree | ff381014e31231f569654062ea1a52f77416a6a8 | |
| parent | bd24d59b6010d6fea72f0e39a0d3b312f2f31fce (diff) | |
| download | rabbitmq-server-git-31459cd1ea66eab4555fc469e72247e8538a4373.tar.gz | |
Rename bump/1 to handle_bump_msg/1, add a comment and specs.
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 |
3 files changed, 31 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 31afa47325..c9714c3def 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -317,7 +317,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). handle_info({bump_credit, Msg}, State) -> - rabbit_flow:bump(Msg), + rabbit_flow:handle_bump_msg(Msg), noreply(State); handle_info(timeout, State) -> diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 54d019cd10..3c2c0eded4 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -16,10 +16,35 @@ -module(rabbit_flow). +%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep +%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more +%% credit by sending a message to the sender. The sender should pass +%% this message in to handle_bump_msg/1. The sender should block when +%% it goes below 0 (check by invoking blocked/0). If a process is both +%% a sender and a receiver it will not grant any more credit to its +%% senders when it is itself blocked - thus the only processes that +%% need to check blocked/0 are ones that read from network sockets. + -define(MAX_CREDIT, 200). -define(MORE_CREDIT_AT, 150). --export([ack/1, bump/1, blocked/0, send/1, receiver_down/1]). +-export([ack/1, handle_bump_msg/1, blocked/0, send/1, receiver_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). + +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(receiver_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- %% There are two "flows" here; of messages and of credit, going in %% opposite directions. The variable names "From" and "To" refer to @@ -38,13 +63,13 @@ ack(To) -> end, put({credit_to, To}, Credit). -bump({From, MoreCredit}) -> +handle_bump_msg({From, MoreCredit}) -> Credit = get({credit_from, From}, 0) + MoreCredit, put({credit_from, From}, Credit), case Credit > 0 of true -> unblock(From), - false; - false -> true + ok; + false -> ok end. blocked() -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bc25e8174c..57aa880bff 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -343,7 +343,7 @@ handle_other(emit_stats, Deb, State) -> handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other({bump_credit, Msg}, Deb, State) -> - rabbit_flow:bump(Msg), + rabbit_flow:handle_bump_msg(Msg), recvloop(Deb, update_blockers(false, self(), State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for |
