summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 15:19:56 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 15:19:56 +0000
commit31459cd1ea66eab4555fc469e72247e8538a4373 (patch)
treeff381014e31231f569654062ea1a52f77416a6a8 /src
parentbd24d59b6010d6fea72f0e39a0d3b312f2f31fce (diff)
downloadrabbitmq-server-git-31459cd1ea66eab4555fc469e72247e8538a4373.tar.gz
Rename bump/1 to handle_bump_msg/1, add a comment and specs.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_flow.erl33
-rw-r--r--src/rabbit_reader.erl2
3 files changed, 31 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 31afa47325..c9714c3def 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -317,7 +317,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
handle_info({bump_credit, Msg}, State) ->
- rabbit_flow:bump(Msg),
+ rabbit_flow:handle_bump_msg(Msg),
noreply(State);
handle_info(timeout, State) ->
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index 54d019cd10..3c2c0eded4 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -16,10 +16,35 @@
-module(rabbit_flow).
+%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep
+%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more
+%% credit by sending a message to the sender. The sender should pass
+%% this message in to handle_bump_msg/1. The sender should block when
+%% it goes below 0 (check by invoking blocked/0). If a process is both
+%% a sender and a receiver it will not grant any more credit to its
+%% senders when it is itself blocked - thus the only processes that
+%% need to check blocked/0 are ones that read from network sockets.
+
-define(MAX_CREDIT, 200).
-define(MORE_CREDIT_AT, 150).
--export([ack/1, bump/1, blocked/0, send/1, receiver_down/1]).
+-export([ack/1, handle_bump_msg/1, blocked/0, send/1, receiver_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(receiver_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
@@ -38,13 +63,13 @@ ack(To) ->
end,
put({credit_to, To}, Credit).
-bump({From, MoreCredit}) ->
+handle_bump_msg({From, MoreCredit}) ->
Credit = get({credit_from, From}, 0) + MoreCredit,
put({credit_from, From}, Credit),
case Credit > 0 of
true -> unblock(From),
- false;
- false -> true
+ ok;
+ false -> ok
end.
blocked() ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bc25e8174c..57aa880bff 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -343,7 +343,7 @@ handle_other(emit_stats, Deb, State) ->
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other({bump_credit, Msg}, Deb, State) ->
- rabbit_flow:bump(Msg),
+ rabbit_flow:handle_bump_msg(Msg),
recvloop(Deb, update_blockers(false, self(), State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for