diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7a16865bd0..e30a98394d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -451,26 +451,23 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of - true -> - block_consumer(C, E), - {false, State}; - false -> - #cr{limiter = Limiter, ch_pid = ChPid} = C, - #consumer{tag = CTag} = Consumer, - case rabbit_limiter:can_send( - Limiter, self(), Consumer#consumer.ack_required, CTag) of - {consumer_blocked, Limiter2} -> - block_consumer(C#cr{limiter = Limiter2}, E), - {false, State}; - channel_blocked -> - block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - Limiter2 -> - AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C#cr{limiter = Limiter2}, - State#q{active_consumers = AC1}) - end + true -> block_consumer(C, E), + {false, State}; + false -> case rabbit_limiter:can_send(C#cr.limiter, self(), + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {consumer_blocked, Limiter2} -> + block_consumer(C#cr{limiter = Limiter2}, E), + {false, State}; + channel_blocked -> + block_consumer(C#cr{is_limit_active = true}, E), + {false, State}; + Limiter2 -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Limiter2}, + State#q{active_consumers = AC1}) + end end. deliver_msg_to_consumer(DeliverFun, |
