summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl22
1 files changed, 7 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a43dbdcc43..c02fd6b5a6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1142,6 +1142,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, Credit, Drain)
end,
C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1150,22 +1154,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
- case is_empty(State2) of
- true -> send_drained(lookup_ch(ChPid));
- false -> ok
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,