diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 12:01:21 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 12:01:21 +0000 |
| commit | 34d09736e9231813d8e1b74a325dde2c547cbb62 (patch) | |
| tree | 213b7aff3ac8956adb524fe61148d21e6d0de598 /src | |
| parent | b694e7f8e3756b47934d309803eb5cf9d277084d (diff) | |
| download | rabbitmq-server-git-34d09736e9231813d8e1b74a325dde2c547cbb62.tar.gz | |
Allow one sending process to be blocked by more than one receiving process simultaneously.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_flow.erl | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 3dbf8b4338..cf7035f2a1 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -42,19 +42,18 @@ bump({From, MoreCredit}) -> Credit = get({credit_from, From}, 0) + MoreCredit, put({credit_from, From}, Credit), case Credit > 0 of - true -> unblock(), + true -> unblock(From), false; false -> true end. -%% TODO we assume only one From can block at once. Is this true? blocked() -> - get(credit_blocked) =:= true. + get(credit_blocked, []) =/= []. send(From) -> Credit = get({credit_from, From}, ?MAX_CREDIT) - 1, case Credit of - 0 -> put(credit_blocked, true); + 0 -> block(From); _ -> ok end, put({credit_from, From}, Credit). @@ -69,10 +68,17 @@ grant(To, Quantity) -> put(credit_deferred, [{To, Msg} | Deferred]) end. -unblock() -> - erase(credit_blocked), - [To ! Msg || {To, Msg} <- get(credit_deferred, [])], - erase(credit_deferred). +block(From) -> + put(credit_blocked, [From | get(credit_blocked, [])]). + +unblock(From) -> + NewBlocks = get(credit_blocked, []) -- [From], + put(credit_blocked, NewBlocks), + case NewBlocks of + [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + _ -> ok + end. get(Key, Default) -> case get(Key) of |
