summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-19 14:45:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-19 14:45:32 +0000
commit30339e71c3f4a3b6c244ccf5195bba518b490861 (patch)
tree11582a9a552c5d3c126a033a872e9d2da8e32136 /src
parentad5aeddb95b3fb47ed45c73ca9f4880b2baf2560 (diff)
downloadrabbitmq-server-git-30339e71c3f4a3b6c244ccf5195bba518b490861.tar.gz
niggles--
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_channel.erl5
2 files changed, 17 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 37daa0dfca..d9264736d0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -401,14 +401,9 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
- Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
-
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
- end.
+ Count >= ?UNSENT_MESSAGE_LIMIT
+ orelse (Limiter =/= undefined andalso
+ rabbit_limiter:is_suspended(Limiter)).
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
@@ -629,15 +624,15 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> #cr{blocked_consumers = Consumers} = C1,
+ update_ch_record(
+ C1#cr{blocked_consumers = queue:new()}),
+ AC1 = queue:join(State#q.active_consumers,
+ Consumers),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 005200f84c..eb248a4c11 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -805,6 +805,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
+handle_method(#'basic.qos'{prefetch_count = 0}, _,
+ State = #ch{limiter = Limiter}) ->
+ Limiter1 = rabbit_limiter:unlimit(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)),