diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-06 16:41:37 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-06 16:41:37 +0100 |
| commit | 4047821b9426947dedd69f8963c4fcfed20411e5 (patch) | |
| tree | 945564bb389a3fa6a960a0af9697e315843eb11c /src/delegate.erl | |
| parent | 88e5e89a024fc517054d503a6e10651dc286dafc (diff) | |
| parent | ae329cdb93b355bc73824d46245a5ee077be1644 (diff) | |
| download | rabbitmq-server-git-4047821b9426947dedd69f8963c4fcfed20411e5.tar.gz | |
merge default into bug23559
Diffstat (limited to 'src/delegate.erl')
| -rw-r--r-- | src/delegate.erl | 243 |
1 files changed, 90 insertions, 153 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b0d..17046201ad 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -1,41 +1,24 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. %% -%% The Original Code is RabbitMQ. +%% The Original Code is RabbitMQ. %% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% -module(delegate). --define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). - -behaviour(gen_server2). --export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,13 +27,14 @@ -ifdef(use_specs). --spec(start_link/2 :: - (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). - --spec(process_count/0 :: () -> non_neg_integer()). +-spec(invoke/2 :: + ( pid(), fun ((pid()) -> A)) -> A; + ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], + [{pid(), term()}]}). -endif. @@ -61,157 +45,110 @@ %%---------------------------------------------------------------------------- -start_link(Prefix, Hash) -> - gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). +start_link(Num) -> + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). +invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + Fun(Pid); invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), - case Res of - {ok, Result, _} -> + case invoke([Pid], Fun) of + {[{Pid, Result}], []} -> Result; - {error, {Class, Reason, StackTrace}, _} -> + {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; invoke(Pids, Fun) when is_list(Pids) -> - lists:foldl( - fun ({Status, Result, Pid}, {Good, Bad}) -> - case Status of - ok -> {[{Pid, Result}|Good], Bad}; - error -> {Good, [{Pid, Result}|Bad]} - end + {LocalPids, Grouped} = group_pids_by_node(Pids), + %% The use of multi_call is only safe because the timeout is + %% infinity, and thus there is no process spawned in order to do + %% the sending. Thus calls can't overtake preceding calls/casts. + {Replies, BadNodes} = + case orddict:fetch_keys(Grouped) of + [] -> {[], []}; + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, - {[], []}, - invoke_per_node(split_delegate_per_node(Pids), Fun)). + BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || + BadNode <- BadNodes, + Pid <- orddict:fetch(BadNode, Grouped)], + ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + [Results || {_Node, Results} <- Replies]]), + lists:foldl( + fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; + ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} + end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), +invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, Fun), %% we don't care about any error ok; +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result([Pid], Fun); invoke_no_result(Pids, Fun) when is_list(Pids) -> - invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + {LocalPids, Grouped} = group_pids_by_node(Pids), + case orddict:fetch_keys(Grouped) of + [] -> ok; + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}) + end, + safe_invoke(LocalPids, Fun), %% must not die ok. %%---------------------------------------------------------------------------- -internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). - -internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). - -split_delegate_per_node(Pids) -> +group_pids_by_node(Pids) -> LocalNode = node(), - {Local, Remote} = - lists:foldl( - fun (Pid, {L, D}) -> - Node = node(Pid), - case Node of - LocalNode -> {[Pid|L], D}; - _ -> {L, orddict:append(Node, Pid, D)} - end - end, - {[], orddict:new()}, Pids), - {Local, orddict:to_list(Remote)}. - -invoke_per_node(NodePids, Fun) -> - lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). - -invoke_no_result_per_node(NodePids, Fun) -> - delegate_per_node(NodePids, Fun, fun internal_cast/2), - ok. - -delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> - %% In the case where DelegateFun is internal_cast, the safe_invoke - %% is not actually async! However, in practice Fun will always be - %% something that does a gen_server:cast or similar, so I don't - %% think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - [safe_invoke(LocalPids, Fun)| - delegate_per_remote_node(NodePids, Fun, DelegateFun)]. - -delegate_per_remote_node(NodePids, Fun, DelegateFun) -> - Self = self(), - %% Note that this is unsafe if the Fun requires reentrancy to the - %% local_server. I.e. if self() == local_server(Node) then we'll - %% block forever. - [gen_server2:cast( - local_server(Node), - {thunk, fun () -> - Self ! {result, - DelegateFun( - Node, fun () -> safe_invoke(Pids, Fun) end)} - end}) || {Node, Pids} <- NodePids], - [receive {result, Result} -> Result end || _ <- NodePids]. - -local_server(Node) -> - case get({delegate_local_server_name, Node}) of - undefined -> - Name = server(outgoing, - erlang:phash2({self(), Node}, process_count())), - put({delegate_local_server_name, Node}, Name), - Name; - Name -> Name - end. - -remote_server(Node) -> - case get({delegate_remote_server_name, Node}) of - undefined -> - case rpc:call(Node, delegate, process_count, []) of - {badrpc, _} -> - %% Have to return something, if we're just casting - %% then we don't want to blow up - server(incoming, 1); - Count -> - Name = server(incoming, - erlang:phash2({self(), Node}, Count)), - put({delegate_remote_server_name, Node}, Name), - Name - end; - Name -> Name + lists:foldl( + fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> + {[Pid | Local], Remote}; + (Pid, {Local, Remote}) -> + {Local, + orddict:update( + node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} + end, {[], orddict:new()}, Pids). + +delegate_name(Hash) -> + list_to_atom("delegate_" ++ integer_to_list(Hash)). + +delegate(RemoteNodes) -> + case get(delegate) of + undefined -> Name = delegate_name( + erlang:phash2(self(), + delegate_sup:count(RemoteNodes))), + put(delegate, Name), + Name; + Name -> Name end. -server(Prefix, Hash) -> - list_to_atom("delegate_" ++ - atom_to_list(Prefix) ++ "_" ++ - integer_to_list(Hash)). - safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; safe_invoke(Pid, Fun) when is_pid(Pid) -> try - {ok, Fun(Pid), Pid} - catch - Class:Reason -> - {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + {ok, Pid, Fun(Pid)} + catch Class:Reason -> + {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. -process_count() -> - ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). - -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, no_state, hibernate, + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -%% We don't need a catch here; we always go via safe_invoke. A catch here would -%% be the wrong thing anyway since the Thunk can throw multiple errors. -handle_call({thunk, Thunk}, _From, State) -> - {reply, Thunk(), State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_cast({thunk, Thunk}, State) -> - Thunk(), - {noreply, State, hibernate}. +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. |
