diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-15 13:40:40 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-15 13:40:40 +0000 |
| commit | f2bc5ebe36a2ae2ef5c05d722bdac442007bcde5 (patch) | |
| tree | 6c405e28183ddaf1826e130b3c017cd9211b2811 /src | |
| parent | bfdb6491ade85e63c238a469987eb0b6c906ccb3 (diff) | |
| download | rabbitmq-server-git-f2bc5ebe36a2ae2ef5c05d722bdac442007bcde5.tar.gz | |
Add a BQ function for this.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
4 files changed, 17 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78f955e7d8..7fa04c7abf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1114,9 +1114,7 @@ prioritise_cast(Msg, _Len, State) -> {run_backing_queue, _Mod, _Fun} -> 6; {notify_sent, _ChPid, _Credit} -> #q{backing_queue = BQ, backing_queue_state = BQS} = State, - BQSProps = BQ:status(BQS), - [Ingress, Egress] = [proplists:get_value(K, BQSProps) || - K <- [avg_ingress_rate, avg_egress_rate]], + {Ingress, Egress} = BQ:msg_rates(BQS), case ?BIAS of B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1; B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 61b504bc29..1bbd1543f3 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -203,6 +203,10 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status -callback status(state()) -> [{atom(), any()}]. @@ -230,7 +234,8 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3abd81f56a..8b87c56a2f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -345,6 +345,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:msg_rates(BQS). + status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:status(BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52d0..aa090ed4a0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,8 +21,8 @@ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -789,6 +789,10 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate } }) -> + {AvgIngressRate, AvgEgressRate}. + status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, |
