diff options
| -rw-r--r-- | docs/rabbitmqctl.1.pod | 11 | ||||
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rwxr-xr-x | scripts/rabbitmq-multi | 21 | ||||
| -rwxr-xr-x | scripts/rabbitmq-multi.bat | 25 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 17 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server.bat | 25 | ||||
| -rwxr-xr-x | scripts/rabbitmq-service.bat | 21 | ||||
| -rw-r--r-- | src/rabbit.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 107 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 50 |
15 files changed, 348 insertions, 136 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 6b4208725f..5255be28a0 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used queue arguments -=item node +=item pid -node on which the process associated with the queue resides +id of the Erlang process associated with the queue =item messages_ready @@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =item node -node on which the process associated with the connection resides +id of the Erlang process associated with the connection =item address @@ -340,6 +340,11 @@ connection timeout maximum frame size (bytes) +=item client_properties + +informational properties transmitted by the client during connection +establishment + =item recv_oct octets received diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5703d0d619..4b157cbc46 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,7 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost}). +-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). -record(content, {class_id, diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 7db4cb70b2..1a7eb97e08 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -36,23 +36,37 @@ SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= +CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} export \ RABBITMQ_NODENAME \ RABBITMQ_NODE_IP_ADDRESS \ RABBITMQ_NODE_PORT \ RABBITMQ_SCRIPT_HOME \ - RABBITMQ_PIDS_FILE + RABBITMQ_PIDS_FILE \ + RABBITMQ_CONFIG_FILE + +RABBITMQ_CONFIG_ARG= +[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" # we need to turn off path expansion because some of the vars, notably # RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and @@ -65,6 +79,7 @@ exec erl \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ -sname rabbitmq_multi$$ \ + ${RABBITMQ_CONFIG_ARG} \ -s rabbit_multi \ ${RABBITMQ_MULTI_START_ARGS} \ -extra "$@" diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 8de18405b7..6dda13af37 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -41,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -68,6 +80,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( -noinput -hidden ^
%RABBITMQ_MULTI_ERL_ARGS% ^
-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit_multi ^
%RABBITMQ_MULTI_START_ARGS% ^
-extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 310afe9426..7f08cd9d75 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -44,9 +44,17 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} @@ -89,6 +97,9 @@ fi RABBITMQ_CONFIG_ARG= [ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" +RABBITMQ_LISTEN_ARG= +[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]" + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. @@ -102,7 +113,7 @@ exec erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ - -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ + ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 211fc78190..5110285128 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -41,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -114,13 +116,20 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" ( if "%RABBITMQ_CONFIG_FILE%"=="" (
set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
-
+
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
set RABBITMQ_CONFIG_ARG=
)
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
+)
+
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
@@ -132,7 +141,7 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" ( +A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index d80df9677f..d960d29dea 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -45,11 +45,13 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
@@ -177,7 +179,12 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" ( set RABBITMQ_CONFIG_ARG=
)
-
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
%RABBITMQ_EBIN_PATH% ^
@@ -188,7 +195,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ +A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/src/rabbit.erl b/src/rabbit.erl index 4cba1fa9b8..a90e682d5a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -134,6 +134,7 @@ start(normal, []) -> fun () -> ok = rabbit_mnesia:init() end}, {"core processes", fun () -> + ok = start_child(rabbit_exchange_type), ok = start_child(rabbit_log), ok = rabbit_hooks:start(), diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1957972990..ddd0c00263 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -183,10 +183,10 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [node, address, port, +<ConnectionInfoItem> must be a member of the list [pid, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address, peer_port and state. +client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. +The default is to display user, peer_address, peer_port and state. "), halt(1). @@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = list_replace(node, pid, - default_if_empty(RemainingArgs, [name, messages])), + ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, - peer_port, state])), + ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -358,12 +355,15 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - atom_to_list(node(Value)); + pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value -> + escape(atom_to_list(Value)); + Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] + when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); + Value -> io_lib:format("~w", [Value]) end. @@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. -escape(Bin) when binary(Bin) -> +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); -escape_char([X | T], Acc) when X > 32, X /= 127 -> +escape_char([X | T], Acc) when X >= 32, X /= 127 -> escape_char(T, [X | Acc]); escape_char([X | T], Acc) -> escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), @@ -403,6 +403,20 @@ escape_char([X | T], Acc) -> escape_char([], Acc) -> Acc. -list_replace(Find, Replace, List) -> - [case X of Find -> Replace; _ -> X end || X <- List]. +prettify_amqp_table(Table) -> + [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. +prettify_typed_amqp_value(Type, Value) -> + case Type of + longstr -> escape(Value); + table -> prettify_amqp_table(Value); + array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; + _ -> Value + end. + +%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) +pid_to_string(Pid) -> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2c98deee2e..495fc4b3f3 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -128,18 +128,18 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end). -typename_to_plugin_module(T) when is_binary(T) -> - case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of - {'EXIT', {badarg, _}} -> +typename_to_plugin_module(T) -> + case rabbit_exchange_type:lookup_module(T) of + {ok, Module} -> + Module; + {error, not_found} -> rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]); - Module -> - Module + command_invalid, "invalid exchange type '~s'", [T]) end. -plugin_module_to_typename(M) when is_atom(M) -> - "rabbit_exchange_type_" ++ S = atom_to_list(M), - list_to_binary(S). +plugin_module_to_typename(M) -> + {ok, TypeName} = rabbit_exchange_type:lookup_name(M), + TypeName. check_type(T) -> Module = typename_to_plugin_module(T), diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl new file mode 100644 index 0000000000..58dcfbb6a2 --- /dev/null +++ b/src/rabbit_exchange_type.erl @@ -0,0 +1,107 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/2, lookup_module/1, lookup_name/1]). + +-define(SERVER, ?MODULE). + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName}). + +lookup_module(T) when is_binary(T) -> + case ets:lookup(rabbit_exchange_type_modules, T) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +lookup_name(M) when is_atom(M) -> + [{_, TypeName}] = ets:lookup(rabbit_exchange_type_names, M), + {ok, TypeName}. + +%%--------------------------------------------------------------------------- + +internal_register(TypeName, ModuleName) + when is_binary(TypeName), is_atom(ModuleName) -> + true = ets:insert(rabbit_exchange_type_modules, {TypeName, ModuleName}), + true = ets:insert(rabbit_exchange_type_names, {ModuleName, TypeName}), + ok. + +%%--------------------------------------------------------------------------- + +init([]) -> + rabbit_exchange_type_modules = + ets:new(rabbit_exchange_type_modules, [protected, set, named_table]), + rabbit_exchange_type_names = + ets:new(rabbit_exchange_type_names, [protected, set, named_table]), + + %% TODO: split out into separate boot startup steps. + ok = internal_register(<<"direct">>, rabbit_exchange_type_direct), + ok = internal_register(<<"fanout">>, rabbit_exchange_type_fanout), + ok = internal_register(<<"headers">>, rabbit_exchange_type_headers), + ok = internal_register(<<"topic">>, rabbit_exchange_type_topic), + + {ok, none}. + +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), + {reply, ok, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index f364872eca..dc642df403 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -99,13 +99,16 @@ Available commands: action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), - N = list_to_integer(NodeCount), + application:load(rabbit), + NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), {NodePids, Running} = - start_nodes(N, N, [], true, - rabbit_misc:nodeparts( - getenv("RABBITMQ_NODENAME")), - list_to_integer(getenv("RABBITMQ_NODE_PORT")), - RpcTimeout), + case list_to_integer(NodeCount) of + 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), + RpcTimeout), + {[NodePid], Started}; + N -> start_nodes(N, N, [], true, NodeName, + get_node_tcp_listener(), RpcTimeout) + end, write_pids_file(NodePids), case Running of true -> ok; @@ -158,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) -> %% Running is a boolean exhibiting success at some moment start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; -start_nodes(N, Total, PNodePid, Running, - NodeNameBase, NodePortBase, RpcTimeout) -> +start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> {NodePre, NodeSuff} = NodeNameBase, NodeNumber = Total - N, - NodePre1 = if NodeNumber == 0 -> - %% For compatibility with running a single node - NodePre; - true -> - NodePre ++ "_" ++ integer_to_list(NodeNumber) + NodePre1 = case NodeNumber of + %% For compatibility with running a single node + 0 -> NodePre; + _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) end, - {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}), - NodePortBase + NodeNumber, - RpcTimeout), + Node = rabbit_misc:makenode({NodePre1, NodeSuff}), + os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), + case Listener of + {NodeIpAddress, NodePortBase} -> + NodePort = NodePortBase + NodeNumber, + os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), + os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); + undefined -> + ok + end, + {NodePid, Started} = start_node(Node, RpcTimeout), start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, - NodeNameBase, NodePortBase, RpcTimeout). + Started and Running, NodeNameBase, Listener, RpcTimeout). -start_node(Node, NodePort, RpcTimeout) -> - os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), +start_node(Node, RpcTimeout) -> io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> @@ -293,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) -> io:format(".", []), is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). -% Test using some OS clunkiness since we shouldn't trust +% Test using some OS clunkiness since we shouldn't trust % rpc:call(os, getpid, []) at this point is_dead(Pid) -> PidS = integer_to_list(Pid), @@ -321,3 +327,21 @@ getenv(Var) -> false -> throw({missing_env_var, Var}); Value -> Value end. + +get_node_tcp_listener() -> + try + {getenv("RABBITMQ_NODE_IP_ADDRESS"), + list_to_integer(getenv("RABBITMQ_NODE_PORT"))} + catch _ -> + case application:get_env(rabbit, tcp_listeners) of + {ok, [{_IpAddy, _Port} = Listener]} -> + Listener; + {ok, []} -> + undefined; + {ok, Other} -> + throw({cannot_start_multiple_nodes, multiple_tcp_listeners, + Other}); + undefined -> + throw({missing_configuration, tcp_listeners}) + end + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1bc17a324c..3a0f9240dd 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -53,6 +53,9 @@ %% {delay_send, true}, {exit_on_close, false} ]). + +-define(SSL_TIMEOUT, 5). %% seconds + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -160,36 +163,31 @@ node_listeners(Node) -> on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). -start_client(Sock) -> +start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock}, + Child ! {go, Sock, SockTransform}, Child. +start_client(Sock) -> + start_client(Sock, fun (S) -> {ok, S} end). + start_ssl_client(SslOpts, Sock) -> - case rabbit_net:peername(Sock) of - {ok, {PeerAddress, PeerPort}} -> - PeerIp = inet_parse:ntoa(PeerAddress), - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection " - "from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection " - "from ~s:~p to SSL: ~n~p~n", - [PeerIp, PeerPort, Reason]), - {error, Reason} - end; - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", - [Reason]), - {error, Reason} - end. + start_client( + Sock, + fun (Sock1) -> + case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection ~p to SSL~n", + [self()]), + {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}}; + {error, Reason} -> + {error, {ssl_upgrade_error, Reason}}; + {'EXIT', Reason} -> + {error, {ssl_upgrade_failure, Reason}} + + end + end). connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index a2ac74ef81..9f7879209b 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -44,7 +44,7 @@ -ifdef(use_specs). --spec(start/0 :: () -> no_return()). +-spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). -endif. @@ -73,7 +73,7 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {failed_to_load_app, App, Err} -> + {failed_to_load_app, App, Err} -> error("failed to load application ~s:~n~p", [App, Err]); AppList -> AppList @@ -82,8 +82,8 @@ start() -> {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor - RDesc = {release, - {"rabbit", RabbitVersion}, + RDesc = {release, + {"rabbit", RabbitVersion}, {erts, erlang:system_info(version)}, AppVersions}, @@ -93,15 +93,15 @@ start() -> %% Compile the script ScriptFile = RootName ++ ".script", case systools:make_script(RootName, [local, silent]) of - {ok, Module, Warnings} -> + {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent %% hiding real issues. WarningStr = Module:format_warning( - [W || W <- Warnings, - case W of - {warning, {source_not_found, _}} -> false; - _ -> true + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + _ -> true end]), case length(WarningStr) of 0 -> ok; @@ -136,8 +136,8 @@ get_env(Key, Default) -> end. determine_version(App) -> - application:load(App), - {ok, Vsn} = application:get_key(App, vsn), + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. assert_dir(Dir) -> @@ -236,7 +236,7 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entries([]) -> +process_entries([]) -> []; process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | Rest]) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e21485b517..e78d889d58 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max]). + state, channels, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -142,7 +142,8 @@ start_link() -> init(Parent) -> Deb = sys:debug_options([]), receive - {go, Sock} -> start_connection(Parent, Deb, Sock) + {go, Sock, SockTransform} -> + start_connection(Parent, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -192,34 +193,35 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -peername(Sock) -> - try - {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), - AddressS = inet_parse:ntoa(Address), - {AddressS, Port} - catch - Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Ex]), - rabbit_log:info("closing TCP connection ~p", [self()]), - exit(normal) +socket_op(Sock, Fun) -> + case Fun(Sock) of + {ok, Res} -> Res; + {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Reason]), + rabbit_log:info("closing TCP connection ~p~n", + [self()]), + exit(normal) end. -start_connection(Parent, Deb, ClientSock) -> +start_connection(Parent, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddressS, PeerPort} = peername(ClientSock), + {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), + PeerAddressS = inet_parse:ntoa(PeerAddress), + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + ClientSock = socket_op(Sock, SockTransform), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), + handshake_timeout), ProfilingValue = setup_profiling(), try - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, - vhost = none}, + vhost = none, + client_properties = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init}, @@ -558,7 +560,8 @@ handle_method0(MethodName, FieldsBin, State) -> end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, - response = Response}, + response = Response, + client_properties = ClientProperties}, State = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> @@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, frame_max = 131072, heartbeat = 0}), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}}; + connection = Connection#connection{ + user = User, + client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, frame_max = FrameMax, heartbeat = ClientHeartbeat}, @@ -689,6 +694,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; +i(client_properties, #v1{connection = #connection{ + client_properties = ClientProperties}}) -> + ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). |
