diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 |
1 files changed, 4 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ef48bb5d5e..fba58d38d7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1097,7 +1097,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply({error, exclusive_consume_unavailable}, State); ok -> C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1106,18 +1106,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - E = {ChPid, Consumer}, - State2 = - case is_ch_blocked(C1) of - true -> block_consumer(C1, E), - State1; - false -> update_ch_record(C1), - AC1 = queue:in(E, State1#q.active_consumers), - run_message_queue(State1#q{active_consumers = AC1}) - end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State2)), - reply(ok, State2) + not NoAck, qname(State1)), + AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, |
