summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-06 10:06:48 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-06 10:06:48 +0100
commit23ff3a598c34dc603ef7c2f95f73489060a23a8c (patch)
treec42ed7d535046939b598e77d21b5c0cef31c6933
parent6f8272c47d45a0d07e90738052f9bf2affde3e6d (diff)
parentd53d2e4cbf5a4c10623b469fb153aae28c593276 (diff)
downloadrabbitmq-server-git-23ff3a598c34dc603ef7c2f95f73489060a23a8c.tar.gz
merge default into bug21673
-rw-r--r--Makefile3
-rw-r--r--src/delegate.erl206
-rw-r--r--src/delegate_sup.erl63
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl97
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_exchange.erl11
-rw-r--r--src/rabbit_misc.erl50
-rw-r--r--src/rabbit_persister.erl4
-rw-r--r--src/rabbit_router.erl151
-rw-r--r--src/rabbit_tests.erl109
12 files changed, 473 insertions, 242 deletions
diff --git a/Makefile b/Makefile
index 2b08e071c2..3d39ccb071 100644
--- a/Makefile
+++ b/Makefile
@@ -167,6 +167,9 @@ start-cover: all
echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL)
echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+start-secondary-cover: all
+ echo "rabbit_misc:start_cover([\"hare\"])." | $(ERL_CALL)
+
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
cat cover/summary.txt
diff --git a/src/delegate.erl b/src/delegate.erl
new file mode 100644
index 0000000000..12eb814f8f
--- /dev/null
+++ b/src/delegate.erl
@@ -0,0 +1,206 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate).
+
+-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+
+-spec(process_count/0 :: () -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link(Hash) ->
+ gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
+
+invoke(Pid, Fun) when is_pid(Pid) ->
+ [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ case Res of
+ {ok, Result, _} ->
+ Result;
+ {error, {Class, Reason, StackTrace}, _} ->
+ erlang:raise(Class, Reason, StackTrace)
+ end;
+
+invoke(Pids, Fun) when is_list(Pids) ->
+ lists:foldl(
+ fun({Status, Result, Pid}, {Good, Bad}) ->
+ case Status of
+ ok -> {[{Pid, Result}|Good], Bad};
+ error -> {Good, [{Pid, Result}|Bad]}
+ end
+ end,
+ {[], []},
+ invoke_per_node(split_delegate_per_node(Pids), Fun)).
+
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ ok;
+
+invoke_no_result(Pids, Fun) when is_list(Pids) ->
+ invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+internal_call(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
+
+internal_cast(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
+
+split_delegate_per_node(Pids) ->
+ orddict:to_list(
+ lists:foldl(
+ fun (Pid, D) ->
+ orddict:update(node(Pid),
+ fun (Pids1) -> [Pid | Pids1] end,
+ [Pid], D)
+ end,
+ orddict:new(), Pids)).
+
+invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
+ safe_invoke(Pids, Fun);
+invoke_per_node(NodePids, Fun) ->
+ lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
+
+invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
+ %% This is not actually async! However, in practice Fun will
+ %% always be something that does a gen_server:cast or similar, so
+ %% I don't think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ safe_invoke(Pids, Fun);
+invoke_no_result_per_node(NodePids, Fun) ->
+ delegate_per_node(NodePids, Fun, fun internal_cast/2),
+ ok.
+
+delegate_per_node(NodePids, Fun, DelegateFun) ->
+ Self = self(),
+ %% Note that this is unsafe if the Fun requires reentrancy to the
+ %% local_server. I.e. if self() == local_server(Node) then we'll
+ %% block forever.
+ [gen_server2:cast(
+ local_server(Node),
+ {thunk, fun() ->
+ Self ! {result,
+ DelegateFun(
+ Node, fun() -> safe_invoke(Pids, Fun) end)}
+ end}) || {Node, Pids} <- NodePids],
+ [receive {result, Result} -> Result end || _ <- NodePids].
+
+local_server(Node) ->
+ case get({delegate_local_server_name, Node}) of
+ undefined ->
+ Name = server(erlang:phash2({self(), Node}, process_count())),
+ put({delegate_local_server_name, Node}, Name),
+ Name;
+ Name -> Name
+ end.
+
+remote_server(Node) ->
+ case get({delegate_remote_server_name, Node}) of
+ undefined ->
+ case rpc:call(Node, delegate, process_count, []) of
+ {badrpc, _} ->
+ %% Have to return something, if we're just casting
+ %% then we don't want to blow up
+ server(1);
+ Count ->
+ Name = server(erlang:phash2({self(), Node}, Count)),
+ put({delegate_remote_server_name, Node}, Name),
+ Name
+ end;
+ Name -> Name
+ end.
+
+server(Hash) ->
+ list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
+
+safe_invoke(Pids, Fun) when is_list(Pids) ->
+ [safe_invoke(Pid, Fun) || Pid <- Pids];
+safe_invoke(Pid, Fun) when is_pid(Pid) ->
+ try
+ {ok, Fun(Pid), Pid}
+ catch
+ Class:Reason ->
+ {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ end.
+
+process_count() ->
+ ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
+
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, no_state, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+%% We don't need a catch here; we always go via safe_invoke. A catch here would
+%% be the wrong thing anyway since the Thunk can throw multiple errors.
+handle_call({thunk, Thunk}, _From, State) ->
+ {reply, Thunk(), State, hibernate}.
+
+handle_cast({thunk, Thunk}, State) ->
+ Thunk(),
+ {noreply, State, hibernate}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
new file mode 100644
index 0000000000..1c1d62a95d
--- /dev/null
+++ b/src/delegate_sup.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init(_Args) ->
+ {ok, {{one_for_one, 10, 10},
+ [{Hash, {delegate, start_link, [Hash]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Hash <- lists:seq(0, delegate:process_count() - 1)]}}.
+
+%%----------------------------------------------------------------------------
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 94a23fb90a..5b899cdbc7 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -443,7 +443,7 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
name({local,Name}) -> Name;
name({global,Name}) -> Name;
%% name(Pid) when is_pid(Pid) -> Pid;
-%% when R11 goes away, drop the line beneath and uncomment the line above
+%% when R12 goes away, drop the line beneath and uncomment the line above
name(Name) -> Name.
unregister_name({local,Name}) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7ca5b07b4b..67f8df947b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -112,10 +112,10 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_router,
- [{description, "cluster router"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_router]}},
+-rabbit_boot_step({delegate_sup,
+ [{description, "cluster delegate"},
+ {mfa, {rabbit_sup, start_child,
+ [delegate_sup]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2d75b15b64..41799a9210 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -94,7 +94,7 @@
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
@@ -231,10 +231,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, info, infinity).
+ delegate_pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -244,7 +244,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, consumers, infinity).
+ delegate_pcall(QPid, 9, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -253,15 +253,15 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
txn = Txn, sender = ChPid, message = Message}) ->
@@ -276,25 +276,23 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
rollback_all(QPids, Txn, ChPid) ->
- safe_pmap_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end,
- QPids).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
notify_down_all(QPids, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
@@ -302,38 +300,36 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
- QPids).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
+ end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
+ delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+ delegate_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
- QPids).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
internal_delete(QueueName) ->
case
@@ -392,17 +388,28 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_pmap_ok(H, F, L) ->
- case [R || R <- rabbit_misc:upmap(
- fun (V) ->
- try
- rabbit_misc:with_exit_handler(
- fun () -> H(V) end,
- fun () -> F(V) end)
- catch Class:Reason -> {Class, Reason}
- end
- end, L),
- R =/= ok] of
- [] -> ok;
- Errors -> {error, Errors}
+safe_delegate_call_ok(H, F, Pids) ->
+ {_, Bad} = delegate:invoke(Pids,
+ fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> H(Pid) end,
+ fun () -> F(Pid) end)
+ end),
+ case Bad of
+ [] -> ok;
+ _ -> {error, Bad}
end.
+
+delegate_call(Pid, Msg, Timeout) ->
+ delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
+
+delegate_pcall(Pid, Pri, Msg, Timeout) ->
+ delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
+
+delegate_cast(Pid, Msg) ->
+ delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+
+delegate_pcast(Pid, Pri, Msg) ->
+ delegate:invoke_no_result(Pid,
+ fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7d3cd7225d..1f16ec080e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -944,13 +944,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey, self()) of
- ok -> NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ});
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "rollback failed: ~w", [Errors])
- end.
+ ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey, self()),
+ NewUAMQ = queue:join(UAQ, UAMQ),
+ new_tx(State#ch{unacked_message_q = NewUAMQ}).
rollback_and_notify(State = #ch{transaction_id = none}) ->
notify_queues(State);
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 6f52dd7c08..8f41392f83 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -341,16 +341,7 @@ delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
contains(Table, MatchHead) ->
- try
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- case mnesia:match_object(Table, MatchHead, read) of
- [] -> false;
- [_|_] -> true
- end
- end.
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 59ba277610..723b818b41 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -537,51 +537,19 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
- ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
- %% TODO: simplify this code by using the 're' module, once we drop
- %% support for R11
- %%
- %% 1) sanity check
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of
- {match, _, _} ->
- %% 2) strip <>
- Str1 = string:substr(Str, 2, string:len(Str) - 2),
- %% 3) extract three constituent parts, taking care to
- %% handle dots in the node part (hence the reverse and concat)
- [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
- NodeStr = lists:concat(lists:reverse(Rest)),
- %% 4) construct a triple term from the three parts
- TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
- [NodeStr, IdStr, SerStr])),
- %% 5) parse the triple
- Tokens = case erl_scan:string(TripleStr) of
- {ok, Tokens1, _} -> Tokens1;
- {error, _, _} -> ErrorFun()
- end,
- Term = case erl_parse:parse_term(Tokens) of
- {ok, Term1} -> Term1;
- {error, _} -> ErrorFun()
- end,
- {Node, Id, Ser} =
- case Term of
- {Node1, Id1, Ser1} when is_atom(Node1) andalso
- is_integer(Id1) andalso
- is_integer(Ser1) ->
- Term;
- _ ->
- ErrorFun()
- end,
- %% 6) turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(Node),
+ case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ [{capture,all_but_first,list}]) of
+ {match, [NodeStr, IdStr, SerStr]} ->
+ %% turn the triple into a pid - see pid_to_string
+ <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ Id = list_to_integer(IdStr),
+ Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- ErrorFun();
- Error ->
- %% invalid regexp - shouldn't happen
- throw(Error)
- end.
+ throw({error, {invalid_pid_syntax, Str}})
+ end.
version_compare(A, B, lte) ->
case version_compare(A, B) of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a8e41baf74..3cd42e4753 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -184,9 +184,7 @@ handle_cast(_Msg, State) ->
handle_info(timeout, State = #pstate{deadline = infinity}) ->
State1 = flush(true, State),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State1, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
+ {noreply, State1, hibernate};
handle_info(timeout, State) ->
do_noreply(flush(State));
handle_info(_Info, State) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index a449e19eb4..03979d6c60 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,100 +33,40 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--behaviour(gen_server2).
-
--export([start_link/0,
- deliver/2,
+-export([deliver/2,
match_bindings/2,
match_routing_key/2]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
-%% cross-node routing optimisation is disabled because of bug 19758.
--define(BUG19758, true).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
--ifdef(BUG19758).
-
-deliver(QPids, Delivery) ->
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery)).
-
--else.
+deliver(QPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ delegate:invoke_no_result(
+ QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {routed, QPids};
deliver(QPids, Delivery) ->
- %% we reduce inter-node traffic by grouping the qpids by node and
- %% only delivering one copy of the message to each node involved,
- %% which then in turn delivers it to its queues.
- deliver_per_node(
- dict:to_list(
- lists:foldl(fun (QPid, D) ->
- rabbit_misc:dict_cons(node(QPid), QPid, D)
- end, dict:new(), QPids)),
- Delivery).
-
-deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
- %% optimisation
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery));
-deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver in run_bindings below will deliver the
- %% message to the queue process asynchronously, and return true,
- %% which means all the QPids will always be returned. It is
- %% therefore safe to use a fire-and-forget cast here and return
- %% the QPids - the semantics is preserved. This scales much better
- %% than the non-immediate case below.
- {routed,
- lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Delivery) ->
- R = rabbit_misc:upmap(
- fun ({Node, QPids}) ->
- try gen_server2:call({?SERVER, Node},
- {deliver, QPids, Delivery},
- infinity)
- catch
- _Class:_Reason ->
- %% TODO: figure out what to log (and do!) here
- {false, []}
- end
- end,
- NodeQPids),
- {Routed, Handled} =
- lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) ->
- {Routed or RoutedAcc,
- %% we do the concatenation below, which
- %% should be faster
- [Handled | HandledAcc]}
- end,
- {false, []},
- R),
+ {Success, _} =
+ delegate:invoke(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, lists:append(Handled)}).
-
--endif.
+ {Routed, Handled}).
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
@@ -137,20 +77,7 @@ match_bindings(Name, Match) ->
mnesia:table(rabbit_route),
ExchangeName == Name,
Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
+ lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
match_routing_key(Name, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
@@ -170,44 +97,8 @@ lookup_qpids(Queues) ->
%%--------------------------------------------------------------------
-init([]) ->
- {ok, no_state}.
-
-handle_call({deliver, QPids, Delivery}, From, State) ->
- spawn(
- fun () ->
- R = run_bindings(QPids, Delivery),
- gen_server2:reply(From, R)
- end),
- {noreply, State}.
-
-handle_cast({deliver, QPids, Delivery}, State) ->
- %% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Delivery),
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-run_bindings(QPids, Delivery) ->
- lists:foldl(
- fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(QPid, Delivery) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', _Reason} -> {Routed, Handled}
- end
- end,
- {false, []},
- QPids).
+fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
+fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {unroutable, []};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c8de79845d..9c659652e5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -68,7 +68,32 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
- passed = test_hooks(),
+ passed = maybe_run_cluster_dependent_tests(),
+ passed.
+
+
+maybe_run_cluster_dependent_tests() ->
+ SecondaryNode = rabbit_misc:makenode("hare"),
+
+ case net_adm:ping(SecondaryNode) of
+ pong -> passed = run_cluster_dependent_tests(SecondaryNode);
+ pang -> io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end,
+ passed.
+
+run_cluster_dependent_tests(SecondaryNode) ->
+ SecondaryNodeS = atom_to_list(SecondaryNode),
+
+ ok = control_action(stop_app, []),
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
+ ok = control_action(start_app, []),
+
+ io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
+ passed = test_delegates_async(SecondaryNode),
+ passed = test_delegates_sync(SecondaryNode),
+
passed.
test_priority_queue() ->
@@ -905,6 +930,88 @@ test_hooks() ->
end,
passed.
+test_delegates_async(SecondaryNode) ->
+ Self = self(),
+ Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+
+ Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
+
+ ok = delegate:invoke_no_result(spawn(Responder), Sender),
+ ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
+ await_response(2),
+
+ LocalPids = spawn_responders(node(), Responder, 10),
+ RemotePids = spawn_responders(SecondaryNode, Responder, 10),
+ ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
+ await_response(20),
+
+ passed.
+
+make_responder(FMsg) ->
+ fun() ->
+ receive Msg -> FMsg(Msg)
+ after 1000 -> throw(timeout)
+ end
+ end.
+
+spawn_responders(Node, Responder, Count) ->
+ [spawn(Node, Responder) || _ <- lists:seq(1, Count)].
+
+await_response(0) ->
+ ok;
+await_response(Count) ->
+ receive
+ response -> ok,
+ await_response(Count - 1)
+ after 1000 ->
+ io:format("Async reply not received~n"),
+ throw(timeout)
+ end.
+
+must_exit(Fun) ->
+ try
+ Fun(),
+ throw(exit_not_thrown)
+ catch
+ exit:_ -> ok
+ end.
+
+test_delegates_sync(SecondaryNode) ->
+ Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun(_Pid) -> exit(exception) end,
+
+ Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end),
+
+ response = delegate:invoke(spawn(Responder), Sender),
+ response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
+
+ must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
+ must_exit(fun() ->
+ delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+
+ LocalGoodPids = spawn_responders(node(), Responder, 2),
+ RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), Responder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+
+ {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
+ true = lists:all(fun ({_, response}) -> true end, GoodRes),
+ GoodResPids = [Pid || {Pid, _} <- GoodRes],
+
+ Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
+ Good = ordsets:from_list(GoodResPids),
+
+ {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
+ true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
+ BadResPids = [Pid || {Pid, _} <- BadRes],
+
+ Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
+ Bad = ordsets:from_list(BadResPids),
+
+ passed.
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).