summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-14 12:03:10 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-14 12:03:10 +0000
commit655d872d0d9b0ff29bba5ecbe2402c2b66e949f8 (patch)
tree7cc5f7892f8d784a2e47d695c4c046042c3b2829 /src
parentcbdd7758327e497d9a5285d3333da4625a73daf6 (diff)
downloadrabbitmq-server-git-655d872d0d9b0ff29bba5ecbe2402c2b66e949f8.tar.gz
consumer_bound info item.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl23
1 files changed, 14 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4ff30ce0b8..d4cba9440d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ active_consumers_last_empty,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,6 +96,7 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_bound,
memory,
slave_pids,
synchronised_slave_pids,
@@ -145,14 +147,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- active_consumers = priority_queue:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0},
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ active_consumers = priority_queue:new(),
+ active_consumers_last_empty = erlang:now(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -485,7 +488,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false, State#q{active_consumers_last_empty = erlang:now()}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -1037,6 +1040,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_bound, #q{active_consumers_last_empty = Last}) ->
+ timer:now_diff(erlang:now(), Last) < 1000000;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;