summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-09-13 09:17:09 -0700
committerJerry Kuch <jerryk@vmware.com>2011-09-13 09:17:09 -0700
commitb114149300ca0f3abd2cfab72141448250b3871a (patch)
treedebbcf02264160507e642952279663523fe1edc0 /src
parent7ae03f5419de41a00d9fd0e5df09d44ff5015c85 (diff)
parent090eee48366f13314f7b86584c0a7977f322d96a (diff)
downloadrabbitmq-server-git-b114149300ca0f3abd2cfab72141448250b3871a.tar.gz
Merge bug 23056
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/rabbit.erl15
-rw-r--r--src/rabbit_basic.erl74
-rw-r--r--src/rabbit_control.erl31
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_prelaunch.erl36
-rw-r--r--src/rabbit_tests.erl13
-rw-r--r--src/rabbit_upgrade.erl8
9 files changed, 78 insertions, 113 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 35258139ca..ab6c4e64f3 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -616,11 +616,11 @@ in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
process_msg({system, From, Req},
GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
+ %% gen_server puts Hib on the end as the 7th arg, but that version
+ %% of the fun seems not to be documented so leaving out for now.
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
process_msg({'EXIT', Parent, Reason} = Msg,
GS2State = #gs2_state { parent = Parent }) ->
- %% gen_server puts Hib on the end as the 7th arg, but that version
- %% of the fun seems not to be documented so leaving out for now.
terminate(Reason, Msg, GS2State);
process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
handle_msg(Msg, GS2State);
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 20b3e275d2..3e3117471f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,7 +18,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0,
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
+ is_running/0 , is_running/1, environment/0,
rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -196,6 +197,8 @@
{os, {atom(), atom()}} |
{erlang_version, string()} |
{memory, any()}]).
+-spec(is_running/0 :: () -> boolean()).
+-spec(is_running/1 :: (node()) -> boolean()).
-spec(environment/0 :: () -> [{atom() | term()}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
@@ -241,11 +244,19 @@ stop_and_halt() ->
status() ->
[{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications()},
+ {running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
{memory, erlang:memory()}].
+is_running() -> is_running(node()).
+
+is_running(Node) ->
+ case rpc:call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(rabbit, Apps)
+ end.
+
environment() ->
lists:keysort(
1, [P || P = {K, _} <- application:get_all_env(rabbit),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9cc406e718..b266d3664d 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/4]).
--export([publish/4, publish/6]).
+-export([publish/4, publish/6, publish/1,
+ message/3, message/4, properties/1, delivery/4]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -35,6 +35,12 @@
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
+-spec(publish/4 ::
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) -> publish_result()).
+-spec(publish/6 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+ properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
@@ -49,12 +55,6 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(publish/4 ::
- (exchange_input(), rabbit_router:routing_key(), properties_input(),
- body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -64,13 +64,34 @@
%%----------------------------------------------------------------------------
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined));
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined)).
+
publish(Delivery = #delivery{
- message = #basic_message{exchange_name = ExchangeName}}) ->
- case rabbit_exchange:lookup(ExchangeName) of
+ message = #basic_message{exchange_name = XName}}) ->
+ case rabbit_exchange:lookup(XName) of
{ok, X} -> publish(X, Delivery);
- Other -> Other
+ Err -> Err
end.
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
@@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
-message(ExchangeName, RoutingKey,
- #content{properties = Props} = DecodedContent) ->
+message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
+ exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent),
@@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, Body) ->
+message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
- {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ {ok, Msg} = message(XName, RoutingKey, Content),
Msg.
properties(P = #'P_basic'{}) ->
@@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0;
indexof([Element | _Rest], Element, N) -> N;
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
- Err -> Err
- end.
-
-publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
- {ok, RoutingRes, DeliveredQPids}.
-
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index b9e550c9f1..1163ae9d86 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -17,7 +17,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1, log_action/3]).
+-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
@@ -50,7 +50,6 @@
-> 'ok').
-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
--spec(log_action/3 :: (node(), string(), [term()]) -> ok).
-endif.
@@ -73,7 +72,6 @@ start() ->
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
- rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -362,7 +360,7 @@ wait_for_application(Node, PidFile, Inform) ->
wait_for_application(Node, Pid) ->
case process_up(Pid) of
- true -> case node_up(Node) of
+ true -> case rabbit:is_running(Node) of
true -> ok;
false -> timer:sleep(1000),
wait_for_application(Node, Pid)
@@ -378,12 +376,6 @@ wait_and_read_pid_file(PidFile) ->
{error, _} = E -> exit({error, {could_not_read_pid, E}})
end.
-node_up(Node) ->
- case rpc_call(Node, application, which_applications, [infinity]) of
- {badrpc, _} -> false;
- Apps -> proplists:is_defined(rabbit, Apps)
- end.
-
% Test using some OS clunkiness since we shouldn't trust
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
@@ -521,22 +513,3 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
-
-log_action(Node, Command, Args) ->
- rabbit_misc:with_local_io(
- fun () ->
- error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n",
- [Node, Command,
- format_args(mask_args(Command, Args))])
- end).
-
-%% Mask passwords and other sensitive info before logging.
-mask_args("add_user", [Name, _Password | Args]) ->
- [Name, "****" | Args];
-mask_args("change_password", [Name, _Password | Args]) ->
- [Name, "****" | Args];
-mask_args(_, Args) ->
- Args.
-
-format_args(Args) ->
- string:join([io_lib:format("~p", [A]) || A <- Args], " ").
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 68afaf5d7c..6f9a46504f 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -72,7 +72,7 @@ list() ->
%%----------------------------------------------------------------------------
connect(Username, VHost, Protocol, Pid, Infos) ->
- case lists:keymember(rabbit, 1, application:which_applications()) of
+ case rabbit:is_running() of
true ->
case rabbit_access_control:check_user_login(Username, []) of
{ok, User} ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ae28722ab2..0b39a20927 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -749,7 +749,7 @@ recursive_delete(Files) ->
end, ok, Files).
recursive_delete1(Path) ->
- case filelib:is_dir(Path) of
+ case filelib:is_dir(Path) and not(is_symlink(Path)) of
false -> case file:delete(Path) of
ok -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
@@ -777,6 +777,12 @@ recursive_delete1(Path) ->
end
end.
+is_symlink(Name) ->
+ case file:read_link(Name) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
recursive_copy(Src, Dest) ->
case filelib:is_dir(Src) of
false -> case file:copy(Src, Dest) of
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index e3cf8ebef0..9fe073d997 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -139,38 +139,10 @@ determine_version(App) ->
{App, Vsn}.
delete_recursively(Fn) ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true ->
- case file:list_dir(Fn) of
- {ok, Files} ->
- case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
- Fn ++ "/" ++ Fn1);
- (_Fn1, Err) -> Err
- end, ok, Files) of
- ok -> case file:del_dir(Fn) of
- ok -> ok;
- {error, E} -> {error,
- {cannot_delete, Fn, E}}
- end;
- Err -> Err
- end;
- {error, E} ->
- {error, {cannot_list_files, Fn, E}}
- end;
- false ->
- case filelib:is_file(Fn) of
- true -> case file:delete(Fn) of
- ok -> ok;
- {error, E} -> {error, {cannot_delete, Fn, E}}
- end;
- false -> ok
- end
- end.
-
-is_symlink(Name) ->
- case file:read_link(Name) of
- {ok, _} -> true;
- _ -> false
+ case rabbit_misc:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
end.
unpack_ez_plugins(SrcDir, DestDir) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cd5d9be0cc..7e84251fc4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -757,13 +757,23 @@ test_topic_expect_match(X, List) ->
end, List).
test_app_management() ->
- %% starting, stopping, status
+ control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]),
+ %% Starting, stopping and diagnostics. Note that we don't try
+ %% 'report' when the rabbit app is stopped and that we enable
+ %% tracing for the duration of this function.
+ ok = control_action(trace_on, []),
ok = control_action(stop_app, []),
ok = control_action(stop_app, []),
ok = control_action(status, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
ok = control_action(start_app, []),
ok = control_action(start_app, []),
ok = control_action(status, []),
+ ok = control_action(report, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
+ ok = control_action(trace_off, []),
passed.
test_log_management() ->
@@ -1146,6 +1156,7 @@ test_user_management() ->
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
control_action(add_user, ["foo", "bar"]),
+ ok = control_action(clear_password, ["foo"]),
ok = control_action(change_password, ["foo", "baz"]),
TestTags = fun (Tags) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 9739f6b7d5..e7a302f80d 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -228,13 +228,7 @@ secondary_upgrade(AllNodes) ->
ok.
nodes_running(Nodes) ->
- [N || N <- Nodes, node_running(N)].
-
-node_running(Node) ->
- case rpc:call(Node, application, which_applications, []) of
- {badrpc, _} -> false;
- Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
- end.
+ [N || N <- Nodes, rabbit:is_running(N)].
%% -------------------------------------------------------------------