summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-23 15:10:02 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-23 15:10:02 +0100
commit63d4e1774c79f6d55fd295823661cff5bdff062a (patch)
treeb43b67cc999221d30e09cdff07bf2950b39b3315 /src
parent275559bdb7a77e7a0b78715b64d3551ccaa9fa84 (diff)
downloadrabbitmq-server-git-63d4e1774c79f6d55fd295823661cff5bdff062a.tar.gz
unprioritized calls, casts and infos are passed through the prioritize functions
PCalls and PCasts have the priorities assigned to them by the pcall, pcast call. Everything else gets its priority set by prioritize_call/3, prioritize_cast/2 or prioritize_info/2.
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl77
1 files changed, 48 insertions, 29 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 2dde103bf1..a7600150af 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -181,8 +181,8 @@
%% State record
-record(gs2_state, {parent, name, state, mod, time,
- timeout_state, queue, debug, prioritise_call,
- prioritise_cast, prioritise_info}).
+ timeout_state, queue, debug, prioritize_call,
+ prioritize_cast, prioritize_info}).
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
@@ -410,21 +410,12 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- PrioriCall = function_exported_or_default(
- Mod, 'prioritize_call', 3,
- fun (_Msg, _From, _State) -> 0 end),
- PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2,
- fun (_Msg, _State) -> 0 end),
- PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2,
- fun (_Msg, _State) -> 0 end),
- GS2State = #gs2_state { parent = Parent,
- name = Name,
- mod = Mod,
- queue = Queue,
- debug = Debug,
- prioritise_call = PrioriCall,
- prioritise_cast = PrioriCast,
- prioritise_info = PrioriInfo },
+ GS2State = find_prioritizers(
+ #gs2_state { parent = Parent,
+ name = Name,
+ mod = Mod,
+ queue = Queue,
+ debug = Debug }),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -497,11 +488,11 @@ loop(GS2State = #gs2_state { time = hibernate,
timeout_state = undefined }) ->
pre_hibernate(GS2State);
loop(GS2State = #gs2_state { queue = Queue }) ->
- process_next_msg(GS2State #gs2_state { queue = drain(Queue) }).
+ process_next_msg(GS2State #gs2_state { queue = drain(Queue, GS2State) }).
-drain(Queue) ->
+drain(Queue, GS2State) ->
receive
- Input -> drain(in(Input, Queue))
+ Input -> drain(in(Input, Queue, GS2State), GS2State)
after 0 -> Queue
end.
@@ -531,7 +522,8 @@ process_next_msg(GS2State = #gs2_state { time = Time,
Input ->
%% Time could be 'hibernate' here, so *don't* call loop
process_next_msg(
- GS2State #gs2_state { queue = drain(in(Input, Queue1)) })
+ GS2State #gs2_state {
+ queue = drain(in(Input, Queue1, GS2State), GS2State) })
after Time1 ->
case HibOnTimeout of
true ->
@@ -553,7 +545,7 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS,
adjust_timeout_state(SleptAt, now(), TimeoutState)
end,
post_hibernate(GS2State #gs2_state { timeout_state = TimeoutState1,
- queue = drain(Queue) }).
+ queue = drain(Queue, GS2State) }).
hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
@@ -619,13 +611,19 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-%% THE MAGIC HAPPENS HERE
-in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+in({'$gen_pcast', {Priority, Msg}}, Queue, _GS2State) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
-in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+in({'$gen_pcall', From, {Priority, Msg}}, Queue, _GS2State) ->
priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
-in(Input, Queue) ->
- priority_queue:in(Input, Queue).
+in({'$gen_cast', Msg}, Queue, #gs2_state { prioritize_cast = PC,
+ state = State }) ->
+ priority_queue:in({'$gen_cast', Msg}, PC(Msg, State), Queue);
+in({'$gan_call', From, Msg}, Queue, #gs2_state { prioritize_call = PC,
+ state = State }) ->
+ priority_queue:in({'$gen_call', Msg}, PC(Msg, From, State), Queue);
+in(Input, Queue, #gs2_state { prioritize_info = PI,
+ state = State }) ->
+ priority_queue:in(Input, PI(Input, State), Queue).
process_msg(Msg,
GS2State = #gs2_state { parent = Parent,
@@ -993,7 +991,9 @@ system_code_change(GS2State = #gs2_state { mod = Mod,
_Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
{ok, NewState} ->
- {ok, [GS2State #gs2_state { state = NewState }]};
+ NewGS2State = find_prioritizers(
+ GS2State #gs2_state { state = NewState }),
+ {ok, [NewGS2State]};
Else ->
Else
end.
@@ -1159,9 +1159,28 @@ name_to_pid(Name) ->
Pid
end.
+find_prioritizers(GS2State = #gs2_state { mod = Mod }) ->
+ PrioriCall = function_exported_or_default(
+ Mod, 'prioritize_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritize_call = PrioriCall,
+ prioritize_cast = PrioriCast,
+ prioritize_info = PrioriInfo }.
+
function_exported_or_default(Mod, Fun, Ar, Default) ->
case erlang:function_exported(Mod, Fun, Ar) of
- true -> fun (Args) -> apply(Mod, Fun, Args) end;
+ true -> case Ar of
+ 2 -> fun (Msg, State) ->
+ Mod:Fun(Msg, State)
+ end;
+ 3 -> fun (Msg, From, State) ->
+ Mod:Fun(Msg, From, State)
+ end
+ end;
false -> Default
end.