diff options
27 files changed, 653 insertions, 497 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 180a0dc392..706a92af7a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,14 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}). --record(binding_spec, {exchange_name, routing_key, arguments}). +-record(amqqueue, {name, durable, auto_delete, arguments, pid}). --record(binding, {key, handlers}). --record(handler, {binding_spec, queue, qpid}). +%% mnesia doesn't like unary records, so we add a dummy 'value' field +-record(route, {binding, value = const}). +-record(reverse_route, {reverse_binding, value = const}). + +-record(binding, {exchange_name, key, queue_name, args = []}). +-record(reverse_binding, {queue_name, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). @@ -77,16 +80,11 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(binding_spec() :: - #binding_spec{exchange_name :: exchange_name(), - routing_key :: routing_key(), - arguments :: amqp_table()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), auto_delete :: bool(), arguments :: amqp_table(), - binding_specs :: [binding_spec()], pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), @@ -94,6 +92,10 @@ durable :: bool(), auto_delete :: bool(), arguments :: amqp_table()}). +-type(binding() :: + #binding{exchange_name :: exchange_name(), + queue_name :: queue_name(), + key :: binding_key()}). %% TODO: make this more precise by tying specific class_ids to %% specific properties -type(undecoded_content() :: diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index e9e650929b..130001535a 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -53,3 +53,4 @@ -type(vhost() :: binary()). -type(ctag() :: binary()). -type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(binding_key() :: binary()). diff --git a/packaging/Makefile b/packaging/Makefile deleted file mode 100644 index 44a9b328e8..0000000000 --- a/packaging/Makefile +++ /dev/null @@ -1,3 +0,0 @@ -check_tools: - @sh ./checks.sh - @echo All the needed tools seem to be installed, great! diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 6cc3579bab..33032f118f 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -4,32 +4,23 @@ VERSION=0.0.0 SOURCE_TARBALL_DIR=../../../dist TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) -RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' rpms: clean server #Create proper environment for making rpms prepare: - mkdir -p $(TOP_DIR)/BUILD - mkdir -p $(TOP_DIR)/SOURCES - mkdir -p $(TOP_DIR)/SPECS - mkdir -p $(TOP_DIR)/SRPMS - mkdir -p $(TOP_DIR)/RPMS - mkdir -p $(TOP_DIR)/tmp - cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES - cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS - cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD + mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp + cp $(TOP_DIR)/$(TARBALL) SOURCES + cp rabbitmq-server.spec SPECS + sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + + cp init.d SOURCES/rabbitmq-server.init + cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper + cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch clean: - rm -rf $(TOP_DIR)/SOURCES/ - rm -rf $(TOP_DIR)/SPECS/ - rm -rf $(TOP_DIR)/RPMS/ - rm -rf $(TOP_DIR)/SRPMS/ - rm -rf $(TOP_DIR)/BUILD/ - rm -rf $(TOP_DIR)/tmp/ + rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate index 64cd01a185..ab87e4a5c6 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate +++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /sbin/service rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 08694c096c..214f6918fd 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,14 +1,21 @@ Name: rabbitmq-server -Version: %{rpm_version} +Version: %%VERSION%% Release: 1 License: MPLv1.1 Group: Development/Libraries -Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz +Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz +Source1: rabbitmq-server.init +Source2: rabbitmq-server.wrapper +Source3: rabbitmq-server.logrotate URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. +%if 0%{?debian} +%else +BuildRequires: python, python-json +%endif Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> -BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root Summary: The RabbitMQ server Requires(post): chkconfig Requires(pre): chkconfig initscripts @@ -19,10 +26,8 @@ performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _mandir /usr/share/man -%define _sbindir /usr/sbin -%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") -%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version} +%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") +%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} %pre @@ -33,7 +38,7 @@ if [ $1 -gt 1 ]; then fi %prep -%setup -n %{name}-%{main_version} +%setup -n %{name}-%{version} %build make @@ -44,24 +49,23 @@ rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ SBIN_DIR=%{buildroot}%{_sbindir} \ MAN_DIR=%{buildroot}%{_mandir} - VERSION=%{main_version} + VERSION=%{version} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. -cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real -cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl -chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl +install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz mkdir -p %{buildroot}/etc/logrotate.d -cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server +install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server %post # create rabbitmq group @@ -93,7 +97,7 @@ fi %files %defattr(-,root,root,-) -%{_libdir}/rabbitmq_server-%{main_version}/ +%{_erllibdir}/rabbitmq_server-%{version}/ %{_mandir}/man1/rabbitmq-multi.1.gz %{_mandir}/man1/rabbitmq-server.1.gz %{_mandir}/man1/rabbitmqctl.1.gz diff --git a/packaging/checks.sh b/packaging/checks.sh deleted file mode 100755 index 63e88701f3..0000000000 --- a/packaging/checks.sh +++ /dev/null @@ -1,45 +0,0 @@ -#! /bin/sh - -# We check for the presence of the tools necessary to build a release on a -# Debian based OS. - -TOOLS_STOP=0 - -checker () { - if [ ! `which $1` ] - then - echo "$1 is missing, please install it" - TOOLS_STOP=1 - NEW_NAME=`echo $1 | sed -e 's/-/_/g'` - eval "$NEW_NAME=1" - else - echo "$1 found" - fi -}; - -echo ~~~~~~~~~~~~ Looking for mandatory programs ~~~~~~~~~~~~ - -for i in cdbs-edit-patch reprepro rpm elinks wget zip gpg rsync -do - checker $i -done -echo ~~~~~~~~~~~~~~~~~~~~~~~~~~ DONE ~~~~~~~~~~~~~~~~~~~~~~~ - -if [ 1 = $TOOLS_STOP ] -then - [ $cdbs_edit_patch ] && cdbs_edit_patch="cdbs " - [ $reprepro ] && reprepro="reprepro " - [ $rpm ] && rpm="rpm " - [ $elinks ] && elinks="elinks " - [ $wget ] && wget="wget " - [ $zip ] && zip="zip " - [ $gpg ] && gpg="gpg " - [ $rsync ] && rsync="rsync " - - echo - echo We suggest you run the command - echo "apt-get install ${cdbs_edit_patch}${reprepro}${rpm}${elinks}${wget}${zip}${gpg}${rsync}" - echo -fi - -exit $TOOLS_STOP diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index dd74c31ea1..9479feb001 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,5 +1,6 @@ TARBALL_DIR=../../../dist TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server @@ -15,8 +16,8 @@ all: @echo 'Please choose a target from the Makefile.' package: clean - make -C ../.. check_tools - tar -zxvf $(TARBALL_DIR)/$(TARBALL) + cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) + tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index bc691bc7b6..749791a4bd 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python +Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json Standards-Version: 3.7.2 Package: rabbitmq-server diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate index 247635d19a..bfd6b8da0b 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate +++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /etc/init.d/rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod index b87e4bafc3..d2ee48f9ac 100644 --- a/packaging/windows/rabbitmq-service.pod +++ b/packaging/windows/rabbitmq-service.pod @@ -102,6 +102,20 @@ If this file is present it is used by the server to auto-configure a RabbitMQ cluster. See the clustering guide at L<http://www.rabbitmq.com/clustering.html> for details. +=head2 CONSOLE_LOG + +Set this varable to B<new> or B<reuse> to have the console +output from the server redirected to a file named B<SERVICENAME>.debug +in the application data directory of the user that installed the service. +Under Vista this will be F<C:\Documents and Settings\User\AppData\username\SERVICENAME>. +Under previous versions of Windows this will be +F<C:\Documents and Settings\username\Application Data\SERVICENAME>. +If B<CONSOLE_LOG> is set to B<new> then a new file will be created +each time the service starts. If B<CONSOLE_LOG> is set to B<reuse> +then the file will be overwritten each time the service starts. +The default behaviour when B<CONSOLE_LOG> is not set or set to a +value other than B<new> or B<reuse> is to discard the server output. + =head1 EXAMPLES Start a previously-installed RabbitMQ AMQP service: diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b930c8edd8..c953a75312 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -66,8 +66,10 @@ erl \ -sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ - -os_mon start_memsup false \ + -os_mon start_memsup true \ -os_mon start_os_sup false \ + -os_mon memsup_system_only true \ + -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${MNESIA_DIR}\"" \ ${CLUSTER_CONFIG} \ ${RABBIT_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index f08027d237..38b8cc5307 100644..100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 8af9ee38d6..5a176726a0 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -48,6 +48,13 @@ if "%ERLANG_SERVICE_MANAGER_PATH%"=="" ( set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.5.5\erts-5.5.5\bin
)
+set CONSOLE_FLAG=
+set CONSOLE_LOG_VALID=
+for %%i in (new reuse) do if "%%i" == "%CONSOLE_LOG%" set CONSOLE_LOG_VALID=TRUE
+if "%CONSOLE_LOG_VALID%" == "TRUE" (
+ set CONSOLE_FLAG=-debugtype %CONSOLE_LOG%
+)
+
rem *** End of configuration ***
if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" (
@@ -59,7 +66,7 @@ if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" ( echo %ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe not found!
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
- exit /B
+ exit /B 1
)
rem erlang prefers forwardslash as separator in paths
@@ -71,7 +78,7 @@ set LOG_BASE=%RABBITMQ_BASE_UNIX%/log rem We save the previous logs in their respective backup
rem Log management (rotation, filtering based on size...) is left as an exercise for the user.
-set BACKUP_EXTENSION=.bak
+set BACKUP_EXTENSION=.1
set LOGS="%RABBITMQ_BASE%\log\%NODENAME%.log"
set SASL_LOGS="%RABBITMQ_BASE%\log\%NODENAME%-sasl.log"
@@ -99,12 +106,7 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia if "%1" == "install" goto INSTALL_SERVICE
-if "%1" == "start" goto MODIFY_SERVICE
-if "%1" == "stop" goto MODIFY_SERVICE
-if "%1" == "disable" goto MODIFY_SERVICE
-if "%1" == "enable" goto MODIFY_SERVICE
-if "%1" == "list" goto MODIFY_SERVICE
-if "%1" == "remove" goto MODIFY_SERVICE
+for %%i in (start stop disable enable list remove) do if "%%i" == "%1" goto MODIFY_SERVICE
echo.
echo *********************
@@ -123,9 +125,7 @@ echo %~n0 stop - Stop the %SERVICENAME% service echo %~n0 disable - Disable the %SERVICENAME% service
echo %~n0 enable - Enable the %SERVICENAME% service
echo.
-if "%1" == "" pause
exit /B
-goto END
:INSTALL_SERVICE
@@ -141,8 +141,8 @@ if errorlevel 1 ( echo %SERVICENAME% service is already present - only updating service parameters
)
-set RABBIT_EBIN=%~dp0%
-set RABBIT_EBIN=%RABBIT_LIB:\=/%../ebin
+set RABBIT_EBIN_TMP=%~dp0%
+set RABBIT_EBIN=%RABBIT_EBIN_TMP:\=/%../ebin
set ERLANG_SERVICE_ARGUMENTS= ^
-pa "%RABBIT_EBIN%" ^
@@ -150,7 +150,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{sndbuf,16384},{recbuf,4096}]" ^
+-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
-rabbit tcp_listeners "[{\"%NODE_IP_ADDRESS%\",%NODE_PORT%}]" ^
-kernel error_logger {file,\""%LOG_BASE%/%NODENAME%.log"\"} ^
@@ -158,8 +158,10 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
@@ -174,7 +176,7 @@ set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:"=\"% -workdir "%RABBITMQ_BASE%" ^
-stopaction "rabbit:stop_and_halt()." ^
-sname %NODENAME% ^
--debugtype reuse ^
+%CONSOLE_FLAG% ^
-args "%ERLANG_SERVICE_ARGUMENTS%" > NUL
goto END
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index d250570198..fcb7b412e2 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -32,6 +32,8 @@ -export([mainloop/4, drain/2]). -export([proxy_loop/3]). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- start_link(M, A) -> @@ -40,7 +42,8 @@ start_link(M, A) -> ProxyPid = self(), Ref = make_ref(), Pid = spawn_link( - fun () -> mainloop(ProxyPid, Ref, M, + fun () -> ProxyPid ! Ref, + mainloop(ProxyPid, Ref, M, M:init(ProxyPid, A)) end), proxy_loop(Ref, Pid, empty) end). @@ -48,14 +51,19 @@ start_link(M, A) -> %%---------------------------------------------------------------------------- mainloop(ProxyPid, Ref, M, State) -> - ProxyPid ! Ref, NewState = receive {Ref, Messages} -> - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)); + NewSt = + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)), + ProxyPid ! Ref, + NewSt; Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). @@ -89,4 +97,6 @@ proxy_loop(Ref, Pid, State) -> waiting -> Pid ! {Ref, [Msg]}, empty; Messages -> [Msg | Messages] end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) end. diff --git a/src/rabbit.erl b/src/rabbit.erl index c6ef1749f2..a33c5b7bcb 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -157,6 +157,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), + ok = rabbit_alarm:start(), + ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -198,6 +200,7 @@ start(normal, []) -> stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), + ok = rabbit_alarm:stop(), ok. %--------------------------------------------------------------------------- diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl new file mode 100644 index 0000000000..d9c1c45042 --- /dev/null +++ b/src/rabbit_alarm.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_alarm). + +-behaviour(gen_event). + +-export([start/0, stop/0, register/2]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-define(MEMSUP_CHECK_INTERVAL, 1000). + +-record(alarms, {alertees, system_memory_high_watermark = false}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(mfa_tuple() :: {atom(), atom(), list()}). +-spec(start/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + %% The default memsup check interval is 1 minute, which is way too + %% long - rabbit can gobble up all memory in a matter of + %% seconds. Unfortunately the memory_check_interval configuration + %% parameter and memsup:set_check_interval/1 function only provide + %% a granularity of minutes. So we have to peel off one layer of + %% the API to get to the underlying layer which operates at the + %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. + ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, + infinity), + + ok = alarm_handler:add_alarm_handler(?MODULE). + +stop() -> + ok = alarm_handler:delete_alarm_handler(?MODULE). + +register(Pid, HighMemMFA) -> + ok = gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #alarms{alertees = dict:new()}}. + +handle_call({register, Pid, HighMemMFA}, + State = #alarms{alertees = Alertess}) -> + _MRef = erlang:monitor(process, Pid), + case State#alarms.system_memory_high_watermark of + true -> {M, F, A} = HighMemMFA, + ok = erlang:apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertess), + {ok, ok, State#alarms{alertees = NewAlertees}}; + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + ok = alert(true, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = true}}; + +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + ok = alert(false, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = false}}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #alarms{alertees = Alertess}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +alert(Alert, Alertees) -> + dict:fold(fun (Pid, {M, F, A}, Acc) -> + ok = erlang:apply(M, F, A ++ [Pid, Alert]), + Acc + end, ok, Alertees). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd64f1e48d..56d2c35d94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,6 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). --export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). @@ -53,21 +52,12 @@ -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). --type(bind_res() :: {'ok', non_neg_integer()} | - {'error', 'queue_not_found' | 'exchange_not_found'}). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). --spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'binding_not_found'}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -89,7 +79,6 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -131,7 +120,7 @@ recover_durable_queues() -> Queues = lists:map(fun start_queue_process/1, R), rabbit_misc:execute_mnesia_transaction( fun () -> - lists:foreach(fun recover_queue/1, Queues), + lists:foreach(fun store_queue/1, Queues), ok end). @@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - binding_specs = [], pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> ok = recover_queue(Q), + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), Q; [ExistingQ] -> ExistingQ end @@ -167,83 +156,12 @@ start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), Q#amqqueue{pid = Pid}. -recover_queue(Q) -> - ok = store_queue(Q), - ok = recover_bindings(Q), - ok. - -default_binding_spec(#resource{virtual_host = VHost, name = Name}) -> - #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>), - routing_key = Name, - arguments = []}. - -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), - lists:foreach(fun (B) -> - ok = rabbit_exchange:add_binding(B, Q) - end, Specs), +add_default_binding(#amqqueue{name = QueueName}) -> + Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + RoutingKey = QueueName#resource.name, + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), ok. -modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments, - SpecPresentFun, SpecAbsentFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey, - arguments = Arguments}, - case (case lists:member(Spec, Specs0) of - true -> SpecPresentFun; - false -> SpecAbsentFun - end)(Q, Spec) of - {ok, #amqqueue{binding_specs = Specs}} -> - {ok, length(Specs)}; - {error, not_found} -> - {error, exchange_not_found}; - Other -> Other - end; - [] -> {error, queue_not_found} - end - end). - -update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, - UpdateSpecFun, UpdateExchangeFun) -> - Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, - case UpdateExchangeFun(Spec, Q1) of - ok -> store_queue(Q1), - {ok, Q1}; - Other -> Other - end. - -add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, _Spec) -> {ok, Q} end, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun (S, Specs) -> [S | Specs] end, - fun rabbit_exchange:add_binding/2) - end). - -delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun lists:delete/2, - fun rabbit_exchange:delete_binding/2) - end, - fun (Q, Spec) -> - %% the following is essentially a no-op, though crucially - %% it produces {error, not_found} when the exchange does - %% not exist. - case rabbit_exchange:delete_binding(Spec, Q) of - ok -> {error, binding_not_found}; - Other -> Other - end - end). - lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). @@ -295,38 +213,25 @@ ack(QPid, Txn, MsgIds, ChPid) -> commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( - fun (QPid) -> - rabbit_misc:with_exit_handler( - %% we don't care if the queue process has terminated - %% in the meantime - fun () -> ok end, - fun () -> gen_server:call(QPid, {notify_down, ChPid}, - Timeout) end) - end, + %% we don't care if the queue process has terminated in the + %% meantime + fun (_) -> ok end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). -binding_forcibly_removed(BindingSpec, QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [] -> ok; - [Q = #amqqueue{binding_specs = Specs}] -> - store_queue(Q#amqqueue{binding_specs = - lists:delete(BindingSpec, Specs)}) - end - end). - claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -344,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). -delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> - lists:foreach(fun (BindingSpec) -> - ok = rabbit_exchange:delete_binding( - BindingSpec, Q) - end, Specs). - internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -362,10 +261,8 @@ internal_delete(QueueName) -> end end). -delete_queue(Q = #amqqueue{name = QueueName}) -> - ok = delete_bindings(Q), - ok = rabbit_exchange:delete_binding( - default_binding_spec(QueueName), Q), +delete_queue(#amqqueue{name = QueueName}) -> + ok = rabbit_exchange:delete_bindings_for_queue(QueueName), ok = mnesia:delete({amqqueue, QueueName}), ok. @@ -385,13 +282,15 @@ pseudo_queue(QueueName, Pid) -> durable = false, auto_delete = false, arguments = [], - binding_specs = [], pid = Pid}. -safe_pmap_ok(F, L) -> +safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> - try F(V) + try + rabbit_misc:with_exit_handler( + fun () -> H(V) end, + fun () -> F(V) end) catch Class:Reason -> {Class, Reason} end end, L), @@ -399,4 +298,3 @@ safe_pmap_ok(F, L) -> [] -> ok; Errors -> {error, Errors} end. - diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef1646..e687df846a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -30,6 +30,7 @@ -behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER, 1000). -export([start_link/1]). @@ -75,7 +76,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) -> handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), @@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, end, round_robin = NewActive})) of {continue, NewState} -> - {noreply, NewState}; + noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end @@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% queues discarding the message? %% {Delivered, NewState} = attempt_delivery(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), @@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) -> gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), - {noreply, NewState}; + noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, persist_auto_ack(QName, Message) end, Msg = {QName, self(), NextId, Delivered, Message}, - {reply, {ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + reply({ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, + next_msg_id = NextId + 1}); {empty, _} -> - {reply, empty, State} + reply(empty, State) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, @@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> - {reply, {error, queue_owned_by_another_connection}, State}; + reply({error, queue_owned_by_another_connection}, State); ok -> case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> - {reply, {error, exclusive_consume_unavailable}, State}; + reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, @@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, run_poke_burst(State1)} + reply(ok, run_poke_burst(State1)) end end; @@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, State}; + reply(ok, State); C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, @@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ConsumerTag, RoundRobin)}) of {continue, State1} -> - {reply, ok, State1}; + reply(ok, State1); {stop, State1} -> {stop, normal, ok, State1} end @@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, round_robin = RoundRobin}) -> - {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> - {reply, {error, not_empty}, State}; + reply({error, not_empty}, State); IfUnused and not(IsUnused) -> - {reply, {error, in_use}, State}; + reply({error, in_use}, State); true -> {stop, normal, {ok, queue:len(MessageBuffer)}, State} end; handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), - {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + reply({ok, queue:len(MessageBuffer)}, + State#q{message_buffer = queue:new()}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% to check, we'd need to hold not just the ch %% pid for each consumer, but also its reader %% pid... - {reply, locked, State}; + reply(locked, State); ok -> - {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) end; {ReaderPid, _MonitorRef} -> - {reply, ok, State}; + reply(ok, State); _ -> - {reply, locked, State} + reply(locked, State) end. handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {noreply, NewState}; + noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), @@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> _ -> record_pending_acks(Txn, ChPid, MsgIds) end, - {noreply, State} + noreply(State) end; handle_cast({rollback, Txn}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), - {noreply, State}; + noreply(State); handle_cast({redeliver, Messages}, State) -> - {noreply, deliver_or_enqueue_n(Messages, State)}; + noreply(deliver_or_enqueue_n(Messages, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {noreply, deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)} + noreply(deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) end; handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); T = #cr{unsent_message_count =Count} -> - {noreply, possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)} + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State) -> + {noreply, State, hibernate}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9278898ea..1eb421cad4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -export([start_link/4, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4]). +-export([send_command/2, deliver/4, conserve_memory/2]). %% callbacks -export([init/2, handle_message/2]). @@ -49,6 +49,7 @@ -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> Pid ! {deliver, ConsumerTag, AckRequired, Msg}, ok. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + %%--------------------------------------------------------------------------- init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), #ch{state = starting, proxy_pid = ProxyPid, reader_pid = ReaderPid, @@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg}, true, ConsumerTag, DeliveryTag, Msg), State1#ch{next_tag = DeliveryTag + 1}; +handle_message({conserve_memory, Conserve}, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -572,29 +585,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -630,12 +632,47 @@ handle_method(#'channel.flow'{active = _}, _, State) -> %% FIXME: implement {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow_ok'{active = _}, _, State) -> + %% TODO: We may want to correlate this to channel.flow messages we + %% have sent, and complain if we get an unsolicited + %% channel.flow_ok, or the client refuses our flow request. + {noreply, State}; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + ok -> return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, @@ -717,7 +754,8 @@ internal_commit(State = #ch{transaction_id = TxnKey, case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of ok -> new_tx(State); - {error, Errors} -> exit({commit_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, @@ -732,7 +770,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> exit({rollback_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "rollback failed: ~w", [Errors]) end. fold_per_queue(F, Acc0, UAQ) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index bb132a5048..a8c54438a2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -29,13 +29,18 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, list_exchange_bindings/1, + list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/2, delete_binding/2]). +-export([add_binding/4, delete_binding/4]). -export([delete/2]). +-export([delete_bindings_for_queue/1]). -export([check_type/1, assert_type/2, topic_matches/2]). +%% EXTENDED API +-export([list_exchange_bindings/1]). +-export([list_queue_bindings/1]). + -import(mnesia). -import(sets). -import(lists). @@ -48,7 +53,8 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). - +-type(bind_res() :: 'ok' | + {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -57,37 +63,46 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> - [{queue_name(), routing_key(), amqp_table()}]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found()). +-spec(add_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'binding_not_found'}). +-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). +-spec(list_queue_bindings/1 :: (queue_name()) -> + [{exchange_name(), routing_key(), amqp_table()}]). +-spec(list_exchange_bindings/1 :: (exchange_name()) -> + [{queue_name(), routing_key(), amqp_table()}]). -endif. %%---------------------------------------------------------------------------- recover() -> - ok = recover_durable_exchanges(), - ok. - -recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:foldl(fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges) + mnesia:foldl( + fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges), + mnesia:foldl( + fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes), + ok end). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> @@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -list_exchange_bindings(Name) -> - [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. - -bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). - -empty_handlers() -> - []. - %% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate, %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. +%% +%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey) -> - sets:to_list( - sets:union( - mnesia:activity( - async_dirty, - fun () -> - qlc:e(qlc:q([handler_qpids(H) || - #binding{key = {Name1, PatternKey}, - handlers = H} - <- mnesia:table(binding), - Name == Name1, - topic_matches(PatternKey, RoutingKey)])) - end))); - -route(#exchange{name = Name, type = Type}, RoutingKey) -> - BindingKey = delivery_key_for_type(Type, Name, RoutingKey), - case rabbit_misc:dirty_read({binding, BindingKey}) of - {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); - {error, not_found} -> [] - end. + Query = qlc:q([QName || + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Name, + %% TODO: This causes a full scan for each entry + %% with the same exchange (see bug 19336) + topic_matches(BindingKey, RoutingKey)]), + lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); + +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), + [QPid | Acc] + end, [], sets:from_list(Queues)). + +%% TODO: Should all of the route and binding management not be +%% refactored to its own module, especially seeing as unbind will have +%% to be implemented for 0.91 ? + +delete_bindings_for_exchange(ExchangeName) -> + indexed_delete( + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + fun delete_forward_routes/1, fun mnesia:delete_object/1). + +delete_bindings_for_queue(QueueName) -> + Exchanges = exchanges_for_queue(QueueName), + indexed_delete( + reverse_route(#route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + fun mnesia:delete_object/1, fun delete_forward_routes/1), + [begin + [X] = mnesia:read({exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. -delivery_key_for_type(fanout, Name, _RoutingKey) -> - {Name, fanout}; -delivery_key_for_type(_Type, Name, RoutingKey) -> - {Name, RoutingKey}. +indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> + [begin + ok = ReverseDeleteFun(reverse_route(Route)), + ok = ForwardsDeleteFun(Route) + end || Route <- mnesia:match_object(Match)], + ok. -call_with_exchange(Name, Fun) -> - case mnesia:wread({exchange, Name}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end. +delete_forward_routes(Route) -> + ok = mnesia:delete_object(Route), + ok = mnesia:delete_object(durable_routes, Route, write). -make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> - #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). -add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding( - X, RoutingKey, make_handler(BindingSpec, Q)) - end +has_bindings(ExchangeName) -> + MatchHead = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = '$1', + _ = '_'}}, + continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +call_with_exchange(Exchange, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> case mnesia:read({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> Fun(X) + end end). -delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> +call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( - ExchangeName, - fun (X) -> ok = internal_delete_binding( - X, RoutingKey, make_handler(BindingSpec, Q)), - maybe_auto_delete(X) + Exchange, + fun(X) -> case mnesia:read({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> Fun(X, Q) + end end). -%% Must run within a transaction. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> - case internal_delete(ExchangeName, true) of - {error, in_use} -> ok; - ok -> ok - end. - -handlers_isempty([]) -> true; -handlers_isempty([_|_]) -> false. - -extend_handlers(Handlers, Handler) -> [Handler | Handlers]. - -delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). - -handler_qpids(Handlers) -> - sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). +add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:write/3) + end + end). -%% Must run within a transaction. -internal_add_binding(#exchange{name = ExchangeName, type = Type}, - RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - ok = add_handler_to_binding(BindingKey, Handler). +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) + end). -%% Must run within a transaction. -internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - remove_handler_from_binding(BindingKey, Handler), +sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = Arguments}, + ok = case Durable of + true -> Fun(durable_routes, #route{binding = Binding}, write); + false -> ok + end, + [ok, ok] = [Fun(element(1, R), R, write) || + R <- tuple_to_list(route_with_reverse(Binding))], ok. -%% Must run within a transaction. -add_handler_to_binding(BindingKey, Handler) -> - ok = case mnesia:wread({binding, BindingKey}) of - [] -> - ok = mnesia:write( - #binding{key = BindingKey, - handlers = extend_handlers( - empty_handlers(), Handler)}); - [B = #binding{handlers = H}] -> - ok = mnesia:write( - B#binding{handlers = extend_handlers(H, Handler)}) - end. - -%% Must run within a transaction. -remove_handler_from_binding(BindingKey, Handler) -> - case mnesia:wread({binding, BindingKey}) of - [] -> empty; - [B = #binding{handlers = H}] -> - H1 = delete_handler(H, Handler), - case handlers_isempty(H1) of - true -> - ok = mnesia:delete({binding, BindingKey}), - empty; - _ -> - ok = mnesia:write(B#binding{handlers = H1}), - not_empty - end - end. +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), @@ -331,46 +385,50 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -delete(ExchangeName, IfUnused) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> internal_delete(ExchangeName, IfUnused) end). - -internal_delete(ExchangeName, _IfUnused = true) -> - Bindings = bindings_for_exchange(ExchangeName), - case Bindings of - [] -> do_internal_delete(ExchangeName, Bindings); - _ -> - case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, - Bindings) of - true -> - %% There are no handlers anywhere in any of the - %% bindings for this exchange. - do_internal_delete(ExchangeName, Bindings); - false -> - %% There was at least one real handler - %% present. It's still in use. - {error, in_use} - end - end; -internal_delete(ExchangeName, false) -> - do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). - -forcibly_remove_handlers(Handlers) -> - lists:foreach( - fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - ok = rabbit_amqqueue:binding_forcibly_removed( - BindingSpec, QueueName) - end, Handlers), +delete(ExchangeName, _IfUnused = true) -> + call_with_exchange(ExchangeName, fun conditional_delete/1); +delete(ExchangeName, _IfUnused = false) -> + call_with_exchange(ExchangeName, fun unconditional_delete/1). + +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> + conditional_delete(Exchange), ok. -do_internal_delete(ExchangeName, Bindings) -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> {error, not_found}; - _ -> - lists:foreach(fun (#binding{key = K, handlers = H}) -> - ok = forcibly_remove_handlers(H), - ok = mnesia:delete({binding, K}) - end, Bindings), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}) +conditional_delete(Exchange = #exchange{name = ExchangeName}) -> + case has_bindings(ExchangeName) of + false -> unconditional_delete(Exchange); + true -> {error, in_use} end. + +unconditional_delete(#exchange{name = ExchangeName}) -> + ok = delete_bindings_for_exchange(ExchangeName), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}). + +%%---------------------------------------------------------------------------- +%% EXTENDED API +%% These are API calls that are not used by the server internally, +%% they are exported for embedded clients to use + +%% This is currently used in mod_rabbit.erl (XMPP) and expects this to +%% return {QueueName, RoutingKey, Arguments} tuples +list_exchange_bindings(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + [{QueueName, RoutingKey, Arguments} || + #route{binding = #binding{queue_name = QueueName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. + +% Refactoring is left as an exercise for the reader +list_queue_bindings(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, + _ = '_'}}, + [{ExchangeName, RoutingKey, Arguments} || + #route{binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e4ed8f36f..7638af582d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -34,7 +34,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -68,7 +68,8 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) + when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -76,6 +77,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(throw_on_error/2 :: + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). @@ -197,11 +200,19 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + with_exit_handler(Handler, Thunk) -> try Thunk() catch - exit:{R, _} when R =:= noproc; R =:= normal -> Handler() + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + Handler() end. with_user(Username, Thunk) -> @@ -227,6 +238,7 @@ with_vhost(VHostPath, Thunk) -> with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). + execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4ae367ba4b..9b67135def 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,7 +105,13 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, - {binding, [{attributes, record_info(fields, binding)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, @@ -255,7 +261,7 @@ init_db(ClusterNodes) -> end. create_schema() -> - mnesia:stop(), + mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7e68b3eddd..ce26c11a0b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -166,14 +166,27 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +peername(Sock) -> + try + {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + AddressS = inet_parse:ntoa(Address), + {AddressS, Port} + catch + Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Ex]), + rabbit_log:info("closing TCP connection ~p", [self()]), + exit(normal) + end. + start_connection(Parent, Deb, ClientSock) -> - ProfilingValue = setup_profiling(), process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {PeerAddressS, PeerPort} = peername(ClientSock), + ProfilingValue = setup_profiling(), try + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), mainloop(Parent, Deb, switch_callback( @@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + Ref = inet_op(fun () -> prim_inet:async_recv( + OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 41a8d64cb3..a233764766 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> fun (QPid, {Routed, Handled}) -> case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, Txn, Message, QPid) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n", - [QPid, Reason]), - {Routed, Handled} + true -> {true, [QPid | Handled]}; + false -> {true, Handled}; + {'EXIT', _Reason} -> {Routed, Handled} end end, {false, []}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0f6bca91bc..a2688625be 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -36,6 +36,8 @@ -record(wstate, {sock, channel, frame_max}). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) -> mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, [State]) end. handle_message({send_command, MethodRecord}, @@ -127,12 +131,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> Channel, Content, FrameMax), [MethodFrame | ContentFrames]. +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error(inet_error, + fun () -> gen_tcp:send(Sock, Data) end). + internal_send_command(Sock, Channel, MethodRecord) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 3943161a9f..dc38b5941c 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -58,9 +58,9 @@ init({IPAddress, Port, SocketOpts, AcceptorSup, [LSock]) end, lists:duplicate(ConcurrentAcceptorCount, dummy)), - error_logger:info_msg( - "started TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + {ok, {LIPAddress, LPort}} = inet:sockname(LSock), + error_logger:info_msg("started TCP listener on ~s:~p~n", + [inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock=LSock, on_startup = OnStartup, on_shutdown = OnShutdown}}; |
