summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-08 12:53:27 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-08 12:53:27 +0000
commit5a4ab1b1548638698878d435acbfe3e9e02a78ac (patch)
treee0e07b024a3dc4692931a210b1bd20f92afeb9cd
parentaaba39a9aeb52f0b63a5e76581de8e8516ea117d (diff)
downloadrabbitmq-server-git-5a4ab1b1548638698878d435acbfe3e9e02a78ac.tar.gz
Pass length to prioritise_*
-rw-r--r--src/file_handle_cache.erl4
-rw-r--r--src/gen_server2.erl12
-rw-r--r--src/gm.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_limiter.erl6
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_msg_store_gc.erl6
-rw-r--r--src/worker_pool_worker.erl6
10 files changed, 43 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3260d36986..f39ba01d4f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -152,7 +152,7 @@
-export([ulimit/0]).
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) ->
alarm_set = AlarmSet,
alarm_clear = AlarmClear }}.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{release, _, _} -> 5;
_ -> 0
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 78bbbe0627..7bdfa91a45 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1179,16 +1179,20 @@ find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
true -> case Arity of
- 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, State) of
+ 2 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, Length, State) of
Res when is_integer(Res) ->
Res;
Err ->
handle_common_termination(Err, Msg, GS2State)
end
end;
- 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, From, State) of
+ 3 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, From, Length, State) of
Res when is_integer(Res) ->
Res;
Err ->
diff --git a/src/gm.erl b/src/gm.erl
index 4a95de0dd1..98685ebb8c 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -380,7 +380,7 @@
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_info/3]).
-ifndef(use_specs).
-export([behaviour_info/1]).
@@ -718,12 +718,12 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) ->
+prioritise_info(flush, _Len, _State) ->
1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
#state { members_state = MS }) when MS /= undefined ->
1;
-prioritise_info(_, _State) ->
+prioritise_info(_, _Len, _State) ->
0.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 43fe35781d..d706de3c0e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,8 +29,8 @@
-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
-record(q, {q,
@@ -956,7 +956,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
@@ -965,7 +965,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
@@ -974,7 +974,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a94d2ab53c..c14a8b3443 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -27,8 +27,8 @@
-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0]).
@@ -213,20 +213,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
{ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
emit_stats -> 7;
_ -> 0
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b15498ed9..05c2725053 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, prioritise_call/3]).
+ handle_info/2, prioritise_call/4]).
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
@@ -126,8 +126,8 @@ is_blocked(Limiter) ->
init([]) ->
{ok, #lim{}}.
-prioritise_call(get_limit, _From, _State) -> 9;
-prioritise_call(_Msg, _From, _State) -> 0.
+prioritise_call(get_limit, _From, _Len, _State) -> 9;
+prioritise_call(_Msg, _From, _Len, _State) -> 0.
handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1ba1420f42..1d733cf79f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -27,8 +27,8 @@
-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ code_change/3, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -305,14 +305,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
{hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
@@ -322,7 +322,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
update_ram_duration -> 8;
sync_timeout -> 6;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c2e55022d9..d656098a79 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -29,8 +29,8 @@
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_call/3, prioritise_cast/2,
- prioritise_info/2, format_message_queue/2]).
+ code_change/3, prioritise_call/4, prioritise_cast/3,
+ prioritise_info/3, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
@@ -746,7 +746,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
@@ -755,7 +755,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
sync -> 8;
_ -> 0
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 3b61ed0bd7..bdabf406bf 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
-record(state,
{ pending_no_readers,
@@ -79,8 +79,8 @@ init([MsgStoreState]) ->
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 1ddcebb23d..6db6d156ad 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
%%----------------------------------------------------------------------------
@@ -73,8 +73,8 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),