diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 12:26:39 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 12:26:39 +0000 |
| commit | 9c957923ec5228a35f8094e20f321b46b61c70e3 (patch) | |
| tree | f53bb6b1ccac6d8c7ed95188f2945ae8f4d372ba | |
| parent | 54e3ef0d54a2f93e79213c6fb838aa4e08083c0b (diff) | |
| download | rabbitmq-server-git-9c957923ec5228a35f8094e20f321b46b61c70e3.tar.gz | |
Fixes to limiter - logic mistake in maybe_notify and some refactoring and cosmetics
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 41 |
2 files changed, 23 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 38a6a8447f..48e204d84a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -801,6 +801,7 @@ handle_method(#'channel.flow'{active = false}, _, false -> LimiterPid end, ok = rabbit_limiter:block(LimiterPid1), + %% FIXME: need to go and notify the queues and not reply now {reply, #'channel.flow_ok'{active = false}, State#ch{limiter_pid = LimiterPid1}}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 43f31511c9..4cb8725b0a 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -139,31 +139,27 @@ handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) -> - {reply, ok, State#lim{prefetch_count = PrefetchCount}}; handle_call({limit, PrefetchCount}, _From, State) -> - State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}), - case PrefetchCount == 0 of - true -> {stop, normal, stopped, State1}; - false -> {reply, ok, State1} + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} end; handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) -> - State1 = maybe_notify(State, State#lim{blocked = false}), - case PrefetchCount == 0 of - true -> {stop, normal, stopped, State1}; - false -> {reply, ok, State1} +handle_call(unblock, _From, State) -> + case maybe_notify(State, State#lim{blocked = false}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} end. handle_cast(shutdown, State) -> @@ -173,7 +169,10 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + case maybe_notify(State, State#lim{volume = NewVolume}) of + {cont, State1} -> {noreply, State1}; + {stop, State1} -> {stop, normal, State1} + end; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -195,10 +194,14 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse - (is_blocked(OldState) andalso not is_blocked(NewState)) of - true -> notify_queues(NewState); - false -> NewState + case (limit_reached(OldState) orelse is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse is_blocked(NewState)) of + true -> NewState1 = notify_queues(NewState), + {case NewState1#lim.prefetch_count == 0 of + true -> stop; + false -> cont + end, NewState1}; + false -> {cont, NewState} end. limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> |
