diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-08 12:53:27 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-08 12:53:27 +0000 |
| commit | 5a4ab1b1548638698878d435acbfe3e9e02a78ac (patch) | |
| tree | e0e07b024a3dc4692931a210b1bd20f92afeb9cd | |
| parent | aaba39a9aeb52f0b63a5e76581de8e8516ea117d (diff) | |
| download | rabbitmq-server-git-5a4ab1b1548638698878d435acbfe3e9e02a78ac.tar.gz | |
Pass length to prioritise_*
| -rw-r--r-- | src/file_handle_cache.erl | 4 | ||||
| -rw-r--r-- | src/gen_server2.erl | 12 | ||||
| -rw-r--r-- | src/gm.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 6 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 6 |
10 files changed, 43 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3260d36986..f39ba01d4f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -152,7 +152,7 @@ -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). + handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) -> alarm_set = AlarmSet, alarm_clear = AlarmClear }}. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {release, _, _} -> 5; _ -> 0 diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 78bbbe0627..7bdfa91a45 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1179,16 +1179,20 @@ find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> function_exported_or_default(Mod, Fun, Arity, Default) -> case erlang:function_exported(Mod, Fun, Arity) of true -> case Arity of - 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, State) of + 2 -> fun (Msg, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, Length, State) of Res when is_integer(Res) -> Res; Err -> handle_common_termination(Err, Msg, GS2State) end end; - 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, From, State) of + 3 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, From, Length, State) of Res when is_integer(Res) -> Res; Err -> diff --git a/src/gm.erl b/src/gm.erl index 4a95de0dd1..98685ebb8c 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -380,7 +380,7 @@ confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_info/3]). -ifndef(use_specs). -export([behaviour_info/1]). @@ -718,12 +718,12 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> +prioritise_info(flush, _Len, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, #state { members_state = MS }) when MS /= undefined -> 1; -prioritise_info(_, _State) -> +prioritise_info(_, _Len, _State) -> 0. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 43fe35781d..d706de3c0e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,8 +29,8 @@ -export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Queue's state -record(q, {q, @@ -956,7 +956,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; @@ -965,7 +965,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; @@ -974,7 +974,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> +prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a94d2ab53c..c14a8b3443 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -27,8 +27,8 @@ -export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal -export([list_local/0]). @@ -213,20 +213,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of emit_stats -> 7; _ -> 0 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2b15498ed9..05c2725053 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -19,7 +19,7 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_call/3]). + handle_info/2, prioritise_call/4]). -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). @@ -126,8 +126,8 @@ is_blocked(Limiter) -> init([]) -> {ok, #lim{}}. -prioritise_call(get_limit, _From, _State) -> 9; -prioritise_call(_Msg, _From, _State) -> 0. +prioritise_call(get_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1ba1420f42..1d733cf79f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -27,8 +27,8 @@ -export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + code_change/3, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -305,14 +305,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -322,7 +322,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of update_ram_duration -> 8; sync_timeout -> 6; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c2e55022d9..d656098a79 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -29,8 +29,8 @@ -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_call/3, prioritise_cast/2, - prioritise_info/2, format_message_queue/2]). + code_change/3, prioritise_call/4, prioritise_cast/3, + prioritise_info/3, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; @@ -746,7 +746,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; @@ -755,7 +755,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of sync -> 8; _ -> 0 diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 3b61ed0bd7..bdabf406bf 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). -record(state, { pending_no_readers, @@ -79,8 +79,8 @@ init([MsgStoreState]) -> msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 1ddcebb23d..6db6d156ad 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). %%---------------------------------------------------------------------------- @@ -73,8 +73,8 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), |
