diff options
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 |
3 files changed, 31 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 31afa47325..c9714c3def 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -317,7 +317,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). handle_info({bump_credit, Msg}, State) -> - rabbit_flow:bump(Msg), + rabbit_flow:handle_bump_msg(Msg), noreply(State); handle_info(timeout, State) -> diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 54d019cd10..3c2c0eded4 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -16,10 +16,35 @@ -module(rabbit_flow). +%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep +%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more +%% credit by sending a message to the sender. The sender should pass +%% this message in to handle_bump_msg/1. The sender should block when +%% it goes below 0 (check by invoking blocked/0). If a process is both +%% a sender and a receiver it will not grant any more credit to its +%% senders when it is itself blocked - thus the only processes that +%% need to check blocked/0 are ones that read from network sockets. + -define(MAX_CREDIT, 200). -define(MORE_CREDIT_AT, 150). --export([ack/1, bump/1, blocked/0, send/1, receiver_down/1]). +-export([ack/1, handle_bump_msg/1, blocked/0, send/1, receiver_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). + +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(receiver_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- %% There are two "flows" here; of messages and of credit, going in %% opposite directions. The variable names "From" and "To" refer to @@ -38,13 +63,13 @@ ack(To) -> end, put({credit_to, To}, Credit). -bump({From, MoreCredit}) -> +handle_bump_msg({From, MoreCredit}) -> Credit = get({credit_from, From}, 0) + MoreCredit, put({credit_from, From}, Credit), case Credit > 0 of true -> unblock(From), - false; - false -> true + ok; + false -> ok end. blocked() -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bc25e8174c..57aa880bff 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -343,7 +343,7 @@ handle_other(emit_stats, Deb, State) -> handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other({bump_credit, Msg}, Deb, State) -> - rabbit_flow:bump(Msg), + rabbit_flow:handle_bump_msg(Msg), recvloop(Deb, update_blockers(false, self(), State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for |
