diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 14:45:32 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 14:45:32 +0000 |
| commit | 30339e71c3f4a3b6c244ccf5195bba518b490861 (patch) | |
| tree | 11582a9a552c5d3c126a033a872e9d2da8e32136 /src | |
| parent | ad5aeddb95b3fb47ed45c73ca9f4880b2baf2560 (diff) | |
| download | rabbitmq-server-git-30339e71c3f4a3b6c244ccf5195bba518b490861.tar.gz | |
niggles--
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 |
2 files changed, 17 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 37daa0dfca..d9264736d0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -401,14 +401,9 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). - -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok - end. + Count >= ?UNSENT_MESSAGE_LIMIT + orelse (Limiter =/= undefined andalso + rabbit_limiter:is_suspended(Limiter)). deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; @@ -629,15 +624,15 @@ possibly_unblock(State, ChPid, Update) -> State; C -> C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> #cr{blocked_consumers = Consumers} = C1, + update_ch_record( + C1#cr{blocked_consumers = queue:new()}), + AC1 = queue:join(State#q.active_consumers, + Consumers), + run_message_queue(State#q{active_consumers = AC1}) end end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 005200f84c..eb248a4c11 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -805,6 +805,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); +handle_method(#'basic.qos'{prefetch_count = 0}, _, + State = #ch{limiter = Limiter}) -> + Limiter1 = rabbit_limiter:unlimit(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; + handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)), |
