diff options
| author | Simon MacMullen <simon@lshift.net> | 2010-03-17 18:21:05 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@lshift.net> | 2010-03-17 18:21:05 +0000 |
| commit | ce5c7109aedf7bcd2fdd75d23682d242e3c64c8d (patch) | |
| tree | 70460793a6b64482167886de911968f52e7c7f6d | |
| parent | 7402cf2c5de8c9174cc0cbdce1c37475fad14480 (diff) | |
| download | rabbitmq-server-git-ce5c7109aedf7bcd2fdd75d23682d242e3c64c8d.tar.gz | |
Return statuses (and pids where appropriate).
| -rw-r--r-- | src/delegate.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 38 |
2 files changed, 45 insertions, 21 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 3a26c41099..d469e464ad 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -45,13 +45,14 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). + delegate_sync(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); delegate_sync(Pid, FPid) when is_pid(Pid) -> - [[Res]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_sync/2, FPid)), - Res; + [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_sync/2, FPid)), + {Status, Res}; delegate_sync(Pids, FPid) when is_list(Pids) -> lists:flatten( @@ -63,11 +64,13 @@ delegate_async(Node, Thunk) when is_atom(Node) -> delegate_async(Pid, FPid) when is_pid(Pid) -> delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_async/2, FPid)); + f_pid_node(fun delegate_async/2, FPid)), + ok; delegate_async(Pids, FPid) when is_list(Pids) -> delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_async/2, FPid)). + f_pid_node(fun delegate_async/2, FPid)), + ok. %%---------------------------------------------------------------------------- @@ -90,15 +93,18 @@ f_pid_node(DelegateFun, FPid) -> % we improve this? delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> % optimisation - [[FPidNode(Pid, node()) || Pid <- Pids]]; + [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids]]; delegate_per_node(NodePids, FPidNode) -> rabbit_misc:upmap( fun ({Node, Pids}) -> - [FPidNode(Pid, Node) || Pid <- Pids] + [add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] end, NodePids). +add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; +add_pid(Status, Pid) -> {Status, Pid}. + server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). @@ -111,7 +117,13 @@ init([]) -> {ok, no_state}. handle_call({thunk, Thunk}, _From, State) -> - {reply, catch Thunk(), State}. + Res = case catch Thunk() of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}}; + Result -> + {ok, Result} + end, + {reply, Res, State}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 29ec79999d..c40248319d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -830,13 +830,13 @@ test_delegates_async() -> end end, - delegate:delegate_async(spawn(Receiver), Sender), - delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), + ok = delegate:delegate_async(spawn(Receiver), Sender), + ok = delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), await_response(2), LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], - delegate:delegate_async(LocalPids ++ RemotePids, Sender), + ok = delegate:delegate_async(LocalPids ++ RemotePids, Sender), await_response(20), passed. @@ -855,8 +855,8 @@ await_response(Count) -> test_delegates_sync() -> SecondaryNode = rabbit_misc:makenode("hare"), - "foo" = delegate:delegate_sync(node(), fun() -> "foo" end), - "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), + {ok, "foo"} = delegate:delegate_sync(node(), fun() -> "foo" end), + {ok, "bar"} = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), Sender = fun(Pid) -> gen_server2:call(Pid, invoked) @@ -882,21 +882,33 @@ test_delegates_sync() -> end end, - response = delegate:delegate_sync(spawn(Responder), Sender), - response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), + {ok, response} = delegate:delegate_sync(spawn(Responder), Sender), + {ok, response} = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), - {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender), - {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), + {error, _} = delegate:delegate_sync(spawn(BadResponder), Sender), + {error, _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - [response, response, response, response] = - delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), - [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] = - delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + GoodRes = delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), + [{ok, response, _}, {ok, response, _}, + {ok, response, _}, {ok, response, _}] = GoodRes, + + BadRes = delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + [{error, _, _}, {error, _, _}, + {error, _, _}, {error, _, _}] = BadRes, + + GoodResPids = [Pid || {_, _, Pid} <- GoodRes], + BadResPids = [Pid || {_, _, Pid} <- BadRes], + + Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), + Good = ordsets:from_list(GoodResPids), + + Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), + Bad = ordsets:from_list(BadResPids), passed. |
