summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-19 00:10:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-19 00:10:52 +0100
commit5911f5c71b5716523c672c90e39dc75b3c1bc369 (patch)
treede48052f1bba728503c1d35de47beb2a2ad36b4b /src
parent6c7be0a138dbe19987b01f971a193f25fa9be1a6 (diff)
downloadrabbitmq-server-git-5911f5c71b5716523c672c90e39dc75b3c1bc369.tar.gz
count queue consumers as required by the spec
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl19
1 files changed, 9 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 47a9865e94..b9fb2f8544 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -648,8 +648,13 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+consumer_count() -> consumer_count(fun (_) -> false end).
+
+active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
+
+consumer_count(Exclude) ->
+ lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
+ not Exclude(C)]).
is_unused(_State) -> consumer_count() == 0.
@@ -1008,15 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
- active_consumers = ActiveConsumers} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(ensure_expiry_timer(State)),
- %% TODO: According to the spec the returned consumer count should
- %% not include blocked consumers. Since we remove those from
- %% ActiveConsumers lazily - when come across them when attempting
- %% to deliver messages - the count we are returning here may
- %% contain some blocked consumers.
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
+ reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->