summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-17 12:01:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-17 12:01:00 +0100
commitc6d51e7aa6c33e8cd3ecb60bae072ec91407dfd5 (patch)
tree938fb8d4184e09714f1cd2acb8f0193604fa1bba /src
parent55c65208603aa194b6959813e911754214730365 (diff)
downloadrabbitmq-server-git-c6d51e7aa6c33e8cd3ecb60bae072ec91407dfd5.tar.gz
extract correlation between consumer_count and limiter registration
...and fix a bug in the process: during consumer registration we were using the *old* cr, with the old (and usually undefined) limiter, to check whether the channel is blocked. Thus we would end up running through the message queue unnecessarily. No big deal, but certainly not what was intended. Also, make the update_ch_record return the record, which makes the function more composable.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 82ed2ec8cf..b66109e38a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -354,7 +354,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount,
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
{0, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
- end.
+ end,
+ C.
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
@@ -368,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
@@ -405,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
end,
- NewC = C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1},
- update_ch_record(NewC),
+ NewC = update_ch_record(
+ C#cr{unsent_message_count = Count + 1,
+ acktags = ChAckTags1}),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -607,8 +618,7 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- NewC = Update(C),
- update_ch_record(NewC),
+ NewC = update_ch_record(Update(C)),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -962,15 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- update_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
@@ -978,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
State2 =
- case is_ch_blocked(C) of
+ case is_ch_blocked(C1) of
true -> State1#q{
blocked_consumers =
add_consumer(ChPid, Consumer,
@@ -1000,15 +1005,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- update_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C ->
+ update_consumer_count(C, -1),
emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =