summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-03-17 18:21:05 +0000
committerSimon MacMullen <simon@lshift.net>2010-03-17 18:21:05 +0000
commitce5c7109aedf7bcd2fdd75d23682d242e3c64c8d (patch)
tree70460793a6b64482167886de911968f52e7c7f6d
parent7402cf2c5de8c9174cc0cbdce1c37475fad14480 (diff)
downloadrabbitmq-server-git-ce5c7109aedf7bcd2fdd75d23682d242e3c64c8d.tar.gz
Return statuses (and pids where appropriate).
-rw-r--r--src/delegate.erl28
-rw-r--r--src/rabbit_tests.erl38
2 files changed, 45 insertions, 21 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 3a26c41099..d469e464ad 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -45,13 +45,14 @@
start_link(Hash) ->
gen_server2:start_link({local, server(Hash)},
?MODULE, [], []).
+
delegate_sync(Node, Thunk) when is_atom(Node) ->
gen_server2:call({server(), Node}, {thunk, Thunk}, infinity);
delegate_sync(Pid, FPid) when is_pid(Pid) ->
- [[Res]] = delegate_per_node([{node(Pid), [Pid]}],
- f_pid_node(fun delegate_sync/2, FPid)),
- Res;
+ [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}],
+ f_pid_node(fun delegate_sync/2, FPid)),
+ {Status, Res};
delegate_sync(Pids, FPid) when is_list(Pids) ->
lists:flatten(
@@ -63,11 +64,13 @@ delegate_async(Node, Thunk) when is_atom(Node) ->
delegate_async(Pid, FPid) when is_pid(Pid) ->
delegate_per_node([{node(Pid), [Pid]}],
- f_pid_node(fun delegate_async/2, FPid));
+ f_pid_node(fun delegate_async/2, FPid)),
+ ok;
delegate_async(Pids, FPid) when is_list(Pids) ->
delegate_per_node(split_per_node(Pids),
- f_pid_node(fun delegate_async/2, FPid)).
+ f_pid_node(fun delegate_async/2, FPid)),
+ ok.
%%----------------------------------------------------------------------------
@@ -90,15 +93,18 @@ f_pid_node(DelegateFun, FPid) ->
% we improve this?
delegate_per_node([{Node, Pids}], FPidNode) when Node == node() ->
% optimisation
- [[FPidNode(Pid, node()) || Pid <- Pids]];
+ [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids]];
delegate_per_node(NodePids, FPidNode) ->
rabbit_misc:upmap(
fun ({Node, Pids}) ->
- [FPidNode(Pid, Node) || Pid <- Pids]
+ [add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids]
end,
NodePids).
+add_pid({Status, Result}, Pid) -> {Status, Result, Pid};
+add_pid(Status, Pid) -> {Status, Pid}.
+
server() ->
server(erlang:phash(self(), ?DELEGATE_PROCESSES)).
@@ -111,7 +117,13 @@ init([]) ->
{ok, no_state}.
handle_call({thunk, Thunk}, _From, State) ->
- {reply, catch Thunk(), State}.
+ Res = case catch Thunk() of
+ {'EXIT', Reason} ->
+ {error, {'EXIT', Reason}};
+ Result ->
+ {ok, Result}
+ end,
+ {reply, Res, State}.
handle_cast({thunk, Thunk}, State) ->
catch Thunk(),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 29ec79999d..c40248319d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -830,13 +830,13 @@ test_delegates_async() ->
end
end,
- delegate:delegate_async(spawn(Receiver), Sender),
- delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender),
+ ok = delegate:delegate_async(spawn(Receiver), Sender),
+ ok = delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender),
await_response(2),
LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)],
RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)],
- delegate:delegate_async(LocalPids ++ RemotePids, Sender),
+ ok = delegate:delegate_async(LocalPids ++ RemotePids, Sender),
await_response(20),
passed.
@@ -855,8 +855,8 @@ await_response(Count) ->
test_delegates_sync() ->
SecondaryNode = rabbit_misc:makenode("hare"),
- "foo" = delegate:delegate_sync(node(), fun() -> "foo" end),
- "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end),
+ {ok, "foo"} = delegate:delegate_sync(node(), fun() -> "foo" end),
+ {ok, "bar"} = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end),
Sender = fun(Pid) ->
gen_server2:call(Pid, invoked)
@@ -882,21 +882,33 @@ test_delegates_sync() ->
end
end,
- response = delegate:delegate_sync(spawn(Responder), Sender),
- response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender),
+ {ok, response} = delegate:delegate_sync(spawn(Responder), Sender),
+ {ok, response} = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender),
- {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender),
- {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender),
+ {error, _} = delegate:delegate_sync(spawn(BadResponder), Sender),
+ {error, _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender),
LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)],
RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)],
LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)],
RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)],
- [response, response, response, response] =
- delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender),
- [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] =
- delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender),
+ GoodRes = delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender),
+ [{ok, response, _}, {ok, response, _},
+ {ok, response, _}, {ok, response, _}] = GoodRes,
+
+ BadRes = delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender),
+ [{error, _, _}, {error, _, _},
+ {error, _, _}, {error, _, _}] = BadRes,
+
+ GoodResPids = [Pid || {_, _, Pid} <- GoodRes],
+ BadResPids = [Pid || {_, _, Pid} <- BadRes],
+
+ Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
+ Good = ordsets:from_list(GoodResPids),
+
+ Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
+ Bad = ordsets:from_list(BadResPids),
passed.