diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 12:03:10 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 12:03:10 +0000 |
| commit | 655d872d0d9b0ff29bba5ecbe2402c2b66e949f8 (patch) | |
| tree | 7cc5f7892f8d784a2e47d695c4c046042c3b2829 /src | |
| parent | cbdd7758327e497d9a5285d3333da4625a73daf6 (diff) | |
| download | rabbitmq-server-git-655d872d0d9b0ff29bba5ecbe2402c2b66e949f8.tar.gz | |
consumer_bound info item.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4ff30ce0b8..d4cba9440d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,7 @@ backing_queue, backing_queue_state, active_consumers, + active_consumers_last_empty, expires, sync_timer_ref, rate_timer_ref, @@ -95,6 +96,7 @@ messages_unacknowledged, messages, consumers, + consumer_bound, memory, slave_pids, synchronised_slave_pids, @@ -145,14 +147,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - active_consumers = priority_queue:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0}, + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + active_consumers = priority_queue:new(), + active_consumers_last_empty = erlang:now(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -485,7 +488,7 @@ deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State}; + {false, State#q{active_consumers_last_empty = erlang:now()}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -1037,6 +1040,8 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(consumer_bound, #q{active_consumers_last_empty = Last}) -> + timer:now_diff(erlang:now(), Last) < 1000000; i(memory, _) -> {memory, M} = process_info(self(), memory), M; |
