summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 12:01:21 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 12:01:21 +0000
commit34d09736e9231813d8e1b74a325dde2c547cbb62 (patch)
tree213b7aff3ac8956adb524fe61148d21e6d0de598 /src
parentb694e7f8e3756b47934d309803eb5cf9d277084d (diff)
downloadrabbitmq-server-git-34d09736e9231813d8e1b74a325dde2c547cbb62.tar.gz
Allow one sending process to be blocked by more than one receiving process simultaneously.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_flow.erl22
1 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index 3dbf8b4338..cf7035f2a1 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -42,19 +42,18 @@ bump({From, MoreCredit}) ->
Credit = get({credit_from, From}, 0) + MoreCredit,
put({credit_from, From}, Credit),
case Credit > 0 of
- true -> unblock(),
+ true -> unblock(From),
false;
false -> true
end.
-%% TODO we assume only one From can block at once. Is this true?
blocked() ->
- get(credit_blocked) =:= true.
+ get(credit_blocked, []) =/= [].
send(From) ->
Credit = get({credit_from, From}, ?MAX_CREDIT) - 1,
case Credit of
- 0 -> put(credit_blocked, true);
+ 0 -> block(From);
_ -> ok
end,
put({credit_from, From}, Credit).
@@ -69,10 +68,17 @@ grant(To, Quantity) ->
put(credit_deferred, [{To, Msg} | Deferred])
end.
-unblock() ->
- erase(credit_blocked),
- [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred).
+block(From) ->
+ put(credit_blocked, [From | get(credit_blocked, [])]).
+
+unblock(From) ->
+ NewBlocks = get(credit_blocked, []) -- [From],
+ put(credit_blocked, NewBlocks),
+ case NewBlocks of
+ [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ _ -> ok
+ end.
get(Key, Default) ->
case get(Key) of