summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 12:26:39 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 12:26:39 +0000
commit9c957923ec5228a35f8094e20f321b46b61c70e3 (patch)
treef53bb6b1ccac6d8c7ed95188f2945ae8f4d372ba
parent54e3ef0d54a2f93e79213c6fb838aa4e08083c0b (diff)
downloadrabbitmq-server-git-9c957923ec5228a35f8094e20f321b46b61c70e3.tar.gz
Fixes to limiter - logic mistake in maybe_notify and some refactoring and cosmetics
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_limiter.erl41
2 files changed, 23 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 38a6a8447f..48e204d84a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -801,6 +801,7 @@ handle_method(#'channel.flow'{active = false}, _,
false -> LimiterPid
end,
ok = rabbit_limiter:block(LimiterPid1),
+ %% FIXME: need to go and notify the queues and not reply now
{reply, #'channel.flow_ok'{active = false},
State#ch{limiter_pid = LimiterPid1}};
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 43f31511c9..4cb8725b0a 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -139,31 +139,27 @@ handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) ->
- {reply, ok, State#lim{prefetch_count = PrefetchCount}};
handle_call({limit, PrefetchCount}, _From, State) ->
- State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}),
- case PrefetchCount == 0 of
- true -> {stop, normal, stopped, State1};
- false -> {reply, ok, State1}
+ case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
end;
handle_call(block, _From, State) ->
{reply, ok, State#lim{blocked = true}};
-handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- State1 = maybe_notify(State, State#lim{blocked = false}),
- case PrefetchCount == 0 of
- true -> {stop, normal, stopped, State1};
- false -> {reply, ok, State1}
+handle_call(unblock, _From, State) ->
+ case maybe_notify(State, State#lim{blocked = false}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
end.
handle_cast(shutdown, State) ->
@@ -173,7 +169,10 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+ case maybe_notify(State, State#lim{volume = NewVolume}) of
+ {cont, State1} -> {noreply, State1};
+ {stop, State1} -> {stop, normal, State1}
+ end;
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -195,10 +194,14 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse
- (is_blocked(OldState) andalso not is_blocked(NewState)) of
- true -> notify_queues(NewState);
- false -> NewState
+ case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ true -> NewState1 = notify_queues(NewState),
+ {case NewState1#lim.prefetch_count == 0 of
+ true -> stop;
+ false -> cont
+ end, NewState1};
+ false -> {cont, NewState}
end.
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->