summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-11-04 11:56:02 +0000
committerMichael Bridgen <mikeb@lshift.net>2009-11-04 11:56:02 +0000
commit997ca6aa36dbec1e46321bb626c8e3e8e6193f10 (patch)
tree72c46d142c65249e46a1252a4a3dafc0fcd6d2bc
parent11b5c08721455051fef2dc547510a087c87ae9ea (diff)
parent3394efabb87ad14d20a8df7bd681345658594e09 (diff)
downloadrabbitmq-server-git-997ca6aa36dbec1e46321bb626c8e3e8e6193f10.tar.gz
Merge from default to get, among other things, better memory management and synchronous auto_deletes
-rw-r--r--docs/rabbitmqctl.1.pod2
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--scripts/rabbitmq-activate-plugins.bat19
-rw-r--r--scripts/rabbitmq-deactivate-plugins.bat8
-rwxr-xr-xscripts/rabbitmq-multi.bat12
-rwxr-xr-xscripts/rabbitmq-server4
-rwxr-xr-xscripts/rabbitmq-server.bat33
-rwxr-xr-xscripts/rabbitmq-service.bat28
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_alarm.erl110
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_control.erl19
-rw-r--r--src/rabbit_guid.erl9
-rw-r--r--src/rabbit_memsup.erl142
-rw-r--r--src/rabbit_memsup_darwin.erl88
-rw-r--r--src/rabbit_memsup_linux.erl101
-rw-r--r--src/rabbit_misc.erl28
-rw-r--r--src/rabbit_multi.erl26
-rw-r--r--src/rabbit_plugin_activator.erl6
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/vm_memory_monitor.erl322
23 files changed, 486 insertions, 525 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index c43ed2ea25..6b4208725f 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -279,7 +279,7 @@ exchange arguments
=item list_bindings [-p I<vhostpath>]
List bindings by virtual host. Each line printed describes a binding,
-with the exchange name, routing key, queue name and arguments,
+with the exchange name, queue name, routing key and arguments,
separated by tab characters.
=item list_connections [I<connectioninfoitem> ...]
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index dd907d1a02..39f98cbe56 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -20,5 +20,4 @@
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {memory_alarms, auto}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3a5cc2b068..62fb1dfbc5 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -75,9 +75,8 @@ echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
%pre
if [ $1 -gt 1 ]; then
- #Upgrade - stop and remove previous instance of rabbitmq-server init.d script
+ # Upgrade - stop previous instance of rabbitmq-server init.d script
/sbin/service rabbitmq-server stop
- /sbin/chkconfig --del rabbitmq-server
fi
# create rabbitmq group
diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat
index 3540bf2d9b..e7aa709544 100644
--- a/scripts/rabbitmq-activate-plugins.bat
+++ b/scripts/rabbitmq-activate-plugins.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -42,8 +44,17 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
-set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
-set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+set RABBITMQ_PLUGINS_DIR=%~dp0..\plugins
+set RABBITMQ_PLUGINS_EXPAND_DIR=%~dp0..\priv\plugins
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%RABBITMQ_EBIN_DIR%" ^
+-noinput -hidden ^
+-s rabbit_plugin_activator ^
+-rabbit plugins_dir \""%RABBITMQ_PLUGINS_DIR:\=/%"\" ^
+-rabbit plugins_expand_dir \""%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%"\" ^
+-rabbit rabbit_ebin \""%RABBITMQ_EBIN_DIR:\=/%"\" ^
+-extra %*
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
+endlocal
diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat
index 190fdef7b7..40155183a1 100644
--- a/scripts/rabbitmq-deactivate-plugins.bat
+++ b/scripts/rabbitmq-deactivate-plugins.bat
@@ -30,6 +30,10 @@ REM
REM Contributor(s): ______________________________________.
REM
-set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+setlocal
-del /f %RABBITMQ_EBIN_DIR%\rabbit.rel %RABBITMQ_EBIN_DIR%\rabbit.script %RABBITMQ_EBIN_DIR%\rabbit.boot
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+del /f "%RABBITMQ_EBIN_DIR%"\rabbit.rel "%RABBITMQ_EBIN_DIR%"\rabbit.script "%RABBITMQ_EBIN_DIR%"\rabbit.boot
+
+endlocal
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 8abf13f192..8de18405b7 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -61,5 +63,13 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%~dp0..\ebin" ^
+-noinput -hidden ^
+%RABBITMQ_MULTI_ERL_ARGS% ^
+-sname rabbitmq_multi ^
+-s rabbit_multi ^
+%RABBITMQ_MULTI_START_ARGS% ^
+-extra %*
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 67768c0e86..34904850be 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -109,9 +109,7 @@ exec erl \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
-os_mon start_memsup false \
- -os_mon start_os_sup false \
- -os_mon memsup_system_only true \
- -os_mon system_memory_high_watermark 0.95 \
+ -os_mon vm_memory_high_watermark 0.4 \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
${RABBITMQ_CLUSTER_CONFIG_OPTION} \
${RABBITMQ_SERVER_START_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 40f47c4b20..5b82ec1562 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -73,17 +75,17 @@ rem Log management (rotation, filtering based of size...) is left as an exercice
set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -103,26 +105,27 @@ if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
)
if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE="%RABBITMQ_BASE%\rabbitmq"
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
- set RABBITMQ_CONFIG_ARG=""
+ set RABBITMQ_CONFIG_ARG=
)
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot %RABBITMQ_BOOT_FILE% %RABBITMQ_CONFIG_ARG% ^
+-boot "%RABBITMQ_BOOT_FILE%" ^
+%RABBITMQ_CONFIG_ARG% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
@@ -137,10 +140,10 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" (
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon vm_memory_high_watermark 0.4 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
+
+endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 29be174284..0cc7bfa915 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_SERVICENAME%"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -51,7 +53,7 @@ if "%RABBITMQ_NODE_PORT%"=="" (
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
- set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.5.5\erts-5.5.5\bin
+ set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.6.5\erts-5.6.5\bin
)
set CONSOLE_FLAG=
@@ -69,7 +71,7 @@ if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" (
echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
- echo %ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe not found!
+ echo "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" not found!
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
exit /B 1
@@ -91,17 +93,17 @@ rem Log management (rotation, filtering based on size...) is left as an exercise
set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BACKUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -173,9 +175,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon vm_memory_high_watermark 0.4 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
@@ -202,3 +202,5 @@ goto END
:END
+
+endlocal
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 8a4e5445e5..512e8587dc 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_NODENAME%"=="" (
set RABBITMQ_NODENAME=rabbit
)
@@ -47,3 +49,5 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
)
"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
+
+endlocal
diff --git a/src/rabbit.erl b/src/rabbit.erl
index acc8f84ae9..3906f2f76d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -140,8 +140,16 @@ start(normal, []) ->
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
- {ok, MemoryAlarms} = application:get_env(memory_alarms),
- ok = rabbit_alarm:start(MemoryAlarms),
+ ok = rabbit_alarm:start(),
+ MemoryWatermark =
+ application:get_env(os_mon, vm_memory_high_watermark),
+ ok = case MemoryWatermark of
+ {ok, Float} when Float == 0 -> ok;
+ {ok, Float} -> start_child(vm_memory_monitor, [Float]);
+ undefined ->
+ throw({undefined, os_mon,
+ vm_memory_high_watermark, settings})
+ end,
ok = rabbit_amqqueue:start(),
@@ -265,8 +273,11 @@ print_banner() ->
io:nl().
start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
{ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, []},
+ {Mod, {Mod, start_link, Args},
transient, 100, worker, [Mod]}),
ok.
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7a2fbcb826..9a639ed40f 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -33,24 +33,19 @@
-behaviour(gen_event).
--export([start/1, stop/0, register/2]).
+-export([start/0, stop/0, register/2]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--define(MEMSUP_CHECK_INTERVAL, 1000).
-
-%% OSes on which we know memory alarms to be trustworthy
--define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-
--record(alarms, {alertees, system_memory_high_watermark = false}).
+-record(alarms, {alertees, vm_memory_high_watermark = false}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfa_tuple() :: {atom(), atom(), list()}).
--spec(start/1 :: (boolean() | 'auto') -> 'ok').
+-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
@@ -58,20 +53,8 @@
%%----------------------------------------------------------------------------
-start(MemoryAlarms) ->
- EnableAlarms = case MemoryAlarms of
- true -> true;
- false -> false;
- auto -> lists:member(os:type(), ?SUPPORTED_OS)
- end,
- ok = alarm_handler:add_alarm_handler(?MODULE, [EnableAlarms]),
- case whereis(memsup) of
- undefined -> if EnableAlarms -> ok = start_memsup(),
- ok = adjust_memsup_interval();
- true -> ok
- end;
- _ -> ok = adjust_memsup_interval()
- end.
+start() ->
+ ok = alarm_handler:add_alarm_handler(?MODULE, []).
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
@@ -83,43 +66,33 @@ register(Pid, HighMemMFA) ->
%%----------------------------------------------------------------------------
-init([MemoryAlarms]) ->
- {ok, #alarms{alertees = case MemoryAlarms of
- true -> dict:new();
- false -> undefined
- end}}.
+init([]) ->
+ {ok, #alarms{alertees = dict:new()}}.
-handle_call({register, _Pid, _HighMemMFA},
- State = #alarms{alertees = undefined}) ->
- {ok, ok, State};
-handle_call({register, Pid, HighMemMFA},
+handle_call({register, Pid, {M, F, A} = HighMemMFA},
State = #alarms{alertees = Alertess}) ->
_MRef = erlang:monitor(process, Pid),
- case State#alarms.system_memory_high_watermark of
- true -> {M, F, A} = HighMemMFA,
- ok = erlang:apply(M, F, A ++ [Pid, true]);
- false -> ok
- end,
+ ok = case State#alarms.vm_memory_high_watermark of
+ true -> apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
+handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) ->
ok = alert(true, State#alarms.alertees),
- {ok, State#alarms{system_memory_high_watermark = true}};
+ {ok, State#alarms{vm_memory_high_watermark = true}};
-handle_event({clear_alarm, system_memory_high_watermark}, State) ->
+handle_event({clear_alarm, vm_memory_high_watermark}, State) ->
ok = alert(false, State#alarms.alertees),
- {ok, State#alarms{system_memory_high_watermark = false}};
+ {ok, State#alarms{vm_memory_high_watermark = false}};
handle_event(_Event, State) ->
{ok, State}.
-handle_info({'DOWN', _MRef, process, _Pid, _Reason},
- State = #alarms{alertees = undefined}) ->
- {ok, State};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #alarms{alertees = Alertess}) ->
{ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
@@ -134,57 +107,6 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-
-start_memsup() ->
- {Mod, Args} =
- case os:type() of
- %% memsup doesn't take account of buffers or cache when
- %% considering "free" memory - therefore on Linux we can
- %% get memory alarms very easily without any pressure
- %% existing on memory at all. Therefore we need to use
- %% our own simple memory monitor.
- %%
- {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
- {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
-
- %% Start memsup programmatically rather than via the
- %% rabbitmq-server script. This is not quite the right
- %% thing to do as os_mon checks to see if memsup is
- %% available before starting it, but as memsup is
- %% available everywhere (even on VXWorks) it should be
- %% ok.
- %%
- %% One benefit of the programmatic startup is that we
- %% can add our alarm_handler before memsup is running,
- %% thus ensuring that we notice memory alarms that go
- %% off on startup.
- %%
- _ -> {memsup, []}
- end,
- %% This is based on os_mon:childspec(memsup, true)
- {ok, _} = supervisor:start_child(
- os_mon_sup,
- {memsup, {Mod, start_link, Args},
- permanent, 2000, worker, [Mod]}),
- ok.
-
-adjust_memsup_interval() ->
- %% The default memsup check interval is 1 minute, which is way too
- %% long - rabbit can gobble up all memory in a matter of seconds.
- %% Unfortunately the memory_check_interval configuration parameter
- %% and memsup:set_check_interval/1 function only provide a
- %% granularity of minutes. So we have to peel off one layer of the
- %% API to get to the underlying layer which operates at the
- %% granularity of milliseconds.
- %%
- %% Note that the new setting will only take effect after the first
- %% check has completed, i.e. after one minute. So if rabbit eats
- %% all the memory within the first minute after startup then we
- %% are out of luck.
- ok = os_mon:call(memsup,
- {set_check_interval, ?MEMSUP_CHECK_INTERVAL},
- infinity).
-
alert(_Alert, undefined) ->
ok;
alert(Alert, Alertees) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf08e85a60..29ebc87348 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -299,7 +299,7 @@ should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
- not_found -> noreply(State);
+ not_found -> {ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
@@ -323,8 +323,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ false -> {ok, NewState};
+ true -> {stop, NewState}
end
end.
@@ -574,10 +574,16 @@ handle_call({commit, Txn}, From, State) ->
erase_tx(Txn),
noreply(NewState);
-handle_call({notify_down, ChPid}, From, State) ->
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- handle_ch_down(ChPid, State);
+handle_call({notify_down, ChPid}, _From, State) ->
+ %% we want to do this synchronously, so that auto_deleted queues
+ %% are no longer visible by the time we send a response to the
+ %% client. The queue is ultimately deleted in terminate/2; if we
+ %% return stop with a reply, terminate/2 will be called by
+ %% gen_server2 *before* the reply is sent.
+ case handle_ch_down(ChPid, State) of
+ {ok, NewState} -> reply(ok, NewState);
+ {stop, NewState} -> {stop, normal, ok, NewState}
+ end;
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
@@ -775,7 +781,10 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% Exclusively owned queues must disappear with their owner.
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
- handle_ch_down(DownPid, State);
+ case handle_ch_down(DownPid, State) of
+ {ok, NewState} -> noreply(NewState);
+ {stop, NewState} -> {stop, normal, NewState}
+ end;
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 2ad99aebec..79034554e0 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -52,12 +52,11 @@
%%----------------------------------------------------------------------------
start() ->
- {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename),
- NodeName = list_to_atom(NodeNameStr),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:localnode(NodeName)}),
+ node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
true -> fun(_Format, _Args1) -> ok end;
false -> fun(Format, Args1) ->
@@ -97,12 +96,12 @@ error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
fmt_stderr("diagnostics:", []),
- NodeHost = rabbit_misc:nodehost(Node),
+ {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
case net_adm:names(NodeHost) of
{error, EpmdReason} ->
fmt_stderr("- unable to connect to epmd on ~s: ~w",
[NodeHost, EpmdReason]);
- {ok, NamePorts} ->
+ {ok, NamePorts} ->
fmt_stderr("- nodes and their ports on ~s: ~p",
[NodeHost, [{list_to_atom(Name), Port} ||
{Name, Port} <- NamePorts]])
@@ -116,11 +115,7 @@ print_badrpc_diagnostics(Node) ->
ok.
parse_args(["-n", NodeS | Args], Params) ->
- Node = case lists:member($@, NodeS) of
- true -> list_to_atom(NodeS);
- false -> rabbit_misc:localnode(list_to_atom(NodeS))
- end,
- parse_args(Args, Params#params{node = Node});
+ parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
parse_args(["-q" | Args], Params) ->
parse_args(Args, Params#params{quiet = true});
parse_args([Command | Args], Params) ->
@@ -186,7 +181,7 @@ messages, acks_uncommitted, consumers, transactions, memory]. The default is
arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
-exchange name, routing key, queue name and arguments, in that order.
+exchange name, queue name, routing key and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
@@ -290,7 +285,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
{VHostArg, _} = parse_vhost_flag_bin(Args),
- InfoKeys = [exchange_name, routing_key, queue_name, args],
+ InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index b789fbd1e0..ea61a679e8 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,13 +104,8 @@ guid() ->
%% generate a readable string representation of a guid. Note that any
%% monotonicity of the guid is not preserved in the encoding.
string_guid(Prefix) ->
- %% we use the (undocumented) ssl_base64 module here because it is
- %% present throughout OTP R11 and R12 whereas base64 only becomes
- %% available in R11B-4.
- %%
- %% TODO: once debian stable and EPEL have moved from R11B-2 to
- %% R11B-4 or later we should change this to use base64.
- Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(
+ erlang:md5(term_to_binary(guid()))).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl
deleted file mode 100644
index b0d57cb27e..0000000000
--- a/src/rabbit_memsup.erl
+++ /dev/null
@@ -1,142 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup).
-
--behaviour(gen_server).
-
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([update/0]).
-
--record(state, {memory_fraction,
- timeout,
- timer,
- mod,
- mod_state,
- alarmed
- }).
-
--define(SERVER, memsup). %% must be the same as the standard memsup
-
--define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(update/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
-
-update() ->
- gen_server:cast(?SERVER, update).
-
-%%----------------------------------------------------------------------------
-
-init([Mod]) ->
- Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
- TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- InitState = Mod:init(),
- State = #state { memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef,
- mod = Mod,
- mod_state = InitState,
- alarmed = false },
- {ok, internal_update(State)}.
-
-start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
- TRef.
-
-%% Export the same API as the real memsup. Note that
-%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
-%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
-handle_call(get_sysmem_high_watermark, _From, State) ->
- {reply, trunc(100 * State#state.memory_fraction), State};
-
-handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
- {reply, ok, State#state{memory_fraction = Float}};
-
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
-
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-
-handle_call(get_memory_data, _From,
- State = #state { mod = Mod, mod_state = ModState }) ->
- {reply, Mod:get_memory_data(ModState), State};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(update, State) ->
- {noreply, internal_update(State)};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-internal_update(State = #state { memory_fraction = MemoryFraction,
- alarmed = Alarmed,
- mod = Mod, mod_state = ModState }) ->
- ModState1 = Mod:update(ModState),
- {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
- NewAlarmed = MemUsed / MemTotal > MemoryFraction,
- case {Alarmed, NewAlarmed} of
- {false, true} ->
- alarm_handler:set_alarm({system_memory_high_watermark, []});
- {true, false} ->
- alarm_handler:clear_alarm(system_memory_high_watermark);
- _ ->
- ok
- end,
- State #state { mod_state = ModState1, alarmed = NewAlarmed }.
diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl
deleted file mode 100644
index 3de2d8430e..0000000000
--- a/src/rabbit_memsup_darwin.erl
+++ /dev/null
@@ -1,88 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup_darwin).
-
--export([init/0, update/1, get_memory_data/1]).
-
--record(state, {total_memory,
- allocated_memory}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
- allocated_memory :: ('undefined' | non_neg_integer())
- }).
-
--spec(init/0 :: () -> state()).
--spec(update/1 :: (state()) -> state()).
--spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
- ('undefined' | pid())}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-init() ->
- #state{total_memory = undefined,
- allocated_memory = undefined}.
-
-update(State) ->
- File = os:cmd("/usr/bin/vm_stat"),
- Lines = string:tokens(File, "\n"),
- Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- [PageSize, Inactive, Active, Free, Wired] =
- [dict:fetch(Key, Dict) ||
- Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
- 'Pages wired down']],
- MemTotal = PageSize * (Inactive + Active + Free + Wired),
- MemUsed = PageSize * (Active + Wired),
- State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
-
-get_memory_data(State) ->
- {State#state.total_memory, State#state.allocated_memory, undefined}.
-
-%%----------------------------------------------------------------------------
-
-%% A line looks like "Foo bar: 123456."
-parse_line(Line) ->
- [Name, RHS | _Rest] = string:tokens(Line, ":"),
- case Name of
- "Mach Virtual Memory Statistics" ->
- ["(page", "size", "of", PageSize, "bytes)"] =
- string:tokens(RHS, " "),
- {page_size, list_to_integer(PageSize)};
- _ ->
- [Value | _Rest1] = string:tokens(RHS, " ."),
- {list_to_atom(Name), list_to_integer(Value)}
- end.
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
deleted file mode 100644
index ca942d7caa..0000000000
--- a/src/rabbit_memsup_linux.erl
+++ /dev/null
@@ -1,101 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup_linux).
-
--export([init/0, update/1, get_memory_data/1]).
-
--record(state, {total_memory,
- allocated_memory}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
- allocated_memory :: ('undefined' | non_neg_integer())
- }).
-
--spec(init/0 :: () -> state()).
--spec(update/1 :: (state()) -> state()).
--spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
- ('undefined' | pid())}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-init() ->
- #state{total_memory = undefined,
- allocated_memory = undefined}.
-
-update(State) ->
- File = read_proc_file("/proc/meminfo"),
- Lines = string:tokens(File, "\n"),
- Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- [MemTotal, MemFree, Buffers, Cached] =
- [dict:fetch(Key, Dict) ||
- Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
- MemUsed = MemTotal - MemFree - Buffers - Cached,
- State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
-
-get_memory_data(State) ->
- {State#state.total_memory, State#state.allocated_memory, undefined}.
-
-%%----------------------------------------------------------------------------
-
--define(BUFFER_SIZE, 1024).
-
-%% file:read_file does not work on files in /proc as it seems to get
-%% the size of the file first and then read that many bytes. But files
-%% in /proc always have length 0, we just have to read until we get
-%% eof.
-read_proc_file(File) ->
- {ok, IoDevice} = file:open(File, [read, raw]),
- Res = read_proc_file(IoDevice, []),
- file:close(IoDevice),
- lists:flatten(lists:reverse(Res)).
-
-read_proc_file(IoDevice, Acc) ->
- case file:read(IoDevice, ?BUFFER_SIZE) of
- {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]);
- eof -> Acc
- end.
-
-%% A line looks like "FooBar: 123456 kB"
-parse_line(Line) ->
- [Name, RHS | _Rest] = string:tokens(Line, ":"),
- [Value | UnitsRest] = string:tokens(RHS, " "),
- Value1 = case UnitsRest of
- [] -> list_to_integer(Value); %% no units
- ["kB"] -> list_to_integer(Value) * 1024
- end,
- {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b20e9a86b6..21764fce6d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -47,7 +47,7 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
--export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]).
+-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -105,8 +105,8 @@
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(localnode/1 :: (atom()) -> erlang_node()).
--spec(nodehost/1 :: (erlang_node()) -> string()).
+-spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()).
+-spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@@ -308,16 +308,22 @@ execute_mnesia_transaction(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-localnode(Name) ->
- list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])).
-
-nodehost(Node) ->
- %% This is horrible, but there doesn't seem to be a way to split a
- %% nodename into its constituent parts.
- tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))).
+makenode({Prefix, Suffix}) ->
+ list_to_atom(lists:append([Prefix, "@", Suffix]));
+makenode(NodeStr) ->
+ makenode(nodeparts(NodeStr)).
+
+nodeparts(Node) when is_atom(Node) ->
+ nodeparts(atom_to_list(Node));
+nodeparts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = nodeparts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
cookie_hash() ->
- ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))).
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index b1cc4d028f..f364872eca 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -100,10 +100,12 @@ Available commands:
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
N = list_to_integer(NodeCount),
- {NodePids, Running} = start_nodes(N, N, [], true,
- getenv("RABBITMQ_NODENAME"),
- getenv("RABBITMQ_NODE_PORT"),
- RpcTimeout),
+ {NodePids, Running} =
+ start_nodes(N, N, [], true,
+ rabbit_misc:nodeparts(
+ getenv("RABBITMQ_NODENAME")),
+ list_to_integer(getenv("RABBITMQ_NODE_PORT")),
+ RpcTimeout),
write_pids_file(NodePids),
case Running of
true -> ok;
@@ -158,24 +160,24 @@ start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
start_nodes(N, Total, PNodePid, Running,
NodeNameBase, NodePortBase, RpcTimeout) ->
+ {NodePre, NodeSuff} = NodeNameBase,
NodeNumber = Total - N,
- NodeName = if NodeNumber == 0 ->
+ NodePre1 = if NodeNumber == 0 ->
%% For compatibility with running a single node
- NodeNameBase;
+ NodePre;
true ->
- NodeNameBase ++ "_" ++ integer_to_list(NodeNumber)
+ NodePre ++ "_" ++ integer_to_list(NodeNumber)
end,
- {NodePid, Started} = start_node(NodeName,
- list_to_integer(NodePortBase) + NodeNumber,
+ {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
+ NodePortBase + NodeNumber,
RpcTimeout),
start_nodes(N - 1, Total, [NodePid | PNodePid],
Started and Running,
NodeNameBase, NodePortBase, RpcTimeout).
-start_node(NodeName, NodePort, RpcTimeout) ->
- os:putenv("RABBITMQ_NODENAME", NodeName),
+start_node(Node, NodePort, RpcTimeout) ->
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
- Node = rabbit_misc:localnode(list_to_atom(NodeName)),
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index f28c4a6ec5..e22d844fdf 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -63,7 +63,7 @@ start() ->
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
{failed_to_load_app, App, Err} ->
- error("failed to load application ~s: ~p", [App, Err]);
+ error("failed to load application ~s:~n~p", [App, Err]);
AppList ->
AppList
end,
@@ -98,14 +98,14 @@ start() ->
end,
ok;
{error, Module, Error} ->
- error("generation of boot script file ~s failed: ~w",
+ error("generation of boot script file ~s failed:~n~s",
[ScriptFile, Module:format_error(Error)])
end,
case post_process_script(ScriptFile) of
ok -> ok;
{error, Reason} ->
- error("post processing of boot script file ~s failed: ~w",
+ error("post processing of boot script file ~s failed:~n~w",
[ScriptFile, Reason])
end,
case systools:script2boot(RootName) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 52139ba75c..c32805080e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -495,7 +495,7 @@ test_cluster_management() ->
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- SecondaryNode = rabbit_misc:localnode(hare),
+ SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
new file mode 100644
index 0000000000..6da47933a4
--- /dev/null
+++ b/src/vm_memory_monitor.erl
@@ -0,0 +1,322 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% In practice Erlang shouldn't be allowed to grow to more than a half
+%% of available memory. The pessimistic scenario is when the Erlang VM
+%% has a single process that's consuming all memory. In such a case,
+%% during garbage collection, Erlang tries to allocate a huge chunk of
+%% continuous memory, which can result in a crash or heavy swapping.
+%%
+%% This module tries to warn Rabbit before such situations occur, so
+%% that it has a higher chance to avoid running out of memory.
+%%
+%% This code depends on Erlang os_mon application.
+
+-module(vm_memory_monitor).
+
+-behaviour(gen_server2).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([update/0, get_total_memory/0,
+ get_check_interval/0, set_check_interval/1,
+ get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]).
+
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+
+%% For an unknown OS, we assume that we have 1GB of memory. It'll be
+%% wrong. Scale by vm_memory_high_watermark in configuration to get a
+%% sensible value.
+-define(MEMORY_SIZE_FOR_UNKNOWN_OS, 1073741824).
+
+-record(state, {total_memory,
+ memory_limit,
+ timeout,
+ timer,
+ alarmed
+ }).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})).
+-spec(update/0 :: () -> 'ok').
+-spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)).
+-spec(get_check_interval/0 :: () -> non_neg_integer()).
+-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
+-spec(get_vm_memory_high_watermark/0 :: () -> float()).
+-spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok').
+
+-endif.
+
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+init([MemFraction]) ->
+ TotalMemory =
+ case get_total_memory() of
+ unknown ->
+ rabbit_log:warning(
+ "Unknown total memory size for your OS ~p. "
+ "Assuming memory size is ~pMB.~n",
+ [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
+ ?MEMORY_SIZE_FOR_UNKNOWN_OS;
+ M -> M
+ end,
+ MemLimit = get_mem_limit(MemFraction, TotalMemory),
+ rabbit_log:info("Memory limit set to ~pMB.~n", [trunc(MemLimit/1048576)]),
+ TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
+ State = #state { total_memory = TotalMemory,
+ memory_limit = MemLimit,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ alarmed = false},
+ {ok, internal_update(State)}.
+
+handle_call(get_vm_memory_high_watermark, _From, State) ->
+ {reply, State#state.memory_limit / State#state.total_memory, State};
+
+handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
+ MemLimit = get_mem_limit(MemFraction, State#state.total_memory),
+ rabbit_log:info("Memory alarm changed to ~p, ~p bytes.~n",
+ [MemFraction, MemLimit]),
+ {reply, ok, State#state{memory_limit = MemLimit}};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+update() ->
+ gen_server2:cast(?SERVER, update).
+
+get_total_memory() ->
+ get_total_memory(os:type()).
+
+get_check_interval() ->
+ gen_server2:call(?MODULE, get_check_interval).
+
+set_check_interval(Fraction) ->
+ gen_server2:call(?MODULE, {set_check_interval, Fraction}).
+
+get_vm_memory_high_watermark() ->
+ gen_server2:call(?MODULE, get_vm_memory_high_watermark).
+
+set_vm_memory_high_watermark(Fraction) ->
+ gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
+
+%%----------------------------------------------------------------------------
+%% Server Internals
+%%----------------------------------------------------------------------------
+
+internal_update(State = #state { memory_limit = MemLimit,
+ alarmed = Alarmed}) ->
+ MemUsed = erlang:memory(total),
+ NewAlarmed = MemUsed > MemLimit,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ emit_update_info(set, MemUsed, MemLimit),
+ alarm_handler:set_alarm({vm_memory_high_watermark, []});
+ {true, false} ->
+ emit_update_info(clear, MemUsed, MemLimit),
+ alarm_handler:clear_alarm(vm_memory_high_watermark);
+ _ ->
+ ok
+ end,
+ State #state {alarmed = NewAlarmed}.
+
+emit_update_info(State, MemUsed, MemLimit) ->
+ rabbit_log:info("vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
+ [State, MemUsed, MemLimit]).
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ TRef.
+
+%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're
+%% in big trouble anyway.
+get_vm_limit() ->
+ case erlang:system_info(wordsize) of
+ 4 -> 4294967296; %% 4 GB for 32 bits 2^32
+ 8 -> 281474976710656 %% 256 TB for 64 bits 2^48
+ %% http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
+ end.
+
+get_mem_limit(MemFraction, TotalMemory) ->
+ lists:min([trunc(TotalMemory * MemFraction), get_vm_limit()]).
+
+%%----------------------------------------------------------------------------
+%% Internal Helpers
+%%----------------------------------------------------------------------------
+cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
+
+%% get_total_memory(OS) -> Total
+%% Windows and Freebsd code based on: memsup:get_memory_usage/1
+%% Original code was part of OTP and released under "Erlang Public License".
+
+get_total_memory({unix,darwin}) ->
+ File = cmd("/usr/bin/vm_stat"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)),
+ [PageSize, Inactive, Active, Free, Wired] =
+ [dict:fetch(Key, Dict) ||
+ Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
+ 'Pages wired down']],
+ PageSize * (Inactive + Active + Free + Wired);
+
+get_total_memory({unix,freebsd}) ->
+ PageSize = freebsd_sysctl("vm.stats.vm.v_page_size"),
+ PageCount = freebsd_sysctl("vm.stats.vm.v_page_count"),
+ PageCount * PageSize;
+
+get_total_memory({win32,_OSname}) ->
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys,
+ _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys;
+
+get_total_memory({unix, linux}) ->
+ File = read_proc_file("/proc/meminfo"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_linux/1, Lines)),
+ dict:fetch('MemTotal', Dict);
+
+get_total_memory({unix, sunos}) ->
+ File = cmd("/usr/sbin/prtconf"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
+ dict:fetch('Memory size', Dict);
+
+get_total_memory(_OsType) ->
+ unknown.
+
+%% A line looks like "Foo bar: 123456."
+parse_line_mach(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ case Name of
+ "Mach Virtual Memory Statistics" ->
+ ["(page", "size", "of", PageSize, "bytes)"] =
+ string:tokens(RHS, " "),
+ {page_size, list_to_integer(PageSize)};
+ _ ->
+ [Value | _Rest1] = string:tokens(RHS, " ."),
+ {list_to_atom(Name), list_to_integer(Value)}
+ end.
+
+%% A line looks like "FooBar: 123456 kB"
+parse_line_linux(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ [Value | UnitsRest] = string:tokens(RHS, " "),
+ Value1 = case UnitsRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
+
+%% A line looks like "Memory size: 1024 Megabytes"
+parse_line_sunos(Line) ->
+ case string:tokens(Line, ":") of
+ [Name, RHS | _Rest] ->
+ [Value1 | UnitsRest] = string:tokens(RHS, " "),
+ Value2 = case UnitsRest of
+ ["Gigabytes"] ->
+ list_to_integer(Value1) * 1024 * 1024 * 1024;
+ ["Megabytes"] ->
+ list_to_integer(Value1) * 1024 * 1024;
+ ["Kilobytes"] ->
+ list_to_integer(Value1) * 1024;
+ _ ->
+ Value1 ++ UnitsRest %% no known units
+ end,
+ {list_to_atom(Name), Value2};
+ [Name] -> {list_to_atom(Name), none}
+ end.
+
+freebsd_sysctl(Def) ->
+ list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
+
+%% file:read_file does not work on files in /proc as it seems to get
+%% the size of the file first and then read that many bytes. But files
+%% in /proc always have length 0, we just have to read until we get
+%% eof.
+read_proc_file(File) ->
+ {ok, IoDevice} = file:open(File, [read, raw]),
+ Res = read_proc_file(IoDevice, []),
+ file:close(IoDevice),
+ lists:flatten(lists:reverse(Res)).
+
+-define(BUFFER_SIZE, 1024).
+read_proc_file(IoDevice, Acc) ->
+ case file:read(IoDevice, ?BUFFER_SIZE) of
+ {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]);
+ eof -> Acc
+ end.