summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-15 13:40:40 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-15 13:40:40 +0000
commitf2bc5ebe36a2ae2ef5c05d722bdac442007bcde5 (patch)
tree6c405e28183ddaf1826e130b3c017cd9211b2811 /src
parentbfdb6491ade85e63c238a469987eb0b6c906ccb3 (diff)
downloadrabbitmq-server-git-f2bc5ebe36a2ae2ef5c05d722bdac442007bcde5.tar.gz
Add a BQ function for this.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_variable_queue.erl8
4 files changed, 17 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78f955e7d8..7fa04c7abf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1114,9 +1114,7 @@ prioritise_cast(Msg, _Len, State) ->
{run_backing_queue, _Mod, _Fun} -> 6;
{notify_sent, _ChPid, _Credit} ->
#q{backing_queue = BQ, backing_queue_state = BQS} = State,
- BQSProps = BQ:status(BQS),
- [Ingress, Egress] = [proplists:get_value(K, BQSProps) ||
- K <- [avg_ingress_rate, avg_egress_rate]],
+ {Ingress, Egress} = BQ:msg_rates(BQS),
case ?BIAS of
B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1;
B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc29..1bbd1543f3 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -203,6 +203,10 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
-callback status(state()) -> [{atom(), any()}].
@@ -230,7 +234,8 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3abd81f56a..8b87c56a2f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -345,6 +345,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:msg_rates(BQS).
+
status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:status(BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52d0..aa090ed4a0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,8 +21,8 @@
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -789,6 +789,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,