diff options
| author | Marek Majkowski <majek@lshift.net> | 2009-10-12 13:20:41 +0100 |
|---|---|---|
| committer | Marek Majkowski <majek@lshift.net> | 2009-10-12 13:20:41 +0100 |
| commit | e6f2c09625f9cc81a11fe0f5150df36b4ff62525 (patch) | |
| tree | a26a32dcdbef875c1ab09219c7fc830adfe02ec8 /src | |
| parent | 11137ab1adfde763d079a468cb9eb64dad08f9b0 (diff) | |
| parent | 0aca55dbe898a61a2379cff6b151822de2dd3834 (diff) | |
| download | rabbitmq-server-git-e6f2c09625f9cc81a11fe0f5150df36b4ff62525.tar.gz | |
Default merged into bug21457
Diffstat (limited to 'src')
| -rw-r--r-- | src/priority_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 27 |
15 files changed, 201 insertions, 92 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index c74b39a957..74b41a910c 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -67,8 +67,8 @@ -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). --spec(is_queue/1 :: (any()) -> bool()). --spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(is_queue/1 :: (any()) -> boolean()). +-spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(in/2 :: (any(), pqueue()) -> pqueue()). diff --git a/src/rabbit.erl b/src/rabbit.erl index 6d24ff731b..be74283f14 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,7 +33,7 @@ -behaviour(application). --export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). -export([start/2, stop/1]). @@ -57,6 +57,7 @@ -type(log_location() :: 'tty' | 'undefined' | string()). -type(file_suffix() :: binary()). +-spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). @@ -71,11 +72,14 @@ %%---------------------------------------------------------------------------- +prepare() -> + ok = ensure_working_log_handlers(), + ok = rabbit_mnesia:ensure_mnesia_dir(). + start() -> try - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = prepare(), + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -226,6 +230,12 @@ app_location() -> {ok, Application} = application:get_application(), filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). +home_dir() -> + case init:get_argument(home) of + {ok, [[Home]]} -> Home; + Other -> Other + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -250,6 +260,8 @@ print_banner() -> ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, + {"home dir", home_dir()}, + {"cookie hash", rabbit_misc:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}], diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f05f7880b7..1a5e82d714 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -63,7 +63,7 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> +-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -83,8 +83,8 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), delivery()) -> bool()). --spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). +-spec(deliver/2 :: (pid(), delivery()) -> boolean()). +-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). @@ -92,16 +92,16 @@ -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). --spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> +-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4033aaafda..bec2cd0845 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -45,13 +45,14 @@ -type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())). -spec(publish/1 :: (delivery()) -> publish_result()). --spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). +-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> + delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). --spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), +-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(), maybe(txn()), properties_input(), binary()) -> publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a1fa106665..c20cb16ca1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -60,8 +60,8 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -endif. @@ -260,12 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>, expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -die_precondition_failed(Fmt, Params) -> - %% FIXME: 406 should be replaced with precondition_failed when we - %% move to AMQP spec >=8.1 - rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>}, - Fmt, Params). - %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -610,8 +604,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -685,11 +679,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - die_precondition_failed( - "~s not empty", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{ diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index f701c4aafb..a53ac289f2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,10 +52,12 @@ %%---------------------------------------------------------------------------- start() -> + {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), + NodeName = list_to_atom(NodeNameStr), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(rabbit)}), + node = rabbit_misc:localnode(NodeName)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> @@ -80,13 +82,38 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, Reason} -> + error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics(Node), + halt(2); Other -> error("~p", [Other]), halt(2) end. -error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Node) -> + fmt_stderr("diagnostics:", []), + NodeHost = rabbit_misc:nodehost(Node), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + fmt_stderr("- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]); + {ok, NamePorts} -> + fmt_stderr("- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]) + end, + fmt_stderr("- current node: ~w", [node()]), + case init:get_argument(home) of + {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); + Other -> fmt_stderr("- no current node home dir: ~p", [Other]) + end, + fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), + ok. parse_args(["-n", NodeS | Args], Params) -> Node = case lists:member($@, NodeS) of diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8fb9eae304..33dea8c7ce 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -61,7 +61,7 @@ 'exchange_not_found' | 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), +-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). @@ -83,9 +83,9 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> bool()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). --spec(delete/2 :: (exchange_name(), bool()) -> +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9f3dcbd071..087a9f64d9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5843143839..b20e9a86b6 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -47,7 +47,7 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, tcp_name/3]). +-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -106,6 +106,8 @@ -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). +-spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). @@ -307,11 +309,15 @@ ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). localnode(Name) -> + list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). + +nodehost(Node) -> %% This is horrible, but there doesn't seem to be a way to split a %% nodename into its constituent parts. - list_to_atom(lists:append(atom_to_list(Name), - lists:dropwhile(fun (E) -> E =/= $@ end, - atom_to_list(node())))). + tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). + +cookie_hash() -> + ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 37e20335bb..c4d5aac684 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -50,7 +50,7 @@ -spec(dir/0 :: () -> string()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(is_db_empty/0 :: () -> bool()). +-spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1dd935187a..1bc17a324c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -39,8 +39,8 @@ %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, ssl_connection_upgrade/2, - tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, tcp_listener_stopped/2, + start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). @@ -112,7 +112,7 @@ start_tcp_listener(Host, Port) -> start_ssl_listener(Host, Port, SslOpts) -> start_listener(Host, Port, "SSL Listener", - {?MODULE, ssl_connection_upgrade, [SslOpts]}). + {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Host, Port, Label, OnConnect) -> {IPAddress, Name} = @@ -166,20 +166,28 @@ start_client(Sock) -> Child ! {go, Sock}, Child. -ssl_connection_upgrade(SslOpts, Sock) -> - {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), - PeerIp = inet_parse:ntoa(PeerAddress), - - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); +start_ssl_client(SslOpts, Sock) -> + case rabbit_net:peername(Sock) of + {ok, {PeerAddress, PeerPort}} -> + PeerIp = inet_parse:ntoa(PeerAddress), + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection " + "from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection " + "from ~s:~p to SSL: ~n~p~n", + [PeerIp, PeerPort, Reason]), + {error, Reason} + end; {error, Reason} -> gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection from ~s:~p " - "to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]), + rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", + [Reason]), {error, Reason} end. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 0206f73e9f..f28c4a6ec5 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -49,6 +49,8 @@ start() -> UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + RootName = RabbitEBin ++ "/rabbit", + %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -60,10 +62,8 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {unknown_app, {App, Err}} -> - io:format("ERROR: Failed to load application " ++ - "~s: ~p~n", [App, Err]), - halt(1); + {failed_to_load_app, App, Err} -> + error("failed to load application ~s: ~p", [App, Err]); AppList -> AppList end, @@ -77,11 +77,11 @@ start() -> AppVersions}, %% Write it out to ebin/rabbit.rel - file:write_file(RabbitEBin ++ "/rabbit.rel", - io_lib:format("~p.~n", [RDesc])), + file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script - case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + ScriptFile = RootName ++ ".script", + case systools:make_script(RootName, [local, silent]) of {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent @@ -98,9 +98,19 @@ start() -> end, ok; {error, Module, Error} -> - io:format("Boot file generation failed: ~s~n", - [Module:format_error(Error)]), - halt(1) + error("generation of boot script file ~s failed: ~w", + [ScriptFile, Module:format_error(Error)]) + end, + + case post_process_script(ScriptFile) of + ok -> ok; + {error, Reason} -> + error("post processing of boot script file ~s failed: ~w", + [ScriptFile, Reason]) + end, + case systools:script2boot(RootName) of + ok -> ok; + error -> error("failed to compile boot script file ~s", [ScriptFile]) end, halt(), ok. @@ -122,10 +132,10 @@ determine_version(App) -> assert_dir(Dir) -> case filelib:is_dir(Dir) of true -> ok; - false -> - ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) + false -> ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) end. + delete_dir(Dir) -> case filelib:is_dir(Dir) of true -> @@ -143,6 +153,7 @@ delete_dir(Dir) -> false -> ok end. + is_symlink(Name) -> case file:read_link(Name) of {ok, _} -> true; @@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) -> expand_dependencies(Current, Rest); false -> case application:load(Next) of - ok -> + ok -> ok; - {error, {already_loaded, _}} -> + {error, {already_loaded, _}} -> ok; - X -> - throw({unknown_app, {Next, X}}) + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) end, {ok, Required} = application:get_key(Next, applications), Unique = [A || A <- Required, not(sets:is_element(A, Current))], expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) end. + +post_process_script(ScriptFile) -> + case file:consult(ScriptFile) of + {ok, [{script, Name, Entries}]} -> + NewEntries = process_entries(Entries), + case file:open(ScriptFile, [write]) of + {ok, Fd} -> + io:format(Fd, "%% script generated at ~w ~w~n~p.~n", + [date(), time(), {script, Name, NewEntries}]), + file:close(Fd), + ok; + {error, OReason} -> + {error, {failed_to_open_script_file_for_writing, OReason}} + end; + {error, Reason} -> + {error, {failed_to_load_script, Reason}} + end. + +process_entries([]) -> + []; +process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | + Rest]) -> + [Entry, {apply,{rabbit,prepare,[]}} | Rest]; +process_entries([Entry|Rest]) -> + [Entry | process_entries(Rest)]. + +error(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + halt(1). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5cc989929f..e21485b517 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -751,18 +751,27 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -lookup_amqp_exception( - #amqp_error{name = Name, explanation = Expl, method = Method}) -> +%% FIXME: this clause can go when we move to AMQP spec >=8.1 +lookup_amqp_exception(#amqp_error{name = precondition_failed, + explanation = Expl, + method = Method}) -> + ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), + {false, 406, ExplBin, Method}; +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}) -> {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - SafeTextBin = if size(CompleteTextBin) > 255 -> - <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end, - {ShouldClose, Code, SafeTextBin, Method}; + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b4cd30bc92..5c5c55f1e4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -666,7 +666,10 @@ test_server_status() -> {ok, _} = rabbit_amqqueue:delete(Q, false, false), %% list connections - [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(), + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action( diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index aa8b8ad5a7..bc7425613f 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - %% report - {ok, {Address, Port}} = inet:sockname(LSock), - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [inet_parse:ntoa(Address), Port, - inet_parse:ntoa(PeerAddress), PeerPort]), - - %% handle - apply(M, F, A ++ [Sock]), + try + %% report + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), + error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", + [inet_parse:ntoa(Address), Port, + inet_parse:ntoa(PeerAddress), PeerPort]), + %% handle + apply(M, F, A ++ [Sock]) + catch {inet_error, Reason} -> + gen_tcp:close(Sock), + error_logger:error_msg("unable to accept TCP connection: ~p~n", + [Reason]) + end, %% accept more case prim_inet:async_accept(LSock, -1) of @@ -95,3 +100,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%-------------------------------------------------------------------- + +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). |
