summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-21 11:38:53 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-21 11:38:53 +0000
commit07827524a7b9a6c2ca1ebf3561e5c2839bec3aba (patch)
treea7985a90e2dd6d433b8732cffb83193d4014d39f
parentc000bc9b2632615828f92d2dbf79f02cc8aa4a1e (diff)
downloadrabbitmq-server-git-07827524a7b9a6c2ca1ebf3561e5c2839bec3aba.tar.gz
simplify queue's basic_consume handler
- the call to update_ch_record in the is_ch_blocked(C1) == false branch was superfluos since the preceding update_consumer_count calls update_ch_record - all the checking whether the channel is blocked, and associated branching was just an optimisation. And not a particularly important one, since a) the "a new consumer comes along while its channel is blocked" case is hardly on the critical path, and b) exactly the same check is performed as part of run_message_queue (in deliver_msg_to_consumer/3). So get rid of it. - the is_empty & send_drained logic can be invoked earlier, which allows us to use the #cr we have rather than looking it up again. We can do this since the only case we need to catch here is that of a consumer coming along while the queue is empty already. If it becomes empty as part of run_message_queue then send_drained will be invoked in 'fetch'.
-rw-r--r--src/rabbit_amqqueue_process.erl22
1 files changed, 7 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a43dbdcc43..c02fd6b5a6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1142,6 +1142,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, Credit, Drain)
end,
C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1150,22 +1154,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
- case is_empty(State2) of
- true -> send_drained(lookup_ch(ChPid));
- false -> ok
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,