diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-10-07 21:01:39 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-10-07 21:01:39 +0100 |
| commit | 98a679a1e632dbeb768e8dc436d9bf7aac91e582 (patch) | |
| tree | 47a28ff5bc64a64515083f872e05f9858edd63b7 /src | |
| parent | e1c21592a5bd33e6189b8ef71b9cf678e89a29d6 (diff) | |
| parent | 0aca55dbe898a61a2379cff6b151822de2dd3834 (diff) | |
| download | rabbitmq-server-git-98a679a1e632dbeb768e8dc436d9bf7aac91e582.tar.gz | |
merge default into bug21368
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 27 |
8 files changed, 133 insertions, 64 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 9b0470311b..892e3c8ba6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,7 +33,7 @@ -behaviour(application). --export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). -export([start/2, stop/1]). @@ -57,6 +57,7 @@ -type(log_location() :: 'tty' | 'undefined' | string()). -type(file_suffix() :: binary()). +-spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). @@ -71,11 +72,14 @@ %%---------------------------------------------------------------------------- +prepare() -> + ok = ensure_working_log_handlers(), + ok = rabbit_mnesia:ensure_mnesia_dir(). + start() -> try - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = prepare(), + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0093c28566..4209773926 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -263,12 +263,6 @@ expand_routing_key_shortcut(<<>>, <<>>, expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -die_precondition_failed(Fmt, Params) -> - %% FIXME: 406 should be replaced with precondition_failed when we - %% move to AMQP spec >=8.1 - rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>}, - Fmt, Params). - %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -610,8 +604,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -685,11 +679,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - die_precondition_failed( - "~s not empty", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{ diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 5323afbe42..c7b81d3788 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,10 +52,12 @@ %%---------------------------------------------------------------------------- start() -> + {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), + NodeName = list_to_atom(NodeNameStr), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(rabbit)}), + node = rabbit_misc:localnode(NodeName)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 4bbdb65eb5..1bc17a324c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -167,19 +167,27 @@ start_client(Sock) -> Child. start_ssl_client(SslOpts, Sock) -> - {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), - PeerIp = inet_parse:ntoa(PeerAddress), - - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); + case rabbit_net:peername(Sock) of + {ok, {PeerAddress, PeerPort}} -> + PeerIp = inet_parse:ntoa(PeerAddress), + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection " + "from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection " + "from ~s:~p to SSL: ~n~p~n", + [PeerIp, PeerPort, Reason]), + {error, Reason} + end; {error, Reason} -> gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection from ~s:~p " - "to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]), + rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", + [Reason]), {error, Reason} end. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 0206f73e9f..f28c4a6ec5 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -49,6 +49,8 @@ start() -> UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + RootName = RabbitEBin ++ "/rabbit", + %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -60,10 +62,8 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {unknown_app, {App, Err}} -> - io:format("ERROR: Failed to load application " ++ - "~s: ~p~n", [App, Err]), - halt(1); + {failed_to_load_app, App, Err} -> + error("failed to load application ~s: ~p", [App, Err]); AppList -> AppList end, @@ -77,11 +77,11 @@ start() -> AppVersions}, %% Write it out to ebin/rabbit.rel - file:write_file(RabbitEBin ++ "/rabbit.rel", - io_lib:format("~p.~n", [RDesc])), + file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script - case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + ScriptFile = RootName ++ ".script", + case systools:make_script(RootName, [local, silent]) of {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent @@ -98,9 +98,19 @@ start() -> end, ok; {error, Module, Error} -> - io:format("Boot file generation failed: ~s~n", - [Module:format_error(Error)]), - halt(1) + error("generation of boot script file ~s failed: ~w", + [ScriptFile, Module:format_error(Error)]) + end, + + case post_process_script(ScriptFile) of + ok -> ok; + {error, Reason} -> + error("post processing of boot script file ~s failed: ~w", + [ScriptFile, Reason]) + end, + case systools:script2boot(RootName) of + ok -> ok; + error -> error("failed to compile boot script file ~s", [ScriptFile]) end, halt(), ok. @@ -122,10 +132,10 @@ determine_version(App) -> assert_dir(Dir) -> case filelib:is_dir(Dir) of true -> ok; - false -> - ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) + false -> ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) end. + delete_dir(Dir) -> case filelib:is_dir(Dir) of true -> @@ -143,6 +153,7 @@ delete_dir(Dir) -> false -> ok end. + is_symlink(Name) -> case file:read_link(Name) of {ok, _} -> true; @@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) -> expand_dependencies(Current, Rest); false -> case application:load(Next) of - ok -> + ok -> ok; - {error, {already_loaded, _}} -> + {error, {already_loaded, _}} -> ok; - X -> - throw({unknown_app, {Next, X}}) + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) end, {ok, Required} = application:get_key(Next, applications), Unique = [A || A <- Required, not(sets:is_element(A, Current))], expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) end. + +post_process_script(ScriptFile) -> + case file:consult(ScriptFile) of + {ok, [{script, Name, Entries}]} -> + NewEntries = process_entries(Entries), + case file:open(ScriptFile, [write]) of + {ok, Fd} -> + io:format(Fd, "%% script generated at ~w ~w~n~p.~n", + [date(), time(), {script, Name, NewEntries}]), + file:close(Fd), + ok; + {error, OReason} -> + {error, {failed_to_open_script_file_for_writing, OReason}} + end; + {error, Reason} -> + {error, {failed_to_load_script, Reason}} + end. + +process_entries([]) -> + []; +process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | + Rest]) -> + [Entry, {apply,{rabbit,prepare,[]}} | Rest]; +process_entries([Entry|Rest]) -> + [Entry | process_entries(Rest)]. + +error(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + halt(1). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5cc989929f..e21485b517 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -751,18 +751,27 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -lookup_amqp_exception( - #amqp_error{name = Name, explanation = Expl, method = Method}) -> +%% FIXME: this clause can go when we move to AMQP spec >=8.1 +lookup_amqp_exception(#amqp_error{name = precondition_failed, + explanation = Expl, + method = Method}) -> + ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), + {false, 406, ExplBin, Method}; +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}) -> {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - SafeTextBin = if size(CompleteTextBin) > 255 -> - <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end, - {ShouldClose, Code, SafeTextBin, Method}; + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9d9e60ba44..259f120a17 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -668,7 +668,10 @@ test_server_status() -> {ok, _} = rabbit_amqqueue:delete(Q, false, false), %% list connections - [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(), + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action( diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index aa8b8ad5a7..bc7425613f 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - %% report - {ok, {Address, Port}} = inet:sockname(LSock), - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [inet_parse:ntoa(Address), Port, - inet_parse:ntoa(PeerAddress), PeerPort]), - - %% handle - apply(M, F, A ++ [Sock]), + try + %% report + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), + error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", + [inet_parse:ntoa(Address), Port, + inet_parse:ntoa(PeerAddress), PeerPort]), + %% handle + apply(M, F, A ++ [Sock]) + catch {inet_error, Reason} -> + gen_tcp:close(Sock), + error_logger:error_msg("unable to accept TCP connection: ~p~n", + [Reason]) + end, %% accept more case prim_inet:async_accept(LSock, -1) of @@ -95,3 +100,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%-------------------------------------------------------------------- + +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). |
