summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.pod11
-rw-r--r--include/rabbit.hrl2
-rwxr-xr-xscripts/rabbitmq-multi21
-rwxr-xr-xscripts/rabbitmq-multi.bat25
-rwxr-xr-xscripts/rabbitmq-server17
-rwxr-xr-xscripts/rabbitmq-server.bat25
-rwxr-xr-xscripts/rabbitmq-service.bat21
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_exchange.erl18
-rw-r--r--src/rabbit_exchange_type.erl107
-rw-r--r--src/rabbit_multi.erl68
-rw-r--r--src/rabbit_networking.erl48
-rw-r--r--src/rabbit_plugin_activator.erl24
-rw-r--r--src/rabbit_reader.erl50
15 files changed, 348 insertions, 136 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 6b4208725f..5255be28a0 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used
queue arguments
-=item node
+=item pid
-node on which the process associated with the queue resides
+id of the Erlang process associated with the queue
=item messages_ready
@@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=item node
-node on which the process associated with the connection resides
+id of the Erlang process associated with the connection
=item address
@@ -340,6 +340,11 @@ connection timeout
maximum frame size (bytes)
+=item client_properties
+
+informational properties transmitted by the client during connection
+establishment
+
=item recv_oct
octets received
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5703d0d619..4b157cbc46 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,7 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost}).
+-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
-record(content,
{class_id,
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 7db4cb70b2..1a7eb97e08 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -36,23 +36,37 @@ SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
MULTI_START_ARGS=
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
. `dirname $0`/rabbitmq-env
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${NODE_PORT}
+ fi
+fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
export \
RABBITMQ_NODENAME \
RABBITMQ_NODE_IP_ADDRESS \
RABBITMQ_NODE_PORT \
RABBITMQ_SCRIPT_HOME \
- RABBITMQ_PIDS_FILE
+ RABBITMQ_PIDS_FILE \
+ RABBITMQ_CONFIG_FILE
+
+RABBITMQ_CONFIG_ARG=
+[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and
@@ -65,6 +79,7 @@ exec erl \
-hidden \
${RABBITMQ_MULTI_ERL_ARGS} \
-sname rabbitmq_multi$$ \
+ ${RABBITMQ_CONFIG_ARG} \
-s rabbit_multi \
${RABBITMQ_MULTI_START_ARGS} \
-extra "$@"
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 8de18405b7..6dda13af37 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -41,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -68,6 +80,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
-noinput -hidden ^
%RABBITMQ_MULTI_ERL_ARGS% ^
-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit_multi ^
%RABBITMQ_MULTI_START_ARGS% ^
-extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 310afe9426..7f08cd9d75 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -44,9 +44,17 @@ SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${NODE_PORT}
+ fi
+fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
@@ -89,6 +97,9 @@ fi
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
+RABBITMQ_LISTEN_ARG=
+[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]"
+
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
# there is no other way of preventing their expansion.
@@ -102,7 +113,7 @@ exec erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
- -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \
+ ${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 211fc78190..5110285128 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -41,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -114,13 +116,20 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
if "%RABBITMQ_CONFIG_FILE%"=="" (
set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
-
+
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
set RABBITMQ_CONFIG_ARG=
)
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
+)
+
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
@@ -132,7 +141,7 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" (
+A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index d80df9677f..d960d29dea 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -45,11 +45,13 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
@@ -177,7 +179,12 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=
)
-
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
%RABBITMQ_EBIN_PATH% ^
@@ -188,7 +195,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4cba1fa9b8..a90e682d5a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -134,6 +134,7 @@ start(normal, []) ->
fun () -> ok = rabbit_mnesia:init() end},
{"core processes",
fun () ->
+ ok = start_child(rabbit_exchange_type),
ok = start_child(rabbit_log),
ok = rabbit_hooks:start(),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1957972990..ddd0c00263 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -183,10 +183,10 @@ auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [node, address, port,
+<ConnectionInfoItem> must be a member of the list [pid, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
-recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address, peer_port and state.
+client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
+The default is to display user, peer_address, peer_port and state.
"),
halt(1).
@@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(RemainingArgs, [name, messages])),
+ ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address,
- peer_port, state])),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -358,12 +355,15 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- atom_to_list(node(Value));
+ pid_to_string(Value);
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value ->
+ escape(atom_to_list(Value));
+ Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
+ when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+ Value ->
io_lib:format("~w", [Value])
end.
@@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Bin) when binary(Bin) ->
+escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
-escape_char([X | T], Acc) when X > 32, X /= 127 ->
+escape_char([X | T], Acc) when X >= 32, X /= 127 ->
escape_char(T, [X | Acc]);
escape_char([X | T], Acc) ->
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
@@ -403,6 +403,20 @@ escape_char([X | T], Acc) ->
escape_char([], Acc) ->
Acc.
-list_replace(Find, Replace, List) ->
- [case X of Find -> Replace; _ -> X end || X <- List].
+prettify_amqp_table(Table) ->
+ [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
+prettify_typed_amqp_value(Type, Value) ->
+ case Type of
+ longstr -> escape(Value);
+ table -> prettify_amqp_table(Value);
+ array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
+ _ -> Value
+ end.
+
+%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
+pid_to_string(Pid) ->
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2c98deee2e..495fc4b3f3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -128,18 +128,18 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end).
-typename_to_plugin_module(T) when is_binary(T) ->
- case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of
- {'EXIT', {badarg, _}} ->
+typename_to_plugin_module(T) ->
+ case rabbit_exchange_type:lookup_module(T) of
+ {ok, Module} ->
+ Module;
+ {error, not_found} ->
rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- Module ->
- Module
+ command_invalid, "invalid exchange type '~s'", [T])
end.
-plugin_module_to_typename(M) when is_atom(M) ->
- "rabbit_exchange_type_" ++ S = atom_to_list(M),
- list_to_binary(S).
+plugin_module_to_typename(M) ->
+ {ok, TypeName} = rabbit_exchange_type:lookup_name(M),
+ TypeName.
check_type(T) ->
Module = typename_to_plugin_module(T),
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
new file mode 100644
index 0000000000..58dcfbb6a2
--- /dev/null
+++ b/src/rabbit_exchange_type.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/2, lookup_module/1, lookup_name/1]).
+
+-define(SERVER, ?MODULE).
+
+%%---------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%---------------------------------------------------------------------------
+
+register(TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+
+lookup_module(T) when is_binary(T) ->
+ case ets:lookup(rabbit_exchange_type_modules, T) of
+ [{_, Module}] ->
+ {ok, Module};
+ [] ->
+ {error, not_found}
+ end.
+
+lookup_name(M) when is_atom(M) ->
+ [{_, TypeName}] = ets:lookup(rabbit_exchange_type_names, M),
+ {ok, TypeName}.
+
+%%---------------------------------------------------------------------------
+
+internal_register(TypeName, ModuleName)
+ when is_binary(TypeName), is_atom(ModuleName) ->
+ true = ets:insert(rabbit_exchange_type_modules, {TypeName, ModuleName}),
+ true = ets:insert(rabbit_exchange_type_names, {ModuleName, TypeName}),
+ ok.
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ rabbit_exchange_type_modules =
+ ets:new(rabbit_exchange_type_modules, [protected, set, named_table]),
+ rabbit_exchange_type_names =
+ ets:new(rabbit_exchange_type_names, [protected, set, named_table]),
+
+ %% TODO: split out into separate boot startup steps.
+ ok = internal_register(<<"direct">>, rabbit_exchange_type_direct),
+ ok = internal_register(<<"fanout">>, rabbit_exchange_type_fanout),
+ ok = internal_register(<<"headers">>, rabbit_exchange_type_headers),
+ ok = internal_register(<<"topic">>, rabbit_exchange_type_topic),
+
+ {ok, none}.
+
+handle_call({register, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(TypeName, ModuleName),
+ {reply, ok, State};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index f364872eca..dc642df403 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -99,13 +99,16 @@ Available commands:
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
- N = list_to_integer(NodeCount),
+ application:load(rabbit),
+ NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
{NodePids, Running} =
- start_nodes(N, N, [], true,
- rabbit_misc:nodeparts(
- getenv("RABBITMQ_NODENAME")),
- list_to_integer(getenv("RABBITMQ_NODE_PORT")),
- RpcTimeout),
+ case list_to_integer(NodeCount) of
+ 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
+ RpcTimeout),
+ {[NodePid], Started};
+ N -> start_nodes(N, N, [], true, NodeName,
+ get_node_tcp_listener(), RpcTimeout)
+ end,
write_pids_file(NodePids),
case Running of
true -> ok;
@@ -158,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) ->
%% Running is a boolean exhibiting success at some moment
start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
-start_nodes(N, Total, PNodePid, Running,
- NodeNameBase, NodePortBase, RpcTimeout) ->
+start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) ->
{NodePre, NodeSuff} = NodeNameBase,
NodeNumber = Total - N,
- NodePre1 = if NodeNumber == 0 ->
- %% For compatibility with running a single node
- NodePre;
- true ->
- NodePre ++ "_" ++ integer_to_list(NodeNumber)
+ NodePre1 = case NodeNumber of
+ %% For compatibility with running a single node
+ 0 -> NodePre;
+ _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber)
end,
- {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
- NodePortBase + NodeNumber,
- RpcTimeout),
+ Node = rabbit_misc:makenode({NodePre1, NodeSuff}),
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
+ case Listener of
+ {NodeIpAddress, NodePortBase} ->
+ NodePort = NodePortBase + NodeNumber,
+ os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
+ os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress);
+ undefined ->
+ ok
+ end,
+ {NodePid, Started} = start_node(Node, RpcTimeout),
start_nodes(N - 1, Total, [NodePid | PNodePid],
- Started and Running,
- NodeNameBase, NodePortBase, RpcTimeout).
+ Started and Running, NodeNameBase, Listener, RpcTimeout).
-start_node(Node, NodePort, RpcTimeout) ->
- os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
- os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
+start_node(Node, RpcTimeout) ->
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
@@ -293,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) ->
io:format(".", []),
is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
-% Test using some OS clunkiness since we shouldn't trust
+% Test using some OS clunkiness since we shouldn't trust
% rpc:call(os, getpid, []) at this point
is_dead(Pid) ->
PidS = integer_to_list(Pid),
@@ -321,3 +327,21 @@ getenv(Var) ->
false -> throw({missing_env_var, Var});
Value -> Value
end.
+
+get_node_tcp_listener() ->
+ try
+ {getenv("RABBITMQ_NODE_IP_ADDRESS"),
+ list_to_integer(getenv("RABBITMQ_NODE_PORT"))}
+ catch _ ->
+ case application:get_env(rabbit, tcp_listeners) of
+ {ok, [{_IpAddy, _Port} = Listener]} ->
+ Listener;
+ {ok, []} ->
+ undefined;
+ {ok, Other} ->
+ throw({cannot_start_multiple_nodes, multiple_tcp_listeners,
+ Other});
+ undefined ->
+ throw({missing_configuration, tcp_listeners})
+ end
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1bc17a324c..3a0f9240dd 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -53,6 +53,9 @@
%% {delay_send, true},
{exit_on_close, false}
]).
+
+-define(SSL_TIMEOUT, 5). %% seconds
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -160,36 +163,31 @@ node_listeners(Node) ->
on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
-start_client(Sock) ->
+start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock},
+ Child ! {go, Sock, SockTransform},
Child.
+start_client(Sock) ->
+ start_client(Sock, fun (S) -> {ok, S} end).
+
start_ssl_client(SslOpts, Sock) ->
- case rabbit_net:peername(Sock) of
- {ok, {PeerAddress, PeerPort}} ->
- PeerIp = inet_parse:ntoa(PeerAddress),
- case ssl:ssl_accept(Sock, SslOpts) of
- {ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection "
- "from ~s:~p to SSL~n",
- [PeerIp, PeerPort]),
- RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
- start_client(RabbitSslSock);
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection "
- "from ~s:~p to SSL: ~n~p~n",
- [PeerIp, PeerPort, Reason]),
- {error, Reason}
- end;
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n",
- [Reason]),
- {error, Reason}
- end.
+ start_client(
+ Sock,
+ fun (Sock1) ->
+ case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection ~p to SSL~n",
+ [self()]),
+ {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}};
+ {error, Reason} ->
+ {error, {ssl_upgrade_error, Reason}};
+ {'EXIT', Reason} ->
+ {error, {ssl_upgrade_failure, Reason}}
+
+ end
+ end).
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index a2ac74ef81..9f7879209b 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -44,7 +44,7 @@
-ifdef(use_specs).
--spec(start/0 :: () -> no_return()).
+-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
-endif.
@@ -73,7 +73,7 @@ start() ->
%% Build the entire set of dependencies - this will load the
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {failed_to_load_app, App, Err} ->
+ {failed_to_load_app, App, Err} ->
error("failed to load application ~s:~n~p", [App, Err]);
AppList ->
AppList
@@ -82,8 +82,8 @@ start() ->
{rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
{erts, erlang:system_info(version)},
AppVersions},
@@ -93,15 +93,15 @@ start() ->
%% Compile the script
ScriptFile = RootName ++ ".script",
case systools:make_script(RootName, [local, silent]) of
- {ok, Module, Warnings} ->
+ {ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
%% hiding real issues.
WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- _ -> true
+ [W || W <- Warnings,
+ case W of
+ {warning, {source_not_found, _}} -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -136,8 +136,8 @@ get_env(Key, Default) ->
end.
determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
assert_dir(Dir) ->
@@ -236,7 +236,7 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
+process_entries([]) ->
[];
process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
Rest]) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e21485b517..e78d889d58 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,7 +58,7 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max]).
+ state, channels, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -142,7 +142,8 @@ start_link() ->
init(Parent) ->
Deb = sys:debug_options([]),
receive
- {go, Sock} -> start_connection(Parent, Deb, Sock)
+ {go, Sock, SockTransform} ->
+ start_connection(Parent, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -192,34 +193,35 @@ teardown_profiling(Value) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-peername(Sock) ->
- try
- {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
- AddressS = inet_parse:ntoa(Address),
- {AddressS, Port}
- catch
- Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Ex]),
- rabbit_log:info("closing TCP connection ~p", [self()]),
- exit(normal)
+socket_op(Sock, Fun) ->
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Reason]),
+ rabbit_log:info("closing TCP connection ~p~n",
+ [self()]),
+ exit(normal)
end.
-start_connection(Parent, Deb, ClientSock) ->
+start_connection(Parent, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddressS, PeerPort} = peername(ClientSock),
+ {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
+ PeerAddressS = inet_parse:ntoa(PeerAddress),
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
+ ClientSock = socket_op(Sock, SockTransform),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
+ handshake_timeout),
ProfilingValue = setup_profiling(),
try
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
- vhost = none},
+ vhost = none,
+ client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
@@ -558,7 +560,8 @@ handle_method0(MethodName, FieldsBin, State) ->
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
- response = Response},
+ response = Response,
+ client_properties = ClientProperties},
State = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
@@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
frame_max = 131072,
heartbeat = 0}),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}};
+ connection = Connection#connection{
+ user = User,
+ client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
frame_max = FrameMax,
heartbeat = ClientHeartbeat},
@@ -689,6 +694,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
+i(client_properties, #v1{connection = #connection{
+ client_properties = ClientProperties}}) ->
+ ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).