diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2011-09-13 09:17:09 -0700 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2011-09-13 09:17:09 -0700 |
| commit | b114149300ca0f3abd2cfab72141448250b3871a (patch) | |
| tree | debbcf02264160507e642952279663523fe1edc0 /src | |
| parent | 7ae03f5419de41a00d9fd0e5df09d44ff5015c85 (diff) | |
| parent | 090eee48366f13314f7b86584c0a7977f322d96a (diff) | |
| download | rabbitmq-server-git-b114149300ca0f3abd2cfab72141448250b3871a.tar.gz | |
Merge bug 23056
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 4 | ||||
| -rw-r--r-- | src/rabbit.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 8 |
9 files changed, 78 insertions, 113 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 35258139ca..ab6c4e64f3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -616,11 +616,11 @@ in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> process_msg({system, From, Req}, GS2State = #gs2_state { parent = Parent, debug = Debug }) -> + %% gen_server puts Hib on the end as the 7th arg, but that version + %% of the fun seems not to be documented so leaving out for now. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); process_msg({'EXIT', Parent, Reason} = Msg, GS2State = #gs2_state { parent = Parent }) -> - %% gen_server puts Hib on the end as the 7th arg, but that version - %% of the fun seems not to be documented so leaving out for now. terminate(Reason, Msg, GS2State); process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> handle_msg(Msg, GS2State); diff --git a/src/rabbit.erl b/src/rabbit.erl index 20b3e275d2..3e3117471f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,7 +18,8 @@ -behaviour(application). --export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0, +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, + is_running/0 , is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -196,6 +197,8 @@ {os, {atom(), atom()}} | {erlang_version, string()} | {memory, any()}]). +-spec(is_running/0 :: () -> boolean()). +-spec(is_running/1 :: (node()) -> boolean()). -spec(environment/0 :: () -> [{atom() | term()}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). @@ -241,11 +244,19 @@ stop_and_halt() -> status() -> [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications()}, + {running_applications, application:which_applications(infinity)}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, {memory, erlang:memory()}]. +is_running() -> is_running(node()). + +is_running(Node) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(rabbit, Apps) + end. + environment() -> lists:keysort( 1, [P || P = {K, _} <- application:get_all_env(rabbit), diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9cc406e718..b266d3664d 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/3, message/4, properties/1, delivery/4]). --export([publish/4, publish/6]). +-export([publish/4, publish/6, publish/1, + message/3, message/4, properties/1, delivery/4]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -35,6 +35,12 @@ -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). -type(body_input() :: (binary() | [binary()])). +-spec(publish/4 :: + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> publish_result()). +-spec(publish/6 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), + properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). -spec(delivery/4 :: @@ -49,12 +55,6 @@ rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(publish/4 :: - (exchange_input(), rabbit_router:routing_key(), properties_input(), - body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), - properties_input(), body_input()) -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -64,13 +64,34 @@ %%---------------------------------------------------------------------------- +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> + publish(X, delivery(Mandatory, Immediate, + message(XName, RKey, properties(Props), Body), + undefined)); +publish(XName, RKey, Mandatory, Immediate, Props, Body) -> + publish(delivery(Mandatory, Immediate, + message(XName, RKey, properties(Props), Body), + undefined)). + publish(Delivery = #delivery{ - message = #basic_message{exchange_name = ExchangeName}}) -> - case rabbit_exchange:lookup(ExchangeName) of + message = #basic_message{exchange_name = XName}}) -> + case rabbit_exchange:lookup(XName) of {ok, X} -> publish(X, Delivery); - Other -> Other + Err -> Err end. +publish(X, Delivery) -> + {RoutingRes, DeliveredQPids} = + rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), + {ok, RoutingRes, DeliveredQPids}. + delivery(Mandatory, Immediate, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. @@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} headers = Headers0}}) end. -message(ExchangeName, RoutingKey, - #content{properties = Props} = DecodedContent) -> +message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> try {ok, #basic_message{ - exchange_name = ExchangeName, + exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), id = rabbit_guid:guid(), is_persistent = is_message_persistent(DecodedContent), @@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey, {error, _Reason} = Error -> Error end. -message(ExchangeName, RoutingKey, RawProperties, Body) -> +message(XName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), Content = build_content(Properties, Body), - {ok, Msg} = message(ExchangeName, RoutingKey, Content), + {ok, Msg} = message(XName, RoutingKey, Content), Msg. properties(P = #'P_basic'{}) -> @@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0; indexof([Element | _Rest], Element, N) -> N; indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). -%% Convenience function, for avoiding round-trips in calls across the -%% erlang distributed network. -publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). - -%% Convenience function, for avoiding round-trips in calls across the -%% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - case rabbit_exchange:lookup(XName) of - {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body); - Err -> Err - end. - -publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = - rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), - {ok, RoutingRes, DeliveredQPids}. - is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index b9e550c9f1..1163ae9d86 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -17,7 +17,7 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1, log_action/3]). +-export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). @@ -50,7 +50,6 @@ -> 'ok'). -spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). --spec(log_action/3 :: (node(), string(), [term()]) -> ok). -endif. @@ -73,7 +72,6 @@ start() -> Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), - rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -362,7 +360,7 @@ wait_for_application(Node, PidFile, Inform) -> wait_for_application(Node, Pid) -> case process_up(Pid) of - true -> case node_up(Node) of + true -> case rabbit:is_running(Node) of true -> ok; false -> timer:sleep(1000), wait_for_application(Node, Pid) @@ -378,12 +376,6 @@ wait_and_read_pid_file(PidFile) -> {error, _} = E -> exit({error, {could_not_read_pid, E}}) end. -node_up(Node) -> - case rpc_call(Node, application, which_applications, [infinity]) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(rabbit, Apps) - end. - % Test using some OS clunkiness since we shouldn't trust % rpc:call(os, getpid, []) at this point process_up(Pid) -> @@ -521,22 +513,3 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. - -log_action(Node, Command, Args) -> - rabbit_misc:with_local_io( - fun () -> - error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n", - [Node, Command, - format_args(mask_args(Command, Args))]) - end). - -%% Mask passwords and other sensitive info before logging. -mask_args("add_user", [Name, _Password | Args]) -> - [Name, "****" | Args]; -mask_args("change_password", [Name, _Password | Args]) -> - [Name, "****" | Args]; -mask_args(_, Args) -> - Args. - -format_args(Args) -> - string:join([io_lib:format("~p", [A]) || A <- Args], " "). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 68afaf5d7c..6f9a46504f 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -72,7 +72,7 @@ list() -> %%---------------------------------------------------------------------------- connect(Username, VHost, Protocol, Pid, Infos) -> - case lists:keymember(rabbit, 1, application:which_applications()) of + case rabbit:is_running() of true -> case rabbit_access_control:check_user_login(Username, []) of {ok, User} -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ae28722ab2..0b39a20927 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -749,7 +749,7 @@ recursive_delete(Files) -> end, ok, Files). recursive_delete1(Path) -> - case filelib:is_dir(Path) of + case filelib:is_dir(Path) and not(is_symlink(Path)) of false -> case file:delete(Path) of ok -> ok; {error, enoent} -> ok; %% Path doesn't exist anyway @@ -777,6 +777,12 @@ recursive_delete1(Path) -> end end. +is_symlink(Name) -> + case file:read_link(Name) of + {ok, _} -> true; + _ -> false + end. + recursive_copy(Src, Dest) -> case filelib:is_dir(Src) of false -> case file:copy(Src, Dest) of diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index e3cf8ebef0..9fe073d997 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -139,38 +139,10 @@ determine_version(App) -> {App, Vsn}. delete_recursively(Fn) -> - case filelib:is_dir(Fn) and not(is_symlink(Fn)) of - true -> - case file:list_dir(Fn) of - {ok, Files} -> - case lists:foldl(fun ( Fn1, ok) -> delete_recursively( - Fn ++ "/" ++ Fn1); - (_Fn1, Err) -> Err - end, ok, Files) of - ok -> case file:del_dir(Fn) of - ok -> ok; - {error, E} -> {error, - {cannot_delete, Fn, E}} - end; - Err -> Err - end; - {error, E} -> - {error, {cannot_list_files, Fn, E}} - end; - false -> - case filelib:is_file(Fn) of - true -> case file:delete(Fn) of - ok -> ok; - {error, E} -> {error, {cannot_delete, Fn, E}} - end; - false -> ok - end - end. - -is_symlink(Name) -> - case file:read_link(Name) of - {ok, _} -> true; - _ -> false + case rabbit_misc:recursive_delete([Fn]) of + ok -> ok; + {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; + Error -> Error end. unpack_ez_plugins(SrcDir, DestDir) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cd5d9be0cc..7e84251fc4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -757,13 +757,23 @@ test_topic_expect_match(X, List) -> end, List). test_app_management() -> - %% starting, stopping, status + control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]), + %% Starting, stopping and diagnostics. Note that we don't try + %% 'report' when the rabbit app is stopped and that we enable + %% tracing for the duration of this function. + ok = control_action(trace_on, []), ok = control_action(stop_app, []), ok = control_action(stop_app, []), ok = control_action(status, []), + ok = control_action(cluster_status, []), + ok = control_action(environment, []), ok = control_action(start_app, []), ok = control_action(start_app, []), ok = control_action(status, []), + ok = control_action(report, []), + ok = control_action(cluster_status, []), + ok = control_action(environment, []), + ok = control_action(trace_off, []), passed. test_log_management() -> @@ -1146,6 +1156,7 @@ test_user_management() -> ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), + ok = control_action(clear_password, ["foo"]), ok = control_action(change_password, ["foo", "baz"]), TestTags = fun (Tags) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9739f6b7d5..e7a302f80d 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -228,13 +228,7 @@ secondary_upgrade(AllNodes) -> ok. nodes_running(Nodes) -> - [N || N <- Nodes, node_running(N)]. - -node_running(Node) -> - case rpc:call(Node, application, which_applications, []) of - {badrpc, _} -> false; - Apps -> lists:keysearch(rabbit, 1, Apps) =/= false - end. + [N || N <- Nodes, rabbit:is_running(N)]. %% ------------------------------------------------------------------- |
